From 1824ad5d8248fb6946fa302af3a09427e4335f39 Mon Sep 17 00:00:00 2001 From: konstntokas Date: Mon, 25 Nov 2024 16:20:36 +0100 Subject: [PATCH] file data access for creodias vm added --- xcube_stac/accessor.py | 48 ++++++++++++-- xcube_stac/helper.py | 137 +++++++++++++++++++++++++++++++++++++++ xcube_stac/mldataset.py | 7 +- xcube_stac/store.py | 9 ++- xcube_stac/store_mode.py | 25 ++++++- 5 files changed, 212 insertions(+), 14 deletions(-) diff --git a/xcube_stac/accessor.py b/xcube_stac/accessor.py index d6ad490..2beda2c 100644 --- a/xcube_stac/accessor.py +++ b/xcube_stac/accessor.py @@ -72,6 +72,43 @@ def open_data( ) +class Sentinel2DataAccessor: + """Implementation of the data accessor supporting + the jp2 format of Sentinel-2 data. + """ + + def __init__(self, root): + self._root = root + + @property + def root(self) -> str: + return self._root + + def open_data( + self, + access_params: dict, + opener_id: str = None, + data_type: DataTypeLike = None, + **open_params, + ) -> Union[xr.Dataset, MultiLevelDataset]: + if opener_id is None: + opener_id = "" + if "tile_size" in open_params: + LOG.info( + "The parameter tile_size is set to (1024, 1024), which is the " + "native chunk size of the jp2 files in the Sentinel-2 archive." + ) + file_path = f"{access_params["root"]}/{access_params["fs_path"]}" + if is_valid_ml_data_type(data_type) or opener_id.split(":")[0] == "mldataset": + return Jp2MultiLevelDataset(file_path, **open_params) + else: + return rioxarray.open_rasterio( + file_path, + chunks=dict(x=1024, y=1024), + band_as_variable=True, + ) + + class S3DataAccessor: """Implementation of the data accessor supporting the zarr, geotiff and netcdf format via the AWS S3 protocol. @@ -160,14 +197,15 @@ def open_data( "The parameter tile_size is set to (1024, 1024), which is the " "native chunk size of the jp2 files in the Sentinel-2 archive." ) + file_path = ( + f"{access_params["protocol"]}://{access_params["root"]}/" + f"{access_params["fs_path"]}" + ) if is_valid_ml_data_type(data_type) or opener_id.split(":")[0] == "mldataset": - return Jp2MultiLevelDataset(access_params, **open_params) + return Jp2MultiLevelDataset(file_path, **open_params) else: return rioxarray.open_rasterio( - ( - f"{access_params["protocol"]}://{access_params["root"]}/" - f"{access_params["fs_path"]}" - ), + file_path, chunks=dict(x=1024, y=1024), band_as_variable=True, ) diff --git a/xcube_stac/helper.py b/xcube_stac/helper.py index ba39f8d..e29c53b 100644 --- a/xcube_stac/helper.py +++ b/xcube_stac/helper.py @@ -6,9 +6,11 @@ from xcube.core.store import DataStoreError from xcube.util.jsonschema import JsonObjectSchema import s3fs +import fsspec from .accessor import S3DataAccessor from .accessor import S3Sentinel2DataAccessor +from .accessor import Sentinel2DataAccessor from .constants import MAP_CDSE_COLLECTION_FORMAT from .constants import MLDATASET_FORMATS from .constants import STAC_SEARCH_PARAMETERS @@ -43,6 +45,7 @@ def __init__(self): self.schema_search_params = STAC_SEARCH_PARAMETERS self.schema_search_params_stack = STAC_SEARCH_PARAMETERS_STACK_MODE self.s3_accessor = S3DataAccessor + self.file_accessor = None def parse_item(self, item: pystac.Item, **open_params) -> pystac.Item: return item @@ -317,3 +320,137 @@ def search_items( if not processing_level[1:] in item.properties["processingLevel"]: continue yield item + + +class HelperCdseCreodiasVM(Helper): + + def __init__(self): + super().__init__() + self.supported_protocols = ["file"] + self.supported_format_ids = ["netcdf", "zarr", "geotiff", "jp2"] + self.schema_open_params = dict( + **STAC_OPEN_PARAMETERS, spatial_res=SCHEMA_SPATIAL_RES + ) + open_params_stack = dict( + **STAC_OPEN_PARAMETERS_STACK_MODE, processing_level=SCHEMA_PROCESSING_LEVEL + ) + del open_params_stack["query"] + self.schema_open_params_stack = open_params_stack + self.schema_search_params = dict( + **STAC_SEARCH_PARAMETERS_STACK_MODE, + collections=SCHEMA_COLLECTIONS, + processing_level=SCHEMA_PROCESSING_LEVEL, + ) + self._fs = fsspec.filesystem("file") + self.file_accessor = Sentinel2DataAccessor + + def parse_item(self, item: pystac.Item, **open_params) -> pystac.Item: + processing_level = open_params.pop("processing_level", "L2A") + open_params["asset_names"] = open_params.get( + "asset_names", CDSE_SENITNEL_2_BANDS[processing_level] + ) + href_base = item.assets["PRODUCT"].extra_fields["alternate"]["s3"]["href"][1:] + res_want = open_params.get("spatial_res", CDSE_SENTINEL_2_MIN_RESOLUTIONS) + if "crs" in open_params: + target_crs = normalize_crs(open_params["crs"]) + if target_crs.is_geographic: + res_want = open_params["spatial_res"] * 111320 + time_end = None + for asset_name in open_params["asset_names"]: + res_avail = CDSE_SENTINEL_2_LEVEL_BAND_RESOLUTIONS[processing_level][ + asset_name + ] + res_select = res_avail[np.argmin(abs(np.array(res_avail) - res_want))] + if time_end is None: + hrefs = self._fs.glob( + f"{href_base}/**/*_{asset_name}_{res_select}m.jp2" + ) + assert len(hrefs) == 1, "No unique jp2 file found" + href_mod = hrefs[0] + time_end = hrefs[0].split("/IMG_DATA/")[0][-15:] + else: + id_parts = item.id.split("_") + href_mod = ( + f"{href_base}/GRANULE/L2A_T{item.properties["tileId"]}_" + f"A{item.properties["orbitNumber"]:06}_{time_end}/IMG_DATA/" + f"R{res_select}m/T{item.properties["tileId"]}_" + f"{id_parts[2]}_{asset_name}_{res_select}m.jp2" + ).replace("/eodata/", "/eo/") + if float(item.properties["processorVersion"]) >= 4.00: + offset = CDSE_SENITNEL_2_OFFSET_400[asset_name] + else: + offset = 0 + item.assets[asset_name] = pystac.Asset( + href_mod, + asset_name, + media_type="image/jp2", + roles=["data"], + extra_fields={ + "cdse": True, + "raster:bands": [ + dict( + nodata=CDSE_SENITNEL_2_NO_DATA, + scale=1 / CDSE_SENITNEL_2_SCALE[asset_name], + offset=offset / CDSE_SENITNEL_2_SCALE[asset_name], + ) + ], + }, + ) + # add asset for meta data for angles + item.assets["granule_metadata"] = pystac.Asset( + f"{href_base}/GRANULE/MTD_TL.xml", + "granule_metadata", + media_type="application/xml", + roles=["metadata"], + extra_fields={"cdse": True}, + ) + return item + + def get_data_access_params(self, item: pystac.Item, **open_params) -> dict: + processing_level = open_params.pop("processing_level", "L2A") + asset_names = open_params.get( + "asset_names", CDSE_SENITNEL_2_BANDS[processing_level] + ) + data_access_params = {} + for asset_name in asset_names: + protocol = "file" + href_components = item.assets[asset_name].href.split("/") + root = href_components[0] + instrument = href_components[1] + format_id = MAP_CDSE_COLLECTION_FORMAT[instrument] + fs_path = "/".join(href_components[1:]) + storage_options = {} + data_access_params[asset_name] = dict( + name=asset_name, + protocol=protocol, + root=root, + fs_path=fs_path, + storage_options=storage_options, + format_id=format_id, + href=item.assets[asset_name].href, + ) + return data_access_params + + def get_protocols(self, item: pystac.Item, **open_params) -> list[str]: + return ["file"] + + def get_format_ids(self, item: pystac.Item, **open_params) -> list[str]: + return ["jp2"] + + def is_mldataset_available(self, item: pystac.Item, **open_params) -> bool: + return True + + def search_items( + self, + catalog: Union[pystac.Catalog, pystac_client.client.Client], + searchable: bool, + **search_params, + ) -> Iterator[pystac.Item]: + processing_level = search_params.pop("processing_level", "L2A") + if "sortby" not in search_params: + search_params["sortby"] = "+datetime" + items = search_items(catalog, searchable, **search_params) + for item in items: + if not processing_level[1:] in item.properties["processingLevel"]: + continue + yield item diff --git a/xcube_stac/mldataset.py b/xcube_stac/mldataset.py index f79ae78..c65c7a5 100644 --- a/xcube_stac/mldataset.py +++ b/xcube_stac/mldataset.py @@ -92,15 +92,10 @@ class Jp2MultiLevelDataset(LazyMultiLevelDataset): def __init__( self, - access_params: dict, + file_path: str, **open_params: dict[str, Any], ): - file_path = ( - f"{access_params["protocol"]}://{access_params["root"]}" - f"/{access_params["fs_path"]}" - ) self._file_path = file_path - self._access_params = access_params self._open_params = open_params super().__init__(ds_id=file_path) diff --git a/xcube_stac/store.py b/xcube_stac/store.py index 6a6d5e9..2522741 100644 --- a/xcube_stac/store.py +++ b/xcube_stac/store.py @@ -49,6 +49,7 @@ from .helper import Helper from .helper import HelperXcube from .helper import HelperCdse +from .helper import HelperCdseCreodiasVM from ._utils import ( assert_valid_data_type, assert_valid_opener_id, @@ -291,7 +292,8 @@ class StacCdseDataStore(StacDataStore): def __init__( self, - stack_mode: Union[bool, str] = False, + stack_mode: bool = False, + creodias_vm: bool = False, **storage_options_s3, ): storage_options_s3 = update_dict( @@ -301,7 +303,10 @@ def __init__( client_kwargs=dict(endpoint_url=CDSE_S3_ENDPOINT), ), ) - self._helper = HelperCdse(**storage_options_s3) + if creodias_vm: + self._helper = HelperCdseCreodiasVM() + else: + self._helper = HelperCdse(**storage_options_s3) super().__init__(url=CDSE_STAC_URL, stack_mode=stack_mode, **storage_options_s3) @classmethod diff --git a/xcube_stac/store_mode.py b/xcube_stac/store_mode.py index 72ac8e5..00616f8 100644 --- a/xcube_stac/store_mode.py +++ b/xcube_stac/store_mode.py @@ -107,6 +107,7 @@ def __init__( self._helper = helper self._https_accessor = None self._s3_accessor = None + self._file_accessor = None def access_item(self, data_id: str) -> pystac.Item: """Access item for a given data ID. @@ -261,10 +262,18 @@ def build_dataset_from_item( data_type=data_type, **open_params_asset, ) + elif params["protocol"] == "file": + opener = self._get_file_accessor(params) + ds_asset = opener.open_data( + params, + opener_id=opener_id, + data_type=data_type, + **open_params_asset, + ) else: url = get_url_from_pystac_object(item) raise DataStoreError( - f"Only 's3' and 'https' protocols are supported, not " + f"Only 'file', 's3' and 'https' protocols are supported, not " f"{params["protocol"]!r}. The asset {asset_key!r} has a href " f"{params["href"]!r}. The item's url is given by {url!r}." ) @@ -297,6 +306,20 @@ def build_dataset_from_item( ########################################################################## # Implementation helpers + def _get_file_accessor(self, access_params: dict) -> S3DataAccessor: + """This function returns the file data accessor associated with the + bucket *root*. + Args: + access_params: dictionary containing access parameter for one asset + + Returns: + file data accessor + """ + + if self._file_accessor is None: + self._file_accessor = self._helper.file_accessor(access_params["root"]) + return self._s3_accessor + def _get_s3_accessor(self, access_params: dict) -> S3DataAccessor: """This function returns the S3 data accessor associated with the bucket *root*. It creates the S3 data accessor only if it is not