diff --git a/xcube_clms/clms.py b/xcube_clms/clms.py index f86b587..e311f45 100644 --- a/xcube_clms/clms.py +++ b/xcube_clms/clms.py @@ -123,6 +123,13 @@ def open_dataset( self.refresh_token() self._fetch_all_datasets() item = self.access_item(data_id) + # TODO: Check for other unsupported datasets (non-EEA) for now. + path = item.get(DOWNLOADABLE_FILES).get(ITEMS)[0].get("path", "") + source = item.get(DOWNLOADABLE_FILES).get(ITEMS)[0].get("source", "") + + assert ( + path is not "" and source is not "" + ), f"This data product: {item["title"]} is not yet supported in this plugin yet." request_exists: bool = False while True: @@ -474,3 +481,117 @@ def _current_requests(self, dataset_id: str) -> tuple: return COMPLETE, key return UNDEFINED, "" + + def _queue_download(self, data_request: dict) -> str: + if not self._credentials: + raise Exception( + "You need credentials to open the data. Please provide credentials using set_credentials()." + ) + self.refresh_token() + self._fetch_all_datasets() + data_id = data_request.get("data_id", "") + spatial_coverage = data_request.get("spatial_coverage", "") + resolution = data_request.get("resolution", "") + format = data_request.get("format", "") + + item = self.access_item(data_id) + + # This is to make sure that there are pre-packaged files available for download. + # Without this, the API throws the following error: Error, the FileID is not valid. + # We check for path and source based on the API code here: + # https://github.com/eea/clms.downloadtool/blob/master/clms/downloadtool/api/services/datarequest_post/post.py#L177-L196 + + path = item.get(DOWNLOADABLE_FILES).get(ITEMS).get("path", "") + source = item.get(DOWNLOADABLE_FILES).get(ITEMS).get("source", "") + + assert ( + path is not "" and source is not "" + ), f"This data product: {item["title"]} is not yet supported in this plugin yet." + + request_exists: bool = False + while True: + status, task_id = self._current_requests(item[UID]) + if status == COMPLETE: + LOG.info( + f"Download request with task id {task_id} already completed for data id: {data_id}" + ) + request_exists = True + break + if status == UNDEFINED: + LOG.info(f"No download request exists for data id: {data_id}") + break + if status == PENDING: + LOG.info( + f"Download request with task id {task_id} already exists for data id: {data_id}. Status check again in 60 seconds" + ) + time.sleep(60) + + if not request_exists: + download_request_url, headers, json = self._prepare_download_request( + item, data_id, spatial_coverage, resolution + ) + + response_data = make_api_request( + method="POST", url=download_request_url, headers=headers, json=json + ) + response = get_response_of_type(response_data, JSON_TYPE) + LOG.info(f"Download Requested with Task ID : {response}") + + while True: + status, task_id = self._current_requests(item[UID]) + if status == COMPLETE: + LOG.info( + f"Download request with task id {task_id} completed for data id: {data_id}" + ) + break + if status == PENDING: + LOG.info( + f"Download request with task id {task_id} for data id: {data_id}. Status check again in 60 seconds" + ) + time.sleep(60) + + if self.download_url: + LOG.info(f"Downloading zip file from {self.download_url}") + headers = ACCEPT_HEADER.copy() + headers.update(CONTENT_TYPE_HEADER) + response_data = make_api_request( + self.download_url, headers=headers, stream=True + ) + response = get_response_of_type(response_data, BYTES_TYPE) + + with io.BytesIO(response.content) as zip_file_in_memory: + fs = fsspec.filesystem("zip", fo=zip_file_in_memory) + zip_contents = fs.ls(RESULTS) + actual_zip_file = None + if len(zip_contents) == 1: + if ".zip" in zip_contents[0][FILENAME]: + actual_zip_file = zip_contents[0] + elif len(zip_contents) > 1: + LOG.warn("Cannot handle more than one zip files at the moment.") + else: + LOG.info("No downloadable zip file found inside.") + if actual_zip_file: + LOG.info(f"Found one zip file {actual_zip_file}.") + with fs.open(actual_zip_file[NAME], "rb") as f: + zip_fs = fsspec.filesystem("zip", fo=f) + geo_file = self._find_geo_in_dir( + "/", + zip_fs, + ) + if geo_file: + with zip_fs.open(geo_file, "rb") as geo_f: + if "tif" in geo_file: + return rioxarray.open_rasterio(geo_f) + else: + return xr.open_dataset(geo_f) + else: + raise Exception("No GeoTiff file found") + + else: + raise Exception(f"No DownloadURL found for data_id {data_id}") + + def preload_data(self, data_requests: list[dict]): + task_ids = [] + for data_request in data_requests: + task_id = self._queue_download(data_request) + task_ids.append(task_id)