Skip to content

Commit

Permalink
parallel with mpi
Browse files Browse the repository at this point in the history
  • Loading branch information
Emma Ai committed Oct 23, 2018
1 parent dcef56c commit c25f929
Showing 1 changed file with 65 additions and 4 deletions.
69 changes: 65 additions & 4 deletions streamer/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
import subprocess
import tempfile
import time
import sys
import numpy as np
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime, timedelta
from os.path import join as pjoin, basename, dirname, exists
Expand Down Expand Up @@ -205,7 +207,15 @@ def make_out_prefix(self, input_fname, dest_dir):
abs_fname = basename(input_fname)
r = re.compile(r"(?<=_)[-\d]+")
indices = r.findall(abs_fname)
x_index, y_index, datetime = indices[-3:]
r = re.compile(r"\{\w+\}")
LOG.debug("src template %s", self.src_template)
key_indices = r.findall(self.src_template)
LOG.debug("indices from file name %s", indices)
if len(key_indices) > 3:
x_index, y_index, datetime = indices[-len(key_indices):-(len(key_indices) - 3)]
else:
x_index, y_index, datetime = indices[-len(key_indices):]

time_dict = {}
year = re.search(r"\d{4}", datetime)
month = re.search(r'(?<=\d{4})\d{2}', datetime)
Expand Down Expand Up @@ -267,8 +277,13 @@ def _dataset_to_yaml(self, prefix, dataset_array: xarray.DataArray, rastercount)

# Update band urls
for key, value in dataset['image']['bands'].items():
value['layer'] = '1'
value['path'] = prefix + '_' + key + '.tif'
if rastercount == 1:
tif_path = basename(prefix + '_' + key + '.tif')
else:
tif_path = basename(prefix + '_' + key + '_' + str(i+1) + '.tif')

value['layer'] = str(i+1)
value['path'] = tif_path

dataset['format'] = {'name': 'GeoTIFF'}
dataset['lineage'] = {'source_datasets': {}}
Expand Down Expand Up @@ -418,6 +433,7 @@ def generate_work_list(product_name, year, month):
print(item)


from mpi4py import MPI
@cli.command()
@click.option('--config', '-c', help='Config file')
@click.option('--output-dir', help='Output directory', required=True)
Expand All @@ -439,8 +455,53 @@ def convert_cog(config, output_dir, product, filenames):
product_config = cfg['products'][product]

cog_convert = COGNetCDF(**product_config)
for filename in filenames:

comm = MPI.Comm.Get_parent()
try:
size = comm.Get_size()
rank = comm.Get_rank()
LOG.debug("rank is %d, size is %d", rank, size)
except:
LOG.info("Run with single process")
for filename in filenames:
cog_convert(filename, output_dir)
else:
filename = filenames[rank]
LOG.debug("file name %s", filename)
cog_convert(filename, output_dir)
comm.Disconnect()

@cli.command()
@click.option('--config', '-c', help='Config file')
@click.option('--output-dir', help='Output directory', required=True)
@click.option('--product', help='Product name', required=True)
@click.option('--numprocs', type=int, help='Number of processes', required=True, default=1)
@click.option('--cog-path', help='cog convert script path', required=True, default='../COG-Conversion/streamer/streamer.py')
@click.argument('filelist', nargs=1, required=True)
def mpi_convert_cog(config, output_dir, product, numprocs, cog_path, filelist):
comdLine = [cog_path] + ['convert_cog', '-c'] + [config] + ['--output-dir'] + [output_dir] + ['--product' ] + [product]
args = comdLine
with open(filelist, 'r') as fb:
file_list = np.genfromtxt(fb, dtype='str')
file_batch = int(len(file_list) / numprocs)
file_odd = len(file_list) % numprocs
LOG.debug("file_batch %d", file_batch)
LOG.debug("file_odd %d", file_odd)
for i in range(file_batch):
margs = args + list(file_list[i*numprocs:(i+1)*numprocs])
comm = MPI.COMM_SELF.Spawn(sys.executable,
args=margs,
maxprocs=numprocs)

comm.Disconnect()
if file_odd > 0:
numprocs = file_odd
margs = args + list(file_list[-file_odd:])
comm = MPI.COMM_SELF.Spawn(sys.executable,
args=margs,
maxprocs=numprocs)

comm.Disconnect()

if __name__ == '__main__':
LOG = logging.getLogger(__name__)
Expand Down

0 comments on commit c25f929

Please sign in to comment.