Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel with mpi #7

Merged
merged 3 commits into from
Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions streamer/cogeo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""rio_cogeo.cogeo: translate a file to a cloud optimized geotiff."""

import sys

import click

import numpy

import rasterio
from rasterio.io import MemoryFile
from rasterio.enums import Resampling
from rasterio.shutil import copy


def cog_translate(
src_path,
dst_path,
dst_kwargs,
indexes=None,
nodata=None,
alpha=None,
overview_level=5,
overview_resampling=None,
config=None,
):
"""
Create Cloud Optimized Geotiff.

Parameters
----------
src_path : str or PathLike object
A dataset path or URL. Will be opened in "r" mode.
dst_path : str or Path-like object
An output dataset path or or PathLike object.
Will be opened in "w" mode.
dst_kwargs: dict
output dataset creation options.
indexes : tuple, int, optional
Raster band indexes to copy.
nodata, int, optional
nodata value for mask creation.
alpha, int, optional
alpha band index for mask creation.
overview_level : int, optional (default: 6)
COGEO overview (decimation) level
config : dict
Rasterio Env options.

"""
config = config or {}

with rasterio.Env(**config):
with rasterio.open(src_path) as src:

indexes = indexes if indexes else src.indexes
meta = src.meta
meta["count"] = len(indexes)
meta.pop("nodata", None)
meta.pop("alpha", None)

meta.update(**dst_kwargs)
meta.pop("compress", None)
meta.pop("photometric", None)

with MemoryFile() as memfile:
with memfile.open(**meta) as mem:
wind = list(mem.block_windows(1))
with click.progressbar(
wind, length=len(wind), file=sys.stderr, show_percent=True
) as windows:
for ij, w in windows:
matrix = src.read(window=w, indexes=indexes)
mem.write(matrix, window=w)

if nodata is not None:
mask_value = (
numpy.all(matrix != nodata, axis=0).astype(
numpy.uint8
)
* 255
)
elif alpha is not None:
mask_value = src.read(alpha, window=w)
else:
mask_value = None
#mask_value = src.dataset_mask(window=w)
if mask_value is not None:
mem.write_mask(mask_value, window=w)

if overview_resampling is not None:
overviews = [2 ** j for j in range(1, overview_level + 1)]

mem.build_overviews(overviews, Resampling[overview_resampling])
mem.update_tags(
OVR_RESAMPLING_ALG=Resampling[overview_resampling].name.upper()
)

copy(mem, dst_path, copy_src_overviews=True, **dst_kwargs)
48 changes: 48 additions & 0 deletions streamer/mpi_wofls_convert.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/bash
JOBDIR=$PWD
PRDDIR=/g/data/v10/users/ea6141/wofls_cog
SRCDIR=/g/data/fk4/datacube/002/WOfS/WOfS_25_2_1/
COGS=/home/547/ea6141/datafile/COG-Conversion/streamer/streamer.py
FILEL=wofs_list
FILEN=wofs_empty_list
NCPUS=128
MEM=512GB
JOBFS=32GB

i=1
j=1
find $SRCDIR -name "*.nc" | \
while read file
do
SIZE=$(stat -c%s $file)
if [ $((SIZE)) -eq 0 ]
then
echo $file >> $FILEN
else
echo $file >> $FILEL$((j))
i=$((i+1))
#echo $((i))
fi
if [ $((i)) -gt 100000 ]
then
i=1
j=$((j+1))
fi
done

j=1
f_j=$(qsub -P v10 -q normal -l walltime=48:00:00,mem=$MEM,jobfs=$JOBFS,ncpus=$NCPUS,wd -- \
bash -l -c "\
source $HOME/.bashrc; cd $JOBDIR;\
mpirun --oversubscribe -n 1 python3 $COGS mpi_convert_cog -c cog_fcp.yaml --output-dir $PRDDIR --product wofls --numprocs $((NCPUS-1)) --cog-path $COGS $FILEL$((j))")

j=2
while
n_j=$(qsub -W depend=afterany:$f_j -P v10 -q normal -l walltime=48:00:00,mem=$MEM,jobfs=$JOBFS,ncpus=$NCPUS,wd -- \
bash -l -c "\
source $HOME/.bashrc; cd $JOBDIR;\
mpirun --oversubscribe -n 1 python3 $COGS mpi_convert_cog -c cog_fcp.yaml --output-dir $PRDDIR --product wofls --numprocs $((NCPUS-1)) --cog-path $COGS $FILEL$((j))")
f_j=$n_j
j=$((j+1))
[ -s $FILEL$((j)) ]
do :; done
Loading