-
Notifications
You must be signed in to change notification settings - Fork 0
/
gru_tensorflow.py
232 lines (165 loc) · 6.62 KB
/
gru_tensorflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import tensorflow as tf
from sklearn import datasets
from sklearn.cross_validation import train_test_split
import sys
# # Vhanilla RNN class and functions
class RNN_cell(object):
"""
RNN cell object which takes 3 arguments for initialization.
input_size = Input Vector size
hidden_layer_size = Hidden layer size
target_size = Output vector size
"""
def __init__(self, input_size, hidden_layer_size, target_size):
# Initialization of given values
self.input_size = input_size
self.hidden_layer_size = hidden_layer_size
self.target_size = target_size
# Weights for input and hidden tensor
self.Wx = tf.Variable(
tf.zeros([self.input_size, self.hidden_layer_size]))
self.Wr = tf.Variable(
tf.zeros([self.input_size, self.hidden_layer_size]))
self.Wz = tf.Variable(
tf.zeros([self.input_size, self.hidden_layer_size]))
self.br = tf.Variable(tf.truncated_normal(
[self.hidden_layer_size], mean=1))
self.bz = tf.Variable(tf.truncated_normal(
[self.hidden_layer_size], mean=1))
self.Wh = tf.Variable(
tf.zeros([self.hidden_layer_size, self.hidden_layer_size]))
# Weights for output layer
self.Wo = tf.Variable(tf.truncated_normal(
[self.hidden_layer_size, self.target_size], mean=1, stddev=.01))
self.bo = tf.Variable(tf.truncated_normal(
[self.target_size], mean=1, stddev=.01))
# Placeholder for input vector with shape[batch, seq, embeddings]
self._inputs = tf.placeholder(tf.float32,
shape=[None, None, self.input_size],
name='inputs')
# Processing inputs to work with scan function
self.processed_input = process_batch_input_for_RNN(self._inputs)
'''
Initial hidden state's shape is [1,self.hidden_layer_size]
In First time stamp, we are doing dot product with weights to
get the shape of [batch_size, self.hidden_layer_size].
For this dot product tensorflow use broadcasting. But during
Back propagation a low level error occurs.
So to solve the problem it was needed to initialize initial
hiddden state of size [batch_size, self.hidden_layer_size].
So here is a little hack !!!! Getting the same shaped
initial hidden state of zeros.
'''
self.initial_hidden = self._inputs[:, 0, :]
self.initial_hidden = tf.matmul(
self.initial_hidden, tf.zeros([input_size, hidden_layer_size]))
# Function for GRU cell
def Gru(self, previous_hidden_state, x):
"""
GRU Equations
"""
z = tf.sigmoid(tf.matmul(x, self.Wz) + self.bz)
r = tf.sigmoid(tf.matmul(x, self.Wr) + self.br)
h_ = tf.tanh(tf.matmul(x, self.Wx) +
tf.matmul(previous_hidden_state, self.Wh) * r)
current_hidden_state = tf.multiply(
(1 - z), h_) + tf.multiply(previous_hidden_state, z)
return current_hidden_state
# Function for getting all hidden state.
def get_states(self):
"""
Iterates through time/ sequence to get all hidden state
"""
# Getting all hidden state throuh time
all_hidden_states = tf.scan(self.Gru,
self.processed_input,
initializer=self.initial_hidden,
name='states')
return all_hidden_states
# Function to get output from a hidden layer
def get_output(self, hidden_state):
"""
This function takes hidden state and returns output
"""
output = tf.nn.relu(tf.matmul(hidden_state, self.Wo) + self.bo)
return output
# Function for getting all output layers
def get_outputs(self):
"""
Iterating through hidden states to get outputs for all timestamp
"""
all_hidden_states = self.get_states()
all_outputs = tf.map_fn(self.get_output, all_hidden_states)
return all_outputs
# Function to convert batch input data to use scan ops of tensorflow.
def process_batch_input_for_RNN(batch_input):
"""
Process tensor of size [5,3,2] to [3,5,2]
"""
batch_input_ = tf.transpose(batch_input, perm=[2, 0, 1])
X = tf.transpose(batch_input_)
return X
"""
Example of using GRU
"""
# Initializing variables.
hidden_layer_size = 30
input_size = 8
target_size = 10
# Initializing placeholder
y = tf.placeholder(tf.float32, shape=[None, target_size], name='inputs')
# # Models
# Initializing rnn object
rnn = RNN_cell(input_size, hidden_layer_size, target_size)
# Getting all outputs from rnn
outputs = rnn.get_outputs()
# Getting final output through indexing after reversing
last_output = outputs[-1]
# As rnn model output the final layer through Relu activation softmax is
# used for final output.
output = tf.nn.softmax(last_output)
# Computing the Cross Entropy loss
cross_entropy = -tf.reduce_sum(y * tf.log(output))
# Trainning with Adadelta Optimizer
train_step = tf.train.AdamOptimizer().minimize(cross_entropy)
# Calculatio of correct prediction and accuracy
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(output, 1))
accuracy = (tf.reduce_mean(tf.cast(correct_prediction, tf.float32))) * 100
# # Dataset Preparation
# Function to get on hot
def get_on_hot(number):
on_hot = [0] * 10
on_hot[number] = 1
return on_hot
# Using Sklearn MNIST dataset.
digits = datasets.load_digits()
X = digits.images
Y_ = digits.target
Y = map(get_on_hot, Y_)
# Getting Train and test Dataset
X_train, X_test, y_train, y_test = train_test_split(
X, Y, test_size=0.22, random_state=42)
# Cuttting for simple iteration
X_train = X_train[:1400]
y_train = y_train[:1400]
sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
# Iterations to do trainning
for epoch in range(200):
start = 0
end = 100
for i in range(14):
X = X_train[start:end]
Y = y_train[start:end]
start = end
end = start + 100
sess.run(train_step, feed_dict={rnn._inputs: X, y: Y})
Loss = str(sess.run(cross_entropy, feed_dict={rnn._inputs: X, y: Y}))
Train_accuracy = str(sess.run(accuracy, feed_dict={
rnn._inputs: X_train, y: y_train}))
Test_accuracy = str(sess.run(accuracy, feed_dict={
rnn._inputs: X_test, y: y_test}))
sys.stdout.flush()
print("\rIteration: %s Loss: %s Train Accuracy: %s Test Accuracy: %s" %
(epoch, Loss, Train_accuracy, Test_accuracy)),
sys.stdout.flush()