Skip to content

Commit

Permalink
Add initial unsupported datasets check
Browse files Browse the repository at this point in the history
  • Loading branch information
b-yogesh committed Nov 15, 2024
1 parent a91bd5c commit 8aa6014
Showing 1 changed file with 121 additions and 0 deletions.
121 changes: 121 additions & 0 deletions xcube_clms/clms.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ def open_dataset(
self.refresh_token()
self._fetch_all_datasets()
item = self.access_item(data_id)
# TODO: Check for other unsupported datasets (non-EEA) for now.
path = item.get(DOWNLOADABLE_FILES).get(ITEMS)[0].get("path", "")
source = item.get(DOWNLOADABLE_FILES).get(ITEMS)[0].get("source", "")

assert (
path is not "" and source is not ""
), f"This data product: {item["title"]} is not yet supported in this plugin yet."

request_exists: bool = False
while True:
Expand Down Expand Up @@ -474,3 +481,117 @@ def _current_requests(self, dataset_id: str) -> tuple:
return COMPLETE, key

return UNDEFINED, ""

def _queue_download(self, data_request: dict) -> str:
if not self._credentials:
raise Exception(
"You need credentials to open the data. Please provide credentials using set_credentials()."
)
self.refresh_token()
self._fetch_all_datasets()
data_id = data_request.get("data_id", "")
spatial_coverage = data_request.get("spatial_coverage", "")
resolution = data_request.get("resolution", "")
format = data_request.get("format", "")

item = self.access_item(data_id)

# This is to make sure that there are pre-packaged files available for download.
# Without this, the API throws the following error: Error, the FileID is not valid.
# We check for path and source based on the API code here:
# https://github.com/eea/clms.downloadtool/blob/master/clms/downloadtool/api/services/datarequest_post/post.py#L177-L196

path = item.get(DOWNLOADABLE_FILES).get(ITEMS).get("path", "")
source = item.get(DOWNLOADABLE_FILES).get(ITEMS).get("source", "")

assert (
path is not "" and source is not ""
), f"This data product: {item["title"]} is not yet supported in this plugin yet."

request_exists: bool = False
while True:
status, task_id = self._current_requests(item[UID])
if status == COMPLETE:
LOG.info(
f"Download request with task id {task_id} already completed for data id: {data_id}"
)
request_exists = True
break
if status == UNDEFINED:
LOG.info(f"No download request exists for data id: {data_id}")
break
if status == PENDING:
LOG.info(
f"Download request with task id {task_id} already exists for data id: {data_id}. Status check again in 60 seconds"
)
time.sleep(60)

if not request_exists:
download_request_url, headers, json = self._prepare_download_request(
item, data_id, spatial_coverage, resolution
)

response_data = make_api_request(
method="POST", url=download_request_url, headers=headers, json=json
)
response = get_response_of_type(response_data, JSON_TYPE)
LOG.info(f"Download Requested with Task ID : {response}")

while True:
status, task_id = self._current_requests(item[UID])
if status == COMPLETE:
LOG.info(
f"Download request with task id {task_id} completed for data id: {data_id}"
)
break
if status == PENDING:
LOG.info(
f"Download request with task id {task_id} for data id: {data_id}. Status check again in 60 seconds"
)
time.sleep(60)

if self.download_url:
LOG.info(f"Downloading zip file from {self.download_url}")
headers = ACCEPT_HEADER.copy()
headers.update(CONTENT_TYPE_HEADER)
response_data = make_api_request(
self.download_url, headers=headers, stream=True
)
response = get_response_of_type(response_data, BYTES_TYPE)

with io.BytesIO(response.content) as zip_file_in_memory:
fs = fsspec.filesystem("zip", fo=zip_file_in_memory)
zip_contents = fs.ls(RESULTS)
actual_zip_file = None
if len(zip_contents) == 1:
if ".zip" in zip_contents[0][FILENAME]:
actual_zip_file = zip_contents[0]
elif len(zip_contents) > 1:
LOG.warn("Cannot handle more than one zip files at the moment.")
else:
LOG.info("No downloadable zip file found inside.")
if actual_zip_file:
LOG.info(f"Found one zip file {actual_zip_file}.")
with fs.open(actual_zip_file[NAME], "rb") as f:
zip_fs = fsspec.filesystem("zip", fo=f)
geo_file = self._find_geo_in_dir(
"/",
zip_fs,
)
if geo_file:
with zip_fs.open(geo_file, "rb") as geo_f:
if "tif" in geo_file:
return rioxarray.open_rasterio(geo_f)
else:
return xr.open_dataset(geo_f)
else:
raise Exception("No GeoTiff file found")

else:
raise Exception(f"No DownloadURL found for data_id {data_id}")

def preload_data(self, data_requests: list[dict]):
task_ids = []
for data_request in data_requests:
task_id = self._queue_download(data_request)
task_ids.append(task_id)

0 comments on commit 8aa6014

Please sign in to comment.