-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
416 lines (335 loc) · 12.8 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
import tensorflow as tf
import numpy as np
import collections
import random
import os
import yaml
from PIL import Image
from tensorflow.keras.preprocessing.image import (
ImageDataGenerator,
array_to_img,
img_to_array,
load_img,
)
def get_configs(config_path: str):
"""
Get settings based on the contents of the specified config file.
Input:
config_path: String containing the config file path
Output:
configs: Dict of configs
"""
# Reads in the config file for future use
configs = {}
try:
with open(config_path, "r") as config_file:
configs = yaml.safe_load(config_file)
except Exception as e:
print(e)
return configs
def make_train_and_test_sets(
images_dir: str, train_fraction: float = 0.8, random_seed: int = 0
):
"""
Split the data into train and test sets and get the label classes.
Input:
images_dir: String containing the image directory file path
train_fraction: The percent (expressed as a decimal) of images that will be used for trainig
random_seed: Random seed to ensure we can replicate image split
Output:
train_examples: Array of training data images
test_examples: Array of testing data images
classes: Bristol scale rating of all images
"""
train_examples, test_examples = [], []
shuffler = random.Random(random_seed)
is_root = True
classes = None
for (dirname, subdirs, filenames) in tf.io.gfile.walk(images_dir):
# The root directory gives us the classes
if is_root:
subdirs = sorted(subdirs)
classes = collections.OrderedDict(enumerate(subdirs))
label_to_class = dict([(x, i) for i, x in enumerate(subdirs)])
is_root = False
# The sub directories give us the image files for training.
else:
filenames.sort()
shuffler.shuffle(filenames)
full_filenames = [os.path.join(dirname, f) for f in filenames]
_, label = os.path.split(dirname)
label_class = label_to_class[label]
# An example is the image file and it's label class.
examples = list(zip(full_filenames, [label_class] * len(filenames)))
num_train = int(len(filenames) * train_fraction)
train_examples.extend(examples[:num_train])
test_examples.extend(examples[num_train:])
shuffler.shuffle(train_examples)
shuffler.shuffle(test_examples)
return np.array(train_examples), np.array(test_examples), classes
def make_separate_train_test_sets(train_dir: str, test_dir: str, random_seed: int = 0):
"""
Splits data into separate training and testing sets.
Input:
train_dir: String containing the training folder file path
test_dir: String containing the testing folder file path
random_seed: Random seed to ensure we can replicate image split
Output:
train_data: Model training data
test_data: Model testing data
classes: Bristol scale rating of all images
"""
# Get training data
train_data, _, classes = make_train_and_test_sets(train_dir, 1.0, random_seed)
# Get test (validation) data
test_data, _, _ = make_train_and_test_sets(test_dir, 1.0, random_seed)
return train_data, test_data, classes
def get_train_validation_sets(configs: dict):
"""
Gets and uses the validation data if the config file specifies a validation directory.
Input:
configs: Dict containing config information
Output:
Either returns separate train and test sets or uses a training/test split
"""
# Creates train, test, and validation sets
if configs["use_validation_dir"]:
return make_separate_train_test_sets(
"augmented_data", configs["validation_data_dir"], configs["seed"]
)
# Creates train and test sets
return make_train_and_test_sets(
"augmented_data", configs["train_fraction"], configs["seed"]
)
def get_label(example):
"""
Get the label (number) for given example.
Input:
example: A piece of sample data (image and label class)
Output:
That exmaple's label class (Bristol scale rating)
"""
return example[1]
def get_encoded_image(image_path: str):
"""
Get the image data (encoded jpg) of given example.
Input:
image_path: A string path to image to be decoded
"""
return tf.io.gfile.GFile(image_path, "rb").read()
def decode_and_resize_images(train_data: list, test_data: list, classes: collections.OrderedDict, input_dimensions: list):
"""
Decode and resize the input images differently based on CNN used
Inputs:
train_data: List of training images in bytes
test_data: List of test images in bytes
classes: OrderedDict of classes used in this network
input_dimensions: List of size 2 denoting the dimensions of input images to the network
"""
# Convert the training data
train_images, train_labels = get_images_and_labels(train_data, len(classes))
size = tf.convert_to_tensor(
np.full((len(train_images), 2), input_dimensions, dtype=(int, int)),
dtype=tf.int32,
)
train_images = tf.map_fn(
decode_and_resize_image,
(tf.convert_to_tensor(train_images), size),
dtype=tf.float32,
)
# Convert the testing data
test_images, test_labels = get_images_and_labels(test_data, len(classes))
size = tf.convert_to_tensor(
np.full((len(test_images), 2), input_dimensions, dtype=(int, int)),
dtype=tf.int32,
)
test_images = tf.map_fn(
decode_and_resize_image,
(tf.convert_to_tensor(test_images), size),
dtype=tf.float32,
)
return train_images, np.array(train_labels), test_images, test_labels
def decode_and_resize_image(args):
"""
Decodes images from bytes and crops images in the center
Inputs to this function are weird due to use of tf.map_fn in train.py
Input:
args: tuple of (bytes string of image data, tensor of crop dimensions)
Output:
Decoded and cropped image
"""
# Parse args
encoded = args[0]
image_dimensions = args[1].numpy()
# Decode image
decoded = tf.image.decode_jpeg(encoded, channels=3)
decoded = tf.image.convert_image_dtype(decoded, tf.float32)
# Return cropped image
return tf.image.resize_with_crop_or_pad(
decoded, image_dimensions[0], image_dimensions[1]
)
def get_batch(images: np.ndarray, labels: np.ndarray, batch_size=None):
"""
Get a random batch of examples.
Input:
images: Array of images
labels: Array of labels corresponding to images
batch_size: The number of images that will be in the batch
Output:
Images and corresponding labels in the batch
"""
# If there is batch size, create the batch
if batch_size:
idx = np.random.choice(images.shape[0], batch_size, replace=False)
return images[idx], labels[idx]
return images, labels
def get_images_and_labels(batch_examples, num_classes):
"""
Get the images and corresponding labels for use.
Input:
batch_examples: Batch of examples we want images and labels of
num_classes: Number of available classes to label
Output:
Images and one hot labels from that batch.
"""
# Creates arrays of images and labels for those images
images = [get_encoded_image(e[0]) for e in batch_examples]
classes = []
for i in range(num_classes):
classes.append(i + 1)
one_hot_labels = [get_label_one_hot(e, num_classes) for e in batch_examples]
return images, one_hot_labels
def get_label_one_hot(example, num_classes):
"""
Get the one hot encoding vector for the example.
Input:
example: A single image and label pair
num_classes: Number of classes available to label
Output:
Vector containing one hot label for the example
"""
# Creates one hot encoding for label classes
one_hot_vector = np.zeros(num_classes, dtype=int)
np.put(one_hot_vector, get_label(example), 1)
return one_hot_vector
def separate_train_by_labels(train_data: np.ndarray, classes: collections.OrderedDict):
"""
Separate training data by Bristol scale rating.
Input:
train_data: Array of training examples
classes: Available classes that we are labelling
Output:
Training data separated by class (Bristol scale rating)
"""
# Creates dict so we can store training data by label
train_data_by_label = {key: list() for key in classes.values()}
from itertools import zip_longest
# Divides training data by label
for label, num in zip_longest(classes.values(), classes.keys()):
train_data_by_label[label] = train_data[np.where(train_data[:, 1] == str(num))]
return train_data_by_label
def augment_image(img_path: str, save_to: str, img_num=200):
"""
Augment a single image using a Keras Image Data Generator.
Input:
img_path: String containing image path
save_to: Directory to save image
img_num: Number of augmented images to produce based on that image
Output:
No output, but the augmented images will be saved to save_to
"""
# Basic image data generator from https://blog.keras.io/building-powerful-image-classification-models-using-very-little-data.html
# Parameters for transformations
datagen = ImageDataGenerator(
rotation_range=360, # degree range for random rotations
width_shift_range=0.2, # fraction of total width that we can shift the image
height_shift_range=0.2, # fraction of total height that we can shift the image
rescale=1.0 / 255,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True,
vertical_flip=True,
fill_mode="nearest",
)
# Load the images for the Keras generator
img = load_img(img_path)
x = img_to_array(img)
x = x.reshape((1,) + x.shape)
# Create the directory we will save to
if os.path.isdir(save_to) == False:
os.mkdir(save_to)
# Augment the image img_num times and save as a .jpg
i = 0
for batch in datagen.flow(
x,
batch_size=1,
save_to_dir=save_to,
save_prefix="aug",
save_format="jpg",
):
i += 1
if i > img_num:
break
def augment_all_images(root_dir="training_data", new_dir="augmented_data"):
"""
Recursively goes through the directory containing the images and augments all images.
Saves images to the specified sub_dir.
Input:
root_dir: Starting directory containing subdirectories based on labels.
Each subdirectory should then contain images.
sub_dir: Directory to save augmented images
Output:
No output, but the augmented images will be saved to sub_dir in each class' directory
"""
branch_dirs = []
# Find all directories we will be working with
for root_dir, dirs, files in os.walk(root_dir, topdown=False):
for name in dirs:
branch_dirs.append(name)
# Create the new directory for augmented images as needed
if os.path.isdir(new_dir) == False:
os.mkdir(new_dir)
# Augment all images using augment_image()
for directory in branch_dirs:
for filename in os.listdir(os.path.join(root_dir, directory)):
# If the image is augmented somehow already, ignore it
if "aug" in filename:
continue
if filename.endswith(".jpg") or filename.endswith(".jpeg"):
augment_image(
os.path.join(os.path.join(root_dir, directory), filename),
os.path.join(new_dir, directory),
)
else:
# Removes all non-JPG, non-JPEG images
os.remove(os.path.join(os.path.join(root_dir, directory), filename))
def downsample_train_data(train_data: np.ndarray, classes: collections.OrderedDict):
"""
Downsamples the training data.
"""
data = separate_train_by_labels(np.array(train_data), classes)
train = []
for label in data.keys():
try:
train = np.vstack((train, data[label][:9]))
except:
train = data[label][:9]
shuffler = random.Random()
shuffler.shuffle(train)
return train
def crop_image_from_mask(args):
"""
Crop an image based on the image mask.
Input:
args: A tuple of (2D numpy array of image data, 2D numpy array of mask data)
Output:
image: Numpy array of image with areas outside the mask cropped out.
"""
# Parse args
image = args[0]
mask = args[1]
# Get indices where mask is not 1
indices = np.where(mask!=1)[:2]
# Cut out data in image at indexes not containing mask
image[indices] = [0, 0, 0]
return image