diff --git a/streamer/streamer.py b/streamer/streamer.py index 1fa266f..b01a862 100644 --- a/streamer/streamer.py +++ b/streamer/streamer.py @@ -75,6 +75,8 @@ import subprocess import tempfile import time +import sys +import numpy as np from concurrent.futures import ProcessPoolExecutor, as_completed from datetime import datetime, timedelta from os.path import join as pjoin, basename, dirname, exists @@ -205,7 +207,15 @@ def make_out_prefix(self, input_fname, dest_dir): abs_fname = basename(input_fname) r = re.compile(r"(?<=_)[-\d]+") indices = r.findall(abs_fname) - x_index, y_index, datetime = indices[-3:] + r = re.compile(r"\{\w+\}") + LOG.debug("src template %s", self.src_template) + key_indices = r.findall(self.src_template) + LOG.debug("indices from file name %s", indices) + if len(key_indices) > 3: + x_index, y_index, datetime = indices[-len(key_indices):-(len(key_indices) - 3)] + else: + x_index, y_index, datetime = indices[-len(key_indices):] + time_dict = {} year = re.search(r"\d{4}", datetime) month = re.search(r'(?<=\d{4})\d{2}', datetime) @@ -267,8 +277,13 @@ def _dataset_to_yaml(self, prefix, dataset_array: xarray.DataArray, rastercount) # Update band urls for key, value in dataset['image']['bands'].items(): - value['layer'] = '1' - value['path'] = prefix + '_' + key + '.tif' + if rastercount == 1: + tif_path = basename(prefix + '_' + key + '.tif') + else: + tif_path = basename(prefix + '_' + key + '_' + str(i+1) + '.tif') + + value['layer'] = str(i+1) + value['path'] = tif_path dataset['format'] = {'name': 'GeoTIFF'} dataset['lineage'] = {'source_datasets': {}} @@ -418,6 +433,7 @@ def generate_work_list(product_name, year, month): print(item) +from mpi4py import MPI @cli.command() @click.option('--config', '-c', help='Config file') @click.option('--output-dir', help='Output directory', required=True) @@ -439,8 +455,70 @@ def convert_cog(config, output_dir, product, filenames): product_config = cfg['products'][product] cog_convert = COGNetCDF(**product_config) - for filename in filenames: + + comm = MPI.Comm.Get_parent() + try: + size = comm.Get_size() + rank = comm.Get_rank() + LOG.debug("rank is %d, size is %d", rank, size) + except: + LOG.info("Run with single process") + for filename in filenames: + cog_convert(filename, output_dir) + else: + filename = filenames[rank] + LOG.debug("file name %s", filename) cog_convert(filename, output_dir) + comm.Barrier() + comm.Disconnect() + + +from time import sleep +@cli.command() +@click.option('--config', '-c', help='Config file') +@click.option('--output-dir', help='Output directory', required=True) +@click.option('--product', help='Product name', required=True) +@click.option('--numprocs', type=int, help='Number of processes', required=True, default=1) +@click.option('--cog-path', help='cog convert script path', required=True, default='../COG-Conversion/streamer/streamer.py') +@click.argument('filelist', nargs=1, required=True) +def mpi_convert_cog(config, output_dir, product, numprocs, cog_path, filelist): + comdLine = [cog_path] + ['convert_cog', '-c'] + [config] + ['--output-dir'] + [output_dir] + ['--product' ] + [product] + args = comdLine + with open(filelist, 'r') as fb: + file_list = np.genfromtxt(fb, dtype='str') + file_batch = int(len(file_list) / numprocs) + file_odd = len(file_list) % numprocs + LOG.debug("file_batch %d", file_batch) + LOG.debug("file_odd %d", file_odd) + for i in range(file_batch): + margs = args + list(file_list[i*numprocs:(i+1)*numprocs]) + while True: + try: + comm = MPI.COMM_SELF.Spawn(sys.executable, + args=margs, + maxprocs=numprocs) + + except: + sleep(1) + else: + break + comm.Barrier() + comm.Disconnect() + if file_odd > 0: + numprocs = file_odd + margs = args + list(file_list[-file_odd:]) + while True: + try: + comm = MPI.COMM_SELF.Spawn(sys.executable, + args=margs, + maxprocs=numprocs) + except: + sleep(1) + else: + break + comm.Barrier() + comm.Disconnect() + if __name__ == '__main__': LOG = logging.getLogger(__name__)