-
Notifications
You must be signed in to change notification settings - Fork 0
/
tools.py
1652 lines (1337 loc) · 68.2 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.signal
from sklearn.preprocessing import MinMaxScaler, normalize
from scipy.signal import firwin, lfilter, medfilt, periodogram,welch
import scipy.signal as signal
from scipy.signal import find_peaks,stft
from scipy.interpolate import interp1d
import math
from pyti import catch_errors
from pyti.weighted_moving_average import weighted_moving_average as wma
import h5py
import os
import pdb
from dtw import accelerated_dtw # 动态时间规整评价时域信号相似性
# 数据归一化 [0,1]
def min_max_norm(x):
x = np.array(x)
x_norm = (x-np.min(x))/(np.max(x)-np.min(x))
return x_norm
# 数据归一化 [-1,1]
def mean_max_norm(x):
x = np.array(x)
x_norm = (x-np.mean(x))/(np.max(x)-np.min(x))
return x_norm
# 数据标准化
def standar_scaler(x):
x = np.array(x)
m = np.mean(x)
s = np.std(x)
if s == 0:
x_norm = x - m
else:
x_norm = (x - m) / s
return x_norm
# 信噪比
def SignaltoNoiseRatio(Arr, axis=0, ddof=0):
Arr = np.asanyarray(Arr)
me = Arr.mean(axis=axis)
sd = Arr.std(axis=axis, ddof=ddof)
# print(me,sd)
return np.where(sd == 0, 0, me/sd)
# 峰值信噪比
def PeakSignaltoNoiseRatio(img1, img2):
mse = np.mean((img1/255. - img2/255.) ** 2 )
if mse < 1.0e-10:
return 100
PIXEL_MAX = 1
return 20 * math.log10(PIXEL_MAX / math.sqrt(mse))
# 基于fft计算 cPPG 的信噪比,计算谐波信息
def SignaltoNoiseRatio_cPPG(signal, fps, method='fft',power_max_index=None, draw_fig_flag=False):
if method=='stft':
# 基于stft计算信噪比
nperseg = int(min(len(signal) * 0.5, fps * 15))
# # print(f'nperseg {nperseg}')
half_freqs, half_power, win = get_stft_freq_power(signal, fps, nperseg=nperseg)
else:
# 基于fft计算信噪比
half_freqs, half_power, win = get_fft_freq_power(signal, fps)
# 信号过滤 [0.7, 3.0]
half_power[half_freqs <= 0.7] = 0
half_power[half_freqs >= 3.0] = 0
power_max_idx = np.argmax(half_power)
if power_max_index is not None:
power_max_idx = power_max_index
# print('SignaltoNoiseRatio_rPPG power_max_idx not None', power_max_idx)
power_main_region = half_power[int(power_max_idx - win):int(power_max_idx + win + 1)]
freq_main_reqion = half_freqs[int(power_max_idx - win):int(power_max_idx + win + 1)]
power_sub_region = half_power[int(power_max_idx - win)*2:int(power_max_idx + win + 1)*2]
freq_sub_reqion = half_freqs[int(power_max_idx - win)*2:int(power_max_idx + win + 1)*2]
signal_power = np.sum(power_main_region) + np.sum(power_sub_region)
fft_snr = signal_power / (np.sum(half_power) - signal_power)
# print(f'SignaltoNoiseRatio_rPPG power_max_idx, {power_max_idx}, fft_snr {fft_snr}', )
# draw_fig_flag = True
if draw_fig_flag:
hr_freqs = half_freqs[power_max_idx]
HR_est = hr_freqs * 60
print(f'SignaltoNoiseRatio_rPPG power_max_idx, {power_max_idx}, fft_snr {fft_snr}, HR_est {HR_est}')
plt.figure('SignaltoNoiseRatio_rPPG',figsize=(9,6))
plt.subplot((211))
plt.plot(np.linspace(0,len(signal)/fps,len(signal)),signal,label='HR '+ str(np.round(HR_est,2)))
plt.legend(loc='upper right')
plt.subplot((212))
plt.plot(half_freqs[half_freqs < 5], half_power[half_freqs < 5])
plt.scatter(hr_freqs, np.max(half_power), label='hr_freqs ' + str(np.round(hr_freqs, 2)))
plt.vlines(hr_freqs, 0, int(np.max(half_power) * 1.2), colors='y', linestyles='dashed')
plt.plot(freq_main_reqion, power_main_region,'r',label='SNR ' + str(np.round(fft_snr,2)))
plt.plot(freq_sub_reqion, power_sub_region, 'r', label='SNR ' + str(np.round(fft_snr, 2)))
plt.legend(loc='upper right')
plt.show()
return fft_snr
# 基于stft计算 rPPG 的信噪比,因谐波多被掩盖,不计算谐波信息
def SignaltoNoiseRatio_rPPG(signal, fps, power_max_index=None, method='fft', draw_fig_flag=False):
if method=='stft':
# 基于stft计算信噪比
nperseg = int(min(len(signal) * 0.5, fps * 15))
# # print(f'nperseg {nperseg}')
half_freqs, half_power, win = get_stft_freq_power(signal, fps, nperseg=nperseg)
elif method=='fft':
# 基于fft计算信噪比
half_freqs, half_power, win = get_fft_freq_power(signal, fps)
elif method == 'welch':
# # welch傅里叶变换
nperseg = int(min(len(signal) * 0.5, fps * 15))
# nperseg = len(signal)
half_freqs, half_power = welch(signal, fps, 'flattop', nperseg=nperseg, average='median')
# 预测心率误差 5 以内的频域窗
win = int(len(half_freqs) / (12 * np.max(half_freqs)) + 1)
else:
print(f'Warning: FFT method not defined')
half_freqs, half_power = None, None
pdb.set_trace()
# 信号过滤 [0.7, 3.0]
half_power[half_freqs <= 0.7] = 0
half_power[half_freqs >= 3.0] = 0
power_max_idx = np.argmax(half_power)
if power_max_index is not None:
power_max_idx = power_max_index
# print('SignaltoNoiseRatio_rPPG power_max_idx not None', power_max_idx)
power_main_region = half_power[int(power_max_idx - win):int(power_max_idx + win + 1)]
freq_main_reqion = half_freqs[int(power_max_idx - win):int(power_max_idx + win + 1)]
signal_power = np.sum(power_main_region)
fft_snr = signal_power / (np.sum(half_power) - signal_power)
# print(f'SignaltoNoiseRatio_rPPG power_max_idx, {power_max_idx}, fft_snr {fft_snr}', )
# draw_fig_flag = True
if draw_fig_flag:
hr_freqs = half_freqs[power_max_idx]
HR_est = hr_freqs * 60
print(f'SignaltoNoiseRatio_rPPG method {method}, power_max_idx, {power_max_idx},'
f' fft_snr {fft_snr}, HR_est {HR_est}')
plt.figure('SignaltoNoiseRatio_rPPG',figsize=(9,6))
plt.subplot((211))
plt.plot(np.linspace(0,len(signal)/fps,len(signal)),signal,label='HR '+ str(np.round(HR_est,2)))
plt.legend(loc='upper right')
plt.subplot((212))
plt.plot(half_freqs[half_freqs < 5], half_power[half_freqs < 5])
plt.scatter(hr_freqs, np.max(half_power), label='hr_freqs ' + str(np.round(hr_freqs, 2)))
plt.vlines(hr_freqs, 0, int(np.max(half_power) * 1.2), colors='y', linestyles='dashed')
plt.plot(freq_main_reqion, power_main_region,'r',label='SNR ' + str(np.round(fft_snr,2)))
plt.legend(loc='upper right')
plt.show()
return fft_snr
# 数据降维
def npydata_reduction_fn(file_path):
data = np.load(file_path)
new_data = np.zeros((data.shape[0], data.shape[2]))
for i in range(data.shape[0]):
new_data[i, :] = np.squeeze(data[i, :, :].mean(axis=0))
# print(f'data {data.shape} \n{data[0]}')
# pdb.set_trace()
return new_data
# 基于三次立方插值法进行数据重采样
def Cubic_Interpolation(signals, fps, time_points=30, time_duration='None'):
# cPPG and rPPG signals
if time_duration != 'None':
time_duration = time_duration
else:
signals = np.squeeze(signals)
time_duration = len(signals) / fps
signal_num = int(time_duration * time_points)
t0 = np.linspace(0, time_duration, len(signals))
t1 = np.linspace(0, time_duration, signal_num)
# draw_1_signal(signals)
# print(len(t0),len(signals))
interpolation = interp1d(t0, signals, kind='cubic',bounds_error=True)
resample_signals = interpolation(t1)
# draw_2_figure(signals, resample_signals, legend1='signals', legend2='resample_signals')
# pdb.set_trace()
return resample_signals
# exist_file_names = check_exist_files(path)
def check_exist_files(path):
if os.path.exists(path):
file_list = os.listdir(path)
exist_file_names = []
for file in file_list:
file_name = file.split('.')[0]
# file_name = '_'.join(file.split('_')[:3])
exist_file_names.append(file_name)
else:
exist_file_names = []
return exist_file_names
# 构建数据切分的窗口
def CreateWindows(data, fs, win_time, step_time=1, overlap=None):
data = np.array(data)
win_len = int(win_time * fs)
N_samp = data.shape[0] - win_len + 1
if overlap is not None:
step_len = int(win_len * (1 - overlap))
else:
step_len = int(step_time * fs)
idx_start = np.round(np.arange(0, N_samp, step_len)).astype(int)
idx_stop = np.round(idx_start + win_len)
return idx_start, idx_stop
# 构建数据切分的窗口
def CreateWindows_bk(N_samp, win_time, step_time, fs):
win_len = int(win_time * fs)
step_len = int(step_time * fs)
N_samp_ = N_samp - win_len + 1
idx_start = np.round(np.arange(0, N_samp_, step_len)).astype(int)
idx_stop = np.round(idx_start + win_len)
return idx_start, idx_stop
# check and fill nan or inf in ndarray.
def fill_ndarray(t1):
t1 = np.array(t1)
ret = False
try:
for i in range(t1.shape[1]):
temp_col = t1[:,i]
nan_num = np.count_nonzero(temp_col != temp_col)
if nan_num != 0:
ret = True
temp_not_nan_col = temp_col[temp_col == temp_col]
temp_col[np.isnan(temp_col)] = temp_not_nan_col.mean()
except:
temp_col = t1
nan_num = np.count_nonzero(temp_col != temp_col)
print(f'nan_num {nan_num}')
if nan_num != 0:
ret = True
temp_not_nan_col = temp_col[temp_col == temp_col]
temp_col[np.isnan(temp_col)] = temp_not_nan_col.mean()
return ret, t1
# 傅里叶分析,预测心率值
def fourier_analysis(signal, fps, draw_figures_flag=False):
def find_idx_bigger(arr, thr):
return next(f[0] for f in enumerate(arr) if f[1] > thr)
MinFreq = 42 # bpm
MaxFreq = 180 # bpm
freqs, psd = periodogram(signal, fs=fps, window=None, detrend='constant', return_onesided=True,
scaling='density')
min_idx = find_idx_bigger(freqs, MinFreq / 60.0) - 1
max_idx = find_idx_bigger(freqs, MaxFreq / 60.0) + 1
freq_max = freqs[min_idx + np.argmax(psd[min_idx: max_idx])]
hr_estimated = freq_max * 60
def draw_figure(freqs,psd,freq_max):
freqs = freqs[freqs<4]
psd = psd[:len(freqs)]
plt.figure(figsize=(9,6))
plt.plot(freqs,psd,color='r')
plt.axvline(x=freq_max, c="b", ls="--", lw=2)
plt.title('fourier_analysis')
plt.show()
if draw_figures_flag:
draw_figure(freqs,psd,freq_max)
return hr_estimated
# # 快速傅里叶变换提取 half_freq, half_power
def get_fft_freq_power(signal, fps, return_onesided=True):
# 数据去趋势,及归一化
signal = scipy.signal.detrend(signal)
signal = standar_scaler(signal)
signal_fft = np.fft.fft(signal)
power = np.abs(signal_fft) ** 2
freqs = np.fft.fftfreq(signal_fft.size, 1 / fps) # 得到分解波的频率序列
# 预测心率误差 5 以内的频域窗
win = int(len(freqs[0 <= freqs]) / (12 * np.max(freqs)) + 1)
if return_onesided:
power = power[0 <= freqs]
freqs = freqs[0 <= freqs]
return freqs, power, win
# # 快速傅里叶变换提取 half_freq, half_power
def get_stft_freq_power(signal, fps, method='median', return_onesided=True, nperseg=256):
# 数据去趋势,及归一化
signal = scipy.signal.detrend(signal)
signal = standar_scaler(signal)
# # 短时傅里叶变换
freqs, times, powers = stft(signal, fs=fps, nperseg=nperseg, return_onesided=return_onesided)
powers = np.abs(powers) ** 2
# 预测心率误差 5 以内的频域窗
win = int(len(freqs[0 <= freqs]) / (12 * np.max(freqs)) + 1)
if method == 'median':
median_powers = np.median(powers, axis=1)
return freqs, median_powers, win
elif method == 'mean':
mean_powers = np.mean(powers, axis=1)
return freqs, mean_powers, win
# plt.figure('get_stft_freq_power',figsize=(9,6))
# plt.plot(half_freqs,half_powers)
# plt.show()
# pdb.set_trace()
def get_stft_maxidx(signal, fps, win=None):
# 数据标准化,及归一化
signal = mean_max_norm(signal)
signal = standar_scaler(signal)
# 傅里叶变换
signal_fft = np.fft.fft(signal)
power = np.abs(signal_fft) ** 2
freqs = np.fft.fftfreq(signal_fft.size, 1 / fps) # 得到分解波的频率序列
half_power = power[0 <= freqs]
half_freqs = freqs[0 <= freqs]
half_power[half_freqs <= 0.7] = 0
half_power[half_freqs >= 2.5] = 0
if win is None:
win = int(len(half_freqs) / (12 * np.max(half_freqs)) + 1)
# print(f'get_stft_maxidx win {win}')
# 默认最大频谱索引值
power_max_default = np.argmax(half_power)
HR_default = half_freqs[power_max_default] * 60
power_index_top = list(np.argsort(-half_power)[:5])
power_index_top_mean = np.mean(half_power[power_index_top])
# print('power_index_top_mean',power_index_top_mean)
# print('power_index_top',power_index_top)
# # 短时傅里叶变换,计算心率值和频谱值
HR_stft_error_flag = 10
power_sum_flag = 0
power_stft_med_idx = None
power_sum_idx = None
HR_sum = None
HR_med = None
ret = False
for i in power_index_top:
# # 默认主频信号高于其他信号的2倍,则确定默认频谱为心率频谱
if half_power[i] >= power_index_top_mean * 2:
power_max_default = i
power_sum_idx = i
# HR_default = half_freqs[power_max_default] * 60
ret = True
break
up = int(i + win) + 1
low = max(int(i - win), 0)
power_sum = np.sum(half_power[low:up])
# 基于主频域筛选
if power_sum >= power_sum_flag:
power_sum_flag = power_sum
# power_sum_idx = low + np.argmax(half_power[low:up])
power_sum_idx = i
HR_sum = half_freqs[power_sum_idx] * 60
# print(f'{i}, power_sum_idx {power_sum_idx}, HR_freq {half_freqs[power_sum_idx]}, HR {HR_sum}')
temp_HR = half_freqs[i]*60
# 基于短时傅里叶变化的中值频域筛选
HR_med_error = abs(HR_stft_med - temp_HR)
if HR_stft_error_flag >= HR_med_error:
HR_stft_error_flag = HR_med_error
power_stft_med_idx = i
HR_med = half_freqs[power_stft_med_idx] * 60
power_max_idx = None
if ret:
power_max_idx = power_max_default
elif power_stft_med_idx is not None:
power_idx_list = [power_max_default, power_stft_med_idx, power_sum_idx]
HR_error_flag = 20
temp_ret = True
for j in power_idx_list:
temp_HR_ = half_freqs[j] * 60
HR_error = abs(temp_HR_ - 80)
if HR_error_flag >= HR_error:
HR_error_flag = HR_error
power_max_idx = j
temp_ret = False
if temp_ret:
power_max_idx = np.sort(power_idx_list)[1]
else:
power_max_idx = power_sum_idx
sub_win = int(win/2)
half_power = fill_zero_nainf(half_power)
if sub_win != 0:
power_max_idx = (power_max_idx - sub_win) + np.argmax(half_power[power_max_idx - sub_win:power_max_idx + sub_win + 1])
if power_max_idx is None:
power_max_idx = power_max_default
return power_max_idx
# 筛选合适的频谱值,异常噪音可能掩盖心电信号,通过临近频谱进行二次筛选
def get_power_maxidx(signal, fps, method='fft', draw_subfig_flag=False):
def get_main_region(power_max_index):
power_main_region = half_power[int(power_max_index - win):int(power_max_index + win + 1)]
freq_main_reqion = half_freqs[int(power_max_index - win):int(power_max_index + win + 1)]
signal_power = np.sum(power_main_region)
fft_snr = signal_power / (np.sum(half_power) - signal_power)
# power_sub_region = half_power[int(power_max_index - win) * 2:int(power_max_index + win + 1) * 2]
# freq_sub_reqion = half_freqs[int(power_max_index - win) * 2:int(power_max_index + win + 1) * 2]
return power_main_region, freq_main_reqion, fft_snr
if method=='stft':
# 基于stft计算信噪比
nperseg = int(min(len(signal) * 0.5, fps * 15))
# # print(f'nperseg {nperseg}')
half_freqs, half_power, win = get_stft_freq_power(signal, fps, nperseg=nperseg)
else:
# 基于fft计算信噪比
half_freqs, half_power, win = get_fft_freq_power(signal, fps)
# 去除异常频谱
half_power[half_freqs > 3.0] = 0
half_power[half_freqs < 0.7] = 0
# plt.figure('get_power_maxidx',figsize=(9,6))
# plt.plot(half_freqs,half_power)
# plt.show()
# pdb.set_trace()
# 获取获取最大频谱索引值
power_max_default = np.argmax(half_power)
# HR_default = half_freqs[power_max_default] * 60
power_index_top = list(np.argsort(-half_power)[:5])
power_index_top_mean = np.mean(half_power[power_index_top])
# print('power_index_top_mean',power_index_top_mean)
power_sum_flag = 0
power_idx = None
for i in power_index_top:
up = int(i + win) + 1
low = max(int(i - win),0)
if half_power[power_max_default] >= power_index_top_mean * 2:
power_idx = None
break
power_sum = np.sum(half_power[low:up])
if power_sum >= power_sum_flag:
power_sum_flag = power_sum
# power_idx = low + np.argmax(half_power[low:up])
power_idx = i
# HR_sum = half_freqs[power_idx] * 60
# print(f'{i}, power_sum_idx {power_idx}, power_sum_flag {power_sum_flag},'
# f'HR_freq {half_freqs[power_idx]}, HR {HR_sum}')
if draw_subfig_flag:
power_main_region, freq_main_reqion, fft_snr = get_main_region(i)
print(f'{i}, power_idx {power_idx}, fft_snr {fft_snr}'
f'HR_freq {half_freqs[i]}, HR {half_freqs[i]*60}')
plt.figure(figsize=(9,6))
plt.subplot((211))
plt.plot(half_freqs[half_freqs < 5.0], half_power[half_freqs < 5.0])
plt.plot(freq_main_reqion, power_main_region,c='r',
label='fft_snr ' + str(np.round(fft_snr, 2))
)
plt.scatter(half_freqs[i], half_power[i], alpha=0.5, label='HR ' + str(np.round(half_freqs[i]*60, 1)))
plt.legend(loc='upper right')
plt.subplot((212))
plt.plot(freq_main_reqion, power_main_region, c='r',
label='freq win ' + str(np.round(half_freqs[low], 2)) + '_' + str(np.round(half_freqs[up], 2))
)
plt.scatter(half_freqs[i], half_power[i], alpha=0.5, label='HR freqs ' + str(np.round(half_freqs[i], 1)))
plt.vlines(half_freqs[i], -1, int(np.max(half_power) * 1.2), colors='r', linestyles='dashed')
plt.legend(loc='upper right')
plt.show()
plt.close()
if power_idx is not None:
# HR_sum = half_freqs[power_idx] * 60
# if abs(HR_default - 80) > 20 and abs(HR_sum - 80) <= 20:
power_max_idx = (power_idx - win) + np.argmax(half_power[(power_idx - win):(power_idx + win +1)])
else:
power_max_idx = power_max_default
# print(f'power_max_idx {power_max_idx}, freqs {half_freqs[power_idx]}, HR {half_freqs[power_idx]*60}'
# f'power_max_default {power_max_default}, freqs_default {half_freqs[power_max_default]}, '
# f' HR_default {half_freqs[power_max_default]*60}'
# )
return power_max_idx
# 逆变换之前处理异常值 nan 和 inf,替换为 0
def fill_zero_nainf(signal_fft_new):
if np.isinf(signal_fft_new).any() or np.isnan(signal_fft_new).any():
inf_ = np.isinf(signal_fft_new)
signal_fft_new[inf_] = 0
nan_ = np.isnan(signal_fft_new)
signal_fft_new[nan_] = 0
print(f'nan or inf in signal_fft_new')
# print(f'inf_ {inf_}')
# print(f'nan_ {nan_}')
# pdb.set_trace()
return signal_fft_new
# 增强心电信号的频谱,消减非心电信号的频谱,与心电信号的频谱相近越大,消减越明显
def enhance_signal_fft(signal, fps, power_max_index=None, item=1):
# # 平滑处理可以过滤重搏波
# signal = hull_moving_average(signal, window_size=5)
# 数据标准化,及归一化
# signal = mean_max_norm(signal)
signal = standar_scaler(signal)
# 傅里叶变换
signal_fft = np.fft.fft(signal)
signal_fft = fill_zero_nainf(signal_fft)
signal_fft_ori = signal_fft
power = np.abs(signal_fft) ** 2
freqs = np.fft.fftfreq(signal_fft.size, 1 / fps) # 得到分解波的频率序列
half_power = power[0 <= freqs]
half_freqs = freqs[0 <= freqs]
snr = SignaltoNoiseRatio_rPPG(signal, fps)
if power_max_index is not None:
power_max_idx = power_max_index
else:
power_max_idx = get_stft_maxidx(signal, fps)
# print(f'enhance_signal_fft power_max_idx {power_max_idx} ')
power_index_list = np.argsort(-half_power[half_freqs < 5.0]).tolist()
# print(f'enhance_signal_fft power_index_list {len(power_index_list)},{power_index_list} ')
if item == 1:
w = max(abs(1/snr),3)
# print(f'enhance_signal_fft power_max_idx {power_max_idx} w {w} ',
# signal_fft[power_max_idx]
# )
signal_fft[power_max_idx] = signal_fft[power_max_idx] * w
signal_fft[len(signal_fft) - power_max_idx] = signal_fft[len(signal_fft) - power_max_idx] * w
for i in range(int(len(power_index_list) * 0.1)):
if (power_max_idx - power_index_list[i]) != 0 and abs(power_max_idx*2 - power_index_list[i])>2:
signal_fft[power_index_list[i]] = signal_fft[power_index_list[i]] / (abs(power_max_idx - power_index_list[i]))
signal_fft[len(signal_fft) - power_index_list[i]] = signal_fft[power_index_list[i]] / (abs(
power_max_idx - power_index_list[i]))
elif item == 2:
signal_fft[power_max_idx] = signal_fft[power_max_idx] **2
signal_fft[len(signal_fft) - power_max_idx] = signal_fft[len(signal_fft) - power_max_idx] **2
for i in range(int(len(power_index_list) * 0.1)):
if (power_max_idx - power_index_list[i]) != 0 and abs(power_max_idx*2 - power_index_list[i])>2:
signal_fft[power_index_list[i]] = signal_fft[power_index_list[i]] / (abs(power_max_idx - power_index_list[i])**2)
signal_fft[len(signal_fft) - power_index_list[i]] = signal_fft[power_index_list[i]] / (abs(
power_max_idx - power_index_list[i])**2)
signal_fft = fill_zero_nainf(signal_fft)
# print(f'enhance_signal_fft signal_fft_ori {signal_fft_ori} ')
# print(f'enhance_signal_fft signal_fft {signal_fft} ')
# draw_2_signals(np.real(signal_fft_ori),np.real(signal_fft),title='enhance_signal_fft')
return signal_fft
# # 增强心电信号的频谱,消减非心电信号的频谱,与心电信号的频谱相近越大,消减越明显
# def enhance_signal_fft(freqs, power, power_max_idx, snr, item=1):
# # # 平滑处理可以过滤重搏波
# # signal = hull_moving_average(signal, window_size=5)
# # 数据标准化,及归一化
# # signal = mean_max_norm(signal)
# # signal = standar_scaler(signal)
# # 傅里叶变换
# # power = np.fft.fft(signal)
# # power = fill_zero_nainf(power)
# power_ori = power
# half_power = power[0 <= freqs]
# half_freqs = freqs[0 <= freqs]
#
# # print(f'enhance_signal_fft power_max_idx {power_max_idx} ')
# # power_index_list = np.argsort(-half_power[half_freqs < 5.0]).tolist()
# # draw_2_signals(freqs, power)
# # plt.plot(freqs, power)
# # plt.show()
#
# power_index_list = np.argsort(power[abs(freqs) < 5.0]).tolist()
# print(f'enhance_signal_fft power_index_list {len(power_index_list)}, {power_index_list} ')
#
# if item == 1:
# w = max(abs(2/snr),10)
# # w =1
# print(f'enhance_signal_fft power_max_idx {power_max_idx} w {w} ',
# power[power_max_idx]
# )
# power[power_max_idx] = power[power_max_idx] * w
#
# power[len(power) - power_max_idx] = power[len(power) - power_max_idx] * w
#
# # for i in range(int(len(power_index_list) * 0.1)):
# # if (power_max_idx - power_index_list[i]) != 0 and abs(power_max_idx*2 - power_index_list[i])>2:
# # power[power_index_list[i]] = power[power_index_list[i]] / (abs(power_max_idx - power_index_list[i]))
# # power[len(power) - power_index_list[i]] = power[power_index_list[i]] / (abs(
# # power_max_idx - power_index_list[i]))
#
# elif item == 2:
# power[power_max_idx] = power[power_max_idx] **2
# power[len(power) - power_max_idx] = power[len(power) - power_max_idx] **2
#
# for i in range(int(len(power_index_list) * 0.1)):
# if (power_max_idx - power_index_list[i]) != 0 and abs(power_max_idx*2 - power_index_list[i])>2:
# power[power_index_list[i]] = power[power_index_list[i]] / (abs(power_max_idx - power_index_list[i])**2)
# power[len(power) - power_index_list[i]] = power[power_index_list[i]] / (abs(
# power_max_idx - power_index_list[i])**2)
#
# power = fill_zero_nainf(power)
# # print(f'enhance_signal_fft power_ori {power_ori} ')
# # print(f'enhance_signal_fft power {power} ')
#
# # draw_2_signals(np.real(power_ori),np.real(power),title='enhance_signal_fft')
# return power
# 峰值检测,预测心率值
def find_peaks_analysis(signals, fps, draw_figures_flag=False):
signals = signal.detrend(signals)
sample_count = len(signals)
duration = int(sample_count / fps)
time_line = np.linspace(0, duration, sample_count) # 采样点的时间
test_peaks = min(300,int(sample_count * 0.1))
# 主峰检测
down_peak = np.sort(signals)[int(test_peaks * 0.3):int(test_peaks * 0.7)] # sort默认从小到大排序,前300个峰值为30-90秒内的下部峰值点
up_peak = np.sort(signals)[-int(test_peaks * 0.7):-int(test_peaks * 0.3)] # sort默认从小到大排序,后300个峰值为30-90秒内的上部峰值点
median_peak = np.median(signals)
down_height = np.abs(down_peak - median_peak) # 下部峰值点与均值的差距
up_height = np.abs(up_peak - median_peak) # 上部峰值点与均值的差距
main_peak = np.sum(up_height) - np.sum(down_height) # 上部峰值点与均值的差距与下部峰值点与均值的差距的差值
# print('mean_height', mean_height)
# print('up_peak', len(up_peak), up_peak)
# print('down_peak', len(down_peak), down_peak)
# print(down_height, up_height)
# print(main_peak)
# pdb.set_trace()
def main_peak_test(mean_peak, main_peak, up_height, down_height):
if main_peak >= 0:
# peaks是峰值的索引值,properties是设置的参数
mean_height = np.median(up_height) * 0.5 + mean_peak
# print('np.mean(up_height): \n', np.mean(up_height))
# print('mean_height1: \n', mean_height)
# height=[min_height, 3*mean_height]
peaks, properties = signal.find_peaks(signals, height=mean_height,distance=int(fps*0.5))
# print('peaks1: \n', peaks)
# print('properties: \n', properties)
return peaks, properties
elif main_peak < 0:
mean_height = np.median(down_height) * 0.5 - mean_peak
# print('mean_height2: \n', mean_height)
peaks, properties = signal.find_peaks(-signals, height=mean_height)
# print('peaks2: \n', peaks)
return peaks, properties
# print('properties: \n', properties)
def draw_figure(signals,peaks,properties):
plt.figure(figsize=(9,6))
plt.plot(signals,color='r')
plt.scatter(peaks,properties['peak_heights'],color='b')
plt.title('find_peaks_analysis')
plt.show()
peaks, properties = main_peak_test(median_peak, main_peak, up_height, down_height)
if draw_figures_flag:
draw_figure(signals, peaks, properties)
# print('peaks: \n', peaks)
# print('properties: \n', properties)
peaks_height = [] # 峰值点的振幅
peaks_time = [] # 峰值点的时间
for tp, h in enumerate(signals):
time_peak = time_line[tp]
if tp in peaks:
if len(peaks_time) == 0:
peaks_height.append(h)
peaks_time.append(time_peak)
else:
time_duration = float(time_peak) - float(peaks_time[-1])
if time_duration >= 0.5:
peaks_height.append(h)
peaks_time.append(time_peak)
if time_duration < 0.5 and abs(float(peaks_height[-1])) < abs(float(h)):
peaks_height.pop()
peaks_time.pop()
peaks_height.append(h)
peaks_time.append(time_peak)
# 相邻峰值之间的时间差
delta_time = np.array(peaks_time[1:]) - np.array(peaks_time[:-1])
# print('delta_time: \n', list(delta_time))
hr = 60 / delta_time # 瞬时心率
return hr
def max_min_scaler(data):
try:
data_scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = data_scaler.fit_transform(data)
except:
data = np.expand_dims(data,axis=1)
data_scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = data_scaler.fit_transform(data)
return np.squeeze(data_scaled)
def fir_filter(signals, filter_coef=40, cutoffFreq=32, fs=60):
# cPPG filter
a = 1 # 分母
# fs is sampling frequency of the signals
# fs is 255 in article
# cutoffFreq is 32 HZ in article
b = firwin(filter_coef, cutoff=cutoffFreq, pass_zero='lowpass', fs=fs) # FIR with low-pass
filter_sig = lfilter(b, a, signals)
return filter_sig
def moving_average(signals, window_size=30):
# windows_size is 300 in article
# vec = np.cumsum(np.insert(signals,0,0))
# ma_vec = (vec[windows_size:]-vec[:-windows_size])/windows_size
signals = list(signals)
am_signals = np.convolve(signals, np.ones(window_size, dtype=int) / window_size, 'valid')
return am_signals
# r = np.arange(1,windows_size-1,2)
# signals = list(signals)
# start = np.cumsum(signals[:windows_size-1]) # [::2]/r
# print(start[::2].shape)
#
# start = start[::2]/r
# stop = (np.cumsum(signals[:-windows_size:-1])[::2]/r)[::-1]
# am_signals = np.concatenate((start,op,stop))
#
# return am_signals
# yy = smooth(y) smooths the data in the column vector y ..
# The first few elements of yy are given by
# yy(1) = y(1)
# yy(2) = (y(1) + y(2) + y(3))/3
# yy(3) = (y(1) + y(2) + y(3) + y(4) + y(5))/5
# yy(4) = (y(2) + y(3) + y(4) + y(5) + y(6))/5
# ...
def smooth(data, window_size):
# data:原始数据,NumPy 1-D array containing the data to be smoothed
# 必须是1-D的,如果不是,请使用 np.ravel()或者np.squeeze()转化
# window_size: smoothing window size needs, which must be odd number,
# as in the original MATLAB implementation
data = np.squeeze(data)
out0 = np.convolve(data, np.ones(window_size, dtype=int), 'valid') / window_size
r = np.arange(1, window_size - 1, 2)
start = np.cumsum(data[:window_size - 1])[::2] / r
stop = (np.cumsum(data[:-window_size:-1])[::2] / r)[::-1]
return np.concatenate((start, out0, stop))
# another one,边缘处理的不好
def moving_avg_2(data, window_size):
window = np.ones(int(window_size))/float(window_size)
return np.convolve(data, window, 'same')
# another one,速度更快
# 输出结果 不与原始数据等长,假设原数据为m,平滑步长为t,则输出数据为m-t+1
def moving_avg_3(data, window_size):
cumsum_vec = np.cumsum(np.insert(data, 0, 0))
ma_vec = (cumsum_vec[window_size:] - cumsum_vec[:-window_size]) / window_size
return ma_vec
def hull_moving_average(data, window_size):
"""
Hull Moving Average.
Formula:
HMA = WMA(2*WMA(n/2) - WMA(n)), sqrt(n)
"""
catch_errors.check_for_period_error(data, window_size)
hma = wma(2 * wma(data, int(window_size/2)) - wma(data, window_size), int(np.sqrt(window_size)))
# print(list(hma))
nan_idx = np.isnan(hma).sum()
hma[:nan_idx]=data[:nan_idx]
# print(nan_idx)
# print(list(hma))
# nan_idx = np.isnan(hma).sum()
# print(nan_idx)
return hma
# "生成巴特沃斯带通滤波器"
# refer to: https://blog.csdn.net/SeaBiscuitUncle/article/details/103943900
def butterBandPassFilter(signals, lowcut, highcut, fps, order):
semiSampleRate = fps*0.5
low = lowcut / semiSampleRate
high = highcut / semiSampleRate
b,a = signal.butter(order,[low,high],btype='bandpass')
filter_sig = lfilter(b, a, signals)
return filter_sig
# "生成巴特沃斯带阻滤波器"
def butterBandStopFilter(signals,lowcut, highcut, fps, order):
semiSampleRate = fps*0.5
low = lowcut / semiSampleRate
high = highcut / semiSampleRate
b,a = signal.butter(order,[low,high],btype='bandstop')
filter_sig = lfilter(b, a, signals)
return filter_sig
# "生成巴特沃斯低通滤波器"
def butterLowpassFilter(signals,lowcut, fps, order):
semiSampleRate = fps*0.5
low = lowcut / semiSampleRate
b,a = signal.butter(order,[low],btype='lowpass')
filter_sig = lfilter(b, a, signals)
return filter_sig
def find_signal_peaks(signal, fps, draw_figure=False):
# find_peaks
dur_time = len(signal) / fps
time_line = np.linspace(0, dur_time, len(signal))
indices_up = find_peaks(signal, height=None, threshold=None, distance=5, prominence=None, width=None, wlen=None,
rel_height=None, plateau_size=None)
indices_down = find_peaks(-signal, height=None, threshold=None, distance=5, prominence=None, width=None, wlen=None,
rel_height=None, plateau_size=None)
indices = indices_up + indices_down
# argrelextrema
# indices_up = argrelextrema(signal, np.greater)
# indices_down = argrelextrema(-signal, np.greater)
# indices = indices_up + indices_down
# print(indices)
time_peak = time_line[indices[0]]
if draw_figure:
plt.figure(figsize=(10, 6))
plt.plot(time_line, signal)
plt.plot(time_peak, signal[indices[0]], 'o')
plt.show()
# plt.plot(time_line[indices_up[0]], signal[indices_up], 'o')
# plt.plot(time_line[indices_down[0]], signal[indices_down], '+')
# plt.show()
return time_peak
# 信号的特征提取
# refer to: https://blog.csdn.net/qq_34705900/article/details/88389319
def Extract_Signal_Features(signals,fps,time_points):
signals = np.squeeze(signals)
# 均值
df_mean = signals.mean()
# 方差
df_var = signals.var()
# 标准差
df_std = signals.std()
# 均方根
df_rms = math.sqrt(pow(df_mean, 2) + pow(df_std, 2))
# 偏度
df_skew = pd.Series(signals).skew()
# 峭度
df_kurt = pd.Series(signals).kurt()
sum = 0
for i in range(signals.shape[0]):
sum += math.sqrt(abs(signals[i]))
# 波形因子
df_boxing = df_rms / (abs(signals).mean())
# 峰值因子
df_fengzhi = (max(signals)) / df_rms
# 脉冲因子
df_maichong = (max(signals)) / (abs(signals).mean())
# 裕度因子
df_yudu = (max(signals)) / pow((sum / signals.shape[0]), 2)
# 一阶导数
dur_time = signals.shape[0]/fps
timeline = np.linspace(0, dur_time, signals.shape[0])
delta_h1 = np.squeeze(signals)[1:] - np.squeeze(signals)[:-1]
delta_t1 = timeline[1:] - timeline[:-1]
signal_diff1 = np.squeeze(delta_h1) / delta_t1
signal_diff1_30 = Cubic_Interpolation(signal_diff1, fps, time_points=time_points)
# print(f'signal_diff1:{signal_diff1.shape} ')
# 二阶导数
delta_h2 = signal_diff1[1:] - signal_diff1[:-1]
delta_t2 = timeline[1:-1] - timeline[:-2]
signal__diff2 = delta_h2 / delta_t2
signal_diff2_30 = Cubic_Interpolation(signal__diff2, fps, time_points=time_points)
#
signal_features_list = [df_mean, df_var, df_rms, df_skew, df_kurt, df_boxing, df_fengzhi, df_maichong, df_yudu,
list(signal_diff1_30), list(signal_diff2_30)]
# df = pd.DataFrame(signal_features_list).T
# print('signal_features',df)
# pdb.set_trace()
return signal_features_list
# 查看已存在文件名称
def Check_exist_file(path):
file_name_list = []
if os.path.exists(path):
file_list = os.listdir(path)
for file in file_list:
file_name_list.append(file.split('.')[0])
return file_name_list
# 中值滤波器,去除基线漂移
def signal_detrend_mdf(signal, fps, draw_fig_flag=False):
# draw_fig_flag = True
fliter = int(fps)
Give_up_size = int(fliter / 2)
if fliter % 2 != 1:
kenerl_size = fliter + 1