Skip to content

Commit

Permalink
finally made it work all through
Browse files Browse the repository at this point in the history
  • Loading branch information
valeriupredoi committed Feb 29, 2024
1 parent 0677ca4 commit f3dc818
Showing 1 changed file with 19 additions and 63 deletions.
82 changes: 19 additions & 63 deletions activestorage/netcdf_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
import h5py


def _correct_compressor(content, varname):
"""Correct the compressor type as it comes out of Kerchunk."""
def _correct_compressor_and_filename(content, varname):
"""
Correct the compressor type as it comes out of Kerchunk.
Also correct file name as Kerchnk now prefixes it with "s3://"
and for special buckets like Bryan's bnl the correct file is bnl/file.nc
not s3://bnl/file.nc
"""
tc1 = time.time()
new_content = content.copy()
try:
new_zarray = ujson.loads(new_content['refs'][f"{varname}/.zarray"])
Expand All @@ -35,6 +41,14 @@ def _correct_compressor(content, varname):
else:
new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray)

# FIXME TODO this is an absolute nightmate: the type of bucket on UOR ACES
# this is a HACK and it works only with the crazy Bryan S3 bucket "bnl/file.nc"
for key in new_content['refs'].keys():
if varname in key and isinstance(new_content['refs'][key], list) and "s3://" in new_content['refs'][key][0]:
new_content['refs'][key][0] = new_content['refs'][key][0].replace("s3://", "")

tc2 = time.time()
print("Time to manipulate Kerchunk Zarr output", tc2 - tc1)
return new_content


Expand All @@ -57,55 +71,13 @@ def gen_json(file_url, varname, outf, storage_type, storage_options):
f.write(ujson.dumps(content).encode())

# S3 passed-in configuration
#### this implementation works with a minimally changed kerchunk/hdf.py #####
#############################################################################
### def __init__(
### self,
### h5f: "BinaryIO | str",
### url: str = None,
### spec=1,
### inline_threshold=500,
### storage_options=None,
### error="warn",
### vlen_encode="embed",
### ):
###
### # Open HDF5 file in read mode...
### lggr.debug(f"HDF5 file: {h5f}")
###
### if isinstance(h5f, str):
### fs, path = fsspec.core.url_to_fs(h5f, **(storage_options or {}))
### self.input_file = fs.open(path, "rb")
### url = h5f
### self._h5f = h5py.File(self.input_file, mode="r")
### elif isinstance(h5f, io.IOBase):
### self.input_file = h5f
### self._h5f = h5py.File(self.input_file, mode="r")
### elif isinstance(h5f, (h5py.File, h5py.Group)):
### self._h5f = h5f
###
### self.spec = spec
### self.inline = inline_threshold
### if vlen_encode not in ["embed", "null", "leave", "encode"]:
### raise NotImplementedError
### self.vlen = vlen_encode
###
### self.store = {}
### self._zroot = zarr.group(store=self.store, overwrite=True)
###
### self._uri = url
### self.error = error
### lggr.debug(f"HDF5 file URI: {self._uri}")
###############################################################################
elif storage_type == "s3" and storage_options is not None:
storage_options = storage_options.copy()
storage_options['default_fill_cache'] = False
storage_options['default_cache_type'] = "first"
fs = s3fs.S3FileSystem(**storage_options)
fs2 = fsspec.filesystem('')
tk1 = time.time()
print("Storage options dict", storage_options)
print("File url", file_url)
with fs.open(file_url, 'rb') as s3file:
s3file = h5py.File(s3file, mode="w")
if isinstance(s3file[varname], h5py.Dataset):
Expand All @@ -116,20 +88,19 @@ def gen_json(file_url, varname, outf, storage_type, storage_options):
elif isinstance(s3file[varname], h5py.Group):
print("Looking only at a single Group", s3file[varname])
s3file = s3file[varname]
# Kerchunk wants the correct file name in S3 format
if not file_url.startswith("s3://"):
file_url = "s3://" + file_url
print("File_url to Kerchunk", file_url)
h5chunks = SingleHdf5ToZarr(s3file, file_url,
inline_threshold=0,
storage_options=storage_options)
tk2 = time.time()
print("Time to set up Kerchunk", tk2 - tk1)
with fs2.open(outf, 'wb') as f:
content = h5chunks.translate()
content = _correct_compressor(content, varname)
content = _correct_compressor_and_filename(content, varname)
f.write(ujson.dumps(content).encode())
tk3 = time.time()
print("Time to Translate and Dump Kerchunks to json file", tk3 - tk2)
print("Time to Kerchunk and write JSON file", tk3 - tk2)
# not S3
else:
fs = fsspec.filesystem('')
Expand Down Expand Up @@ -182,7 +153,6 @@ def open_zarr_group(out_json, varname):
print(f"Zarr Group does not contain variable {varname}. "
f"Zarr Group info: {zarr_group.info}")
raise attrerr
#print("Zarr array info:", zarr_array.info)

return zarr_array

Expand All @@ -209,17 +179,3 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, bu
ref_ds = open_zarr_group(out_json.name, varname + " ")

return ref_ds, zarray, zattrs


#d = {'version': 1,
# 'refs': {
# '.zgroup': '{"zarr_format":2}',
# '.zattrs': '{"Conventions":"CF-1.6","access-list":"grenvillelister simonwilson jeffcole","awarning":"**** THIS SUITE WILL ARCHIVE NON-DUPLEXED DATA TO MOOSE. FOR CRITICAL MODEL RUNS SWITCH TO DUPLEXED IN: postproc --> Post Processing - common settings --> Moose Archiving --> non_duplexed_set. Follow guidance in http:\\/\\/www-twiki\\/Main\\/MassNonDuplexPolicy","branch-date":"1950-01-01","calendar":"360_day","code-version":"UM 11.6, NEMO vn3.6","creation_time":"2022-10-28 12:28","decription":"Initialised from EN4 climatology","description":"Copy of u-ar696\\/trunk@77470","email":"[email protected]","end-date":"2015-01-01","experiment-id":"historical","forcing":"AA,BC,CO2","forcing-info":"blah, blah, blah","institution":"NCAS","macro-parent-experiment-id":"historical","macro-parent-experiment-mip":"CMIP","macro-parent-variant-id":"r1i1p1f3","model-id":"HadGEM3-CG31-MM","name":"\\/work\\/n02\\/n02\\/grenvill\\/cylc-run\\/u-cn134\\/share\\/cycle\\/19500101T0000Z\\/3h_","owner":"rosalynhatcher","project":"Coupled Climate","timeStamp":"2022-Oct-28 12:20:33 GMT","title":"[CANARI] GC3.1 N216 ORCA025 UM11.6","uuid":"51e5ef20-d376-4aa6-938e-4c242886b7b1"}',
# 'lat/.zarray': '{"chunks":[324],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":null,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[324],"zarr_format":2}', 'lat/.zattrs': '{"_ARRAY_DIMENSIONS":["lat"],"axis":"Y","long_name":"Latitude","standard_name":"latitude","units":"degrees_north"}',
# 'lat/0': ['/home/david/Downloads/3h__19500101-19500110.nc', 26477, 560],
# 'lon/.zarray': '{"chunks":[432],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":null,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[432],"zarr_format":2}',
# 'lon/.zattrs': '{"_ARRAY_DIMENSIONS":["lon"],"axis":"X","long_name":"Longitude","standard_name":"longitude","units":"degrees_east"}',
# 'lon/0': ['/home/david/Downloads/3h__19500101-19500110.nc', 27037, 556],
# 'm01s00i507_10/.zarray': '{"chunks":[1,324,432],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":-1073741824.0,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[80,324,432],"zarr_format":2}',
# 'm01s00i507_10/.zattrs': '{"_ARRAY_DIMENSIONS":["time_counter","lat","lon"],"cell_methods":"time: mean (interval: 900 s)","coordinates":"time_centered","interval_offset":"0ts","interval_operation":"900 s","interval_write":"3 h","long_name":"OPEN SEA SURFACE TEMP AFTER TIMESTEP","missing_value":-1073741824.0,"online_operation":"average","standard_name":"surface_temperature","units":"K"}',
# }}

0 comments on commit f3dc818

Please sign in to comment.