diff --git a/src/main/python/systemds/scuro/dataloader/timeseries_loader.py b/src/main/python/systemds/scuro/dataloader/timeseries_loader.py index 6b697e6a7ad..8e6c11316b0 100644 --- a/src/main/python/systemds/scuro/dataloader/timeseries_loader.py +++ b/src/main/python/systemds/scuro/dataloader/timeseries_loader.py @@ -36,6 +36,8 @@ class TimeseriesStats: num_signals: int output_shape: tuple output_shape_is_known: bool + avg_length: float + sampling_rate: int class TimeseriesLoader(BaseLoader): @@ -49,15 +51,14 @@ def __init__( sampling_rate: Optional[int] = None, normalize: bool = True, file_format: str = "npy", + modality_type: Optional[ModalityType] = ModalityType.TIMESERIES, ): - super().__init__( - source_path, indices, data_type, chunk_size, ModalityType.TIMESERIES - ) + super().__init__(source_path, indices, data_type, chunk_size, modality_type) self.signal_names = signal_names self.sampling_rate = sampling_rate self.normalize = normalize self.file_format = file_format.lower() - self.stats = self.get_stats(source_path) + self.stats = self.get_stats(source_path, sampling_rate) if self.file_format not in ["npy", "mat", "hdf5", "txt"]: raise ValueError(f"Unsupported file format: {self.file_format}") @@ -148,17 +149,25 @@ def _load_csv_with_header(self, file: str, delimiter: str = None) -> np.ndarray: data = df[selected].to_numpy(dtype=self._data_type) return data - def get_stats(self, source_path: str): + def get_stats(self, source_path: str, sampling_rate: int): self.file_sanity_check(source_path) max_length = 0 num_instances = 0 num_signals = 0 + avg_length = 0 for file_name in self.indices: file = source_path + file_name + "." + self.file_format data = self._load_data(file) max_length = max(max_length, data.shape[0]) + avg_length += data.shape[0] num_instances += 1 num_signals = max(num_signals, data.shape[1]) return TimeseriesStats( - max_length, num_instances, num_signals, (max_length,), True + max_length, + num_instances, + num_signals, + (max_length,), + True, + avg_length / num_instances, + sampling_rate, ) diff --git a/src/main/python/systemds/scuro/dataloader/video_loader.py b/src/main/python/systemds/scuro/dataloader/video_loader.py index b35a22a8b66..bf7bdd846c7 100644 --- a/src/main/python/systemds/scuro/dataloader/video_loader.py +++ b/src/main/python/systemds/scuro/dataloader/video_loader.py @@ -33,6 +33,7 @@ class VideoStats: fps: int max_length: int + avg_length: float max_width: int max_height: int max_channels: int @@ -117,6 +118,7 @@ def get_stats(self, source_path: str): max_height = 0 max_num_channels = 0 num_instances = 0 + avg_length = 0 for file in os.listdir(source_path): file_name = file.split(".")[0] if file_name not in self.indices: @@ -129,6 +131,7 @@ def get_stats(self, source_path: str): height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) num_channels = 3 max_length = max(max_length, length) + avg_length += length max_width = max(max_width, width) max_height = max(max_height, height) max_num_channels = max(max_num_channels, num_channels) @@ -142,6 +145,7 @@ def get_stats(self, source_path: str): return VideoStats( fps, max_length, + avg_length / num_instances, max_width, max_height, max_num_channels, diff --git a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py index 0305f613b63..4f04bffcbe5 100644 --- a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py +++ b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py @@ -233,6 +233,8 @@ def __init__(self, tasks, modalities): def add_result(self, results): # TODO: Check if order of best results matters (deterministic) for result in results: + if result is None: + continue if result.mm_opt: self.results[result.task_name]["mm_results"].append(result) else: diff --git a/src/main/python/systemds/scuro/drsearch/node_executor.py b/src/main/python/systemds/scuro/drsearch/node_executor.py index 4b9b2acc658..a6a7ffe2ca4 100644 --- a/src/main/python/systemds/scuro/drsearch/node_executor.py +++ b/src/main/python/systemds/scuro/drsearch/node_executor.py @@ -234,6 +234,7 @@ def _execute_multiple_reps_for_leaf_dependencies( def _execute_node_worker(node, input_mods, task, rep_cache, gpu_id): + start_time = time.perf_counter() if gpu_id is not None: device = torch.device(f"cuda:{gpu_id}") torch.cuda.set_device(device) @@ -287,23 +288,28 @@ def _run_node_op(): ) else: result = _run_node_op() - + end_time = time.perf_counter() + pid = os.getpid() return { "result": result, "peak_bytes": peak_delta_bytes, "peak_abs_rss_bytes": peak_abs_rss, "gpu_peak_bytes": gpu_peak_bytes, "operation_name": operation_name, + "start_time": start_time, + "end_time": end_time, + "pid": pid, } def _execute_task_worker( task_node_id: str, task: Any, - data: Any, + modality: Any, gpu_id: Optional[int], + aggregation: AggregatedRepresentation = None, ) -> Dict[str, Any]: - + start_time = time.perf_counter() if DEBUG: print(f"Executing task {task_node_id} on GPU {gpu_id}") if gpu_id is not None: @@ -316,6 +322,14 @@ def _execute_task_worker( def _run_task(): start = time.perf_counter() + if aggregation is not None: + data = ( + aggregation.operation(params=aggregation.parameters) + .transform(modality) + .data + ) + else: + data = modality.data scores = task.run(data) end = time.perf_counter() return scores, end - start @@ -336,12 +350,16 @@ def _run_task(): ) else: result = _run_task() - + end_time = time.perf_counter() + pid = os.getpid() return { "scores": result[0], "task_time": result[1], "peak_bytes": peak_delta_bytes, "gpu_peak_bytes": gpu_peak_bytes, + "start_time": start_time, + "end_time": end_time, + "pid": pid, } @@ -385,6 +403,9 @@ def __init__( checkpoint_every=1, resume=False, ) + self.statistics = {} + self.statistics["worker_stats"] = {} + self.statistics["node_stats"] = {} def _shm_names_for_submit( self, parent_node_ids: List[str], payload_data: Any @@ -451,6 +472,10 @@ def submit_node(node_id: str): ] if self._is_task_node(node): + # potentially batch task nodes and then execute them together + # by the the same task type (index, gpu vs cpu) + # either enough nodes to batch or enough time to batch whatever happens first + task_result = ResultEntry( dag=self._get_dag_from_node_ids(node_id), representation_time=parent_results[0].transform_time, @@ -458,17 +483,20 @@ def submit_node(node_id: str): task_results[node_id] = task_result task_idx = int(node.parameters.get("_task_idx", 0)) payload_data = ( - self.modalities[0].data + self.modalities[0] if parent_results is None - else parent_results[0].data + else parent_results[0] ) retained = self._retain_for_submit(parent_node_ids, payload_data) + aggregation = node.aggregation + future = executor.submit( _execute_task_worker, node_id, self.tasks[task_idx], payload_data, gpu_id, + aggregation, ) else: payload_data = ( @@ -538,6 +566,14 @@ def submit_new_ready_nodes(): peak_bytes = result["peak_bytes"] gpu_peak_bytes = result["gpu_peak_bytes"] node = self.scheduler.mapping[node_id] + self.statistics["worker_stats"][result["pid"]] = { + "start_time": result["start_time"], + "end_time": result["end_time"], + } + self.statistics["node_stats"][node_id] = { + "start_time": result["start_time"], + "end_time": result["end_time"], + } if self._is_task_node(node): task_results[node_id].task_time = result["task_time"] task_results[node_id].train_score = result["scores"][ @@ -594,7 +630,10 @@ def submit_new_ready_nodes(): self.result_cache.cleanup_all() self._cleanup_leaf_shared_memory() - return {"task_results": list(task_results.values())} + return { + "task_results": list(task_results.values()), + "statistics": self.statistics, + } def _handle_modality_result( self, diff --git a/src/main/python/systemds/scuro/drsearch/operator_registry.py b/src/main/python/systemds/scuro/drsearch/operator_registry.py index bc9bd406a8f..4c5641fdd91 100644 --- a/src/main/python/systemds/scuro/drsearch/operator_registry.py +++ b/src/main/python/systemds/scuro/drsearch/operator_registry.py @@ -19,7 +19,7 @@ # # ------------------------------------------------------------- from typing import Union, List - +import math from systemds.scuro.modality.type import ModalityType from systemds.scuro.representations.representation import Representation @@ -57,6 +57,24 @@ def set_representations(self, modality_type, representations): else: self._representations[modality_type] = [representations] + def set_context_operators(self, modality_type, context_operators): + if isinstance(context_operators, list): + self._context_operators[modality_type] = context_operators + else: + self._context_operators[modality_type] = [context_operators] + + def set_context_representation_operators( + self, modality_type, context_representation_operators + ): + if isinstance(context_representation_operators, list): + self._context_representation_operators[modality_type] = ( + context_representation_operators + ) + else: + self._context_representation_operators[modality_type] = [ + context_representation_operators + ] + def add_representation( self, representation: Representation, modality: ModalityType ): @@ -139,6 +157,64 @@ def get_representation_by_name(self, representation_name, modality_type): def get_context_representations(self, modality_type): return self._context_representation_operators[modality_type] + def get_context_lenghts_for_modality(self, modality_type, statistics): + if modality_type == ModalityType.AUDIO: + window_lengths = [ + 0.010, + 0.020, + 0.050, + 0.075, + 0.100, + 0.5, + 1, + 2, + 5, + 10, + ] # seconds + + if ( + modality_type == ModalityType.TIMESERIES + or modality_type == ModalityType.PHYSIOLOGICAL + ): + window_lengths = [0.05, 0.1, 0.5, 0.75, 1, 2, 5, 10, 30, 60] # seconds + + if modality_type == ModalityType.VIDEO: + window_lengths = [0.5, 1, 2, 5, 10] # seconds + + if ( + modality_type == ModalityType.AUDIO + or modality_type == ModalityType.TIMESERIES + or modality_type == ModalityType.PHYSIOLOGICAL + ): + max_length_in_seconds = statistics.max_length / statistics.sampling_rate + window_lengths = [ + length for length in window_lengths if length <= max_length_in_seconds + ] + + effective_window_lenghts = [ + statistics.sampling_rate * length for length in window_lengths + ] + num_windows = [ + math.ceil(statistics.avg_length / length) + for length in effective_window_lenghts + ] + return effective_window_lenghts, num_windows + + if modality_type == ModalityType.VIDEO: + max_length_in_seconds = statistics.max_length / statistics.fps + window_lengths = [ + length for length in window_lengths if length <= max_length_in_seconds + ] + + effective_window_lenghts = [ + statistics.fps * length for length in window_lengths + ] + num_windows = [ + math.ceil(statistics.avg_length / length) + for length in effective_window_lenghts + ] + return effective_window_lenghts, num_windows + def register_representation(modalities: Union[ModalityType, List[ModalityType]]): """ diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py b/src/main/python/systemds/scuro/drsearch/representation_dag.py index b1d5835ad3f..a19c44396fd 100644 --- a/src/main/python/systemds/scuro/drsearch/representation_dag.py +++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py @@ -80,6 +80,7 @@ class RepresentationNode: representation_index: int = None parameters: Dict[str, Any] = field(default_factory=dict) gpu_id: int = None + aggregation: AggregatedRepresentation = None @dataclass diff --git a/src/main/python/systemds/scuro/drsearch/task.py b/src/main/python/systemds/scuro/drsearch/task.py index 404624ec3a5..e74e791794c 100644 --- a/src/main/python/systemds/scuro/drsearch/task.py +++ b/src/main/python/systemds/scuro/drsearch/task.py @@ -259,7 +259,9 @@ def run(self, data): val_y = self._gather_by_indices(self.labels, fold_val_indices) self._run_fold(model, train_X, train_y, val_X, val_y, test_X, test_y) - + if hasattr(model, "clean_up"): + model.clean_up() + del model return [ self.train_scores.compute_averages(), self.val_scores.compute_averages(), diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py index 8dc2e1a082f..a67cbe12029 100644 --- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py +++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py @@ -20,6 +20,8 @@ # ------------------------------------------------------------- import copy import pickle +import csv +from pathlib import Path import time from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass @@ -39,6 +41,7 @@ ) from systemds.scuro.modality.modality import Modality from systemds.scuro.drsearch.operator_registry import Registry +from systemds.scuro.representations.window_aggregation import WindowAggregation from systemds.scuro.utils.checkpointing import CheckpointManager from systemds.scuro.drsearch.representation_dag import ( RepresentationDag, @@ -63,6 +66,8 @@ def __init__( resume: bool = False, max_num_workers: int = -1, enable_checkpointing: bool = True, + enable_execution_profile: bool = False, + execution_profile_path: Optional[str] = None, ): self.enable_checkpointing = enable_checkpointing self.modalities = modalities @@ -92,7 +97,8 @@ def __init__( self.operator_performance = UnimodalResults( modalities, tasks, debug, True, k, self.metric_name ) - + self.enable_execution_profile = enable_execution_profile + self.execution_profile_path = execution_profile_path self._tasks_require_same_dims = True self.expected_dimensions = tasks[0].expected_dim @@ -242,7 +248,7 @@ def optimize(self): for modality in self.modalities: try: - local_result = self._process_modality( + local_result, execution_time = self._process_modality( modality, ( int( @@ -274,6 +280,7 @@ def optimize(self): self.operator_performance.results, {} ) raise + return execution_time def _expand_dags_with_task_roots( self, dags: List[RepresentationDag] @@ -281,7 +288,16 @@ def _expand_dags_with_task_roots( expanded_dags: List[RepresentationDag] = [] for dag in dags: - root_id = dag.root_node_id + dag = copy.deepcopy(dag) + root_node = dag.get_node_by_id(dag.root_node_id) + if root_node and root_node.operation == AggregatedRepresentation: + aggregation = root_node + dag.nodes = [n for n in dag.nodes if n.node_id != root_node.node_id] + root_id = aggregation.inputs[0] + dag.root_node_id = root_id + else: + aggregation = None + root_id = dag.root_node_id for task_idx, _ in enumerate(self.tasks): task_node_id = f"task_{root_id}_{task_idx}" @@ -294,6 +310,7 @@ def _expand_dags_with_task_roots( "_task_idx": task_idx, "_dag_root_id": root_id, }, + aggregation=aggregation, ) task_root_dag = RepresentationDag( @@ -326,12 +343,18 @@ def _process_modality(self, modality, skip_remaining: int = 0, scheduler=None): self.result_path, enable_checkpointing=self.enable_checkpointing, ) - + start_time = time.perf_counter() exec_out = node_executor.run() + end_time = time.perf_counter() task_results = exec_out["task_results"] for task_result in task_results: local_results.add_task_result(task_result, dags) + statistics = exec_out["statistics"] + for worker_stat in statistics["worker_stats"]: + local_results.add_worker_stat(worker_stat, modality.modality_id) + for node_stat in statistics["node_stats"]: + local_results.add_node_stat(node_stat, modality.modality_id) if self.save_all_results: timestr = time.strftime("%Y%m%d-%H%M%S") @@ -339,7 +362,7 @@ def _process_modality(self, modality, skip_remaining: int = 0, scheduler=None): with open(file_name, "wb") as f: pickle.dump(local_results.results, f) - return local_results + return local_results, end_time - start_time def _build_execution_dags_for_modality( self, modality: Modality, skip_remaining: int = 0 @@ -354,6 +377,7 @@ def _build_execution_dags_for_modality( if ( modality.modality_type == ModalityType.TIMESERIES or modality.modality_type == ModalityType.AUDIO + or modality.modality_type == ModalityType.PHYSIOLOGICAL ): dags.extend( self.temporal_context_operators( @@ -509,7 +533,6 @@ def _build_modality_dag( def _aggregation_needed(self, dag: RepresentationDag) -> bool: input_stats = {} - # TODO: adapt this to the fusion of multiple modalities, list of input stats needed for modality in self.modalities: if modality.modality_id == dag.nodes[0].modality_id: input_stats[dag.nodes[0].node_id] = modality.stats @@ -584,16 +607,26 @@ def temporal_context_operators(self, modality, builder, leaf_id): modality.modality_type ) context_operators = self._get_context_operators(modality.modality_type) - + window_lengths, num_windows = ( + self.operator_registry.get_context_lenghts_for_modality( + modality.modality_type, modality.stats + ) + ) dags = [] for agg in aggregators: for context_operator in context_operators: - context_node_id = builder.create_operation_node( - context_operator, - [leaf_id], - context_operator(agg()).get_current_parameters(), - ) - dags.append(builder.build(context_node_id)) + for window_size, num_window in zip(window_lengths, num_windows): + context_operator_instance = context_operator(agg()) + if hasattr(context_operator, "num_windows"): + context_operator_instance.num_windows = num_window + elif hasattr(context_operator_instance, "window_size"): + context_operator_instance.window_size = window_size + context_node_id = builder.create_operation_node( + context_operator, + [leaf_id], + context_operator_instance.get_current_parameters(), + ) + dags.append(builder.build(context_node_id)) return dags @@ -619,6 +652,8 @@ def __init__( for modality in self.modality_ids: self.results[modality] = {task_name: [] for task_name in self.task_names} self.cache[modality] = {task_name: [] for task_name in self.task_names} + self.worker_stats = {} + self.node_stats = {} def add_task_result(self, task_result: ResultEntry, dags: List[RepresentationDag]): dag_id = task_result.dag.dag_id @@ -750,6 +785,12 @@ def get_k_best_results( return results, cache + def add_worker_stat(self, worker_stats, modality_id): + self.worker_stats[modality_id] = worker_stats + + def add_node_stat(self, node_stats, modality_id): + self.node_stats[modality_id] = node_stats + def get_dag_by_id(dags: List[RepresentationDag], dag_id: int) -> RepresentationDag: for dag in dags: diff --git a/src/main/python/systemds/scuro/modality/type.py b/src/main/python/systemds/scuro/modality/type.py index 0493edf5bdd..a648044562a 100644 --- a/src/main/python/systemds/scuro/modality/type.py +++ b/src/main/python/systemds/scuro/modality/type.py @@ -66,6 +66,11 @@ class ModalitySchemas: "num_columns": "integer", } + PHYSIOLOGICAL_SCHEMA = { + **TEMPORAL_BASE_SCHEMA, + "num_columns": "integer", + } + _metadata_handlers = {} @classmethod @@ -184,6 +189,22 @@ def handle_timeseries_metadata(md, data): return md +@ModalitySchemas.register_metadata_handler("PHYSIOLOGICAL") +def handle_physiological_metadata(md, data): + new_frequency = calculate_new_frequency(len(data), md["length"], md["frequency"]) + md.update( + { + "length": len(data), + "num_columns": ( + data.shape[1] if isinstance(data, np.ndarray) and data.ndim > 1 else 1 + ), + "frequency": new_frequency, + "timestamp": create_timestamps(new_frequency, len(data)), + } + ) + return md + + @ModalitySchemas.register_metadata_handler("TEXT") def handle_text_metadata(md, data): md.update({"length": len(data)}) @@ -205,6 +226,7 @@ class ModalityType(Flag): "VIDEO": "create_video_metadata", "IMAGE": "create_image_metadata", "TIMESERIES": "create_ts_metadata", + "PHYSIOLOGICAL": "create_ts_metadata", "EMBEDDING": "create_embedding_metadata", } diff --git a/src/main/python/systemds/scuro/representations/lstm.py b/src/main/python/systemds/scuro/representations/lstm.py index 7243b65966a..c15776284ce 100644 --- a/src/main/python/systemds/scuro/representations/lstm.py +++ b/src/main/python/systemds/scuro/representations/lstm.py @@ -25,7 +25,7 @@ from torch import nn from torch.utils.data import DataLoader, TensorDataset from typing import List, Dict, Any -from systemds.scuro.utils.static_variables import get_device, get_device_for_model +from systemds.scuro.utils.static_variables import get_device import numpy as np from systemds.scuro.modality.modality import Modality diff --git a/src/main/python/systemds/scuro/representations/mel_spectrogram.py b/src/main/python/systemds/scuro/representations/mel_spectrogram.py index 6d378806475..3cf25b44c4a 100644 --- a/src/main/python/systemds/scuro/representations/mel_spectrogram.py +++ b/src/main/python/systemds/scuro/representations/mel_spectrogram.py @@ -36,6 +36,14 @@ PY_LIST_SLOT_BYTES, ) +import warnings + +warnings.filterwarnings( + "ignore", + message=r"n_fft=\d+ is too (?:small|large) for input signal", + module=r"librosa", +) + @register_representation(ModalityType.AUDIO) @register_context_representation_operator(ModalityType.AUDIO) @@ -56,6 +64,7 @@ def __init__(self, n_mels=128, hop_length=512, n_fft=2048, params=None): self.n_mels = int(n_mels) self.hop_length = int(hop_length) self.n_fft = int(n_fft) + self.window_size = self.n_fft def transform(self, modality, aggregation=None): transformed_modality = TransformedModality( diff --git a/src/main/python/systemds/scuro/representations/mfcc.py b/src/main/python/systemds/scuro/representations/mfcc.py index 406fc6616c1..483ae3eef81 100644 --- a/src/main/python/systemds/scuro/representations/mfcc.py +++ b/src/main/python/systemds/scuro/representations/mfcc.py @@ -35,43 +35,54 @@ PY_LIST_HEADER_BYTES, PY_LIST_SLOT_BYTES, ) +import warnings + +warnings.filterwarnings( + "ignore", + message=r"n_fft=\d+ is too (?:small|large) for input signal", + module=r"librosa", +) @register_representation(ModalityType.AUDIO) @register_context_representation_operator(ModalityType.AUDIO) class MFCC(UnimodalRepresentation): - def __init__(self, n_mfcc=12, dct_type=2, n_mels=128, hop_length=512, params=None): + def __init__( + self, n_mfcc=12, dct_type=2, n_mels=128, hop_length=512, n_fft=2048, params=None + ): parameters = { "n_mfcc": [x for x in range(10, 26)], "dct_type": [1, 2, 3], "hop_length": [256, 512, 1024, 2048], "n_mels": [20, 32, 64, 128], - } # TODO + "n_fft": [1024, 2048, 4096], + } + super().__init__("MFCC", ModalityType.TIMESERIES, parameters, False) - # Allow construction from a parameter dict (used by optimizer) if params is not None: n_mfcc = params.get("n_mfcc", n_mfcc) dct_type = params.get("dct_type", dct_type) n_mels = params.get("n_mels", n_mels) hop_length = params.get("hop_length", hop_length) + n_fft = params.get("n_fft", n_fft) self.n_mfcc = int(n_mfcc) self.dct_type = int(dct_type) self.n_mels = int(n_mels) self.hop_length = int(hop_length) + self.n_fft = int(n_fft) + self.window_size = self.n_fft def transform(self, modality, aggregation=None): transformed_modality = TransformedModality( modality, self, self.output_modality_type ) result = [] - - for i, sample in enumerate(modality.data): - sr = modality.metadata[i]["frequency"] - computed_feature = self.compute_feature(sample, sr) + sr = modality.metadata[0]["frequency"] if modality.metadata else 22050 + for sample in modality.data: + computed_feature = self.compute_feature(sample, sr=sr) result.append(computed_feature) - transformed_modality.data = result return transformed_modality @@ -79,20 +90,22 @@ def compute_feature(self, instance, sr=None): if sr is None: sr = 22050 mfcc = librosa.feature.mfcc( - y=np.array(instance), + y=np.asarray(instance, dtype=np.float32), sr=sr, n_mfcc=self.n_mfcc, dct_type=self.dct_type, hop_length=self.hop_length, n_mels=self.n_mels, + n_fft=self.n_fft, ) if mfcc.ndim == 2: - mean = np.mean(mfcc, keepdims=True) - std = np.std(mfcc, keepdims=True) + mean = mfcc.mean(keepdims=True) + std = mfcc.std(keepdims=True) else: - mean = np.mean(mfcc, axis=(1, 2), keepdims=True) - std = np.std(mfcc, axis=(1, 2), keepdims=True) - mfcc = (mfcc - mean) / np.maximum(std, 1e-8) + mean = mfcc.mean(axis=(1, 2), keepdims=True) + std = mfcc.std(axis=(1, 2), keepdims=True) + mfcc -= mean + mfcc /= np.maximum(std, 1e-8) if instance.ndim == 1: return mfcc.T diff --git a/src/main/python/systemds/scuro/representations/mlp_averaging.py b/src/main/python/systemds/scuro/representations/mlp_averaging.py index 46fe04899aa..8c8d67a06ec 100644 --- a/src/main/python/systemds/scuro/representations/mlp_averaging.py +++ b/src/main/python/systemds/scuro/representations/mlp_averaging.py @@ -54,12 +54,8 @@ def __init__(self, output_dim=512, batch_size=32, params=None): "batch_size": [8, 16, 32, 64, 128], } super().__init__("MLPAveraging", parameters) - if params is not None: - self.output_dim = params.get("output_dim", output_dim) - self.batch_size = params.get("batch_size", batch_size) - else: - self.output_dim = output_dim - self.batch_size = batch_size + self.output_dim = output_dim + self.batch_size = batch_size self.device = None self.data_type = np.float32 self.gpu_id = None @@ -74,10 +70,7 @@ def gpu_id(self, gpu_id): self.device = get_device(gpu_id) def get_output_stats(self, input_stats: RepresentationStats) -> RepresentationStats: - if ( - len(input_stats.output_shape) > 1 - and np.prod(input_stats.output_shape) > self.output_dim - ): + if len(input_stats.output_shape) > 1: return RepresentationStats( input_stats.num_instances, (self.output_dim,), @@ -94,13 +87,7 @@ def get_output_stats(self, input_stats: RepresentationStats) -> RepresentationSt ) return RepresentationStats( input_stats.num_instances, - ( - ( - np.prod(input_stats.output_shape) - if np.prod(input_stats.output_shape) < self.output_dim - else self.output_dim - ), - ), + (self.output_dim,), output_shape_is_known=input_stats.output_shape_is_known, ) diff --git a/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py b/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py index 066f3432159..af5a83c7e50 100644 --- a/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py +++ b/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py @@ -60,6 +60,13 @@ def __init__( self.batch_size = int(batch_size) self.num_epochs = int(num_epochs) self.learning_rate = float(learning_rate) + if params is not None: + self.hidden_dim = int(params.get("hidden_dim", self.hidden_dim)) + self.num_heads = int(params.get("num_heads", self.num_heads)) + self.dropout = float(params.get("dropout", self.dropout)) + self.batch_size = int(params.get("batch_size", self.batch_size)) + self.num_epochs = int(params.get("num_epochs", self.num_epochs)) + self.learning_rate = float(params.get("learning_rate", self.learning_rate)) self.needs_training = True self.needs_alignment = True diff --git a/src/main/python/systemds/scuro/representations/spectrogram.py b/src/main/python/systemds/scuro/representations/spectrogram.py index ab0c8a6c649..9d14931ef0e 100644 --- a/src/main/python/systemds/scuro/representations/spectrogram.py +++ b/src/main/python/systemds/scuro/representations/spectrogram.py @@ -35,6 +35,13 @@ PY_LIST_HEADER_BYTES, PY_LIST_SLOT_BYTES, ) +import warnings + +warnings.filterwarnings( + "ignore", + message=r"n_fft=\d+ is too (?:small|large) for input signal", + module=r"librosa", +) @register_representation(ModalityType.AUDIO) @@ -45,6 +52,7 @@ def __init__(self, hop_length=512, n_fft=2048, params=None): super().__init__("Spectrogram", ModalityType.TIMESERIES, parameters, False) self.hop_length = int(hop_length) self.n_fft = int(n_fft) + self.window_size = self.n_fft def transform(self, modality, aggregation=None): transformed_modality = TransformedModality( diff --git a/src/main/python/systemds/scuro/representations/timeseries_representations.py b/src/main/python/systemds/scuro/representations/timeseries_representations.py index 26c8c1a8d98..14fcacf724f 100644 --- a/src/main/python/systemds/scuro/representations/timeseries_representations.py +++ b/src/main/python/systemds/scuro/representations/timeseries_representations.py @@ -51,9 +51,13 @@ def transform(self, modality, aggregation=None): feature = self.compute_feature(signal) result.append(feature) - transformed_modality.data = np.vstack(np.array(result)).astype( - modality.metadata[0]["data_layout"]["type"] - ) + maxlen = max(r.size for r in result) + padded_result = [ + np.pad(r, (0, maxlen - r.size), mode="constant", constant_values=0.0) + for r in result + ] + dtype = modality.metadata[0]["data_layout"]["type"] + transformed_modality.data = np.vstack(np.asarray(padded_result)).astype(dtype) return transformed_modality def get_output_stats(self, input_stats): @@ -71,8 +75,10 @@ def estimate_peak_memory_bytes(self, input_stats): } -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) +@register_context_representation_operator(ModalityType.AUDIO) class Mean(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Mean") @@ -81,8 +87,9 @@ def compute_feature(self, signal, axis=-1): return np.array(np.mean(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class Min(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Min") @@ -91,8 +98,9 @@ def compute_feature(self, signal, axis=-1): return np.array(np.min(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class Max(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Max") @@ -101,8 +109,9 @@ def compute_feature(self, signal, axis=-1): return np.array(np.max(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class Sum(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Sum") @@ -111,8 +120,10 @@ def compute_feature(self, signal, axis=-1): return np.array(np.sum(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) +@register_context_representation_operator(ModalityType.AUDIO) class Std(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Std") @@ -121,8 +132,10 @@ def compute_feature(self, signal, axis=-1): return np.array(np.std(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) +@register_context_representation_operator(ModalityType.AUDIO) class Skew(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Skew") @@ -131,12 +144,13 @@ def compute_feature(self, signal, axis=-1): return np.array(stats.skew(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class Quantile(TimeSeriesRepresentation): def __init__(self, quantile=0.9, params=None): super().__init__( - "Qunatile", {"quantile": [0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]} + "Qunatile", {"quantile": [0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]} ) self.quantile = quantile @@ -144,8 +158,10 @@ def compute_feature(self, signal, axis=-1): return np.array(np.quantile(signal, self.quantile, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) +@register_context_representation_operator(ModalityType.AUDIO) class Kurtosis(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Kurtosis") @@ -154,8 +170,10 @@ def compute_feature(self, signal, axis=-1): return np.array(stats.kurtosis(signal, fisher=True, bias=True, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) +@register_context_representation_operator(ModalityType.AUDIO) class RMS(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("RMS") @@ -164,8 +182,9 @@ def compute_feature(self, signal, axis=-1): return np.array(np.sqrt(np.mean(np.square(signal), axis=axis))) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class ZeroCrossingRate(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("ZeroCrossingRate") @@ -174,8 +193,9 @@ def compute_feature(self, signal, axis=-1): return np.array(np.sum(np.diff(np.signbit(signal), axis=axis) != 0, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class ACF(TimeSeriesRepresentation): def __init__(self, k=1, params=None): super().__init__("ACF", {"k": [1, 2, 5, 10, 20, 25, 50, 100, 200, 500]}) @@ -209,8 +229,9 @@ def get_k_values(self, max_length, percent=0.2, num=10, log=False): return k_vals.tolist() -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class FrequencyMagnitude(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("FrequencyMagnitude") @@ -219,7 +240,8 @@ def compute_feature(self, signal, axis=-1): return np.array(np.abs(np.fft.rfft(signal, axis=axis))) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) @register_context_representation_operator(ModalityType.TIMESERIES) class SpectralCentroid(TimeSeriesRepresentation): def __init__(self, fs=1.0, params=None): @@ -240,7 +262,8 @@ def compute_feature(self, signal, axis=-1): return np.array(num / den) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) @register_context_representation_operator(ModalityType.TIMESERIES) class BandpowerFFT(TimeSeriesRepresentation): def __init__(self, fs=1.0, f1=0.0, f2=0.5, params=None): diff --git a/src/main/python/systemds/scuro/representations/wav2vec.py b/src/main/python/systemds/scuro/representations/wav2vec.py index ece034e099b..5e03baf8bc4 100644 --- a/src/main/python/systemds/scuro/representations/wav2vec.py +++ b/src/main/python/systemds/scuro/representations/wav2vec.py @@ -28,10 +28,11 @@ from systemds.scuro.representations.representation import RepresentationStats from systemds.scuro.representations.unimodal import UnimodalRepresentation from systemds.scuro.drsearch.operator_registry import register_representation +from systemds.scuro.utils.memory_utility import get_device -import warnings +from transformers.utils import logging as transformers_logging -warnings.filterwarnings("ignore", message="Some weights of") +transformers_logging.set_verbosity_error() @register_representation(ModalityType.AUDIO) diff --git a/src/main/python/systemds/scuro/representations/window_aggregation.py b/src/main/python/systemds/scuro/representations/window_aggregation.py index 36e12c0cd5c..52a17401959 100644 --- a/src/main/python/systemds/scuro/representations/window_aggregation.py +++ b/src/main/python/systemds/scuro/representations/window_aggregation.py @@ -155,11 +155,21 @@ def __init__( pad=True, params=None, ): + window_size_set = False + if isinstance(aggregation_function, Representation) and hasattr( + aggregation_function, "window_size" + ): + window_size = aggregation_function.window_size + window_size_set = True + if params is not None: if isinstance( params.get("aggregation_function"), (Aggregation, Representation) ): aggregation_function = params["aggregation_function"] + if hasattr(aggregation_function, "window_size"): + window_size = aggregation_function.window_size + window_size_set = True else: nested_agg = { key[len("aggregation_function_") :]: value @@ -177,7 +187,8 @@ def __init__( aggregation_function = params.get( "aggregation_function", aggregation_function ) - window_size = params["window_size"] + + window_size = params["window_size"] if not window_size_set else window_size pad = params.get("pad", True) super().__init__("WindowAggregation", aggregation_function) self.parameters["window_size"] = (4, 128) @@ -234,6 +245,7 @@ def estimate_peak_memory_bytes(self, input_stats: RepresentationStats) -> dict: effective_seq_len = in_shape[0] in_numel = effective_seq_len * self._rest_numel(in_shape) + output_bytes = self.estimate_output_memory_bytes(input_stats) one_instance_bytes = in_numel * np.dtype(self.data_type).itemsize input_bytes = one_instance_bytes * input_stats.num_instances @@ -269,6 +281,7 @@ def execute(self, modality): ) original_lengths.append(windowed_instance.shape[0]) windowed_data.append(windowed_instance) + if self.pad and not isinstance(windowed_data, np.ndarray): target_length = max(original_lengths) @@ -322,7 +335,17 @@ def window_aggregate_single_level(self, instance, new_length): full_result = self.aggregation_function.compute_feature(full_batches) if tail.size: tail_result = self.aggregation_function.compute_feature(tail) - full_result = np.concatenate([full_result, tail_result[None, :]]) + if tail_result.shape == full_result.shape[1:]: + tail_row = tail_result + else: + tail_row = np.zeros_like(full_result[0]) + slices = tuple( + slice(0, min(d, s)) + for d, s in zip(tail_row.shape, tail_result.shape) + ) + tail_row[slices] = tail_result[slices] + full_result = np.concatenate([full_result, tail_row[None, :]]) + return full_result def window_aggregate_nested_level(self, instance, new_length): diff --git a/src/main/python/tests/scuro/data_generator.py b/src/main/python/tests/scuro/data_generator.py index a51ea510ea3..937fd622d85 100644 --- a/src/main/python/tests/scuro/data_generator.py +++ b/src/main/python/tests/scuro/data_generator.py @@ -75,6 +75,7 @@ def __init__(self, indices, chunk_size, modality_type, data, data_type, metadata self.stats = VideoStats( 30, max(d.shape[0] for d in data), + sum(d.shape[0] for d in data) / len(data), max(d.shape[1] for d in data), max(d.shape[2] for d in data), max(d.shape[3] for d in data), @@ -88,6 +89,8 @@ def __init__(self, indices, chunk_size, modality_type, data, data_type, metadata sum(len(d) for d in data) / len(data), (max(len(d) for d in data),), True, + sum(len(d) for d in data) / len(data), + 16000, ) elif modality_type == ModalityType.IMAGE: self.stats = ImageStats(