Skip to content

Commit

Permalink
Adding ReaderDaskAdaptor
Browse files Browse the repository at this point in the history
Implements DaskReader from ReaderDriver without
dask support.
  • Loading branch information
Kirill888 committed Jun 4, 2024
1 parent 303155d commit 1d64d46
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 2 deletions.
66 changes: 66 additions & 0 deletions odc/loader/_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,83 @@
from typing import Any, Optional, Sequence

import numpy as np
from dask import delayed
from numpy.typing import DTypeLike
from odc.geo.geobox import GeoBox

from .types import (
RasterBandMetadata,
RasterLoadParams,
RasterSource,
ReaderDriver,
ReaderSubsetSelection,
with_default,
)


def _dask_read_adaptor(
src: RasterSource,
ctx: Any,
cfg: RasterLoadParams,
dst_geobox: GeoBox,
driver: ReaderDriver,
env: dict[str, Any],
selection: Optional[ReaderSubsetSelection] = None,
) -> tuple[tuple[slice, slice], np.ndarray]:

with driver.restore_env(env, ctx) as local_ctx:
rdr = driver.open(src, local_ctx)
return rdr.read(cfg, dst_geobox, selection=selection)


class ReaderDaskAdaptor:
"""
Creates default ``DaskRasterReader`` from a ``ReaderDriver``.
Suitable for implementing ``.dask_reader`` property for generic reader drivers.
"""

def __init__(
self,
driver: ReaderDriver,
env: dict[str, Any] | None = None,
ctx: Any | None = None,
src: RasterSource | None = None,
) -> None:
if env is None:
env = driver.capture_env()

self._driver = driver
self._env = env
self._ctx = ctx
self._src = src

def read(
self,
cfg: RasterLoadParams,
dst_geobox: GeoBox,
*,
selection: Optional[ReaderSubsetSelection] = None,
) -> Any:
assert self._src is not None
assert self._ctx is not None
read_op = delayed(_dask_read_adaptor)

# TODO: supply `dask_key_name=` that makes sense
return read_op(
self._src,
self._ctx,
cfg,
dst_geobox,
self._driver,
self._env,
selection=selection,
)

def open(self, src: RasterSource, ctx: Any) -> "ReaderDaskAdaptor":
return ReaderDaskAdaptor(self._driver, self._env, ctx, src)


def resolve_load_cfg(
bands: dict[str, RasterBandMetadata],
resampling: str | dict[str, str] | None = None,
Expand Down
39 changes: 37 additions & 2 deletions odc/loader/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import pytest
import rasterio
import xarray as xr
from dask import is_dask_collection
from numpy import ma
from numpy.testing import assert_array_equal
from odc.geo.geobox import GeoBox
from odc.geo.xr import xr_zeros

from ._reader import (
ReaderDaskAdaptor,
expand_selection,
pick_overview,
resolve_band_query,
Expand All @@ -23,8 +25,13 @@
same_nodata,
)
from ._rio import RioDriver, configure_rio, get_rio_env, rio_read
from .testing.fixtures import with_temp_tiff
from .types import RasterBandMetadata, RasterLoadParams, RasterSource
from .testing.fixtures import FakeReaderDriver, with_temp_tiff
from .types import (
RasterBandMetadata,
RasterGroupMetadata,
RasterLoadParams,
RasterSource,
)


def test_same_nodata():
Expand Down Expand Up @@ -358,3 +365,31 @@ def test_reader_fail_on_error():
assert yy.shape == (0, 0)
assert yy.dtype == cfg.dtype
assert roi == np.s_[0:0, 0:0]


@pytest.mark.parametrize("dtype", ["int16", "float32"])
def test_dask_reader_adaptor(dtype: str):
gbox = GeoBox.from_bbox((-180, -90, 180, 90), shape=(160, 320), tight=True)

meta = RasterBandMetadata(dtype, 333)
group_md = RasterGroupMetadata({(b, 1): meta for b in ("aa", "bb")})

base_driver = FakeReaderDriver(group_md)
driver = ReaderDaskAdaptor(base_driver)

ctx = base_driver.new_load(gbox, chunks={"x": 64, "y": 64})

src = RasterSource("mem://", meta=meta)
rdr = driver.open(src, ctx)

assert isinstance(rdr, ReaderDaskAdaptor)

cfg = RasterLoadParams.same_as(src)
xx = rdr.read(cfg, gbox)
assert is_dask_collection(xx)

yy = xx.compute(scheduler="synchronous")
assert isinstance(yy, tuple)
yx_roi, pix = yy
assert pix.shape == gbox[yx_roi].shape.yx
assert pix.dtype == dtype

0 comments on commit 1d64d46

Please sign in to comment.