From c6d0d2cf877110f083c7412af5afc0701e79a36a Mon Sep 17 00:00:00 2001 From: Christina Dionysio Date: Tue, 23 Jun 2026 11:30:26 +0200 Subject: [PATCH 1/3] reduce resources used in scuro tests --- src/main/python/tests/scuro/test_hp_tuner.py | 9 ++- .../tests/scuro/test_multimodal_join.py | 8 +-- .../tests/scuro/test_unimodal_optimizer.py | 36 ++++------ .../scuro/test_unimodal_representations.py | 70 +++++-------------- .../tests/scuro/test_window_operations.py | 22 +++--- 5 files changed, 53 insertions(+), 92 deletions(-) diff --git a/src/main/python/tests/scuro/test_hp_tuner.py b/src/main/python/tests/scuro/test_hp_tuner.py index c418cefcae8..03ffc1c2dad 100644 --- a/src/main/python/tests/scuro/test_hp_tuner.py +++ b/src/main/python/tests/scuro/test_hp_tuner.py @@ -24,6 +24,7 @@ import numpy as np +from systemds.scuro import Mean from systemds.scuro.drsearch.multimodal_optimizer import MultimodalOptimizer from systemds.scuro.representations.average import Average from systemds.scuro.representations.color_histogram import ColorHistogram @@ -128,7 +129,7 @@ def run_hp_for_modality( { ModalityType.TEXT: [BoW, W2V], ModalityType.AUDIO: [Spectrogram, ZeroCrossing, Spectral, Pitch], - ModalityType.TIMESERIES: [ResNet], + ModalityType.TIMESERIES: [Mean], ModalityType.VIDEO: [ResNet], ModalityType.IMAGE: [ResNet, ColorHistogram], ModalityType.EMBEDDING: [], @@ -136,7 +137,9 @@ def run_hp_for_modality( ): registry = Registry() registry._fusion_operators = [LSTM] - unimodal_optimizer = UnimodalOptimizer(modalities, self.tasks, False) + unimodal_optimizer = UnimodalOptimizer( + modalities, self.tasks, False, k=2, max_num_workers=1 + ) unimodal_optimizer.optimize() hp = HyperparameterTuner( @@ -165,7 +168,7 @@ def run_hp_for_modality( ) else: - hp.tune_unimodal_representations(max_eval_per_rep=10) + hp.tune_unimodal_representations(max_eval_per_rep=2) assert len(hp.optimization_results.results) == len(self.tasks) if multimodal: diff --git a/src/main/python/tests/scuro/test_multimodal_join.py b/src/main/python/tests/scuro/test_multimodal_join.py index 14ce9376be1..4a53129db33 100644 --- a/src/main/python/tests/scuro/test_multimodal_join.py +++ b/src/main/python/tests/scuro/test_multimodal_join.py @@ -47,7 +47,7 @@ def setUpClass(cls): cls.num_instances = 4 cls.indices = np.array(range(cls.num_instances)) cls.audio_data, cls.audio_md = ModalityRandomDataGenerator().create_audio_data( - cls.num_instances, 32000 + cls.num_instances, 500 ) cls.video_data, cls.video_md = ( @@ -104,7 +104,7 @@ def _prepare_data(self, l_chunk_size=None, r_chunk_size=None): l_chunk_size, ModalityType.VIDEO, copy.deepcopy(self.video_data), - np.float32, + np.uint8, copy.deepcopy(self.video_md), ) ) @@ -118,9 +118,7 @@ def _join(self, left_modality, right_modality, window_size): left_modality.join( right_modality, JoinCondition("timestamp", "timestamp", "<") ) - .apply_representation( - ResNet(layer_name="layer1.0.conv2", model_name="ResNet18") - ) + .apply_representation(ResNet()) .window_aggregation(window_size, "mean") .combine("concat") ) diff --git a/src/main/python/tests/scuro/test_unimodal_optimizer.py b/src/main/python/tests/scuro/test_unimodal_optimizer.py index ad824b0335f..11c3aa29ea6 100644 --- a/src/main/python/tests/scuro/test_unimodal_optimizer.py +++ b/src/main/python/tests/scuro/test_unimodal_optimizer.py @@ -23,17 +23,17 @@ import unittest import numpy as np -from systemds.scuro.representations.clip import CLIPText, CLIPVisual from systemds.scuro.representations.color_histogram import ColorHistogram from systemds.scuro.drsearch.operator_registry import Registry from systemds.scuro.drsearch.unimodal_optimizer import UnimodalOptimizer -from systemds.scuro.representations.mfcc import MFCC +from systemds.scuro.representations.covarep_audio_features import ZeroCrossing + +from systemds.scuro.representations.resnet import ResNet from systemds.scuro.representations.mel_spectrogram import MelSpectrogram -from systemds.scuro.representations.word2vec import W2V +from systemds.scuro.representations.tfidf import TfIdf from systemds.scuro.representations.bow import BoW from systemds.scuro.representations.bert import Bert from systemds.scuro.modality.unimodal_modality import UnimodalModality -from systemds.scuro.representations.resnet import ResNet from tests.scuro.data_generator import ( ModalityRandomDataGenerator, TestDataLoader, @@ -53,6 +53,15 @@ from unittest.mock import patch +LIGHTWEIGHT_REGISTRY = { + ModalityType.TEXT: [BoW, TfIdf], + ModalityType.AUDIO: [MelSpectrogram, ZeroCrossing], + ModalityType.VIDEO: [ResNet], + ModalityType.IMAGE: [ColorHistogram], + ModalityType.TIMESERIES: [], + ModalityType.EMBEDDING: [], +} + class TestUnimodalRepresentationOptimizer(unittest.TestCase): data_generator = None @@ -198,24 +207,7 @@ def optimize_unimodal_representation_for_modality(self, modalities): with patch.object( Registry, "_representations", - { - ModalityType.TEXT: [ - W2V, - BoW, - Bert, - CLIPText, - ], - ModalityType.AUDIO: [ - MFCC, - MelSpectrogram, - ], - ModalityType.VIDEO: [ - ResNet, - CLIPVisual, - ], - ModalityType.IMAGE: [ColorHistogram, CLIPVisual], - ModalityType.EMBEDDING: [], - }, + LIGHTWEIGHT_REGISTRY, ): registry = Registry() diff --git a/src/main/python/tests/scuro/test_unimodal_representations.py b/src/main/python/tests/scuro/test_unimodal_representations.py index 2f474be7fd9..59bef40ef64 100644 --- a/src/main/python/tests/scuro/test_unimodal_representations.py +++ b/src/main/python/tests/scuro/test_unimodal_representations.py @@ -19,18 +19,10 @@ # # ------------------------------------------------------------- -import time import unittest import copy import numpy as np -from systemds.scuro.representations.bert import ( - Bert, - ALBERT, - ELECTRA, - RoBERTa, - DistillBERT, -) -from systemds.scuro.representations.clip import CLIPVisual, CLIPText + from systemds.scuro.representations.bow import BoW from systemds.scuro.representations.covarep_audio_features import ( Spectral, @@ -38,20 +30,13 @@ Pitch, ZeroCrossing, ) -from systemds.scuro.representations.glove import GloVe -from systemds.scuro.representations.wav2vec import Wav2Vec +from systemds.scuro.representations.color_histogram import ColorHistogram from systemds.scuro.representations.spectrogram import Spectrogram -from systemds.scuro.representations.window_aggregation import WindowAggregation -from systemds.scuro.representations.word2vec import W2V from systemds.scuro.representations.tfidf import TfIdf -from systemds.scuro.representations.x3d import X3D -from systemds.scuro.representations.x3d import I3D -from systemds.scuro.representations.color_histogram import ColorHistogram +from systemds.scuro.representations.resnet import ResNet from systemds.scuro.modality.unimodal_modality import UnimodalModality from systemds.scuro.representations.mel_spectrogram import MelSpectrogram from systemds.scuro.representations.mfcc import MFCC -from systemds.scuro.representations.resnet import ResNet -from systemds.scuro.representations.swin_video_transformer import SwinVideoTransformer from tests.scuro.data_generator import ( TestDataLoader, ModalityRandomDataGenerator, @@ -72,7 +57,6 @@ ZeroCrossingRate, BandpowerFFT, ) -from systemds.scuro.representations.vgg import VGG19 class TestUnimodalRepresentations(unittest.TestCase): @@ -103,12 +87,11 @@ def _create_audio_modality(self, signal_length=1000): return audio def test_audio_representation_transform_output_shapes(self): - audio = self._create_audio_modality() + audio = self._create_audio_modality(signal_length=200) audio_representations = [ (MFCC(), (2, 12)), (MelSpectrogram(), (2, 128)), (Spectrogram(), (2, 1025)), - (Wav2Vec(), (1, None)), (Spectral(), (2, 4)), (ZeroCrossing(), (2, None)), (RMSE(), (2, None)), @@ -138,14 +121,13 @@ def test_audio_representations(self): MFCC(), MelSpectrogram(), Spectrogram(), - Wav2Vec(), Spectral(), ZeroCrossing(), RMSE(), Pitch(), ] audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( - self.num_instances, 1000 + self.num_instances, 200 ) audio = UnimodalModality( @@ -181,7 +163,7 @@ def test_timeseries_representations(self): BandpowerFFT(), ] ts_data, ts_md = ModalityRandomDataGenerator().create_timeseries_data( - self.num_instances, 1000 + self.num_instances, 100 ) ts = UnimodalModality( @@ -201,10 +183,8 @@ def test_timeseries_representations(self): assert (ts.data[i] == original_data[i]).all() def test_image_representations(self): - image_representations = [ColorHistogram(), CLIPVisual(), ResNet()] - image_data, image_md = ModalityRandomDataGenerator().create_visual_modality( - self.num_instances, 1 + self.num_instances, 1, height=8, width=8 ) image = UnimodalModality( @@ -213,10 +193,9 @@ def test_image_representations(self): ) ) - for representation in image_representations: - r = image.apply_representation(representation) - assert r.data is not None - assert len(r.data) == self.num_instances + r = image.apply_representation(ColorHistogram()) + assert r.data is not None + assert len(r.data) == self.num_instances # def test_video_representations(self): # video_representations = [ @@ -241,47 +220,34 @@ def test_image_representations(self): # assert len(r.data) == self.num_instances def test_text_representations(self): - test_representations = [ - CLIPText(), - Bert(), - BoW(2, 2), - TfIdf(), - W2V(), - GloVe(), - ALBERT(), - ELECTRA(), - RoBERTa(), - DistillBERT(), - ] text_data, text_md = ModalityRandomDataGenerator().create_text_data( - self.num_instances, 100 + self.num_instances, 3 ) text = UnimodalModality( TestDataLoader( self.indices, None, ModalityType.TEXT, text_data, str, text_md ) ) - for representation in test_representations: + for representation in [BoW(2, 2), TfIdf()]: r = text.apply_representation(representation) assert r.data is not None assert len(r.data) == self.num_instances def test_chunked_video_representations(self): - video_representations = [ResNet()] video_data, video_md = ModalityRandomDataGenerator().create_visual_modality( - self.num_instances, 25 + self.num_instances, 30 ) video = UnimodalModality( TestDataLoader( self.indices, None, ModalityType.VIDEO, video_data, np.float32, video_md ) ) - for representation in video_representations: - r = video.apply_representation(representation) - assert r.data is not None - assert len(r.data) == self.num_instances - assert len(r.metadata) == self.num_instances + r = video.apply_representation(ResNet(model_name="ResNet18")) + assert r.data is not None + assert len(r.data) == self.num_instances + assert len(r.metadata) == self.num_instances +# TODO: add unit tests for the other representations if __name__ == "__main__": unittest.main() diff --git a/src/main/python/tests/scuro/test_window_operations.py b/src/main/python/tests/scuro/test_window_operations.py index 2eaf5985db1..a8c86374801 100644 --- a/src/main/python/tests/scuro/test_window_operations.py +++ b/src/main/python/tests/scuro/test_window_operations.py @@ -39,13 +39,13 @@ class TestWindowOperations(unittest.TestCase): @classmethod def setUpClass(cls): - cls.num_instances = 40 + cls.num_instances = 4 cls.data_generator = ModalityRandomDataGenerator() cls.aggregations = ["mean", "sum", "max", "min"] def test_static_window(self): num_windows = 5 - data, md = self.data_generator.create_visual_modality(self.num_instances, 50) + data, md = self.data_generator.create_visual_modality(self.num_instances, 10) modality = UnimodalModality( TestDataLoader( [i for i in range(0, self.num_instances)], @@ -63,7 +63,7 @@ def test_static_window(self): def test_dynamic_window(self): num_windows = 5 - data, md = self.data_generator.create_visual_modality(self.num_instances, 50) + data, md = self.data_generator.create_visual_modality(self.num_instances, 10) modality = UnimodalModality( TestDataLoader( [i for i in range(0, self.num_instances)], @@ -93,19 +93,21 @@ def test_window_operations_on_text_representations(self): self.run_window_aggregation_for_modality(ModalityType.TEXT, window_size) def run_window_aggregation_for_modality(self, modality_type, window_size): - r = self.data_generator.create1DModality(40, 5000, modality_type) + r = self.data_generator.create1DModality(self.num_instances, 200, modality_type) for aggregation in self.aggregations: windowed_modality = r.window_aggregation(window_size, aggregation) self.verify_window_operation(aggregation, r, windowed_modality, window_size) def test_window_aggregation_on_3d_modality(self): - data, _ = self.data_generator.create_3d_modality(40, (100, 28, 28)) + data, _ = self.data_generator.create_3d_modality( + self.num_instances, (100, 8, 8) + ) embedding_modality = TransformedModality( self.data_generator, "test_transformation" ) embedding_modality.data = data - embedding_modality.stats = RepresentationStats(40, (100, 28, 28)) + embedding_modality.stats = RepresentationStats(self.num_instances, (100, 8, 8)) num_windows = 10 for window_operator in [ @@ -115,17 +117,17 @@ def test_window_aggregation_on_3d_modality(self): ]: stats = window_operator.get_output_stats(embedding_modality.stats) assert stats.num_instances == self.num_instances - assert stats.output_shape == (num_windows, 28, 28) + assert stats.output_shape == (num_windows, 8, 8) windowed_modality = embedding_modality.context(window_operator) def test_window_aggregation_on_2d_modality(self): - data, _ = self.data_generator.create_2d_modality(40, (100, 28)) + data, _ = self.data_generator.create_2d_modality(self.num_instances, (100, 8)) embedding_modality = TransformedModality( self.data_generator, "test_transformation" ) embedding_modality.data = data - embedding_modality.stats = RepresentationStats(40, (100, 28)) + embedding_modality.stats = RepresentationStats(self.num_instances, (100, 8)) num_windows = 10 for window_operator in [ @@ -135,7 +137,7 @@ def test_window_aggregation_on_2d_modality(self): ]: stats = window_operator.get_output_stats(embedding_modality.stats) assert stats.num_instances == self.num_instances - assert stats.output_shape == (num_windows, 28) + assert stats.output_shape == (num_windows, 8) windowed_modality = embedding_modality.context(window_operator) From fab8f07f57d7a734d8d5eb80099f5f730159b0d9 Mon Sep 17 00:00:00 2001 From: Christina Dionysio Date: Wed, 1 Jul 2026 13:54:17 +0200 Subject: [PATCH 2/3] add fine grained window operators --- .../scuro/dataloader/timeseries_loader.py | 21 +++-- .../systemds/scuro/dataloader/video_loader.py | 12 ++- .../scuro/drsearch/hyperparameter_tuner.py | 2 + .../systemds/scuro/drsearch/node_executor.py | 53 +++++++++++-- .../scuro/drsearch/operator_registry.py | 78 ++++++++++++++++++- .../scuro/drsearch/representation_dag.py | 1 + .../python/systemds/scuro/drsearch/task.py | 4 +- .../scuro/drsearch/unimodal_optimizer.py | 67 ++++++++++++---- .../python/systemds/scuro/modality/type.py | 22 ++++++ .../systemds/scuro/representations/lstm.py | 2 +- .../scuro/representations/mel_spectrogram.py | 9 +++ .../systemds/scuro/representations/mfcc.py | 41 ++++++---- .../scuro/representations/mlp_averaging.py | 21 +---- .../multimodal_attention_fusion.py | 7 ++ .../scuro/representations/spectrogram.py | 8 ++ .../timeseries_representations.py | 59 +++++++++----- .../systemds/scuro/representations/wav2vec.py | 5 +- .../representations/window_aggregation.py | 27 ++++++- 18 files changed, 353 insertions(+), 86 deletions(-) diff --git a/src/main/python/systemds/scuro/dataloader/timeseries_loader.py b/src/main/python/systemds/scuro/dataloader/timeseries_loader.py index 6b697e6a7ad..8e6c11316b0 100644 --- a/src/main/python/systemds/scuro/dataloader/timeseries_loader.py +++ b/src/main/python/systemds/scuro/dataloader/timeseries_loader.py @@ -36,6 +36,8 @@ class TimeseriesStats: num_signals: int output_shape: tuple output_shape_is_known: bool + avg_length: float + sampling_rate: int class TimeseriesLoader(BaseLoader): @@ -49,15 +51,14 @@ def __init__( sampling_rate: Optional[int] = None, normalize: bool = True, file_format: str = "npy", + modality_type: Optional[ModalityType] = ModalityType.TIMESERIES, ): - super().__init__( - source_path, indices, data_type, chunk_size, ModalityType.TIMESERIES - ) + super().__init__(source_path, indices, data_type, chunk_size, modality_type) self.signal_names = signal_names self.sampling_rate = sampling_rate self.normalize = normalize self.file_format = file_format.lower() - self.stats = self.get_stats(source_path) + self.stats = self.get_stats(source_path, sampling_rate) if self.file_format not in ["npy", "mat", "hdf5", "txt"]: raise ValueError(f"Unsupported file format: {self.file_format}") @@ -148,17 +149,25 @@ def _load_csv_with_header(self, file: str, delimiter: str = None) -> np.ndarray: data = df[selected].to_numpy(dtype=self._data_type) return data - def get_stats(self, source_path: str): + def get_stats(self, source_path: str, sampling_rate: int): self.file_sanity_check(source_path) max_length = 0 num_instances = 0 num_signals = 0 + avg_length = 0 for file_name in self.indices: file = source_path + file_name + "." + self.file_format data = self._load_data(file) max_length = max(max_length, data.shape[0]) + avg_length += data.shape[0] num_instances += 1 num_signals = max(num_signals, data.shape[1]) return TimeseriesStats( - max_length, num_instances, num_signals, (max_length,), True + max_length, + num_instances, + num_signals, + (max_length,), + True, + avg_length / num_instances, + sampling_rate, ) diff --git a/src/main/python/systemds/scuro/dataloader/video_loader.py b/src/main/python/systemds/scuro/dataloader/video_loader.py index b35a22a8b66..b4a5409e66c 100644 --- a/src/main/python/systemds/scuro/dataloader/video_loader.py +++ b/src/main/python/systemds/scuro/dataloader/video_loader.py @@ -33,6 +33,7 @@ class VideoStats: fps: int max_length: int + avg_length: float max_width: int max_height: int max_channels: int @@ -117,6 +118,7 @@ def get_stats(self, source_path: str): max_height = 0 max_num_channels = 0 num_instances = 0 + avg_length = 0 for file in os.listdir(source_path): file_name = file.split(".")[0] if file_name not in self.indices: @@ -128,10 +130,11 @@ def get_stats(self, source_path: str): width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) num_channels = 3 - max_length = max(max_length, length) - max_width = max(max_width, width) - max_height = max(max_height, height) - max_num_channels = max(max_num_channels, num_channels) + max_length = int((max_length + length) / 2) + avg_length += length + max_width = int((max_width + width) / 2) + max_height = int((max_height + height) / 2) + max_num_channels = int((max_num_channels + num_channels) / 2) num_instances += 1 num_total_instances = num_instances num_instances = ( @@ -142,6 +145,7 @@ def get_stats(self, source_path: str): return VideoStats( fps, max_length, + avg_length / num_instances, max_width, max_height, max_num_channels, diff --git a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py index 0305f613b63..4f04bffcbe5 100644 --- a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py +++ b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py @@ -233,6 +233,8 @@ def __init__(self, tasks, modalities): def add_result(self, results): # TODO: Check if order of best results matters (deterministic) for result in results: + if result is None: + continue if result.mm_opt: self.results[result.task_name]["mm_results"].append(result) else: diff --git a/src/main/python/systemds/scuro/drsearch/node_executor.py b/src/main/python/systemds/scuro/drsearch/node_executor.py index 4b9b2acc658..a6a7ffe2ca4 100644 --- a/src/main/python/systemds/scuro/drsearch/node_executor.py +++ b/src/main/python/systemds/scuro/drsearch/node_executor.py @@ -234,6 +234,7 @@ def _execute_multiple_reps_for_leaf_dependencies( def _execute_node_worker(node, input_mods, task, rep_cache, gpu_id): + start_time = time.perf_counter() if gpu_id is not None: device = torch.device(f"cuda:{gpu_id}") torch.cuda.set_device(device) @@ -287,23 +288,28 @@ def _run_node_op(): ) else: result = _run_node_op() - + end_time = time.perf_counter() + pid = os.getpid() return { "result": result, "peak_bytes": peak_delta_bytes, "peak_abs_rss_bytes": peak_abs_rss, "gpu_peak_bytes": gpu_peak_bytes, "operation_name": operation_name, + "start_time": start_time, + "end_time": end_time, + "pid": pid, } def _execute_task_worker( task_node_id: str, task: Any, - data: Any, + modality: Any, gpu_id: Optional[int], + aggregation: AggregatedRepresentation = None, ) -> Dict[str, Any]: - + start_time = time.perf_counter() if DEBUG: print(f"Executing task {task_node_id} on GPU {gpu_id}") if gpu_id is not None: @@ -316,6 +322,14 @@ def _execute_task_worker( def _run_task(): start = time.perf_counter() + if aggregation is not None: + data = ( + aggregation.operation(params=aggregation.parameters) + .transform(modality) + .data + ) + else: + data = modality.data scores = task.run(data) end = time.perf_counter() return scores, end - start @@ -336,12 +350,16 @@ def _run_task(): ) else: result = _run_task() - + end_time = time.perf_counter() + pid = os.getpid() return { "scores": result[0], "task_time": result[1], "peak_bytes": peak_delta_bytes, "gpu_peak_bytes": gpu_peak_bytes, + "start_time": start_time, + "end_time": end_time, + "pid": pid, } @@ -385,6 +403,9 @@ def __init__( checkpoint_every=1, resume=False, ) + self.statistics = {} + self.statistics["worker_stats"] = {} + self.statistics["node_stats"] = {} def _shm_names_for_submit( self, parent_node_ids: List[str], payload_data: Any @@ -451,6 +472,10 @@ def submit_node(node_id: str): ] if self._is_task_node(node): + # potentially batch task nodes and then execute them together + # by the the same task type (index, gpu vs cpu) + # either enough nodes to batch or enough time to batch whatever happens first + task_result = ResultEntry( dag=self._get_dag_from_node_ids(node_id), representation_time=parent_results[0].transform_time, @@ -458,17 +483,20 @@ def submit_node(node_id: str): task_results[node_id] = task_result task_idx = int(node.parameters.get("_task_idx", 0)) payload_data = ( - self.modalities[0].data + self.modalities[0] if parent_results is None - else parent_results[0].data + else parent_results[0] ) retained = self._retain_for_submit(parent_node_ids, payload_data) + aggregation = node.aggregation + future = executor.submit( _execute_task_worker, node_id, self.tasks[task_idx], payload_data, gpu_id, + aggregation, ) else: payload_data = ( @@ -538,6 +566,14 @@ def submit_new_ready_nodes(): peak_bytes = result["peak_bytes"] gpu_peak_bytes = result["gpu_peak_bytes"] node = self.scheduler.mapping[node_id] + self.statistics["worker_stats"][result["pid"]] = { + "start_time": result["start_time"], + "end_time": result["end_time"], + } + self.statistics["node_stats"][node_id] = { + "start_time": result["start_time"], + "end_time": result["end_time"], + } if self._is_task_node(node): task_results[node_id].task_time = result["task_time"] task_results[node_id].train_score = result["scores"][ @@ -594,7 +630,10 @@ def submit_new_ready_nodes(): self.result_cache.cleanup_all() self._cleanup_leaf_shared_memory() - return {"task_results": list(task_results.values())} + return { + "task_results": list(task_results.values()), + "statistics": self.statistics, + } def _handle_modality_result( self, diff --git a/src/main/python/systemds/scuro/drsearch/operator_registry.py b/src/main/python/systemds/scuro/drsearch/operator_registry.py index bc9bd406a8f..4c5641fdd91 100644 --- a/src/main/python/systemds/scuro/drsearch/operator_registry.py +++ b/src/main/python/systemds/scuro/drsearch/operator_registry.py @@ -19,7 +19,7 @@ # # ------------------------------------------------------------- from typing import Union, List - +import math from systemds.scuro.modality.type import ModalityType from systemds.scuro.representations.representation import Representation @@ -57,6 +57,24 @@ def set_representations(self, modality_type, representations): else: self._representations[modality_type] = [representations] + def set_context_operators(self, modality_type, context_operators): + if isinstance(context_operators, list): + self._context_operators[modality_type] = context_operators + else: + self._context_operators[modality_type] = [context_operators] + + def set_context_representation_operators( + self, modality_type, context_representation_operators + ): + if isinstance(context_representation_operators, list): + self._context_representation_operators[modality_type] = ( + context_representation_operators + ) + else: + self._context_representation_operators[modality_type] = [ + context_representation_operators + ] + def add_representation( self, representation: Representation, modality: ModalityType ): @@ -139,6 +157,64 @@ def get_representation_by_name(self, representation_name, modality_type): def get_context_representations(self, modality_type): return self._context_representation_operators[modality_type] + def get_context_lenghts_for_modality(self, modality_type, statistics): + if modality_type == ModalityType.AUDIO: + window_lengths = [ + 0.010, + 0.020, + 0.050, + 0.075, + 0.100, + 0.5, + 1, + 2, + 5, + 10, + ] # seconds + + if ( + modality_type == ModalityType.TIMESERIES + or modality_type == ModalityType.PHYSIOLOGICAL + ): + window_lengths = [0.05, 0.1, 0.5, 0.75, 1, 2, 5, 10, 30, 60] # seconds + + if modality_type == ModalityType.VIDEO: + window_lengths = [0.5, 1, 2, 5, 10] # seconds + + if ( + modality_type == ModalityType.AUDIO + or modality_type == ModalityType.TIMESERIES + or modality_type == ModalityType.PHYSIOLOGICAL + ): + max_length_in_seconds = statistics.max_length / statistics.sampling_rate + window_lengths = [ + length for length in window_lengths if length <= max_length_in_seconds + ] + + effective_window_lenghts = [ + statistics.sampling_rate * length for length in window_lengths + ] + num_windows = [ + math.ceil(statistics.avg_length / length) + for length in effective_window_lenghts + ] + return effective_window_lenghts, num_windows + + if modality_type == ModalityType.VIDEO: + max_length_in_seconds = statistics.max_length / statistics.fps + window_lengths = [ + length for length in window_lengths if length <= max_length_in_seconds + ] + + effective_window_lenghts = [ + statistics.fps * length for length in window_lengths + ] + num_windows = [ + math.ceil(statistics.avg_length / length) + for length in effective_window_lenghts + ] + return effective_window_lenghts, num_windows + def register_representation(modalities: Union[ModalityType, List[ModalityType]]): """ diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py b/src/main/python/systemds/scuro/drsearch/representation_dag.py index b1d5835ad3f..a19c44396fd 100644 --- a/src/main/python/systemds/scuro/drsearch/representation_dag.py +++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py @@ -80,6 +80,7 @@ class RepresentationNode: representation_index: int = None parameters: Dict[str, Any] = field(default_factory=dict) gpu_id: int = None + aggregation: AggregatedRepresentation = None @dataclass diff --git a/src/main/python/systemds/scuro/drsearch/task.py b/src/main/python/systemds/scuro/drsearch/task.py index 404624ec3a5..e74e791794c 100644 --- a/src/main/python/systemds/scuro/drsearch/task.py +++ b/src/main/python/systemds/scuro/drsearch/task.py @@ -259,7 +259,9 @@ def run(self, data): val_y = self._gather_by_indices(self.labels, fold_val_indices) self._run_fold(model, train_X, train_y, val_X, val_y, test_X, test_y) - + if hasattr(model, "clean_up"): + model.clean_up() + del model return [ self.train_scores.compute_averages(), self.val_scores.compute_averages(), diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py index 8dc2e1a082f..a67cbe12029 100644 --- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py +++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py @@ -20,6 +20,8 @@ # ------------------------------------------------------------- import copy import pickle +import csv +from pathlib import Path import time from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass @@ -39,6 +41,7 @@ ) from systemds.scuro.modality.modality import Modality from systemds.scuro.drsearch.operator_registry import Registry +from systemds.scuro.representations.window_aggregation import WindowAggregation from systemds.scuro.utils.checkpointing import CheckpointManager from systemds.scuro.drsearch.representation_dag import ( RepresentationDag, @@ -63,6 +66,8 @@ def __init__( resume: bool = False, max_num_workers: int = -1, enable_checkpointing: bool = True, + enable_execution_profile: bool = False, + execution_profile_path: Optional[str] = None, ): self.enable_checkpointing = enable_checkpointing self.modalities = modalities @@ -92,7 +97,8 @@ def __init__( self.operator_performance = UnimodalResults( modalities, tasks, debug, True, k, self.metric_name ) - + self.enable_execution_profile = enable_execution_profile + self.execution_profile_path = execution_profile_path self._tasks_require_same_dims = True self.expected_dimensions = tasks[0].expected_dim @@ -242,7 +248,7 @@ def optimize(self): for modality in self.modalities: try: - local_result = self._process_modality( + local_result, execution_time = self._process_modality( modality, ( int( @@ -274,6 +280,7 @@ def optimize(self): self.operator_performance.results, {} ) raise + return execution_time def _expand_dags_with_task_roots( self, dags: List[RepresentationDag] @@ -281,7 +288,16 @@ def _expand_dags_with_task_roots( expanded_dags: List[RepresentationDag] = [] for dag in dags: - root_id = dag.root_node_id + dag = copy.deepcopy(dag) + root_node = dag.get_node_by_id(dag.root_node_id) + if root_node and root_node.operation == AggregatedRepresentation: + aggregation = root_node + dag.nodes = [n for n in dag.nodes if n.node_id != root_node.node_id] + root_id = aggregation.inputs[0] + dag.root_node_id = root_id + else: + aggregation = None + root_id = dag.root_node_id for task_idx, _ in enumerate(self.tasks): task_node_id = f"task_{root_id}_{task_idx}" @@ -294,6 +310,7 @@ def _expand_dags_with_task_roots( "_task_idx": task_idx, "_dag_root_id": root_id, }, + aggregation=aggregation, ) task_root_dag = RepresentationDag( @@ -326,12 +343,18 @@ def _process_modality(self, modality, skip_remaining: int = 0, scheduler=None): self.result_path, enable_checkpointing=self.enable_checkpointing, ) - + start_time = time.perf_counter() exec_out = node_executor.run() + end_time = time.perf_counter() task_results = exec_out["task_results"] for task_result in task_results: local_results.add_task_result(task_result, dags) + statistics = exec_out["statistics"] + for worker_stat in statistics["worker_stats"]: + local_results.add_worker_stat(worker_stat, modality.modality_id) + for node_stat in statistics["node_stats"]: + local_results.add_node_stat(node_stat, modality.modality_id) if self.save_all_results: timestr = time.strftime("%Y%m%d-%H%M%S") @@ -339,7 +362,7 @@ def _process_modality(self, modality, skip_remaining: int = 0, scheduler=None): with open(file_name, "wb") as f: pickle.dump(local_results.results, f) - return local_results + return local_results, end_time - start_time def _build_execution_dags_for_modality( self, modality: Modality, skip_remaining: int = 0 @@ -354,6 +377,7 @@ def _build_execution_dags_for_modality( if ( modality.modality_type == ModalityType.TIMESERIES or modality.modality_type == ModalityType.AUDIO + or modality.modality_type == ModalityType.PHYSIOLOGICAL ): dags.extend( self.temporal_context_operators( @@ -509,7 +533,6 @@ def _build_modality_dag( def _aggregation_needed(self, dag: RepresentationDag) -> bool: input_stats = {} - # TODO: adapt this to the fusion of multiple modalities, list of input stats needed for modality in self.modalities: if modality.modality_id == dag.nodes[0].modality_id: input_stats[dag.nodes[0].node_id] = modality.stats @@ -584,16 +607,26 @@ def temporal_context_operators(self, modality, builder, leaf_id): modality.modality_type ) context_operators = self._get_context_operators(modality.modality_type) - + window_lengths, num_windows = ( + self.operator_registry.get_context_lenghts_for_modality( + modality.modality_type, modality.stats + ) + ) dags = [] for agg in aggregators: for context_operator in context_operators: - context_node_id = builder.create_operation_node( - context_operator, - [leaf_id], - context_operator(agg()).get_current_parameters(), - ) - dags.append(builder.build(context_node_id)) + for window_size, num_window in zip(window_lengths, num_windows): + context_operator_instance = context_operator(agg()) + if hasattr(context_operator, "num_windows"): + context_operator_instance.num_windows = num_window + elif hasattr(context_operator_instance, "window_size"): + context_operator_instance.window_size = window_size + context_node_id = builder.create_operation_node( + context_operator, + [leaf_id], + context_operator_instance.get_current_parameters(), + ) + dags.append(builder.build(context_node_id)) return dags @@ -619,6 +652,8 @@ def __init__( for modality in self.modality_ids: self.results[modality] = {task_name: [] for task_name in self.task_names} self.cache[modality] = {task_name: [] for task_name in self.task_names} + self.worker_stats = {} + self.node_stats = {} def add_task_result(self, task_result: ResultEntry, dags: List[RepresentationDag]): dag_id = task_result.dag.dag_id @@ -750,6 +785,12 @@ def get_k_best_results( return results, cache + def add_worker_stat(self, worker_stats, modality_id): + self.worker_stats[modality_id] = worker_stats + + def add_node_stat(self, node_stats, modality_id): + self.node_stats[modality_id] = node_stats + def get_dag_by_id(dags: List[RepresentationDag], dag_id: int) -> RepresentationDag: for dag in dags: diff --git a/src/main/python/systemds/scuro/modality/type.py b/src/main/python/systemds/scuro/modality/type.py index 0493edf5bdd..a648044562a 100644 --- a/src/main/python/systemds/scuro/modality/type.py +++ b/src/main/python/systemds/scuro/modality/type.py @@ -66,6 +66,11 @@ class ModalitySchemas: "num_columns": "integer", } + PHYSIOLOGICAL_SCHEMA = { + **TEMPORAL_BASE_SCHEMA, + "num_columns": "integer", + } + _metadata_handlers = {} @classmethod @@ -184,6 +189,22 @@ def handle_timeseries_metadata(md, data): return md +@ModalitySchemas.register_metadata_handler("PHYSIOLOGICAL") +def handle_physiological_metadata(md, data): + new_frequency = calculate_new_frequency(len(data), md["length"], md["frequency"]) + md.update( + { + "length": len(data), + "num_columns": ( + data.shape[1] if isinstance(data, np.ndarray) and data.ndim > 1 else 1 + ), + "frequency": new_frequency, + "timestamp": create_timestamps(new_frequency, len(data)), + } + ) + return md + + @ModalitySchemas.register_metadata_handler("TEXT") def handle_text_metadata(md, data): md.update({"length": len(data)}) @@ -205,6 +226,7 @@ class ModalityType(Flag): "VIDEO": "create_video_metadata", "IMAGE": "create_image_metadata", "TIMESERIES": "create_ts_metadata", + "PHYSIOLOGICAL": "create_ts_metadata", "EMBEDDING": "create_embedding_metadata", } diff --git a/src/main/python/systemds/scuro/representations/lstm.py b/src/main/python/systemds/scuro/representations/lstm.py index 7243b65966a..c15776284ce 100644 --- a/src/main/python/systemds/scuro/representations/lstm.py +++ b/src/main/python/systemds/scuro/representations/lstm.py @@ -25,7 +25,7 @@ from torch import nn from torch.utils.data import DataLoader, TensorDataset from typing import List, Dict, Any -from systemds.scuro.utils.static_variables import get_device, get_device_for_model +from systemds.scuro.utils.static_variables import get_device import numpy as np from systemds.scuro.modality.modality import Modality diff --git a/src/main/python/systemds/scuro/representations/mel_spectrogram.py b/src/main/python/systemds/scuro/representations/mel_spectrogram.py index 6d378806475..3cf25b44c4a 100644 --- a/src/main/python/systemds/scuro/representations/mel_spectrogram.py +++ b/src/main/python/systemds/scuro/representations/mel_spectrogram.py @@ -36,6 +36,14 @@ PY_LIST_SLOT_BYTES, ) +import warnings + +warnings.filterwarnings( + "ignore", + message=r"n_fft=\d+ is too (?:small|large) for input signal", + module=r"librosa", +) + @register_representation(ModalityType.AUDIO) @register_context_representation_operator(ModalityType.AUDIO) @@ -56,6 +64,7 @@ def __init__(self, n_mels=128, hop_length=512, n_fft=2048, params=None): self.n_mels = int(n_mels) self.hop_length = int(hop_length) self.n_fft = int(n_fft) + self.window_size = self.n_fft def transform(self, modality, aggregation=None): transformed_modality = TransformedModality( diff --git a/src/main/python/systemds/scuro/representations/mfcc.py b/src/main/python/systemds/scuro/representations/mfcc.py index 406fc6616c1..483ae3eef81 100644 --- a/src/main/python/systemds/scuro/representations/mfcc.py +++ b/src/main/python/systemds/scuro/representations/mfcc.py @@ -35,43 +35,54 @@ PY_LIST_HEADER_BYTES, PY_LIST_SLOT_BYTES, ) +import warnings + +warnings.filterwarnings( + "ignore", + message=r"n_fft=\d+ is too (?:small|large) for input signal", + module=r"librosa", +) @register_representation(ModalityType.AUDIO) @register_context_representation_operator(ModalityType.AUDIO) class MFCC(UnimodalRepresentation): - def __init__(self, n_mfcc=12, dct_type=2, n_mels=128, hop_length=512, params=None): + def __init__( + self, n_mfcc=12, dct_type=2, n_mels=128, hop_length=512, n_fft=2048, params=None + ): parameters = { "n_mfcc": [x for x in range(10, 26)], "dct_type": [1, 2, 3], "hop_length": [256, 512, 1024, 2048], "n_mels": [20, 32, 64, 128], - } # TODO + "n_fft": [1024, 2048, 4096], + } + super().__init__("MFCC", ModalityType.TIMESERIES, parameters, False) - # Allow construction from a parameter dict (used by optimizer) if params is not None: n_mfcc = params.get("n_mfcc", n_mfcc) dct_type = params.get("dct_type", dct_type) n_mels = params.get("n_mels", n_mels) hop_length = params.get("hop_length", hop_length) + n_fft = params.get("n_fft", n_fft) self.n_mfcc = int(n_mfcc) self.dct_type = int(dct_type) self.n_mels = int(n_mels) self.hop_length = int(hop_length) + self.n_fft = int(n_fft) + self.window_size = self.n_fft def transform(self, modality, aggregation=None): transformed_modality = TransformedModality( modality, self, self.output_modality_type ) result = [] - - for i, sample in enumerate(modality.data): - sr = modality.metadata[i]["frequency"] - computed_feature = self.compute_feature(sample, sr) + sr = modality.metadata[0]["frequency"] if modality.metadata else 22050 + for sample in modality.data: + computed_feature = self.compute_feature(sample, sr=sr) result.append(computed_feature) - transformed_modality.data = result return transformed_modality @@ -79,20 +90,22 @@ def compute_feature(self, instance, sr=None): if sr is None: sr = 22050 mfcc = librosa.feature.mfcc( - y=np.array(instance), + y=np.asarray(instance, dtype=np.float32), sr=sr, n_mfcc=self.n_mfcc, dct_type=self.dct_type, hop_length=self.hop_length, n_mels=self.n_mels, + n_fft=self.n_fft, ) if mfcc.ndim == 2: - mean = np.mean(mfcc, keepdims=True) - std = np.std(mfcc, keepdims=True) + mean = mfcc.mean(keepdims=True) + std = mfcc.std(keepdims=True) else: - mean = np.mean(mfcc, axis=(1, 2), keepdims=True) - std = np.std(mfcc, axis=(1, 2), keepdims=True) - mfcc = (mfcc - mean) / np.maximum(std, 1e-8) + mean = mfcc.mean(axis=(1, 2), keepdims=True) + std = mfcc.std(axis=(1, 2), keepdims=True) + mfcc -= mean + mfcc /= np.maximum(std, 1e-8) if instance.ndim == 1: return mfcc.T diff --git a/src/main/python/systemds/scuro/representations/mlp_averaging.py b/src/main/python/systemds/scuro/representations/mlp_averaging.py index 46fe04899aa..8c8d67a06ec 100644 --- a/src/main/python/systemds/scuro/representations/mlp_averaging.py +++ b/src/main/python/systemds/scuro/representations/mlp_averaging.py @@ -54,12 +54,8 @@ def __init__(self, output_dim=512, batch_size=32, params=None): "batch_size": [8, 16, 32, 64, 128], } super().__init__("MLPAveraging", parameters) - if params is not None: - self.output_dim = params.get("output_dim", output_dim) - self.batch_size = params.get("batch_size", batch_size) - else: - self.output_dim = output_dim - self.batch_size = batch_size + self.output_dim = output_dim + self.batch_size = batch_size self.device = None self.data_type = np.float32 self.gpu_id = None @@ -74,10 +70,7 @@ def gpu_id(self, gpu_id): self.device = get_device(gpu_id) def get_output_stats(self, input_stats: RepresentationStats) -> RepresentationStats: - if ( - len(input_stats.output_shape) > 1 - and np.prod(input_stats.output_shape) > self.output_dim - ): + if len(input_stats.output_shape) > 1: return RepresentationStats( input_stats.num_instances, (self.output_dim,), @@ -94,13 +87,7 @@ def get_output_stats(self, input_stats: RepresentationStats) -> RepresentationSt ) return RepresentationStats( input_stats.num_instances, - ( - ( - np.prod(input_stats.output_shape) - if np.prod(input_stats.output_shape) < self.output_dim - else self.output_dim - ), - ), + (self.output_dim,), output_shape_is_known=input_stats.output_shape_is_known, ) diff --git a/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py b/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py index 066f3432159..af5a83c7e50 100644 --- a/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py +++ b/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py @@ -60,6 +60,13 @@ def __init__( self.batch_size = int(batch_size) self.num_epochs = int(num_epochs) self.learning_rate = float(learning_rate) + if params is not None: + self.hidden_dim = int(params.get("hidden_dim", self.hidden_dim)) + self.num_heads = int(params.get("num_heads", self.num_heads)) + self.dropout = float(params.get("dropout", self.dropout)) + self.batch_size = int(params.get("batch_size", self.batch_size)) + self.num_epochs = int(params.get("num_epochs", self.num_epochs)) + self.learning_rate = float(params.get("learning_rate", self.learning_rate)) self.needs_training = True self.needs_alignment = True diff --git a/src/main/python/systemds/scuro/representations/spectrogram.py b/src/main/python/systemds/scuro/representations/spectrogram.py index ab0c8a6c649..9d14931ef0e 100644 --- a/src/main/python/systemds/scuro/representations/spectrogram.py +++ b/src/main/python/systemds/scuro/representations/spectrogram.py @@ -35,6 +35,13 @@ PY_LIST_HEADER_BYTES, PY_LIST_SLOT_BYTES, ) +import warnings + +warnings.filterwarnings( + "ignore", + message=r"n_fft=\d+ is too (?:small|large) for input signal", + module=r"librosa", +) @register_representation(ModalityType.AUDIO) @@ -45,6 +52,7 @@ def __init__(self, hop_length=512, n_fft=2048, params=None): super().__init__("Spectrogram", ModalityType.TIMESERIES, parameters, False) self.hop_length = int(hop_length) self.n_fft = int(n_fft) + self.window_size = self.n_fft def transform(self, modality, aggregation=None): transformed_modality = TransformedModality( diff --git a/src/main/python/systemds/scuro/representations/timeseries_representations.py b/src/main/python/systemds/scuro/representations/timeseries_representations.py index 26c8c1a8d98..14fcacf724f 100644 --- a/src/main/python/systemds/scuro/representations/timeseries_representations.py +++ b/src/main/python/systemds/scuro/representations/timeseries_representations.py @@ -51,9 +51,13 @@ def transform(self, modality, aggregation=None): feature = self.compute_feature(signal) result.append(feature) - transformed_modality.data = np.vstack(np.array(result)).astype( - modality.metadata[0]["data_layout"]["type"] - ) + maxlen = max(r.size for r in result) + padded_result = [ + np.pad(r, (0, maxlen - r.size), mode="constant", constant_values=0.0) + for r in result + ] + dtype = modality.metadata[0]["data_layout"]["type"] + transformed_modality.data = np.vstack(np.asarray(padded_result)).astype(dtype) return transformed_modality def get_output_stats(self, input_stats): @@ -71,8 +75,10 @@ def estimate_peak_memory_bytes(self, input_stats): } -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) +@register_context_representation_operator(ModalityType.AUDIO) class Mean(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Mean") @@ -81,8 +87,9 @@ def compute_feature(self, signal, axis=-1): return np.array(np.mean(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class Min(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Min") @@ -91,8 +98,9 @@ def compute_feature(self, signal, axis=-1): return np.array(np.min(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class Max(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Max") @@ -101,8 +109,9 @@ def compute_feature(self, signal, axis=-1): return np.array(np.max(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class Sum(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Sum") @@ -111,8 +120,10 @@ def compute_feature(self, signal, axis=-1): return np.array(np.sum(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) +@register_context_representation_operator(ModalityType.AUDIO) class Std(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Std") @@ -121,8 +132,10 @@ def compute_feature(self, signal, axis=-1): return np.array(np.std(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) +@register_context_representation_operator(ModalityType.AUDIO) class Skew(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Skew") @@ -131,12 +144,13 @@ def compute_feature(self, signal, axis=-1): return np.array(stats.skew(signal, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class Quantile(TimeSeriesRepresentation): def __init__(self, quantile=0.9, params=None): super().__init__( - "Qunatile", {"quantile": [0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]} + "Qunatile", {"quantile": [0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]} ) self.quantile = quantile @@ -144,8 +158,10 @@ def compute_feature(self, signal, axis=-1): return np.array(np.quantile(signal, self.quantile, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) +@register_context_representation_operator(ModalityType.AUDIO) class Kurtosis(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("Kurtosis") @@ -154,8 +170,10 @@ def compute_feature(self, signal, axis=-1): return np.array(stats.kurtosis(signal, fisher=True, bias=True, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) +@register_context_representation_operator(ModalityType.AUDIO) class RMS(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("RMS") @@ -164,8 +182,9 @@ def compute_feature(self, signal, axis=-1): return np.array(np.sqrt(np.mean(np.square(signal), axis=axis))) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class ZeroCrossingRate(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("ZeroCrossingRate") @@ -174,8 +193,9 @@ def compute_feature(self, signal, axis=-1): return np.array(np.sum(np.diff(np.signbit(signal), axis=axis) != 0, axis=axis)) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class ACF(TimeSeriesRepresentation): def __init__(self, k=1, params=None): super().__init__("ACF", {"k": [1, 2, 5, 10, 20, 25, 50, 100, 200, 500]}) @@ -209,8 +229,9 @@ def get_k_values(self, max_length, percent=0.2, num=10, log=False): return k_vals.tolist() -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) @register_context_representation_operator(ModalityType.TIMESERIES) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) class FrequencyMagnitude(TimeSeriesRepresentation): def __init__(self, params=None): super().__init__("FrequencyMagnitude") @@ -219,7 +240,8 @@ def compute_feature(self, signal, axis=-1): return np.array(np.abs(np.fft.rfft(signal, axis=axis))) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) @register_context_representation_operator(ModalityType.TIMESERIES) class SpectralCentroid(TimeSeriesRepresentation): def __init__(self, fs=1.0, params=None): @@ -240,7 +262,8 @@ def compute_feature(self, signal, axis=-1): return np.array(num / den) -@register_representation([ModalityType.TIMESERIES]) +@register_representation([ModalityType.TIMESERIES, ModalityType.PHYSIOLOGICAL]) +@register_context_representation_operator(ModalityType.PHYSIOLOGICAL) @register_context_representation_operator(ModalityType.TIMESERIES) class BandpowerFFT(TimeSeriesRepresentation): def __init__(self, fs=1.0, f1=0.0, f2=0.5, params=None): diff --git a/src/main/python/systemds/scuro/representations/wav2vec.py b/src/main/python/systemds/scuro/representations/wav2vec.py index ece034e099b..5e03baf8bc4 100644 --- a/src/main/python/systemds/scuro/representations/wav2vec.py +++ b/src/main/python/systemds/scuro/representations/wav2vec.py @@ -28,10 +28,11 @@ from systemds.scuro.representations.representation import RepresentationStats from systemds.scuro.representations.unimodal import UnimodalRepresentation from systemds.scuro.drsearch.operator_registry import register_representation +from systemds.scuro.utils.memory_utility import get_device -import warnings +from transformers.utils import logging as transformers_logging -warnings.filterwarnings("ignore", message="Some weights of") +transformers_logging.set_verbosity_error() @register_representation(ModalityType.AUDIO) diff --git a/src/main/python/systemds/scuro/representations/window_aggregation.py b/src/main/python/systemds/scuro/representations/window_aggregation.py index 36e12c0cd5c..52a17401959 100644 --- a/src/main/python/systemds/scuro/representations/window_aggregation.py +++ b/src/main/python/systemds/scuro/representations/window_aggregation.py @@ -155,11 +155,21 @@ def __init__( pad=True, params=None, ): + window_size_set = False + if isinstance(aggregation_function, Representation) and hasattr( + aggregation_function, "window_size" + ): + window_size = aggregation_function.window_size + window_size_set = True + if params is not None: if isinstance( params.get("aggregation_function"), (Aggregation, Representation) ): aggregation_function = params["aggregation_function"] + if hasattr(aggregation_function, "window_size"): + window_size = aggregation_function.window_size + window_size_set = True else: nested_agg = { key[len("aggregation_function_") :]: value @@ -177,7 +187,8 @@ def __init__( aggregation_function = params.get( "aggregation_function", aggregation_function ) - window_size = params["window_size"] + + window_size = params["window_size"] if not window_size_set else window_size pad = params.get("pad", True) super().__init__("WindowAggregation", aggregation_function) self.parameters["window_size"] = (4, 128) @@ -234,6 +245,7 @@ def estimate_peak_memory_bytes(self, input_stats: RepresentationStats) -> dict: effective_seq_len = in_shape[0] in_numel = effective_seq_len * self._rest_numel(in_shape) + output_bytes = self.estimate_output_memory_bytes(input_stats) one_instance_bytes = in_numel * np.dtype(self.data_type).itemsize input_bytes = one_instance_bytes * input_stats.num_instances @@ -269,6 +281,7 @@ def execute(self, modality): ) original_lengths.append(windowed_instance.shape[0]) windowed_data.append(windowed_instance) + if self.pad and not isinstance(windowed_data, np.ndarray): target_length = max(original_lengths) @@ -322,7 +335,17 @@ def window_aggregate_single_level(self, instance, new_length): full_result = self.aggregation_function.compute_feature(full_batches) if tail.size: tail_result = self.aggregation_function.compute_feature(tail) - full_result = np.concatenate([full_result, tail_result[None, :]]) + if tail_result.shape == full_result.shape[1:]: + tail_row = tail_result + else: + tail_row = np.zeros_like(full_result[0]) + slices = tuple( + slice(0, min(d, s)) + for d, s in zip(tail_row.shape, tail_result.shape) + ) + tail_row[slices] = tail_result[slices] + full_result = np.concatenate([full_result, tail_row[None, :]]) + return full_result def window_aggregate_nested_level(self, instance, new_length): From 407acb0aaa5a921f5342700d930f20c1fa75f87e Mon Sep 17 00:00:00 2001 From: Christina Dionysio Date: Wed, 1 Jul 2026 14:30:52 +0200 Subject: [PATCH 3/3] fix video stats --- src/main/python/systemds/scuro/dataloader/video_loader.py | 8 ++++---- src/main/python/tests/scuro/data_generator.py | 3 +++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/python/systemds/scuro/dataloader/video_loader.py b/src/main/python/systemds/scuro/dataloader/video_loader.py index b4a5409e66c..bf7bdd846c7 100644 --- a/src/main/python/systemds/scuro/dataloader/video_loader.py +++ b/src/main/python/systemds/scuro/dataloader/video_loader.py @@ -130,11 +130,11 @@ def get_stats(self, source_path: str): width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) num_channels = 3 - max_length = int((max_length + length) / 2) + max_length = max(max_length, length) avg_length += length - max_width = int((max_width + width) / 2) - max_height = int((max_height + height) / 2) - max_num_channels = int((max_num_channels + num_channels) / 2) + max_width = max(max_width, width) + max_height = max(max_height, height) + max_num_channels = max(max_num_channels, num_channels) num_instances += 1 num_total_instances = num_instances num_instances = ( diff --git a/src/main/python/tests/scuro/data_generator.py b/src/main/python/tests/scuro/data_generator.py index a51ea510ea3..937fd622d85 100644 --- a/src/main/python/tests/scuro/data_generator.py +++ b/src/main/python/tests/scuro/data_generator.py @@ -75,6 +75,7 @@ def __init__(self, indices, chunk_size, modality_type, data, data_type, metadata self.stats = VideoStats( 30, max(d.shape[0] for d in data), + sum(d.shape[0] for d in data) / len(data), max(d.shape[1] for d in data), max(d.shape[2] for d in data), max(d.shape[3] for d in data), @@ -88,6 +89,8 @@ def __init__(self, indices, chunk_size, modality_type, data, data_type, metadata sum(len(d) for d in data) / len(data), (max(len(d) for d in data),), True, + sum(len(d) for d in data) / len(data), + 16000, ) elif modality_type == ModalityType.IMAGE: self.stats = ImageStats(