-
Notifications
You must be signed in to change notification settings - Fork 8
/
lstm.py
114 lines (91 loc) · 3.68 KB
/
lstm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import numpy as np
from keras.layers.core import Dense, Activation, Dropout
from keras.layers.recurrent import LSTM
from keras.models import Sequential
def augment_data(X, y, duplication_ratio):
"""
See Data Augmentation section at
http://simaaron.github.io/Estimating-rainfall-from-weather-radar-readings
-using-recurrent-neural-networks/
:param X: Each row is a training sequence
:param y: The target we train and will later predict
:param duplication_ratio: (float) the percentage of data to duplicate.
:return X, y: augmented data
"""
nb_duplicates = duplication_ratio * len(X)
print('nb_duplicates:', nb_duplicates)
X_hat = []
y_hat = []
for i in range(0, len(X)):
for j in range(0, np.random.random_integers(0, nb_duplicates)):
X_hat.append(X[i, :])
y_hat.append(y[i])
return np.asarray(X_hat), np.asarray(y_hat)
def z_norm(result):
result_mean = result.mean()
result_std = result.std()
result -= result_mean
result /= result_std
return result, result_mean
def create_model(sequence_length, layers):
model = Sequential()
model.add(LSTM(units=layers['hidden1'],
input_shape=(sequence_length - 1, layers['input']),
return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(units=layers['hidden2'], return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(units=layers['hidden3'], return_sequences=False))
model.add(Dropout(0.2))
model.add(Dense(units=layers['output']))
model.add(Activation("linear"))
model.compile(loss="mse", optimizer="rmsprop")
return model
def create_train_and_test(data, sequence_length, split_index,
duplication_ratio, normalize=True):
"""
:param data: (array)
Data to convert. The last value is the label.
:param sequence_length: (int)
Length of the sequence.
:param split_index: (int)
Train / test split index.
:param duplication_ratio: (float)
Data duplication percentage for the data augmentation step.
:param normalize: (bool)
Whether to normalize the input data. Default is True.
:return X_train, y_train, X_test, y_test: (tuple of arrays)
Train set with targets and test set with targets.
"""
nb_records = len(data)
print("Total number of records:", nb_records)
print("Creating train data...")
result = []
for index in range(split_index - sequence_length):
result.append(data[index: index + sequence_length])
result = np.array(result) # shape = (samples, sequence_length)
if normalize:
result, result_mean = z_norm(result)
print("Mean of train data :", result_mean)
print("Train data shape :", result.shape)
train = result[:split_index, :]
np.random.shuffle(train)
X_train = train[:, :-1]
y_train = train[:, -1]
X_train, y_train = augment_data(X_train, y_train, duplication_ratio)
print("Creating test data...")
result = []
for index in range(split_index, nb_records - sequence_length):
result.append(data[index: index + sequence_length])
result = np.array(result) # shape = (samples, sequence_length)
if normalize:
result, result_mean = z_norm(result)
print("Mean of test data : ", result_mean)
print("Test data shape : ", result.shape)
X_test = result[:, :-1]
y_test = result[:, -1]
print("Shape X_train", np.shape(X_train))
print("Shape X_test", np.shape(X_test))
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))
return X_train, y_train, X_test, y_test