-
-
Notifications
You must be signed in to change notification settings - Fork 43
/
time_series_classification.py
170 lines (145 loc) · 6.88 KB
/
time_series_classification.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
######################
# So you want to train a Neural CDE model?
# Let's get started!
######################
import math
import torch
import torchcde
######################
# A CDE model looks like
#
# z_t = z_0 + \int_0^t f_\theta(z_s) dX_s
#
# Where X is your data and f_\theta is a neural network. So the first thing we need to do is define such an f_\theta.
# That's what this CDEFunc class does.
# Here we've built a small single-hidden-layer neural network, whose hidden layer is of width 128.
######################
class CDEFunc(torch.nn.Module):
def __init__(self, input_channels, hidden_channels):
######################
# input_channels is the number of input channels in the data X. (Determined by the data.)
# hidden_channels is the number of channels for z_t. (Determined by you!)
######################
super(CDEFunc, self).__init__()
self.input_channels = input_channels
self.hidden_channels = hidden_channels
self.linear1 = torch.nn.Linear(hidden_channels, 128)
self.linear2 = torch.nn.Linear(128, input_channels * hidden_channels)
######################
# For most purposes the t argument can probably be ignored; unless you want your CDE to behave differently at
# different times, which would be unusual. But it's there if you need it!
######################
def forward(self, t, z):
# z has shape (batch, hidden_channels)
z = self.linear1(z)
z = z.relu()
z = self.linear2(z)
######################
# Easy-to-forget gotcha: Best results tend to be obtained by adding a final tanh nonlinearity.
######################
z = z.tanh()
######################
# Ignoring the batch dimension, the shape of the output tensor must be a matrix,
# because we need it to represent a linear map from R^input_channels to R^hidden_channels.
######################
z = z.view(z.size(0), self.hidden_channels, self.input_channels)
return z
######################
# Next, we need to package CDEFunc up into a model that computes the integral.
######################
class NeuralCDE(torch.nn.Module):
def __init__(self, input_channels, hidden_channels, output_channels, interpolation="cubic"):
super(NeuralCDE, self).__init__()
self.func = CDEFunc(input_channels, hidden_channels)
self.initial = torch.nn.Linear(input_channels, hidden_channels)
self.readout = torch.nn.Linear(hidden_channels, output_channels)
self.interpolation = interpolation
def forward(self, coeffs):
if self.interpolation == 'cubic':
X = torchcde.CubicSpline(coeffs)
elif self.interpolation == 'linear':
X = torchcde.LinearInterpolation(coeffs)
else:
raise ValueError("Only 'linear' and 'cubic' interpolation methods are implemented.")
######################
# Easy to forget gotcha: Initial hidden state should be a function of the first observation.
######################
X0 = X.evaluate(X.interval[0])
z0 = self.initial(X0)
######################
# Actually solve the CDE.
######################
z_T = torchcde.cdeint(X=X,
z0=z0,
func=self.func,
t=X.interval)
######################
# Both the initial value and the terminal value are returned from cdeint; extract just the terminal value,
# and then apply a linear map.
######################
z_T = z_T[:, 1]
pred_y = self.readout(z_T)
return pred_y
######################
# Now we need some data.
# Here we have a simple example which generates some spirals, some going clockwise, some going anticlockwise.
######################
def get_data(num_timepoints=100):
t = torch.linspace(0., 4 * math.pi, num_timepoints)
start = torch.rand(128) * 2 * math.pi
x_pos = torch.cos(start.unsqueeze(1) + t.unsqueeze(0)) / (1 + 0.5 * t)
x_pos[:64] *= -1
y_pos = torch.sin(start.unsqueeze(1) + t.unsqueeze(0)) / (1 + 0.5 * t)
x_pos += 0.01 * torch.randn_like(x_pos)
y_pos += 0.01 * torch.randn_like(y_pos)
######################
# Easy to forget gotcha: time should be included as a channel; Neural CDEs need to be explicitly told the
# rate at which time passes. Here, we have a regularly sampled dataset, so appending time is pretty simple.
######################
X = torch.stack([t.unsqueeze(0).repeat(128, 1), x_pos, y_pos], dim=2)
y = torch.zeros(128)
y[:64] = 1
perm = torch.randperm(128)
X = X[perm]
y = y[perm]
######################
# X is a tensor of observations, of shape (batch=128, sequence=100, channels=3)
# y is a tensor of labels, of shape (batch=128,), either 0 or 1 corresponding to anticlockwise or clockwise
# respectively.
######################
return X, y
def main(num_epochs=30):
train_X, train_y = get_data()
######################
# input_channels=3 because we have both the horizontal and vertical position of a point in the spiral, and time.
# hidden_channels=8 is the number of hidden channels for the evolving z_t, which we get to choose.
# output_channels=1 because we're doing binary classification.
######################
model = NeuralCDE(input_channels=3, hidden_channels=8, output_channels=1)
optimizer = torch.optim.Adam(model.parameters())
######################
# Now we turn our dataset into a continuous path. We do this here via Hermite cubic spline interpolation.
# The resulting `train_coeffs` is a tensor describing the path.
# For most problems, it's probably easiest to save this tensor and treat it as the dataset.
######################
train_coeffs = torchcde.hermite_cubic_coefficients_with_backward_differences(train_X)
train_dataset = torch.utils.data.TensorDataset(train_coeffs, train_y)
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=32)
for epoch in range(num_epochs):
for batch in train_dataloader:
batch_coeffs, batch_y = batch
pred_y = model(batch_coeffs).squeeze(-1)
loss = torch.nn.functional.binary_cross_entropy_with_logits(pred_y, batch_y)
loss.backward()
optimizer.step()
optimizer.zero_grad()
print('Epoch: {} Training loss: {}'.format(epoch, loss.item()))
test_X, test_y = get_data()
test_coeffs = torchcde.hermite_cubic_coefficients_with_backward_differences(test_X)
pred_y = model(test_coeffs).squeeze(-1)
binary_prediction = (torch.sigmoid(pred_y) > 0.5).to(test_y.dtype)
prediction_matches = (binary_prediction == test_y).to(test_y.dtype)
proportion_correct = prediction_matches.sum() / test_y.size(0)
print('Test Accuracy: {}'.format(proportion_correct))
if __name__ == '__main__':
main()