From f3dc818fe75bc633d53f722241f38167a3a089a6 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 29 Feb 2024 15:38:08 +0000 Subject: [PATCH] finally made it work all through --- activestorage/netcdf_to_zarr.py | 82 ++++++++------------------------- 1 file changed, 19 insertions(+), 63 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 4ea71b33..ff32b7c8 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -13,8 +13,14 @@ import h5py -def _correct_compressor(content, varname): - """Correct the compressor type as it comes out of Kerchunk.""" +def _correct_compressor_and_filename(content, varname): + """ + Correct the compressor type as it comes out of Kerchunk. + Also correct file name as Kerchnk now prefixes it with "s3://" + and for special buckets like Bryan's bnl the correct file is bnl/file.nc + not s3://bnl/file.nc + """ + tc1 = time.time() new_content = content.copy() try: new_zarray = ujson.loads(new_content['refs'][f"{varname}/.zarray"]) @@ -35,6 +41,14 @@ def _correct_compressor(content, varname): else: new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray) + # FIXME TODO this is an absolute nightmate: the type of bucket on UOR ACES + # this is a HACK and it works only with the crazy Bryan S3 bucket "bnl/file.nc" + for key in new_content['refs'].keys(): + if varname in key and isinstance(new_content['refs'][key], list) and "s3://" in new_content['refs'][key][0]: + new_content['refs'][key][0] = new_content['refs'][key][0].replace("s3://", "") + + tc2 = time.time() + print("Time to manipulate Kerchunk Zarr output", tc2 - tc1) return new_content @@ -57,46 +71,6 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): f.write(ujson.dumps(content).encode()) # S3 passed-in configuration -#### this implementation works with a minimally changed kerchunk/hdf.py ##### -############################################################################# -### def __init__( -### self, -### h5f: "BinaryIO | str", -### url: str = None, -### spec=1, -### inline_threshold=500, -### storage_options=None, -### error="warn", -### vlen_encode="embed", -### ): -### -### # Open HDF5 file in read mode... -### lggr.debug(f"HDF5 file: {h5f}") -### -### if isinstance(h5f, str): -### fs, path = fsspec.core.url_to_fs(h5f, **(storage_options or {})) -### self.input_file = fs.open(path, "rb") -### url = h5f -### self._h5f = h5py.File(self.input_file, mode="r") -### elif isinstance(h5f, io.IOBase): -### self.input_file = h5f -### self._h5f = h5py.File(self.input_file, mode="r") -### elif isinstance(h5f, (h5py.File, h5py.Group)): -### self._h5f = h5f -### -### self.spec = spec -### self.inline = inline_threshold -### if vlen_encode not in ["embed", "null", "leave", "encode"]: -### raise NotImplementedError -### self.vlen = vlen_encode -### -### self.store = {} -### self._zroot = zarr.group(store=self.store, overwrite=True) -### -### self._uri = url -### self.error = error -### lggr.debug(f"HDF5 file URI: {self._uri}") -############################################################################### elif storage_type == "s3" and storage_options is not None: storage_options = storage_options.copy() storage_options['default_fill_cache'] = False @@ -104,8 +78,6 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): fs = s3fs.S3FileSystem(**storage_options) fs2 = fsspec.filesystem('') tk1 = time.time() - print("Storage options dict", storage_options) - print("File url", file_url) with fs.open(file_url, 'rb') as s3file: s3file = h5py.File(s3file, mode="w") if isinstance(s3file[varname], h5py.Dataset): @@ -116,20 +88,19 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): elif isinstance(s3file[varname], h5py.Group): print("Looking only at a single Group", s3file[varname]) s3file = s3file[varname] + # Kerchunk wants the correct file name in S3 format if not file_url.startswith("s3://"): file_url = "s3://" + file_url - print("File_url to Kerchunk", file_url) h5chunks = SingleHdf5ToZarr(s3file, file_url, inline_threshold=0, storage_options=storage_options) tk2 = time.time() - print("Time to set up Kerchunk", tk2 - tk1) with fs2.open(outf, 'wb') as f: content = h5chunks.translate() - content = _correct_compressor(content, varname) + content = _correct_compressor_and_filename(content, varname) f.write(ujson.dumps(content).encode()) tk3 = time.time() - print("Time to Translate and Dump Kerchunks to json file", tk3 - tk2) + print("Time to Kerchunk and write JSON file", tk3 - tk2) # not S3 else: fs = fsspec.filesystem('') @@ -182,7 +153,6 @@ def open_zarr_group(out_json, varname): print(f"Zarr Group does not contain variable {varname}. " f"Zarr Group info: {zarr_group.info}") raise attrerr - #print("Zarr array info:", zarr_array.info) return zarr_array @@ -209,17 +179,3 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, bu ref_ds = open_zarr_group(out_json.name, varname + " ") return ref_ds, zarray, zattrs - - -#d = {'version': 1, -# 'refs': { -# '.zgroup': '{"zarr_format":2}', -# '.zattrs': '{"Conventions":"CF-1.6","access-list":"grenvillelister simonwilson jeffcole","awarning":"**** THIS SUITE WILL ARCHIVE NON-DUPLEXED DATA TO MOOSE. FOR CRITICAL MODEL RUNS SWITCH TO DUPLEXED IN: postproc --> Post Processing - common settings --> Moose Archiving --> non_duplexed_set. Follow guidance in http:\\/\\/www-twiki\\/Main\\/MassNonDuplexPolicy","branch-date":"1950-01-01","calendar":"360_day","code-version":"UM 11.6, NEMO vn3.6","creation_time":"2022-10-28 12:28","decription":"Initialised from EN4 climatology","description":"Copy of u-ar696\\/trunk@77470","email":"r.k.schieman@reading.ac.uk","end-date":"2015-01-01","experiment-id":"historical","forcing":"AA,BC,CO2","forcing-info":"blah, blah, blah","institution":"NCAS","macro-parent-experiment-id":"historical","macro-parent-experiment-mip":"CMIP","macro-parent-variant-id":"r1i1p1f3","model-id":"HadGEM3-CG31-MM","name":"\\/work\\/n02\\/n02\\/grenvill\\/cylc-run\\/u-cn134\\/share\\/cycle\\/19500101T0000Z\\/3h_","owner":"rosalynhatcher","project":"Coupled Climate","timeStamp":"2022-Oct-28 12:20:33 GMT","title":"[CANARI] GC3.1 N216 ORCA025 UM11.6","uuid":"51e5ef20-d376-4aa6-938e-4c242886b7b1"}', -# 'lat/.zarray': '{"chunks":[324],"compressor":{"id":"zlib","level":1},"dtype":"