-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpreprocessing_multi.py
136 lines (99 loc) · 4.42 KB
/
preprocessing_multi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# this is the same as preprocessing but arrays are of arbitrary length given by
# the maximum frames of every file.
import glob
import os
import numpy as np
from librosa import power_to_db, load
from librosa.feature import melspectrogram
from pywt import dwt2
sr = 44100 # resample all to 44.1kHz
window_size = 1024 # ~23[mseg] at 44.1kHz
def extract_features(signal, normalize, wavelet):
# handle less than 3 [seg]
L = sr*3 # Total length for samples ~3[seg]
signal_length = signal.shape[0]
if signal_length < L:
#pad by repeating signal
signal = np.pad(signal, (0, L-signal_length), mode='wrap')
# Calculate melspectrogram
melspec = melspectrogram(signal, sr=sr, center=False, #fmax = sr/2
hop_length=window_size, win_length=window_size,
n_mels=128) # shape:[bands, frames]
# Transform to log scale and transpose
melspec = power_to_db(melspec, ref=np.amax(melspec)).T # shape:[frames, bands]
if normalize:
melspec = (melspec - np.mean(melspec))/np.std(melspec)
# 2D Discrete Wavelet Transform
if wavelet != 0:
LL, (LH, HL, HH) = dwt2(melspec, wavelet)
melspec = np.stack([LL,LH,HL,HH],axis=-1) # shape: [frames, bands, 4]
else:
melspec = melspec[..., np.newaxis]
# Reshape
features = melspec # shape : [frames, bands, channels]
return features
def extract_fold(parent_dir, fold, frames, bands, channels, normalize, wavelet):
# Extract features from one fold
features = [] # shape : [samples, frames, bands, channels]
labels = []
for filename in glob.glob(parent_dir+"/"+fold+"/*.wav"):
# load signal
signal = load(filename, sr=sr)[0]
#extract features
features_yield = extract_features(signal, normalize, wavelet)
features.append(features_yield)
#extract label
labels_yield = int(filename.split('-')[-3]) # filenames: [fsID]-[classID]-[occurrenceID]-[sliceID].wav
labels.append(labels_yield)
return features, labels
def save_folds(data_dir, save_dir, frames=128, bands=128, channels=1, normalize=False, wavelet = 0):
# Preprocess all folds and save
assure_path_exists(save_dir)
for k in range(1,10+1):
fold_name = 'fold' + str(k)
print ("\nSaving " + fold_name)
features, labels = extract_fold(data_dir, fold_name, frames, bands, channels, normalize, wavelet)
print ("Number of samples in ", fold_name , ": ", len(labels))
feature_file = os.path.join(save_dir, fold_name + '_x.npy')
labels_file = os.path.join(save_dir, fold_name + '_y.npy')
np.save(feature_file, features, allow_pickle = True)
print ("Saved " + feature_file)
np.save(labels_file, labels, allow_pickle = True)
print ("Saved " + labels_file)
return
def load_folds(load_dir, validation_fold, bands=128, frames=128, channels=1):
#load all folds except the validation fold, and a random testing fold
train_x = []
train_y = []
# take out validation from training set
train_set = set(np.arange(1,10+1))-set([validation_fold])
# take one random fold from the remaining for testing
test_fold = np.random.choice(list(train_set),1)[0]
train_set = train_set-set([test_fold])
print("\n*** Train on", train_set,
"Validate on", validation_fold, "Test on", test_fold, "***")
for k in range(1,10+1):
fold_name = 'fold' + str(k)
feature_file = os.path.join(load_dir, fold_name + '_x.npy')
labels_file = os.path.join(load_dir, fold_name + '_y.npy')
loaded_features = np.load(feature_file, allow_pickle=True)
loaded_labels = np.load(labels_file, allow_pickle=True)
if k == validation_fold:
val_x = loaded_features
val_y = loaded_labels
elif k == test_fold:
test_x = loaded_features
test_y = loaded_labels
else:
train_x.extend(loaded_features)
train_y.extend(loaded_labels)
print("val len: ", len(val_y))
print("test len: ", len(test_y))
print("train len: ", len(train_y))
return train_x, test_x, val_x, train_y, test_y, val_y
def assure_path_exists(path):
# checks if path exists, if it dosen't it is created
mydir = os.path.join(os.getcwd(), path)
if not os.path.exists(mydir):
os.makedirs(mydir)
return