Skip to content

Commit

Permalink
parallelizing hdf5 writes, adding crash support and compression
Browse files Browse the repository at this point in the history
  • Loading branch information
MosNicholas committed Apr 27, 2016
1 parent c629ae9 commit 359a91c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 25 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
*.pem
*.jpg
*.npz
*.txt
*.h5
frames_train*/
frames_test*/
labels_train*/
Expand Down
76 changes: 51 additions & 25 deletions create_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,57 @@ def load_data_into_lmdb(data_source_folder, target_folder):
frame_paths, offset_start, offset_end, i
)).start()

def load_data_hdf5_process(frame_paths, target_folder, shape, offsets, offset_start, offset_end, saved_files, pnum):
stacked = np.zeros((shape[0], shape[1], 20))
prev_stacked = np.zeros((shape[0], shape[1], 9))

for i in xrange(9):
prev_stacked[:, :, i] = greyscale(imread(frame_paths % (i + offset_start)))
prev_stacked[:, :, i] = prev_stacked[:, :, i]/prev_stacked[:, :, i].max()

for split in xrange(offset_start, offset_end):
if split in saved_files: continue
shifted = split - offset_start
stacked[:, :, 10:] = 0
stacked[:, :, :9] = prev_stacked
stacked[:, :, 9] = greyscale(imread(frame_paths % (split + 9)))
stacked[:, :, 9] = stacked[:, :, 9]/stacked[:, :, 9].max()
prev_stacked = stacked[:, :, 1:10]
stacked[:, :, 10:] = stacked[:, :, offsets[shifted] - 1:9 + offsets[shifted]]

outfile = target_folder % split

with h5py.File(outfile, 'w') as f:
f.create_dataset(
'left', data=stacked[:, :, :10],
compression='gzip', compression_opts=1
)
f.create_dataset(
'right', data=stacked[:, :, 10:],
compression='gzip', compression_opts=1
)
label = np.zeros((1, 1, 1, 1))
label[0, 0, 0, 0] = offsets[shifted]
f.create_dataset(
'label', data=label,
compression='gzip', compression_opts=1
)

if shifted % 500 == 0:
print '%s splits processed on process %d' % (shifted, pnum)

def load_data_into_hdf5(data_source_folder, target_folder):
offset_file_path = os.path.join(data_source_folder, 'offsets.npz')
if not os.path.isdir(target_folder): os.makedirs(target_folder)
train_test_file = os.path.join(target_folder, 'train_test_indices.npz')
training_examples = os.path.join(target_folder, 'training_examples.txt')
test_examples = os.path.join(target_folder, 'test_examples.txt')
h5_titles = 'frame-%06d.h5'
h5_output_dir = os.path.join(target_folder, h5_titles)

offsets = np.load(offset_file_path)['offsets']
num_splits = len(offsets)
saved_files = set(map(lambda x: int(x.split('.')[0].split('-')[1]) if x.endswith('.h5') else 0, os.listdir(target_folder)))

if os.path.isfile(train_test_file):
train = np.load(train_test_file)['train']
Expand All @@ -221,31 +262,16 @@ def load_data_into_hdf5(data_source_folder, target_folder):
frame_paths = os.path.join(data_source_folder, video_title)
frame_shape = imread(frame_paths % 1).shape

stacked = np.zeros((frame_shape[0], frame_shape[1], 20))
prev_stacked = np.zeros((frame_shape[0], frame_shape[1], 9))

for i in xrange(9):
prev_stacked[:, :, i] = greyscale(imread(frame_paths % i))
prev_stacked[:, :, i] = prev_stacked[:, :, i]/prev_stacked[:, :, i].max()

for split in xrange(num_splits):
stacked[:, :, 10:] = 0
stacked[:, :, :9] = prev_stacked
stacked[:, :, 9] = greyscale(imread(frame_paths % (split + 9)))
stacked[:, :, 9] = stacked[:, :, 9]/stacked[:, :, 9].max()
prev_stacked = stacked[:, :, 1:10]
stacked[:, :, 10:] = stacked[:, :, offsets[split] - 1:9 + offsets[split]]

outfile = os.path.join(target_folder, h5_titles % split)

with h5py.File(outfile, 'w') as f:
f['left'] = stacked[:, :, :10]
f['right'] = stacked[:, :, 10:]
f['label'] = np.zeros((1, 1, 1, 1))
f['label'][0, 0, 0, 0] = offsets[split]

if split % 500 == 0:
print '%s splits processed' % (split)
num_cpus = multiprocessing.cpu_count()
indices_per_cpu = num_splits / num_cpus
for i in xrange(num_cpus):
offset_start = i * indices_per_cpu
offset_end = num_splits if i == num_cpus - 1 else (i + 1) * indices_per_cpu
multiprocessing.Process(target=load_data_hdf5_process, args=(
frame_paths, h5_output_dir, frame_shape,
offsets[offset_start:offset_end], offset_start, offset_end,
saved_files, i
)).start()

def download_trump():
return download_raw_youtube_video(TRUMP_ID, args.target_folder, 'china.mp4', True)
Expand Down

0 comments on commit 359a91c

Please sign in to comment.