From 49ef5a2cb32e01357587c1e7403a84ed8302a754 Mon Sep 17 00:00:00 2001 From: Kirill Kouzoubov Date: Tue, 17 Oct 2023 15:43:23 +1100 Subject: [PATCH] Add output to disk option to save_cog_with_dask --- odc/geo/cog/_mpu_fs.py | 2 +- odc/geo/cog/_tifffile.py | 36 +++++++++++++++++++++++++----------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/odc/geo/cog/_mpu_fs.py b/odc/geo/cog/_mpu_fs.py index c23a2a13..a0364fa7 100644 --- a/odc/geo/cog/_mpu_fs.py +++ b/odc/geo/cog/_mpu_fs.py @@ -35,7 +35,7 @@ def __init__( @property def min_write_sz(self) -> int: - return self._limits.get("min_write_sz", 0) + return self._limits.get("min_write_sz", 4096) @property def max_write_sz(self) -> int: diff --git a/odc/geo/cog/_tifffile.py b/odc/geo/cog/_tifffile.py index 27c0c6e0..75b4e537 100644 --- a/odc/geo/cog/_tifffile.py +++ b/odc/geo/cog/_tifffile.py @@ -18,6 +18,8 @@ from .._interop import have from ..geobox import GeoBox from ..types import MaybeNodata, Shape2d, Unset, shape_ +from ._mpu import mpu_write +from ._mpu_fs import MPUFileSink from ._s3 import MultiPartUpload, s3_parse_url from ._shared import ( GDAL_COMP, @@ -594,6 +596,13 @@ def save_cog_with_dask( if aws is None: aws = {} + upload_params = {k: kw.pop(k) for k in ["writes_per_chunk", "spill_sz"] if k in kw} + upload_params.update( + {k: aws.pop(k) for k in ["writes_per_chunk", "spill_sz"] if k in aws} + ) + + parts_base = kw.pop("parts_base", None) + # normalize compression and remove GDAL compat options from kw predictor, compression, compressionargs = _norm_compression_tifffile( xx.dtype, predictor, compression, compressionargs, level=level, kw=kw @@ -652,17 +661,6 @@ def save_cog_with_dask( "_stats": _stats, } - bucket, key = s3_parse_url(dst) - if not bucket: - raise ValueError("Currently only support output to S3") - - upload_params = { - k: aws.pop(k) for k in ["writes_per_chunk", "spill_sz"] if k in aws - } - upload_params[ - "ContentType" - ] = "image/tiff;application=geotiff;profile=cloud-optimized" - tiles_write_order = _tiles[::-1] if len(tiles_write_order) > 4: tiles_write_order = [ @@ -670,6 +668,22 @@ def save_cog_with_dask( *tiles_write_order[4:], ] + bucket, key = s3_parse_url(dst) + if not bucket: + # assume disk output + write = MPUFileSink(dst, parts_base=parts_base) + return mpu_write( + tiles_write_order, + write, + mk_header=_patch_hdr, + user_kw={"meta": meta, "hdr0": hdr0, "stats": _stats}, + **upload_params, + ) + + upload_params[ + "ContentType" + ] = "image/tiff;application=geotiff;profile=cloud-optimized" + cleanup = aws.pop("cleanup", False) s3_sink = MultiPartUpload(bucket, key, **aws) if cleanup: