diff --git a/streamer/cogeo.py b/streamer/cogeo.py new file mode 100644 index 0000000..1534de5 --- /dev/null +++ b/streamer/cogeo.py @@ -0,0 +1,98 @@ +"""rio_cogeo.cogeo: translate a file to a cloud optimized geotiff.""" + +import sys + +import click + +import numpy + +import rasterio +from rasterio.io import MemoryFile +from rasterio.enums import Resampling +from rasterio.shutil import copy + + +def cog_translate( + src_path, + dst_path, + dst_kwargs, + indexes=None, + nodata=None, + alpha=None, + overview_level=5, + overview_resampling=None, + config=None, +): + """ + Create Cloud Optimized Geotiff. + + Parameters + ---------- + src_path : str or PathLike object + A dataset path or URL. Will be opened in "r" mode. + dst_path : str or Path-like object + An output dataset path or or PathLike object. + Will be opened in "w" mode. + dst_kwargs: dict + output dataset creation options. + indexes : tuple, int, optional + Raster band indexes to copy. + nodata, int, optional + nodata value for mask creation. + alpha, int, optional + alpha band index for mask creation. + overview_level : int, optional (default: 6) + COGEO overview (decimation) level + config : dict + Rasterio Env options. + + """ + config = config or {} + + with rasterio.Env(**config): + with rasterio.open(src_path) as src: + + indexes = indexes if indexes else src.indexes + meta = src.meta + meta["count"] = len(indexes) + meta.pop("nodata", None) + meta.pop("alpha", None) + + meta.update(**dst_kwargs) + meta.pop("compress", None) + meta.pop("photometric", None) + + with MemoryFile() as memfile: + with memfile.open(**meta) as mem: + wind = list(mem.block_windows(1)) + with click.progressbar( + wind, length=len(wind), file=sys.stderr, show_percent=True + ) as windows: + for ij, w in windows: + matrix = src.read(window=w, indexes=indexes) + mem.write(matrix, window=w) + + if nodata is not None: + mask_value = ( + numpy.all(matrix != nodata, axis=0).astype( + numpy.uint8 + ) + * 255 + ) + elif alpha is not None: + mask_value = src.read(alpha, window=w) + else: + mask_value = None + #mask_value = src.dataset_mask(window=w) + if mask_value is not None: + mem.write_mask(mask_value, window=w) + + if overview_resampling is not None: + overviews = [2 ** j for j in range(1, overview_level + 1)] + + mem.build_overviews(overviews, Resampling[overview_resampling]) + mem.update_tags( + OVR_RESAMPLING_ALG=Resampling[overview_resampling].name.upper() + ) + + copy(mem, dst_path, copy_src_overviews=True, **dst_kwargs) diff --git a/streamer/mpi_wofls_convert.sh b/streamer/mpi_wofls_convert.sh new file mode 100755 index 0000000..51f6a89 --- /dev/null +++ b/streamer/mpi_wofls_convert.sh @@ -0,0 +1,48 @@ +#!/bin/bash +JOBDIR=$PWD +PRDDIR=/g/data/v10/users/ea6141/wofls_cog +SRCDIR=/g/data/fk4/datacube/002/WOfS/WOfS_25_2_1/ +COGS=/home/547/ea6141/datafile/COG-Conversion/streamer/streamer.py +FILEL=wofs_list +FILEN=wofs_empty_list +NCPUS=128 +MEM=512GB +JOBFS=32GB + +i=1 +j=1 +find $SRCDIR -name "*.nc" | \ + while read file + do + SIZE=$(stat -c%s $file) + if [ $((SIZE)) -eq 0 ] + then + echo $file >> $FILEN + else + echo $file >> $FILEL$((j)) + i=$((i+1)) + #echo $((i)) + fi + if [ $((i)) -gt 100000 ] + then + i=1 + j=$((j+1)) + fi + done + +j=1 +f_j=$(qsub -P v10 -q normal -l walltime=48:00:00,mem=$MEM,jobfs=$JOBFS,ncpus=$NCPUS,wd -- \ + bash -l -c "\ + source $HOME/.bashrc; cd $JOBDIR;\ + mpirun --oversubscribe -n 1 python3 $COGS mpi_convert_cog -c cog_fcp.yaml --output-dir $PRDDIR --product wofls --numprocs $((NCPUS-1)) --cog-path $COGS $FILEL$((j))") + +j=2 +while + n_j=$(qsub -W depend=afterany:$f_j -P v10 -q normal -l walltime=48:00:00,mem=$MEM,jobfs=$JOBFS,ncpus=$NCPUS,wd -- \ + bash -l -c "\ + source $HOME/.bashrc; cd $JOBDIR;\ + mpirun --oversubscribe -n 1 python3 $COGS mpi_convert_cog -c cog_fcp.yaml --output-dir $PRDDIR --product wofls --numprocs $((NCPUS-1)) --cog-path $COGS $FILEL$((j))") + f_j=$n_j + j=$((j+1)) + [ -s $FILEL$((j)) ] +do :; done diff --git a/streamer/streamer.py b/streamer/streamer.py index 1fa266f..b420c83 100644 --- a/streamer/streamer.py +++ b/streamer/streamer.py @@ -75,6 +75,8 @@ import subprocess import tempfile import time +import sys +import numpy as np from concurrent.futures import ProcessPoolExecutor, as_completed from datetime import datetime, timedelta from os.path import join as pjoin, basename, dirname, exists @@ -94,6 +96,7 @@ from parse import * from parse import compile +from cogeo import * import re @@ -205,7 +208,15 @@ def make_out_prefix(self, input_fname, dest_dir): abs_fname = basename(input_fname) r = re.compile(r"(?<=_)[-\d]+") indices = r.findall(abs_fname) - x_index, y_index, datetime = indices[-3:] + r = re.compile(r"\{\w+\}") + LOG.debug("src template %s", self.src_template) + key_indices = r.findall(self.src_template) + LOG.debug("indices from file name %s", indices) + if len(key_indices) > 3: + x_index, y_index, datetime = indices[-len(key_indices):-(len(key_indices) - 3)] + else: + x_index, y_index, datetime = indices[-len(key_indices):] + time_dict = {} year = re.search(r"\d{4}", datetime) month = re.search(r'(?<=\d{4})\d{2}', datetime) @@ -267,8 +278,13 @@ def _dataset_to_yaml(self, prefix, dataset_array: xarray.DataArray, rastercount) # Update band urls for key, value in dataset['image']['bands'].items(): - value['layer'] = '1' - value['path'] = prefix + '_' + key + '.tif' + if rastercount == 1: + tif_path = basename(prefix + '_' + key + '.tif') + else: + tif_path = basename(prefix + '_' + key + '_' + str(i+1) + '.tif') + + value['layer'] = str(i+1) + value['path'] = tif_path dataset['format'] = {'name': 'GeoTIFF'} dataset['lineage'] = {'source_datasets': {}} @@ -291,81 +307,57 @@ def _dataset_to_cog(self, prefix, subdatasets): if self.nonpym_list is not None: re_nonpym = "|".join(self.nonpym_list) - with tempfile.TemporaryDirectory() as tmpdir: - for dts in subdatasets[:-1]: - rastercount = gdal.Open(dts[0]).RasterCount - for i in range(rastercount): - band_name = dts[0].split(':')[-1] - - # Only do specified bands if specified - if self.black_list is not None: - if re.search(re_black, band_name) is not None: - continue - - if self.white_list is not None: - if re.search(re_white, band_name) is None: - continue + for dts in subdatasets[:-1]: + rastercount = gdal.Open(dts[0]).RasterCount + for i in range(rastercount): + band_name = dts[0].split(':')[-1] - if rastercount == 1: - out_fname = prefix + '_' + band_name + '.tif' - else: - out_fname = prefix + '_' + band_name + '_' + str(i+1) + '.tif' + # Only do specified bands if specified + if self.black_list is not None: + if re.search(re_black, band_name) is not None: + continue - # Check the done files might need a force option later - if exists(out_fname): + if self.white_list is not None: + if re.search(re_white, band_name) is None: continue - # Resampling method of this band - resampling_method = None - if self.bands_rsp is not None: - resampling_method = self.bands_rsp.get(band_name) - if resampling_method is None: - resampling_method = self.default_rsp - - temp_fname = pjoin(tmpdir, basename(out_fname)) - try: - # copy to a tempfolder - to_cogtif = [ - 'gdal_translate', - '-of', 'GTIFF', - '-b', str(i+1), - dts[0], - temp_fname] - run_command(to_cogtif, tmpdir) - - # Add Overviews - # gdaladdo - Builds or rebuilds overview images. - # 2, 4, 8,16, 32 are levels which is a list of integral overview levels to build. - if self.nonpym_list is None or (self.nonpym_list is not None and - re.search(re_nonpym, band_name) is None): - add_ovr = [ - 'gdaladdo', - '-r', resampling_method, - '--config', 'GDAL_TIFF_OVR_BLOCKSIZE', '512', - temp_fname, - '2', '4', '8', '16', '32'] - run_command(add_ovr, tmpdir) - LOG.debug("resampling %s with %s", temp_fname, resampling_method) - - # Convert to COG - cogtif = [ - 'gdal_translate', - '-co', 'TILED=YES', - '-co', 'COPY_SRC_OVERVIEWS=YES', - '-co', 'COMPRESS=DEFLATE', - '-co', 'ZLEVEL=9', - '--config', 'GDAL_TIFF_OVR_BLOCKSIZE', '512', - '-co', 'BLOCKXSIZE=512', - '-co', 'BLOCKYSIZE=512', - '-co', 'PREDICTOR=2', - '-co', 'PROFILE=GeoTIFF', - temp_fname, - out_fname] - run_command(cogtif, dirname(out_fname)) - os.remove(out_fname + '.aux.xml') - except Exception as e: - LOG.error("Failure during COG conversion: %s", out_fname) - return rastercount + if rastercount == 1: + out_fname = prefix + '_' + band_name + '.tif' + else: + out_fname = prefix + '_' + band_name + '_' + str(i+1) + '.tif' + + # Check the done files might need a force option later + if exists(out_fname): + continue + + # Resampling method of this band + resampling_method = None + if self.bands_rsp is not None: + resampling_method = self.bands_rsp.get(band_name) + if resampling_method is None: + resampling_method = self.default_rsp + if self.nonpym_list is not None: + if re.search(re_nonpym, band_name) is not None: + resampling_method = None + LOG.debug("resampling method %s", resampling_method) + + default_profile = {'driver': 'GTiff', + 'interleave': 'pixel', + 'tiled': True, + 'blockxsize': 512, + 'blockysize': 512, + 'compress': 'DEFLATE', + 'predictor': 2, + 'zlevel': 9} + default_config = {'NUM_THREADS': 1, 'GDAL_TIFF_OVR_BLOCKSIZE': 512} + + cog_translate(dts[0], out_fname, + default_profile, + indexes=[i+1], + overview_resampling=resampling_method, + overview_level=5, + config=default_config) + return rastercount @@ -418,6 +410,7 @@ def generate_work_list(product_name, year, month): print(item) +from mpi4py import MPI @cli.command() @click.option('--config', '-c', help='Config file') @click.option('--output-dir', help='Output directory', required=True) @@ -439,8 +432,70 @@ def convert_cog(config, output_dir, product, filenames): product_config = cfg['products'][product] cog_convert = COGNetCDF(**product_config) - for filename in filenames: + + comm = MPI.Comm.Get_parent() + try: + size = comm.Get_size() + rank = comm.Get_rank() + except: + LOG.info("Run with single process") + for filename in filenames: + cog_convert(filename, output_dir) + else: + comm.Merge(True) + filename = filenames[rank] cog_convert(filename, output_dir) + comm.Disconnect() + LOG.debug("Finish rank is %d, size is %d, filename %s", rank, size, filename) + + +from time import sleep +@cli.command() +@click.option('--config', '-c', help='Config file') +@click.option('--output-dir', help='Output directory', required=True) +@click.option('--product', help='Product name', required=True) +@click.option('--numprocs', type=int, help='Number of processes', required=True, default=1) +@click.option('--cog-path', help='cog convert script path', required=True, default='../COG-Conversion/streamer/streamer.py') +@click.argument('filelist', nargs=1, required=True) +def mpi_convert_cog(config, output_dir, product, numprocs, cog_path, filelist): + comdLine = [cog_path] + ['convert_cog', '-c'] + [config] + ['--output-dir'] + [output_dir] + ['--product' ] + [product] + args = comdLine + with open(filelist, 'r') as fb: + file_list = np.genfromtxt(fb, dtype='str') + file_batch = int(len(file_list) / numprocs) + file_odd = len(file_list) % numprocs + LOG.debug("file_batch %d", file_batch) + LOG.debug("file_odd %d", file_odd) + for i in range(file_batch): + margs = args + list(file_list[i*numprocs:(i+1)*numprocs]) + while True: + try: + comm = MPI.COMM_SELF.Spawn(sys.executable, + args=margs, + maxprocs=numprocs) + + except: + sleep(1) + else: + comm.Merge() + break + comm.Disconnect() + LOG.debug("batch %d done", i) + if file_odd > 0: + numprocs = file_odd + margs = args + list(file_list[-file_odd:]) + while True: + try: + comm = MPI.COMM_SELF.Spawn(sys.executable, + args=margs, + maxprocs=numprocs) + except: + sleep(1) + else: + comm.Merge() + break + comm.Disconnect() + LOG.debug("Job done") if __name__ == '__main__': LOG = logging.getLogger(__name__)