Skip to content

Commit

Permalink
Add output to disk option to save_cog_with_dask
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill888 committed Oct 17, 2023
1 parent 40fdccd commit 49ef5a2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
2 changes: 1 addition & 1 deletion odc/geo/cog/_mpu_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(

@property
def min_write_sz(self) -> int:
return self._limits.get("min_write_sz", 0)
return self._limits.get("min_write_sz", 4096)

@property
def max_write_sz(self) -> int:
Expand Down
36 changes: 25 additions & 11 deletions odc/geo/cog/_tifffile.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from .._interop import have
from ..geobox import GeoBox
from ..types import MaybeNodata, Shape2d, Unset, shape_
from ._mpu import mpu_write
from ._mpu_fs import MPUFileSink
from ._s3 import MultiPartUpload, s3_parse_url
from ._shared import (
GDAL_COMP,
Expand Down Expand Up @@ -594,6 +596,13 @@ def save_cog_with_dask(
if aws is None:
aws = {}

upload_params = {k: kw.pop(k) for k in ["writes_per_chunk", "spill_sz"] if k in kw}
upload_params.update(
{k: aws.pop(k) for k in ["writes_per_chunk", "spill_sz"] if k in aws}
)

parts_base = kw.pop("parts_base", None)

# normalize compression and remove GDAL compat options from kw
predictor, compression, compressionargs = _norm_compression_tifffile(
xx.dtype, predictor, compression, compressionargs, level=level, kw=kw
Expand Down Expand Up @@ -652,24 +661,29 @@ def save_cog_with_dask(
"_stats": _stats,
}

bucket, key = s3_parse_url(dst)
if not bucket:
raise ValueError("Currently only support output to S3")

upload_params = {
k: aws.pop(k) for k in ["writes_per_chunk", "spill_sz"] if k in aws
}
upload_params[
"ContentType"
] = "image/tiff;application=geotiff;profile=cloud-optimized"

tiles_write_order = _tiles[::-1]
if len(tiles_write_order) > 4:
tiles_write_order = [
dask.bag.concat(tiles_write_order[:4]),
*tiles_write_order[4:],
]

bucket, key = s3_parse_url(dst)
if not bucket:
# assume disk output
write = MPUFileSink(dst, parts_base=parts_base)
return mpu_write(
tiles_write_order,
write,
mk_header=_patch_hdr,
user_kw={"meta": meta, "hdr0": hdr0, "stats": _stats},
**upload_params,
)

upload_params[
"ContentType"
] = "image/tiff;application=geotiff;profile=cloud-optimized"

cleanup = aws.pop("cleanup", False)
s3_sink = MultiPartUpload(bucket, key, **aws)
if cleanup:
Expand Down

0 comments on commit 49ef5a2

Please sign in to comment.