Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions src/main/python/systemds/scuro/dataloader/timeseries_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class TimeseriesStats:
num_signals: int
output_shape: tuple
output_shape_is_known: bool
avg_length: float
sampling_rate: int


class TimeseriesLoader(BaseLoader):
Expand All @@ -49,15 +51,14 @@ def __init__(
sampling_rate: Optional[int] = None,
normalize: bool = True,
file_format: str = "npy",
modality_type: Optional[ModalityType] = ModalityType.TIMESERIES,
):
super().__init__(
source_path, indices, data_type, chunk_size, ModalityType.TIMESERIES
)
super().__init__(source_path, indices, data_type, chunk_size, modality_type)
self.signal_names = signal_names
self.sampling_rate = sampling_rate
self.normalize = normalize
self.file_format = file_format.lower()
self.stats = self.get_stats(source_path)
self.stats = self.get_stats(source_path, sampling_rate)
if self.file_format not in ["npy", "mat", "hdf5", "txt"]:
raise ValueError(f"Unsupported file format: {self.file_format}")

Expand Down Expand Up @@ -148,17 +149,25 @@ def _load_csv_with_header(self, file: str, delimiter: str = None) -> np.ndarray:
data = df[selected].to_numpy(dtype=self._data_type)
return data

def get_stats(self, source_path: str):
def get_stats(self, source_path: str, sampling_rate: int):
self.file_sanity_check(source_path)
max_length = 0
num_instances = 0
num_signals = 0
avg_length = 0
for file_name in self.indices:
file = source_path + file_name + "." + self.file_format
data = self._load_data(file)
max_length = max(max_length, data.shape[0])
avg_length += data.shape[0]
num_instances += 1
num_signals = max(num_signals, data.shape[1])
return TimeseriesStats(
max_length, num_instances, num_signals, (max_length,), True
max_length,
num_instances,
num_signals,
(max_length,),
True,
avg_length / num_instances,
sampling_rate,
)
4 changes: 4 additions & 0 deletions src/main/python/systemds/scuro/dataloader/video_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
class VideoStats:
fps: int
max_length: int
avg_length: float
max_width: int
max_height: int
max_channels: int
Expand Down Expand Up @@ -117,6 +118,7 @@ def get_stats(self, source_path: str):
max_height = 0
max_num_channels = 0
num_instances = 0
avg_length = 0
for file in os.listdir(source_path):
file_name = file.split(".")[0]
if file_name not in self.indices:
Expand All @@ -129,6 +131,7 @@ def get_stats(self, source_path: str):
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
num_channels = 3
max_length = max(max_length, length)
avg_length += length
max_width = max(max_width, width)
max_height = max(max_height, height)
max_num_channels = max(max_num_channels, num_channels)
Expand All @@ -142,6 +145,7 @@ def get_stats(self, source_path: str):
return VideoStats(
fps,
max_length,
avg_length / num_instances,
max_width,
max_height,
max_num_channels,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ def __init__(self, tasks, modalities):
def add_result(self, results):
# TODO: Check if order of best results matters (deterministic)
for result in results:
if result is None:
continue
if result.mm_opt:
self.results[result.task_name]["mm_results"].append(result)
else:
Expand Down
53 changes: 46 additions & 7 deletions src/main/python/systemds/scuro/drsearch/node_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def _execute_multiple_reps_for_leaf_dependencies(


def _execute_node_worker(node, input_mods, task, rep_cache, gpu_id):
start_time = time.perf_counter()
if gpu_id is not None:
device = torch.device(f"cuda:{gpu_id}")
torch.cuda.set_device(device)
Expand Down Expand Up @@ -287,23 +288,28 @@ def _run_node_op():
)
else:
result = _run_node_op()

end_time = time.perf_counter()
pid = os.getpid()
return {
"result": result,
"peak_bytes": peak_delta_bytes,
"peak_abs_rss_bytes": peak_abs_rss,
"gpu_peak_bytes": gpu_peak_bytes,
"operation_name": operation_name,
"start_time": start_time,
"end_time": end_time,
"pid": pid,
}


def _execute_task_worker(
task_node_id: str,
task: Any,
data: Any,
modality: Any,
gpu_id: Optional[int],
aggregation: AggregatedRepresentation = None,
) -> Dict[str, Any]:

start_time = time.perf_counter()
if DEBUG:
print(f"Executing task {task_node_id} on GPU {gpu_id}")
if gpu_id is not None:
Expand All @@ -316,6 +322,14 @@ def _execute_task_worker(

def _run_task():
start = time.perf_counter()
if aggregation is not None:
data = (
aggregation.operation(params=aggregation.parameters)
.transform(modality)
.data
)
else:
data = modality.data
scores = task.run(data)
end = time.perf_counter()
return scores, end - start
Expand All @@ -336,12 +350,16 @@ def _run_task():
)
else:
result = _run_task()

end_time = time.perf_counter()
pid = os.getpid()
return {
"scores": result[0],
"task_time": result[1],
"peak_bytes": peak_delta_bytes,
"gpu_peak_bytes": gpu_peak_bytes,
"start_time": start_time,
"end_time": end_time,
"pid": pid,
}


Expand Down Expand Up @@ -385,6 +403,9 @@ def __init__(
checkpoint_every=1,
resume=False,
)
self.statistics = {}
self.statistics["worker_stats"] = {}
self.statistics["node_stats"] = {}

def _shm_names_for_submit(
self, parent_node_ids: List[str], payload_data: Any
Expand Down Expand Up @@ -451,24 +472,31 @@ def submit_node(node_id: str):
]

if self._is_task_node(node):
# potentially batch task nodes and then execute them together
# by the the same task type (index, gpu vs cpu)
# either enough nodes to batch or enough time to batch whatever happens first

task_result = ResultEntry(
dag=self._get_dag_from_node_ids(node_id),
representation_time=parent_results[0].transform_time,
)
task_results[node_id] = task_result
task_idx = int(node.parameters.get("_task_idx", 0))
payload_data = (
self.modalities[0].data
self.modalities[0]
if parent_results is None
else parent_results[0].data
else parent_results[0]
)
retained = self._retain_for_submit(parent_node_ids, payload_data)
aggregation = node.aggregation

future = executor.submit(
_execute_task_worker,
node_id,
self.tasks[task_idx],
payload_data,
gpu_id,
aggregation,
)
else:
payload_data = (
Expand Down Expand Up @@ -538,6 +566,14 @@ def submit_new_ready_nodes():
peak_bytes = result["peak_bytes"]
gpu_peak_bytes = result["gpu_peak_bytes"]
node = self.scheduler.mapping[node_id]
self.statistics["worker_stats"][result["pid"]] = {
"start_time": result["start_time"],
"end_time": result["end_time"],
}
self.statistics["node_stats"][node_id] = {
"start_time": result["start_time"],
"end_time": result["end_time"],
}
if self._is_task_node(node):
task_results[node_id].task_time = result["task_time"]
task_results[node_id].train_score = result["scores"][
Expand Down Expand Up @@ -594,7 +630,10 @@ def submit_new_ready_nodes():

self.result_cache.cleanup_all()
self._cleanup_leaf_shared_memory()
return {"task_results": list(task_results.values())}
return {
"task_results": list(task_results.values()),
"statistics": self.statistics,
}

def _handle_modality_result(
self,
Expand Down
78 changes: 77 additions & 1 deletion src/main/python/systemds/scuro/drsearch/operator_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#
# -------------------------------------------------------------
from typing import Union, List

import math
from systemds.scuro.modality.type import ModalityType
from systemds.scuro.representations.representation import Representation

Expand Down Expand Up @@ -57,6 +57,24 @@ def set_representations(self, modality_type, representations):
else:
self._representations[modality_type] = [representations]

def set_context_operators(self, modality_type, context_operators):
if isinstance(context_operators, list):
self._context_operators[modality_type] = context_operators
else:
self._context_operators[modality_type] = [context_operators]

def set_context_representation_operators(
self, modality_type, context_representation_operators
):
if isinstance(context_representation_operators, list):
self._context_representation_operators[modality_type] = (
context_representation_operators
)
else:
self._context_representation_operators[modality_type] = [
context_representation_operators
]

def add_representation(
self, representation: Representation, modality: ModalityType
):
Expand Down Expand Up @@ -139,6 +157,64 @@ def get_representation_by_name(self, representation_name, modality_type):
def get_context_representations(self, modality_type):
return self._context_representation_operators[modality_type]

def get_context_lenghts_for_modality(self, modality_type, statistics):
if modality_type == ModalityType.AUDIO:
window_lengths = [
0.010,
0.020,
0.050,
0.075,
0.100,
0.5,
1,
2,
5,
10,
] # seconds

if (
modality_type == ModalityType.TIMESERIES
or modality_type == ModalityType.PHYSIOLOGICAL
):
window_lengths = [0.05, 0.1, 0.5, 0.75, 1, 2, 5, 10, 30, 60] # seconds

if modality_type == ModalityType.VIDEO:
window_lengths = [0.5, 1, 2, 5, 10] # seconds

if (
modality_type == ModalityType.AUDIO
or modality_type == ModalityType.TIMESERIES
or modality_type == ModalityType.PHYSIOLOGICAL
):
max_length_in_seconds = statistics.max_length / statistics.sampling_rate
window_lengths = [
length for length in window_lengths if length <= max_length_in_seconds
]

effective_window_lenghts = [
statistics.sampling_rate * length for length in window_lengths
]
num_windows = [
math.ceil(statistics.avg_length / length)
for length in effective_window_lenghts
]
return effective_window_lenghts, num_windows

if modality_type == ModalityType.VIDEO:
max_length_in_seconds = statistics.max_length / statistics.fps
window_lengths = [
length for length in window_lengths if length <= max_length_in_seconds
]

effective_window_lenghts = [
statistics.fps * length for length in window_lengths
]
num_windows = [
math.ceil(statistics.avg_length / length)
for length in effective_window_lenghts
]
return effective_window_lenghts, num_windows


def register_representation(modalities: Union[ModalityType, List[ModalityType]]):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class RepresentationNode:
representation_index: int = None
parameters: Dict[str, Any] = field(default_factory=dict)
gpu_id: int = None
aggregation: AggregatedRepresentation = None


@dataclass
Expand Down
4 changes: 3 additions & 1 deletion src/main/python/systemds/scuro/drsearch/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ def run(self, data):
val_y = self._gather_by_indices(self.labels, fold_val_indices)

self._run_fold(model, train_X, train_y, val_X, val_y, test_X, test_y)

if hasattr(model, "clean_up"):
model.clean_up()
del model
return [
self.train_scores.compute_averages(),
self.val_scores.compute_averages(),
Expand Down
Loading
Loading