-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdataplot.py
More file actions
89 lines (72 loc) · 3.15 KB
/
Copy pathdataplot.py
File metadata and controls
89 lines (72 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile
from scipy.signal import butter, filtfilt, resample
def bandpass_filter(data, lowcut, highcut, fs, order=4):
nyq = 0.5 * fs
low = lowcut / nyq if lowcut else 0
high = highcut / nyq
if lowcut:
b, a = butter(order, [low, high], btype='band')
else:
b, a = butter(order, high, btype='low')
return filtfilt(b, a, data)
def normalize(data):
return (data - np.mean(data)) / (np.std(data) + 1e-8)
def load_and_process(filename, folder):
acc_path = os.path.join(folder, filename + '_acc.txt')
gyro_path = os.path.join(folder, filename + '_gyro.txt')
wav_path = os.path.join(folder, filename + '.wav')
fs_target = 500
scg = np.loadtxt(acc_path, delimiter=',')[:, 2]
gcg = np.loadtxt(gyro_path, delimiter=',')[:, 1]
fs_inearpcg, inearpcg = wavfile.read(wav_path)
if inearpcg.ndim > 1:
inearpcg_channels = [inearpcg[:, ch] for ch in range(inearpcg.shape[1])]
print(f"In-ear PCG has {inearpcg.shape[1]} channels")
else:
inearpcg_channels = [inearpcg]
print("In-ear PCG has 1 channel")
duration_sec = len(inearpcg_channels[0]) / fs_inearpcg
print(f"Recording duration: {duration_sec:.1f} seconds")
fs_scg = int(len(scg) / duration_sec)
fs_gcg = int(len(gcg) / duration_sec)
print(f"SCG sampling rate: {fs_scg}, GCG sampling rate: {fs_gcg}")
cutoff_sec = 0
scg = scg[int(fs_scg * cutoff_sec):]
gcg = gcg[int(fs_gcg * cutoff_sec):]
inearpcg_channels = [ch[int(fs_inearpcg * cutoff_sec):] for ch in inearpcg_channels]
scg = bandpass_filter(scg, 0.5, 40, fs_scg)
gcg = bandpass_filter(gcg, 0.5, 40, fs_gcg)
inearpcg_channels = [bandpass_filter(ch, None, 40, fs_inearpcg) for ch in inearpcg_channels]
scg = resample(scg, int(len(scg) * fs_target / fs_scg))
gcg = resample(gcg, int(len(gcg) * fs_target / fs_gcg))
inearpcg_channels = [resample(ch, int(len(ch) * fs_target / fs_inearpcg)) for ch in inearpcg_channels]
min_len = min(len(scg), len(gcg), min(len(ch) for ch in inearpcg_channels))
print('min length: ', min_len)
scg = scg[:min_len]
gcg = gcg[:min_len]
inearpcg_channels = [ch[:min_len] for ch in inearpcg_channels]
print('length of inearpcg channels:', [len(ch) for ch in inearpcg_channels])
scg = normalize(scg)
gcg = normalize(gcg)
inearpcg_channels = [normalize(ch) for ch in inearpcg_channels]
return scg, gcg, inearpcg_channels
if __name__ == "__main__":
participant = 'participant'
folder = 'data/participant'
filename = 'participant_spk_earphones'
scg, gcg, inearpcg_channels = load_and_process(filename, folder)
t = np.arange(len(scg)) / 500
plt.figure(figsize=(15, 6))
plt.plot(t, gcg - 8, label='GCG', linewidth=2.0, color='#2ca02c')
plt.plot(t, scg - 16, label='SCG', linewidth=2.0, color='#455a64')
for idx, ch in enumerate(inearpcg_channels):
plt.plot(t, ch - (24 + 8 * idx), label=f'In-ear PCG ch{idx + 1}')
plt.xlabel('Time (s)')
plt.ylabel('Normalized Value')
plt.title(f'{filename}')
plt.legend()
plt.tight_layout()
plt.show()