Skip to content

Commit

Permalink
file data access for creodias vm added
Browse files Browse the repository at this point in the history
  • Loading branch information
konstntokas committed Nov 25, 2024
1 parent 2ab9d1c commit 1824ad5
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 14 deletions.
48 changes: 43 additions & 5 deletions xcube_stac/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,43 @@ def open_data(
)


class Sentinel2DataAccessor:
"""Implementation of the data accessor supporting
the jp2 format of Sentinel-2 data.
"""

def __init__(self, root):
self._root = root

@property
def root(self) -> str:
return self._root

def open_data(
self,
access_params: dict,
opener_id: str = None,
data_type: DataTypeLike = None,
**open_params,
) -> Union[xr.Dataset, MultiLevelDataset]:
if opener_id is None:
opener_id = ""
if "tile_size" in open_params:
LOG.info(
"The parameter tile_size is set to (1024, 1024), which is the "
"native chunk size of the jp2 files in the Sentinel-2 archive."
)
file_path = f"{access_params["root"]}/{access_params["fs_path"]}"
if is_valid_ml_data_type(data_type) or opener_id.split(":")[0] == "mldataset":
return Jp2MultiLevelDataset(file_path, **open_params)
else:
return rioxarray.open_rasterio(
file_path,
chunks=dict(x=1024, y=1024),
band_as_variable=True,
)


class S3DataAccessor:
"""Implementation of the data accessor supporting
the zarr, geotiff and netcdf format via the AWS S3 protocol.
Expand Down Expand Up @@ -160,14 +197,15 @@ def open_data(
"The parameter tile_size is set to (1024, 1024), which is the "
"native chunk size of the jp2 files in the Sentinel-2 archive."
)
file_path = (
f"{access_params["protocol"]}://{access_params["root"]}/"
f"{access_params["fs_path"]}"
)
if is_valid_ml_data_type(data_type) or opener_id.split(":")[0] == "mldataset":
return Jp2MultiLevelDataset(access_params, **open_params)
return Jp2MultiLevelDataset(file_path, **open_params)
else:
return rioxarray.open_rasterio(
(
f"{access_params["protocol"]}://{access_params["root"]}/"
f"{access_params["fs_path"]}"
),
file_path,
chunks=dict(x=1024, y=1024),
band_as_variable=True,
)
137 changes: 137 additions & 0 deletions xcube_stac/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
from xcube.core.store import DataStoreError
from xcube.util.jsonschema import JsonObjectSchema
import s3fs
import fsspec

from .accessor import S3DataAccessor
from .accessor import S3Sentinel2DataAccessor
from .accessor import Sentinel2DataAccessor
from .constants import MAP_CDSE_COLLECTION_FORMAT
from .constants import MLDATASET_FORMATS
from .constants import STAC_SEARCH_PARAMETERS
Expand Down Expand Up @@ -43,6 +45,7 @@ def __init__(self):
self.schema_search_params = STAC_SEARCH_PARAMETERS
self.schema_search_params_stack = STAC_SEARCH_PARAMETERS_STACK_MODE
self.s3_accessor = S3DataAccessor
self.file_accessor = None

def parse_item(self, item: pystac.Item, **open_params) -> pystac.Item:
return item
Expand Down Expand Up @@ -317,3 +320,137 @@ def search_items(
if not processing_level[1:] in item.properties["processingLevel"]:
continue
yield item


class HelperCdseCreodiasVM(Helper):

def __init__(self):
super().__init__()
self.supported_protocols = ["file"]
self.supported_format_ids = ["netcdf", "zarr", "geotiff", "jp2"]
self.schema_open_params = dict(
**STAC_OPEN_PARAMETERS, spatial_res=SCHEMA_SPATIAL_RES
)
open_params_stack = dict(
**STAC_OPEN_PARAMETERS_STACK_MODE, processing_level=SCHEMA_PROCESSING_LEVEL
)
del open_params_stack["query"]
self.schema_open_params_stack = open_params_stack
self.schema_search_params = dict(
**STAC_SEARCH_PARAMETERS_STACK_MODE,
collections=SCHEMA_COLLECTIONS,
processing_level=SCHEMA_PROCESSING_LEVEL,
)
self._fs = fsspec.filesystem("file")
self.file_accessor = Sentinel2DataAccessor

def parse_item(self, item: pystac.Item, **open_params) -> pystac.Item:
processing_level = open_params.pop("processing_level", "L2A")
open_params["asset_names"] = open_params.get(
"asset_names", CDSE_SENITNEL_2_BANDS[processing_level]
)
href_base = item.assets["PRODUCT"].extra_fields["alternate"]["s3"]["href"][1:]
res_want = open_params.get("spatial_res", CDSE_SENTINEL_2_MIN_RESOLUTIONS)
if "crs" in open_params:
target_crs = normalize_crs(open_params["crs"])
if target_crs.is_geographic:
res_want = open_params["spatial_res"] * 111320
time_end = None
for asset_name in open_params["asset_names"]:
res_avail = CDSE_SENTINEL_2_LEVEL_BAND_RESOLUTIONS[processing_level][
asset_name
]
res_select = res_avail[np.argmin(abs(np.array(res_avail) - res_want))]
if time_end is None:
hrefs = self._fs.glob(
f"{href_base}/**/*_{asset_name}_{res_select}m.jp2"
)
assert len(hrefs) == 1, "No unique jp2 file found"
href_mod = hrefs[0]
time_end = hrefs[0].split("/IMG_DATA/")[0][-15:]
else:
id_parts = item.id.split("_")
href_mod = (
f"{href_base}/GRANULE/L2A_T{item.properties["tileId"]}_"
f"A{item.properties["orbitNumber"]:06}_{time_end}/IMG_DATA/"
f"R{res_select}m/T{item.properties["tileId"]}_"
f"{id_parts[2]}_{asset_name}_{res_select}m.jp2"
).replace("/eodata/", "/eo/")
if float(item.properties["processorVersion"]) >= 4.00:
offset = CDSE_SENITNEL_2_OFFSET_400[asset_name]
else:
offset = 0
item.assets[asset_name] = pystac.Asset(
href_mod,
asset_name,
media_type="image/jp2",
roles=["data"],
extra_fields={
"cdse": True,
"raster:bands": [
dict(
nodata=CDSE_SENITNEL_2_NO_DATA,
scale=1 / CDSE_SENITNEL_2_SCALE[asset_name],
offset=offset / CDSE_SENITNEL_2_SCALE[asset_name],
)
],
},
)
# add asset for meta data for angles
item.assets["granule_metadata"] = pystac.Asset(
f"{href_base}/GRANULE/MTD_TL.xml",
"granule_metadata",
media_type="application/xml",
roles=["metadata"],
extra_fields={"cdse": True},
)
return item

def get_data_access_params(self, item: pystac.Item, **open_params) -> dict:
processing_level = open_params.pop("processing_level", "L2A")
asset_names = open_params.get(
"asset_names", CDSE_SENITNEL_2_BANDS[processing_level]
)
data_access_params = {}
for asset_name in asset_names:
protocol = "file"
href_components = item.assets[asset_name].href.split("/")
root = href_components[0]
instrument = href_components[1]
format_id = MAP_CDSE_COLLECTION_FORMAT[instrument]
fs_path = "/".join(href_components[1:])
storage_options = {}
data_access_params[asset_name] = dict(
name=asset_name,
protocol=protocol,
root=root,
fs_path=fs_path,
storage_options=storage_options,
format_id=format_id,
href=item.assets[asset_name].href,
)
return data_access_params

def get_protocols(self, item: pystac.Item, **open_params) -> list[str]:
return ["file"]

def get_format_ids(self, item: pystac.Item, **open_params) -> list[str]:
return ["jp2"]

def is_mldataset_available(self, item: pystac.Item, **open_params) -> bool:
return True

def search_items(
self,
catalog: Union[pystac.Catalog, pystac_client.client.Client],
searchable: bool,
**search_params,
) -> Iterator[pystac.Item]:
processing_level = search_params.pop("processing_level", "L2A")
if "sortby" not in search_params:
search_params["sortby"] = "+datetime"
items = search_items(catalog, searchable, **search_params)
for item in items:
if not processing_level[1:] in item.properties["processingLevel"]:
continue
yield item
7 changes: 1 addition & 6 deletions xcube_stac/mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,10 @@ class Jp2MultiLevelDataset(LazyMultiLevelDataset):

def __init__(
self,
access_params: dict,
file_path: str,
**open_params: dict[str, Any],
):
file_path = (
f"{access_params["protocol"]}://{access_params["root"]}"
f"/{access_params["fs_path"]}"
)
self._file_path = file_path
self._access_params = access_params
self._open_params = open_params
super().__init__(ds_id=file_path)

Expand Down
9 changes: 7 additions & 2 deletions xcube_stac/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from .helper import Helper
from .helper import HelperXcube
from .helper import HelperCdse
from .helper import HelperCdseCreodiasVM
from ._utils import (
assert_valid_data_type,
assert_valid_opener_id,
Expand Down Expand Up @@ -291,7 +292,8 @@ class StacCdseDataStore(StacDataStore):

def __init__(
self,
stack_mode: Union[bool, str] = False,
stack_mode: bool = False,
creodias_vm: bool = False,
**storage_options_s3,
):
storage_options_s3 = update_dict(
Expand All @@ -301,7 +303,10 @@ def __init__(
client_kwargs=dict(endpoint_url=CDSE_S3_ENDPOINT),
),
)
self._helper = HelperCdse(**storage_options_s3)
if creodias_vm:
self._helper = HelperCdseCreodiasVM()
else:
self._helper = HelperCdse(**storage_options_s3)
super().__init__(url=CDSE_STAC_URL, stack_mode=stack_mode, **storage_options_s3)

@classmethod
Expand Down
25 changes: 24 additions & 1 deletion xcube_stac/store_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(
self._helper = helper
self._https_accessor = None
self._s3_accessor = None
self._file_accessor = None

def access_item(self, data_id: str) -> pystac.Item:
"""Access item for a given data ID.
Expand Down Expand Up @@ -261,10 +262,18 @@ def build_dataset_from_item(
data_type=data_type,
**open_params_asset,
)
elif params["protocol"] == "file":
opener = self._get_file_accessor(params)
ds_asset = opener.open_data(
params,
opener_id=opener_id,
data_type=data_type,
**open_params_asset,
)
else:
url = get_url_from_pystac_object(item)
raise DataStoreError(
f"Only 's3' and 'https' protocols are supported, not "
f"Only 'file', 's3' and 'https' protocols are supported, not "
f"{params["protocol"]!r}. The asset {asset_key!r} has a href "
f"{params["href"]!r}. The item's url is given by {url!r}."
)
Expand Down Expand Up @@ -297,6 +306,20 @@ def build_dataset_from_item(
##########################################################################
# Implementation helpers

def _get_file_accessor(self, access_params: dict) -> S3DataAccessor:
"""This function returns the file data accessor associated with the
bucket *root*.
Args:
access_params: dictionary containing access parameter for one asset
Returns:
file data accessor
"""

if self._file_accessor is None:
self._file_accessor = self._helper.file_accessor(access_params["root"])
return self._s3_accessor

def _get_s3_accessor(self, access_params: dict) -> S3DataAccessor:
"""This function returns the S3 data accessor associated with the
bucket *root*. It creates the S3 data accessor only if it is not
Expand Down

0 comments on commit 1824ad5

Please sign in to comment.