Skip to content

Commit

Permalink
opener for https added
Browse files Browse the repository at this point in the history
  • Loading branch information
konstntokas committed Jun 18, 2024
1 parent 7ac4488 commit af2b30f
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 18 deletions.
144 changes: 144 additions & 0 deletions xcube_stac/opener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# The MIT License (MIT)
# Copyright (c) 2024 by the xcube development team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import rioxarray
import xarray as xr
from xcube.util.jsonschema import (
JsonArraySchema,
JsonBooleanSchema,
JsonNumberSchema,
JsonObjectSchema,
JsonStringSchema,
)
from xcube.core.store import DataOpener
from xcube.core.store.fs.impl.dataset import GEOTIFF_OPEN_DATA_PARAMS_SCHEMA


class HttpsNetcdfDataOpener(DataOpener):
"""Implementation of the data opener supporting
the netcdf format via the https protocol.
"""

def get_open_data_params_schema(self, data_id: str = None) -> JsonObjectSchema:
open_parms = dict(
tile_size=JsonArraySchema(
items=(
JsonNumberSchema(minimum=256, default=512),
JsonNumberSchema(minimum=256, default=512),
),
title="Tile size [y, x] for chunking",
default=[512, 512],
),
)
return JsonObjectSchema(
properties=dict(**open_parms),
required=[],
additional_properties=False,
)

def open_data(self, data_id: str, **open_params) -> xr.Dataset:
stac_schema = self.get_open_data_params_schema()
stac_schema.validate_instance(open_params)
tile_size = open_params.get("tile_size", (512, 512))
return rioxarray.open_rasterio(data_id, chunks=dict(zip(("x", "y"), tile_size)))


class HttpsTiffDataOpener(DataOpener):
"""Implementation of the data opener supporting
the tiff and geotiff format via the https protocol.
"""

def get_open_data_params_schema(self, data_id: str = None) -> JsonObjectSchema:
return GEOTIFF_OPEN_DATA_PARAMS_SCHEMA

def open_data(self, data_id: str, **open_params) -> xr.Dataset:
stac_schema = self.get_open_data_params_schema()
stac_schema.validate_instance(open_params)
tile_size = open_params.get("tile_size", (512, 512))
overview_level = open_params.get("overview_level", None)
return rioxarray.open_rasterio(
data_id,
overview_level=overview_level,
chunks=dict(zip(("x", "y"), tile_size)),
)


class HttpsZarrDataOpener(DataOpener):
"""Implementation of the data opener supporting
the zarr format via the https protocol.
"""

def get_open_data_params_schema(self, data_id: str = None) -> JsonObjectSchema:
open_parms = dict(
group=JsonStringSchema(
description="Group path." " (a.k.a. path in zarr terminology.).",
min_length=1,
),
chunks=JsonObjectSchema(
description="Optional chunk sizes along each dimension."
' Chunk size values may be None, "auto"'
" or an integer value.",
examples=[
{"time": None, "lat": "auto", "lon": 90},
{"time": 1, "y": 512, "x": 512},
],
additional_properties=True,
),
decode_cf=JsonBooleanSchema(
description="Whether to decode these variables,"
" assuming they were saved according to"
" CF conventions.",
default=True,
),
mask_and_scale=JsonBooleanSchema(
description="If True, replace array values equal"
' to attribute "_FillValue" with NaN. '
' Use "scale_factor" and "add_offset"'
" attributes to compute actual values.",
default=True,
),
decode_times=JsonBooleanSchema(
description="If True, decode times encoded in the"
" standard NetCDF datetime format "
"into datetime objects. Otherwise,"
" leave them encoded as numbers.",
default=True,
),
decode_coords=JsonBooleanSchema(
description='If True, decode the "coordinates"'
" attribute to identify coordinates in "
"the resulting dataset.",
default=True,
),
drop_variables=JsonArraySchema(
items=JsonStringSchema(min_length=1),
),
)
return JsonObjectSchema(
properties=dict(**open_parms),
required=[],
additional_properties=False,
)

def open_data(self, data_id: str, **open_params) -> xr.Dataset:
stac_schema = self.get_open_data_params_schema()
stac_schema.validate_instance(open_params)
return xr.open_zarr(data_id, **open_params)
48 changes: 30 additions & 18 deletions xcube_stac/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
)
from xcube.util.jsonschema import JsonObjectSchema, JsonStringSchema

from .opener import HttpsNetcdfDataOpener, HttpsTiffDataOpener, HttpsZarrDataOpener
from .constants import (
AWS_REGEX_BUCKET_NAME,
AWS_REGION_NAMES,
Expand All @@ -59,9 +60,6 @@ class StacDataStore(DataStore):
Args:
url: URL to STAC catalog
data_id_delimiter: Delimiter used to separate
collections, items and assets from each other.
Defaults to "/".
"""

def __init__(
Expand Down Expand Up @@ -102,7 +100,7 @@ def __init__(
def get_data_store_params_schema(cls) -> JsonObjectSchema:
stac_params = dict(url=JsonStringSchema(title="URL to STAC catalog"))
return JsonObjectSchema(
description=("Describes the parameters of the xcube data store 'stac'."),
description="Describes the parameters of the xcube data store 'stac'.",
properties=stac_params,
required=["url"],
additional_properties=False,
Expand All @@ -125,7 +123,7 @@ def get_data_ids(
yield data_id
else:
attrs = self._get_attrs_from_item(item, include_attrs)
yield (data_id, attrs)
yield data_id, attrs

def has_data(self, data_id: str, data_type: DataTypeLike = None) -> bool:
if self._is_valid_data_type(data_type):
Expand Down Expand Up @@ -177,7 +175,6 @@ def get_open_data_params_schema(
def open_data(
self, data_id: str, opener_id: str = None, **open_params
) -> xr.Dataset:
# ToDo: Actual access of the data needs to be implemented.
stac_schema = self.get_open_data_params_schema()
stac_schema.validate_instance(open_params)
self._assert_valid_opener_id(opener_id)
Expand Down Expand Up @@ -422,7 +419,7 @@ def _do_bboxes_intersect(self, item: pystac.Item, **open_params) -> bool:
"""
return box(*item.bbox).intersects(box(*open_params["bbox"]))

def _access_item(self, data_id: str) -> pystac.Item:
def _access_item(self, data_id: str) -> Union[pystac.Item, str]:
"""Access item for a given data ID.
Args:
Expand Down Expand Up @@ -488,7 +485,7 @@ def _get_data_id_from_item(self, item: pystac.Item) -> str:

def _get_attrs_from_item(
self, item: pystac.Item, include_attrs: Container[str]
) -> str:
) -> Dict[str, Any]:
"""Extracts the desired attributes from an item object.
Args:
Expand Down Expand Up @@ -550,30 +547,30 @@ def _decode_href(self, href: str) -> Tuple[str, str, str, str]:
if re.search(r"^https://s3\.amazonaws\.com/.{3,63}/", href) is not None:
tmp = href[8:].split("/")
root = tmp[1]
fs_path = ("/").join(tmp[2:])
fs_path = "/".join(tmp[2:])
elif re.search(r"^s3://.{3,63}/", href) is not None:
tmp = href[5:].split("/")
root = tmp[0]
fs_path = ("/").join(tmp[1:])
fs_path = "/".join(tmp[1:])
elif re.search(r"^https://.{3,63}\.s3\.amazonaws\.com/", href) is not None:
tmp = href[8:].split("/")
root = tmp[0][:-17]
fs_path = ("/").join(tmp[1:])
fs_path = "/".join(tmp[1:])
elif (
re.search(r"^https://s3-.{9,14}\.amazonaws\.com/.{3,63}/", href) is not None
):
tmp = href[8:].split("/")
region_name = tmp[0][3:-14]
root = tmp[1]
fs_path = ("/").join(tmp[2:])
fs_path = "/".join(tmp[2:])
elif (
re.search(r"^https://.{3,63}\.s3-.{9,14}\.amazonaws\.com/", href)
is not None
):
tmp = href[8:].split("/")
region_name = tmp[0].split(".s3-")[-1][:-14]
root = tmp[0].replace(".s3-" + region_name + ".amazonaws.com", "")
fs_path = ("/").join(tmp[1:])
fs_path = "/".join(tmp[1:])
elif (
re.search(r"^https://.{3,63}\.s3\..{9,14}\.amazonaws\.com/", href)
is not None
Expand Down Expand Up @@ -636,11 +633,9 @@ def _build_dataset(
href = asset.href
(protocol, root, fs_path, region_name) = self._decode_href(href)
if protocol == "https":
if self._store_https is None:
self._store_https = new_data_store("https", root=root)
ds_asset = self._store_https.open_data(
fs_path, opener_id=opener_id_asset[0], **open_params
)
uri = protocol + "://" + root + fs_path
opener = self._get_https_opener_id(opener_id)
opener.open_data(data_id=uri, **open_params)
elif protocol == "s3":
if self._store_s3 is None:
self._initialize_new_s3_data_store(root, region_name)
Expand Down Expand Up @@ -686,3 +681,20 @@ def _initialize_new_s3_data_store(self, root: str, region_name: str = None):
)
self._store_s3_root = root
self._store_s3_region_name = region_name

def _get_https_opener_id(
self, opener_id: str
) -> type[HttpsNetcdfDataOpener | HttpsTiffDataOpener | HttpsZarrDataOpener]:
data_format = opener_id.split(":")[1]
if data_format == "netcdf":
opener = HttpsNetcdfDataOpener
elif data_format == "geotiff":
opener = HttpsTiffDataOpener
elif data_format == "zarr":
opener = HttpsZarrDataOpener
else:
raise DataStoreError(
f"The opener IDs {DATASET_OPENER_ID} are supported, but {opener_id} is given."
)

return opener

0 comments on commit af2b30f

Please sign in to comment.