diff --git a/.github/workflows/run-test-push.yml b/.github/workflows/run-test-push.yml index 84380bc7..33471813 100644 --- a/.github/workflows/run-test-push.yml +++ b/.github/workflows/run-test-push.yml @@ -29,6 +29,13 @@ jobs: use-mamba: true - run: conda --version - run: python -V + - name: Install development version of NCAS-CMS/Pyfive:wacasoft + run: | + cd .. + git clone https://github.com/NCAS-CMS/pyfive.git + cd pyfive + git checkout wacasoft + pip install -e . - run: pip install -e . - run: conda list - run: pytest -n 2 --junitxml=report-1.xml diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 24aeb60f..8a278038 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -4,6 +4,7 @@ on: push: branches: - main + - pyfive schedule: - cron: '0 0 * * *' # nightly @@ -33,6 +34,13 @@ jobs: use-mamba: true - run: conda --version - run: python -V + - name: Install development version of NCAS-CMS/Pyfive:wacasoft + run: | + cd .. + git clone https://github.com/NCAS-CMS/pyfive.git + cd pyfive + git checkout wacasoft + pip install -e . - run: conda list - run: pip install -e . - run: conda list @@ -58,6 +66,13 @@ jobs: use-mamba: true - run: conda --version - run: python -V + - name: Install development version of NCAS-CMS/Pyfive:wacasoft + run: | + cd .. + git clone https://github.com/NCAS-CMS/pyfive.git + cd pyfive + git checkout wacasoft + pip install -e . - run: conda list - run: mamba install -c conda-forge git - run: pip install -e . diff --git a/.github/workflows/test_s3_minio.yml b/.github/workflows/test_s3_minio.yml index 66bebaed..cceb63d9 100644 --- a/.github/workflows/test_s3_minio.yml +++ b/.github/workflows/test_s3_minio.yml @@ -6,6 +6,7 @@ on: push: branches: - main # keep this at all times + - pyfive pull_request: schedule: - cron: '0 0 * * *' # nightly @@ -55,6 +56,13 @@ jobs: python-version: ${{ matrix.python-version }} miniforge-version: "latest" use-mamba: true + - name: Install development version of NCAS-CMS/Pyfive:wacasoft + run: | + cd .. + git clone https://github.com/NCAS-CMS/pyfive.git + cd pyfive + git checkout wacasoft + pip install -e . - name: Install PyActiveStorage run: | conda --version diff --git a/.github/workflows/test_s3_remote_reductionist.yml b/.github/workflows/test_s3_remote_reductionist.yml deleted file mode 100644 index 03908a9c..00000000 --- a/.github/workflows/test_s3_remote_reductionist.yml +++ /dev/null @@ -1,68 +0,0 @@ -# adapted GA workflow from https://github.com/stackhpc/reductionist-rs -# This runs Active with a remote Reductionist and S3 data stored elsewhere ---- -name: S3/Remote Reductionist - -on: - push: - branches: - - main # keep this at all times - pull_request: - schedule: - - cron: '0 0 * * *' # nightly - -# Required shell entrypoint to have properly configured bash shell -defaults: - run: - shell: bash -l {0} - -jobs: - linux-test: - runs-on: "ubuntu-latest" - strategy: - matrix: - python-version: ["3.10", "3.11", "3.12", "3.13"] - fail-fast: false - name: Linux Python ${{ matrix.python-version }} - steps: - - uses: actions/checkout@v3 - with: - fetch-depth: 0 - - uses: conda-incubator/setup-miniconda@v3 - with: - python-version: ${{ matrix.python-version }} - miniforge-version: "latest" - use-mamba: true - - name: Get conda and Python versions - run: | - conda --version - python -V - - name: Export proxy - run: | - echo 'USE_S3 = True' >> activestorage/config.py - echo 'REMOTE_RED = True' >> activestorage/config.py - - name: Ping remote Reductionist - run: curl -k https://192.171.169.248:8080/.well-known/reductionist-schema - - uses: conda-incubator/setup-miniconda@v3 - with: - activate-environment: activestorage-minio - environment-file: environment.yml - python-version: ${{ matrix.python-version }} - miniforge-version: "latest" - use-mamba: true - - name: Install PyActiveStorage - run: | - conda --version - python -V - which python - pip install -e . - - name: Run one single test - run: | - pytest tests/test_compression_remote_reductionist.py - - name: Upload HTML report artifact - uses: actions/upload-artifact@v4 - with: - name: html-report - path: test-reports/ - overwrite: true - if: always() diff --git a/activestorage/__init__.py b/activestorage/__init__.py index 7e632ea9..f5a6f53d 100644 --- a/activestorage/__init__.py +++ b/activestorage/__init__.py @@ -1,4 +1,5 @@ from .active import Active - -__version__ = "0.0.1" +# 0.0.1 is the initial version using kerchunk +# 0.0.2 is testing out the use of pyfive instead +__version__ = "0.0.2" diff --git a/activestorage/active.py b/activestorage/active.py index 1cf03eef..761cf3b6 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -1,25 +1,20 @@ import concurrent.futures -import contextlib import os import numpy as np import pathlib import urllib +import pyfive +import time +from pyfive.h5d import StoreInfo -import h5netcdf import s3fs -#FIXME: Consider using h5py throughout, for more generality -from netCDF4 import Dataset -from zarr.indexing import ( - OrthogonalIndexer, -) from activestorage.config import * from activestorage import reductionist -from activestorage.storage import reduce_chunk -from activestorage import netcdf_to_zarr as nz +from activestorage.storage import reduce_chunk, reduce_opens3_chunk +from activestorage.hdf2numcodec import decode_filters -@contextlib.contextmanager def load_from_s3(uri, storage_options=None): """ Load a netCDF4-like object from S3. @@ -43,11 +38,63 @@ def load_from_s3(uri, storage_options=None): client_kwargs={'endpoint_url': S3_URL}) # eg "http://localhost:9000" for Minio else: fs = s3fs.S3FileSystem(**storage_options) # use passed-in dictionary - with fs.open(uri, 'rb') as s3file: - ds = h5netcdf.File(s3file, 'r', invalid_netcdf=True) - print(f"Dataset loaded from S3 via h5netcdf: {ds}") - yield ds + + t1=time.time() + s3file = fs.open(uri, 'rb') + t2=time.time() + ds = pyfive.File(s3file) + t3=time.time() + print(f"Dataset loaded from S3 with s3fs and Pyfive: {uri} ({t2-t1:.2},{t3-t2:.2})") + return ds + +def _metricise(method): + """ Decorator for class methods loads into metric_data""" + def timed(self, *args, **kw): + ts = time.time() + metric_name='' + if '__metric_name' in kw: + metric_name = kw['__metric_name'] + del kw['__metric_name'] + result = method(self,*args, **kw) + te = time.time() + if metric_name: + self.metric_data[metric_name] = te-ts + return result + return timed + + +def get_missing_attributes(ds): + """" + Load all the missing attributes we need from a netcdf file + """ + def hfix(x): + ''' + return item if single element list/array + see https://github.com/h5netcdf/h5netcdf/issues/116 + ''' + if x is None: + return x + if not np.isscalar(x) and len(x) == 1: + return x[0] + return x + + _FillValue = hfix(ds.attrs.get('_FillValue')) + missing_value = ds.attrs.get('missing_value') + valid_min = hfix(ds.attrs.get('valid_min')) + valid_max = hfix(ds.attrs.get('valid_max')) + valid_range = hfix(ds.attrs.get('valid_range')) + if valid_max is not None or valid_min is not None: + if valid_range is not None: + raise ValueError( + "Invalid combination in the file of valid_min, " + "valid_max, valid_range: " + f"{valid_min}, {valid_max}, {valid_range}" + ) + elif valid_range is not None: + valid_min, valid_max = valid_range + + return _FillValue, missing_value, valid_min, valid_max class Active: """ @@ -83,7 +130,7 @@ def __init__( active_storage_url=None ): """ - Instantiate with a NetCDF4 dataset and the variable of interest within that file. + Instantiate with a NetCDF4 dataset URI and the variable of interest within that file. (We need the variable, because we need variable specific metadata from within that file, however, if that information is available at instantiation, it can be provided using keywords and avoid a metadata read.) @@ -94,7 +141,7 @@ def __init__( # Assume NetCDF4 for now self.uri = uri if self.uri is None: - raise ValueError(f"Must use a valid file for uri. Got {self.uri}") + raise ValueError(f"Must use a valid file for uri. Got {uri}") # still allow for a passable storage_type # for special cases eg "special-POSIX" ie DDN @@ -113,62 +160,75 @@ def __init__( self.ncvar = ncvar if self.ncvar is None: raise ValueError("Must set a netCDF variable name to slice") - self.zds = None self._version = 1 self._components = False self._method = None - self._lock = False self._max_threads = max_threads + self.missing = None + self.ds = None + self.metric_data = {} + self.data_read = 0 + + @_metricise + def __load_nc_file(self): + """ Get the netcdf file and it's b-tree""" + ncvar = self.ncvar + # in all cases we need an open netcdf file to get at attributes + # we keep it open because we need it's b-tree + if self.storage_type is None: + nc = pyfive.File(self.uri) + elif self.storage_type == "s3": + nc = load_from_s3(self.uri, self.storage_options) + self.filename = self.uri + + self.ds = nc[ncvar] + + def __get_missing_attributes(self): + if self.ds is None: + self.__load_nc_file() + return get_missing_attributes(self.ds) def __getitem__(self, index): """ Provides support for a standard get item. + #FIXME-BNL: Why is the argument index? """ - # In version one this is done by explicitly looping over each chunk in the file - # and returning the requested slice ourselves. In version 2, we can pass this - # through to the default method. - ncvar = self.ncvar + self.metric_data = {} + if self.ds is None: + self.__load_nc_file(__metric_name='load nc time') + #self.__metricise('Load','__load_nc_file') + + self.missing = self.__get_missing_attributes() + self.data_read = 0 + if self.method is None and self._version == 0: - # No active operation - lock = self.lock - if lock: - lock.acquire() - - if self.storage_type is None: - nc = Dataset(self.uri) - data = nc[ncvar][index] - nc.close() - elif self.storage_type == "s3": - with load_from_s3(self.uri, self.storage_options) as nc: - data = nc[ncvar][index] - data = self._mask_data(data, nc[ncvar]) - - if lock: - lock.release() - - return data + # No active operation + return self._get_vanilla(index, __metric_name='vanilla_time') + elif self._version == 1: - return self._via_kerchunk(index) + + #FIXME: is the difference between version 1 and 2 still honoured? + return self._get_selection(index, __metric_name='selection 1 time (s)') elif self._version == 2: - # No active operation either - lock = self.lock - if lock: - lock.acquire() - - data = self._via_kerchunk(index) - - if lock: - lock.release() - - return data + return self._get_selection(index, __metric_name='selection 2 time (s)') + else: raise ValueError(f'Version {self._version} not supported') + @_metricise + def _get_vanilla(self, index): + """ + Get the data without any active operation + """ + data = self.ds[index] + data = self._mask_data(data) + return data + @property def components(self): """Return or set the components flag. @@ -223,27 +283,6 @@ def ncvar(self): def ncvar(self, value): self._ncvar = value - @property - def lock(self): - """Return or set a lock that prevents concurrent file reads when accessing the data locally. - - The lock is either a `threading.Lock` instance, an object with - same API and functionality (such as - `dask.utils.SerializableLock`), or is `False` if no lock is - required. - - To be effective, the same lock instance must be used across - all process threads. - - """ - return self._lock - - @lock.setter - def lock(self, value): - if not value: - value = False - - self._lock = value def _get_active(self, method, *args): """ @@ -254,100 +293,53 @@ def _get_active(self, method, *args): an array returned via getitem. """ raise NotImplementedError - - def _via_kerchunk(self, index): - """ - The objective is to use kerchunk to read the slices ourselves. - """ - # FIXME: Order of calls is hardcoded' - if self.zds is None: - print(f"Kerchunking file {self.uri} with variable " - f"{self.ncvar} for storage type {self.storage_type}") - ds, zarray, zattrs = nz.load_netcdf_zarr_generic( - self.uri, - self.ncvar, - self.storage_type, - self.storage_options, - ) - # The following is a hangove from exploration - # and is needed if using the original doing it ourselves - # self.zds = make_an_array_instance_active(ds) - self.zds = ds - - # Retain attributes and other information - if zarray.get('fill_value') is not None: - zattrs['_FillValue'] = zarray['fill_value'] - - self.zarray = zarray - self.zattrs = zattrs - - # FIXME: We do not get the correct byte order on the Zarr - # Array's dtype when using S3, so capture it here. - self._dtype = np.dtype(zarray['dtype']) - - return self._get_selection(index) - + + @_metricise def _get_selection(self, *args): """ - First we need to convert the selection into chunk coordinates, - steps etc, via the Zarr machinery, then we get everything else we can - from zarr and friends and use simple dictionaries and tuples, then - we can go to the storage layer with no zarr. + At this point we have a Dataset object, but all the important information about + how to use it is in the attribute DataoobjectDataset class. Here we gather + metadata from the dataset instance and then continue with the dataobjects instance. """ - compressor = self.zds._compressor - filters = self.zds._filters - - # Get missing values - _FillValue = self.zattrs.get('_FillValue') - missing_value = self.zattrs.get('missing_value') - valid_min = self.zattrs.get('valid_min') - valid_max = self.zattrs.get('valid_max') - valid_range = self.zattrs.get('valid_range') - if valid_max is not None or valid_min is not None: - if valid_range is not None: - raise ValueError( - "Invalid combination in the file of valid_min, " - "valid_max, valid_range: " - f"{valid_min}, {valid_max}, {valid_range}" - ) - elif valid_range is not None: - valid_min, valid_max = valid_range + + # stick this here for later, to discuss with David + keepdims = True + + name = self.ds.name + dtype = np.dtype(self.ds.dtype) + # hopefully fix pyfive to get a dtype directly + array = pyfive.indexing.ZarrArrayStub(self.ds.shape, self.ds.chunks) + ds = self.ds.id + + self.metric_data['args'] = args + self.metric_data['dataset shape'] = self.ds.shape + self.metric_data['dataset chunks'] = self.ds.chunks + if ds.filter_pipeline is None: + compressor, filters = None, None + else: + compressor, filters = decode_filters(ds.filter_pipeline , dtype.itemsize, name) - missing = ( - _FillValue, - missing_value, - valid_min, - valid_max, - ) - - indexer = OrthogonalIndexer(*args, self.zds) + indexer = pyfive.indexing.OrthogonalIndexer(*args, array) out_shape = indexer.shape - out_dtype = self.zds._dtype - stripped_indexer = [(a, b, c) for a,b,c in indexer] - drop_axes = indexer.drop_axes # not sure what this does and why, yet. - - # yes this next line is bordering on voodoo ... - # this returns a nested dictionary with the full file FS reference - # ie all the gubbins: chunks, data structure, types, etc - # if using zarr<=2.13.3 call with _mutable_mapping ie - # fsref = self.zds.chunk_store._mutable_mapping.fs.references - fsref = self.zds.chunk_store.fs.references - - return self._from_storage(stripped_indexer, drop_axes, out_shape, - out_dtype, compressor, filters, missing, fsref) - - def _from_storage(self, stripped_indexer, drop_axes, out_shape, out_dtype, - compressor, filters, missing, fsref): + #stripped_indexer = [(a, b, c) for a,b,c in indexer] + drop_axes = indexer.drop_axes and keepdims + + # we use array._chunks rather than ds.chunks, as the latter is none in the case of + # unchunked data, and we need to tell the storage the array dimensions in this case. + return self._from_storage(ds, indexer, array._chunks, out_shape, dtype, compressor, filters, drop_axes) + + def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, filters, drop_axes): method = self.method + if method is not None: out = [] counts = [] else: - out = np.empty(out_shape, dtype=out_dtype, order=self.zds._order) + out = np.empty(out_shape, dtype=out_dtype, order=ds._order) counts = None # should never get touched with no method! # Create a shared session object. - if self.storage_type == "s3": + if self.storage_type == "s3" and self._version==2: if self.storage_options is not None: key, secret = None, None if "key" in self.storage_options: @@ -367,16 +359,26 @@ def _from_storage(self, stripped_indexer, drop_axes, out_shape, out_dtype, session = None # Process storage chunks using a thread pool. + # Because we do this, we need to read the dataset b-tree now, not as we go, so + # it is already in cache. If we remove the thread pool from here, we probably + # wouldn't need to do it before the first one. + + if ds.chunks is not None: + t1 = time.time() + # ds._get_chunk_addresses() + t2 = time.time() - t1 + self.metric_data['indexing time (s)'] = t2 + # self.metric_data['chunk number'] = len(ds._zchunk_index) + chunk_count = 0 + t1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor: futures = [] # Submit chunks for processing. - for chunk_coords, chunk_selection, out_selection in stripped_indexer: + for chunk_coords, chunk_selection, out_selection in indexer: future = executor.submit( self._process_chunk, - session, fsref, chunk_coords, chunk_selection, - counts, out_selection, - compressor, filters, missing, - drop_axes=drop_axes) + session, ds, chunks, chunk_coords, chunk_selection, + counts, out_selection, compressor, filters, drop_axes=drop_axes) futures.append(future) # Wait for completion. for future in concurrent.futures.as_completed(futures): @@ -385,6 +387,7 @@ def _from_storage(self, stripped_indexer, drop_axes, out_shape, out_dtype, except Exception as exc: raise else: + chunk_count +=1 if method is not None: result, count = result out.append(result) @@ -430,6 +433,10 @@ def _from_storage(self, stripped_indexer, drop_axes, out_shape, out_dtype, # size. out = out / np.sum(counts).reshape(shape1) + t2 = time.time() + self.metric_data['reduction time (s)'] = t2-t1 + self.metric_data['chunks processed'] = chunk_count + self.metric_data['storage read (B)'] = self.data_read return out def _get_endpoint_url(self): @@ -446,9 +453,8 @@ def _get_endpoint_url(self): return f"http://{urllib.parse.urlparse(self.filename).netloc}" - def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, - out_selection, compressor, filters, missing, - drop_axes=None): + def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, counts, + out_selection, compressor, filters, drop_axes=None): """ Obtain part or whole of a chunk. @@ -456,37 +462,47 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, the output array. Note the need to use counts for some methods + #FIXME: Do, we, it's not actually used? """ - coord = '.'.join([str(c) for c in chunk_coords]) - key = f"{self.ncvar}/{coord}" - rfile, offset, size = tuple(fsref[key]) - - # S3: pass in pre-configured storage options (credentials) - if self.storage_type == "s3": - print("S3 rfile is:", rfile) - parsed_url = urllib.parse.urlparse(rfile) + + # retrieve coordinates from chunk index + storeinfo = ds.get_chunk_info_from_chunk_coord(chunk_coords) + offset, size = storeinfo.byte_offset, storeinfo.size + self.data_read += size + + if self.storage_type == 's3' and self._version == 1: + + tmp, count = reduce_opens3_chunk(ds._fh, offset, size, compressor, filters, + self.missing, ds.dtype, + chunks, ds._order, + chunk_selection, method=self.method + ) + + elif self.storage_type == "s3" and self._version==2: + # S3: pass in pre-configured storage options (credentials) + # print("S3 rfile is:", self.filename) + parsed_url = urllib.parse.urlparse(self.filename) bucket = parsed_url.netloc object = parsed_url.path - # FIXME: We do not get the correct byte order on the Zarr Array's dtype - # when using S3, so use the value captured earlier. - dtype = self._dtype + # for certain S3 servers rfile needs to contain the bucket eg "bucket/filename" # as a result the parser above finds empty string bucket if bucket == "": bucket = os.path.dirname(object) object = os.path.basename(object) - print("S3 bucket:", bucket) - print("S3 file:", object) + # print("S3 bucket:", bucket) + # print("S3 file:", object) if self.storage_options is None: + # for the moment we need to force ds.dtype to be a numpy type tmp, count = reductionist.reduce_chunk(session, S3_ACTIVE_STORAGE_URL, S3_URL, bucket, object, offset, size, compressor, filters, - missing, dtype, - self.zds._chunks, - self.zds._order, + self.missing, np.dtype(ds.dtype), + chunks, + ds._order, chunk_selection, operation=self._method) else: @@ -503,47 +519,39 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, self._get_endpoint_url(), bucket, object, offset, size, compressor, filters, - missing, dtype, - self.zds._chunks, - self.zds._order, + self.missing, np.dtype(ds.dtype), + chunks, + ds._order, chunk_selection, operation=self._method) + elif self.storage_type=='ActivePosix' and self.version==2: + # This is where the DDN Fuse and Infinia wrappers go + raise NotImplementedError else: # note there is an ongoing discussion about this interface, and what it returns # see https://github.com/valeriupredoi/PyActiveStorage/issues/33 # so neither the returned data or the interface should be considered stable # although we will version changes. - tmp, count = reduce_chunk(rfile, offset, size, compressor, filters, - missing, self.zds._dtype, - self.zds._chunks, self.zds._order, + tmp, count = reduce_chunk(self.filename, offset, size, compressor, filters, + self.missing, ds.dtype, + chunks, ds._order, chunk_selection, method=self.method) if self.method is not None: - return tmp, count + return tmp, count else: if drop_axes: tmp = np.squeeze(tmp, axis=drop_axes) - return tmp, out_selection - - def _mask_data(self, data, ds_var): - """ppp""" - # TODO: replace with cfdm.NetCDFIndexer, hopefully. - attrs = ds_var.attrs - missing_value = attrs.get('missing_value') - _FillValue = attrs.get('_FillValue') - valid_min = attrs.get('valid_min') - valid_max = attrs.get('valid_max') - valid_range = attrs.get('valid_range') - - if valid_max is not None or valid_min is not None: - if valid_range is not None: - raise ValueError( - "Invalid combination in the file of valid_min, " - "valid_max, valid_range: " - f"{valid_min}, {valid_max}, {valid_range}" - ) - elif valid_range is not None: - valid_min, valid_max = valid_range + return tmp, out_selection + + def _mask_data(self, data): + """ + Missing values obtained at initial getitem, and are used here to + mask data, if necessary + """ + if self.missing is None: + self.missing = self.__get_missing_attributes() + _FillValue, missing_value, valid_min, valid_max = self.missing if _FillValue is not None: data = np.ma.masked_equal(data, _FillValue) diff --git a/activestorage/active_tools.py b/activestorage/active_tools.py deleted file mode 100644 index 59c05508..00000000 --- a/activestorage/active_tools.py +++ /dev/null @@ -1,351 +0,0 @@ -""" -Module to hold Zarr lift-up code. - -We're effectively subclassing zarr.core.Array, but not actually doing so, -instead we're providing tools to hack instances of it -""" -import numpy as np -import zarr - -from packaging import version - -from zarr.core import Array -# import other zarr gubbins used in the methods we override -from zarr.indexing import ( - OrthogonalIndexer, - PartialChunkIterator, - check_fields, - is_contiguous_selection, - -) -from zarr.util import ( - check_array_shape, - is_total_slice, - PartialReadBuffer, -) - -from numcodecs.compat import ensure_ndarray -from zarr.errors import ArrayIndexError - - -def make_an_array_instance_active(instance): - """ - Given a zarr array instance, override some key methods so - we can do active storage things. Note this only works for - normal and _ methods and would not work on __ methods. - - This an ugly hack for development to avoid having to hack - zarr internal in a fork. - """ - - instance.get_orthogonal_selection = as_get_orthogonal_selection.__get__(instance, Array) - instance._get_selection = as_get_selection.__get__(instance, Array) - instance._chunk_getitem = as_chunk_getitem.__get__(instance, Array) - instance._process_chunk = as_process_chunk.__get__(instance, Array) - instance._process_chunk_V = as_process_chunk_V.__get__(instance, Array) - - return instance - -def as_get_orthogonal_selection(self, selection, out=None, - fields=None): - """ - Retrieve data by making a selection for each dimension of the array. For - example, if an array has 2 dimensions, allows selecting specific rows and/or - columns. The selection for each dimension can be either an integer (indexing a - single item), a slice, an array of integers, or a Boolean array where True - values indicate a selection. - Parameters - ---------- - selection : tuple - A selection for each dimension of the array. May be any combination of int, - slice, integer array or Boolean array. - out : ndarray, optional - If given, load the selected data directly into this array. - fields : str or sequence of str, optional - For arrays with a structured dtype, one or more fields can be specified to - extract data for. - Returns - ------- - out : ndarray - A NumPy array containing the data for the requested selection. - Examples - -------- - Setup a 2-dimensional array:: - >>> import zarr - >>> import numpy as np - >>> z = zarr.array(np.arange(100).reshape(10, 10)) - Retrieve rows and columns via any combination of int, slice, integer array and/or - Boolean array:: - >>> z.get_orthogonal_selection(([1, 4], slice(None))) - array([[10, 11, 12, 13, 14, 15, 16, 17, 18, 19], - [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]]) - >>> z.get_orthogonal_selection((slice(None), [1, 4])) - array([[ 1, 4], - [11, 14], - [21, 24], - [31, 34], - [41, 44], - [51, 54], - [61, 64], - [71, 74], - [81, 84], - [91, 94]]) - >>> z.get_orthogonal_selection(([1, 4], [1, 4])) - array([[11, 14], - [41, 44]]) - >>> sel = np.zeros(z.shape[0], dtype=bool) - >>> sel[1] = True - >>> sel[4] = True - >>> z.get_orthogonal_selection((sel, sel)) - array([[11, 14], - [41, 44]]) - For convenience, the orthogonal selection functionality is also available via the - `oindex` property, e.g.:: - >>> z.oindex[[1, 4], :] - array([[10, 11, 12, 13, 14, 15, 16, 17, 18, 19], - [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]]) - >>> z.oindex[:, [1, 4]] - array([[ 1, 4], - [11, 14], - [21, 24], - [31, 34], - [41, 44], - [51, 54], - [61, 64], - [71, 74], - [81, 84], - [91, 94]]) - >>> z.oindex[[1, 4], [1, 4]] - array([[11, 14], - [41, 44]]) - >>> sel = np.zeros(z.shape[0], dtype=bool) - >>> sel[1] = True - >>> sel[4] = True - >>> z.oindex[sel, sel] - array([[11, 14], - [41, 44]]) - Notes - ----- - Orthogonal indexing is also known as outer indexing. - Slices with step > 1 are supported, but slices with negative step are not. - See Also - -------- - get_basic_selection, set_basic_selection, get_mask_selection, set_mask_selection, - get_coordinate_selection, set_coordinate_selection, set_orthogonal_selection, - vindex, oindex, __getitem__, __setitem__ - """ - - # refresh metadata - if not self._cache_metadata: - self._load_metadata() - - # check args - check_fields(fields, self._dtype) - - # setup indexer - indexer = OrthogonalIndexer(selection, self) - - return self._get_selection(indexer=indexer, out=out, - fields=fields) - - -def as_get_selection(self, indexer, out=None, - fields=None): - - # We iterate over all chunks which overlap the selection and thus contain data - # that needs to be extracted. Each chunk is processed in turn, extracting the - # necessary data and storing into the correct location in the output array. - - # N.B., it is an important optimisation that we only visit chunks which overlap - # the selection. This minimises the number of iterations in the main for loop. - - # check fields are sensible - out_dtype = check_fields(fields, self._dtype) - - # determine output shape - out_shape = indexer.shape - - # setup output array - if out is None: - out = np.empty(out_shape, dtype=out_dtype, order=self._order) - else: - check_array_shape('out', out, out_shape) - - chunks_info = [] - chunks_locs = [] - - # note: Zarr API has changed with zarr=2.15 - # hasattr(self.chunk_store, "getitems") = True for zarr >= 2.15 - zarr_version = zarr.__version__ - zarr_api_change_version = "2.15" - if version.parse(zarr_version) < version.parse(zarr_api_change_version): - att_getitems = not hasattr(self.chunk_store, "getitems") - elif version.parse(zarr_version) >= version.parse(zarr_api_change_version): - att_getitems = hasattr(self.chunk_store, "getitems") - # iterate over chunks - if att_getitems or any(map(lambda x: x == 0, self.shape)): - # sequentially get one key at a time from storage - for chunk_coords, chunk_selection, out_selection in indexer: - - if version.parse(zarr_version) < version.parse(zarr_api_change_version): - # load chunk selection into output array - pci = self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection, - drop_axes=indexer.drop_axes, fields=fields) - chunks_info.append(pci) - chunks_locs.append(chunk_coords) - elif version.parse(zarr_version) >= version.parse(zarr_api_change_version): - chunk_coords = [chunk_coords] - # load chunk selection into output array - pci = self._chunk_getitems(chunk_coords, chunk_selection, out, out_selection, - drop_axes=indexer.drop_axes, fields=fields) - chunks_info.append(pci) - chunks_locs.append(chunk_coords[0]) - else: - # allow storage to get multiple items at once - lchunk_coords, lchunk_selection, lout_selection = zip(*indexer) - self._chunk_getitems(lchunk_coords, lchunk_selection, out, lout_selection, - drop_axes=indexer.drop_axes, fields=fields) - - if out.shape: - return out, chunks_info, chunks_locs - else: - return out[()], chunks_info, chunks_locs - - -def as_chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, - drop_axes=None, fields=None): - """Obtain part or whole of a chunk. - Parameters - ---------- - chunk_coords : tuple of ints - Indices of the chunk. - chunk_selection : selection - Location of region within the chunk to extract. - out : ndarray - Array to store result in. - out_selection : selection - Location of region within output array to store results in. - drop_axes : tuple of ints - Axes to squeeze out of the chunk. - fields - TODO - """ - out_is_ndarray = True - try: - out = ensure_ndarray(out) - except TypeError: - out_is_ndarray = False - - assert len(chunk_coords) == len(self._cdata_shape) - - try: - pci_info = self._process_chunk_V(chunk_selection) - except KeyError: - # chunk not initialized - if self._fill_value is not None: - if fields: - fill_value = self._fill_value[fields] - else: - fill_value = self._fill_value - out[out_selection] = fill_value - pci_info = self._process_chunk_V(chunk_selection) - - return pci_info - - -def as_process_chunk( - self, - out, - cdata, - chunk_selection, - drop_axes, - out_is_ndarray, - fields, - out_selection, - partial_read_decode=False, -): - """Take binary data from storage and fill output array""" - if (out_is_ndarray and - not fields and - is_contiguous_selection(out_selection) and - is_total_slice(chunk_selection, self._chunks) and - not self._filters and - self._dtype != object): - - dest = out[out_selection] - write_direct = ( - dest.flags.writeable and - ( - (self._order == 'C' and dest.flags.c_contiguous) or - (self._order == 'F' and dest.flags.f_contiguous) - ) - ) - - if write_direct: - - # optimization: we want the whole chunk, and the destination is - # contiguous, so we can decompress directly from the chunk - # into the destination array - if self._compressor: - if isinstance(cdata, PartialReadBuffer): - cdata = cdata.read_full() - self._compressor.decode(cdata, dest) - else: - chunk = ensure_ndarray(cdata).view(self._dtype) - if np.prod(chunk.shape) > np.prod(self._chunks): - raise ValueError(f"Chunk shape {chunk.shape} exceeds " - f"data chunks shape {self._chunks}") - chunk = chunk.reshape(self._chunks, order=self._order) - np.copyto(dest, chunk) - return - - # decode chunk - try: - if partial_read_decode: - cdata.prepare_chunk() - # size of chunk - tmp = np.empty(self._chunks, dtype=self.dtype) - index_selection = PartialChunkIterator(chunk_selection, self.chunks) - for start, nitems, partial_out_selection in index_selection: - expected_shape = [ - len( - range(*partial_out_selection[i].indices(self.chunks[0] + 1)) - ) - if i < len(partial_out_selection) - else dim - for i, dim in enumerate(self.chunks) - ] - cdata.read_part(start, nitems) - chunk_partial = self._decode_chunk( - cdata.buff, - start=start, - nitems=nitems, - expected_shape=expected_shape, - ) - tmp[partial_out_selection] = chunk_partial - out[out_selection] = tmp[chunk_selection] - return - except ArrayIndexError: - cdata = cdata.read_full() - if self._compressor and isinstance(cdata, np.ndarray): - raise TypeError(f"cdata {cdata} is an ndarray, can not decompress.") - chunk = self._decode_chunk(cdata) - - # select data from chunk - if fields: - chunk = chunk[fields] - tmp = chunk[chunk_selection] - if drop_axes: - tmp = np.squeeze(tmp, axis=drop_axes) - - # store selected data in output - if np.prod(tmp.shape) > np.prod(out.shape): - raise ValueError(f"Storage chunk shape {tmp.shape} exceeds permitted " - f"output data shape {out.shape}.") - out[out_selection] = tmp - -def as_process_chunk_V(self, chunk_selection): - """Run an instance of PCI inside the engine.""" - index_selection = PartialChunkIterator(chunk_selection, self.chunks) - for _, _, _ in index_selection: - return self.chunks, chunk_selection, index_selection diff --git a/activestorage/hdf2numcodec.py b/activestorage/hdf2numcodec.py new file mode 100644 index 00000000..fc90cad3 --- /dev/null +++ b/activestorage/hdf2numcodec.py @@ -0,0 +1,102 @@ +import numcodecs + +def decode_filters(filter_pipeline, itemsize, name): + """ + + Convert HDF5 filter and compression instructions into instructions understood + by numcodecs. Input is a pyfive filter_pipeline object, the itemsize, and the + dataset name for error messages. + + We can only support things that are supported by our storage backends, which + is a rather limited range right now: + - gzip compression, and + - shuffle filter + + See aslso zarr kerchunk _decode_filters. We may need to add much more + support for BLOSC than currently supported. + + Useful notes on filters are here: + - https://docs.unidata.ucar.edu/netcdf-c/current/filters.html and + - https://docs.hdfgroup.org/hdf5/v1_8/group___h5_z.html + + In particular, note that we can only support things that go beyond native HDF5 + if _we_ support them directly. + + """ + compressors, filters = [], [] + + for filter in filter_pipeline: + + filter_id=filter['filter_id'] + properties = filter['client_data'] + + + # We suppor the following + if filter_id == GZIP_DEFLATE_FILTER: + compressors.append(numcodecs.Zlib(level=properties[0])) + elif filter_id == SHUFFLE_FILTER: + filters.append(numcodecs.Shuffle(elementsize=itemsize)) + else: + raise NotImplementedError('We cannot yet support filter id ',filter_id) + + # We might be able, in the future, to support the following + # At the moment the following code cannot be implemented, but we can move + # the loops up as we develop backend support. + + if 0: + + + if filter_id == 32001: + blosc_compressors = ( + "blosclz", + "lz4", + "lz4hc", + "snappy", + "zlib", + "zstd", + ) + ( + _1, + _2, + bytes_per_num, + total_bytes, + clevel, + shuffle, + compressor, + ) = properties + pars = dict( + blocksize=total_bytes, + clevel=clevel, + shuffle=shuffle, + cname=blosc_compressors[compressor], + ) + filters.append(numcodecs.Blosc(**pars)) + elif filter_id == 32015: + filters.append(numcodecs.Zstd(level=properties[0])) + elif filter_id == 32004: + raise RuntimeError( + f"{name} uses lz4 compression - not supported as yet" + ) + elif filter_id == 32008: + raise RuntimeError( + f"{name} uses bitshuffle compression - not supported as yet" + ) + else: + raise RuntimeError( + f"{name} uses filter id {filter_id} with properties {properties}," + f" not supported as yet" + ) + + if len(compressors) > 1: + raise ValueError('We only expected one compression algorithm') + return compressors[0], filters + +# These are from pyfive's HDF5 filter definitions +# IV.A.2.l The Data Storage - Filter Pipeline message +RESERVED_FILTER = 0 +GZIP_DEFLATE_FILTER = 1 +SHUFFLE_FILTER = 2 +FLETCH32_FILTER = 3 +SZIP_FILTER = 4 +NBIT_FILTER = 5 +SCALEOFFSET_FILTER = 6 diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py deleted file mode 100644 index 7bdc3d4d..00000000 --- a/activestorage/netcdf_to_zarr.py +++ /dev/null @@ -1,195 +0,0 @@ -import os -import numpy as np -import zarr -import ujson -import fsspec -import s3fs -import tempfile - -from activestorage.config import * -from kerchunk.hdf import SingleHdf5ToZarr - - -def _correct_compressor_and_filename(content, varname, bryan_bucket=False): - """ - Correct the compressor type as it comes out of Kerchunk (>=0.2.4; pinned). - Also correct file name as Kerchnk now prefixes it with "s3://" - and for special buckets like Bryan's bnl the correct file is bnl/file.nc - not s3://bnl/file.nc - """ - new_content = content.copy() - - # prelimniary assembly - try: - new_zarray = ujson.loads(new_content['refs'][f"{varname}/.zarray"]) - group = False - except KeyError: - new_zarray = ujson.loads(new_content['refs'][f"{varname} /{varname}/.zarray"]) - group = True - - # re-add the correct compressor if it's in the "filters" list - if new_zarray["compressor"] is None and new_zarray["filters"]: - for zfilter in new_zarray["filters"]: - if zfilter["id"] == "zlib": - new_zarray["compressor"] = zfilter - new_zarray["filters"].remove(zfilter) - - if not group: - new_content['refs'][f"{varname}/.zarray"] = ujson.dumps(new_zarray) - else: - new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray) - - # FIXME TODO this is an absolute nightmate: the type of bucket on UOR ACES - # this is a HACK and it works only with the crazy Bryan S3 bucket "bnl/file.nc" - # the problem: filename gets written to JSON as "s3://bnl/file.nc" but Reductionist doesn't - # find it since it needs url=bnl/file.nc, with endpoint URL being extracted from the - # endpoint_url of storage_options. BAH! - if bryan_bucket: - for key in new_content['refs'].keys(): - if varname in key and isinstance(new_content['refs'][key], list) and "s3://" in new_content['refs'][key][0]: - new_content['refs'][key][0] = new_content['refs'][key][0].replace("s3://", "") - - return new_content - - -def gen_json(file_url, varname, outf, storage_type, storage_options): - """Generate a json file that contains the kerchunk-ed data for Zarr.""" - # S3 configuration presets - if storage_type == "s3" and storage_options is None: - fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, - secret=S3_SECRET_KEY, - client_kwargs={'endpoint_url': S3_URL}, - default_fill_cache=False, - default_cache_type="first" # best for HDF5 - ) - fs2 = fsspec.filesystem('') - with fs.open(file_url, 'rb') as s3file: - h5chunks = SingleHdf5ToZarr(s3file, file_url, - inline_threshold=0) - - # TODO absolute crap, this needs to go - # see comments in _correct_compressor_and_filename - bryan_bucket = False - if "bnl" in file_url: - bryan_bucket = True - - with fs2.open(outf, 'wb') as f: - content = h5chunks.translate() - content = _correct_compressor_and_filename(content, - varname, - bryan_bucket=bryan_bucket) - f.write(ujson.dumps(content).encode()) - - # S3 passed-in configuration - elif storage_type == "s3" and storage_options is not None: - storage_options = storage_options.copy() - storage_options['default_fill_cache'] = False - storage_options['default_cache_type'] = "first" # best for HDF5 - fs = s3fs.S3FileSystem(**storage_options) - fs2 = fsspec.filesystem('') - with fs.open(file_url, 'rb') as s3file: - - # Kerchunk wants the correct file name in S3 format - if not file_url.startswith("s3://"): - file_url = "s3://" + file_url - - # TODO absolute crap: this needs to go - # see comments in _correct_compressor_and_filename - bryan_bucket = False - if "bnl" in file_url: - bryan_bucket = True - - h5chunks = SingleHdf5ToZarr(s3file, file_url, - inline_threshold=0) - with fs2.open(outf, 'wb') as f: - content = h5chunks.translate() - content = _correct_compressor_and_filename(content, - varname, - bryan_bucket=bryan_bucket) - f.write(ujson.dumps(content).encode()) - # not S3 - else: - fs = fsspec.filesystem('') - with fs.open(file_url, 'rb') as local_file: - try: - h5chunks = SingleHdf5ToZarr(local_file, file_url, - inline_threshold=0) - except OSError as exc: - raiser_1 = f"Unable to open file {file_url}. " - raiser_2 = "Check if file is netCDF3 or netCDF-classic" - print(raiser_1 + raiser_2) - raise exc - - # inline threshold adjusts the Size below which binary blocks are - # included directly in the output - # a higher inline threshold can result in a larger json file but - # faster loading time - # for active storage, we don't want anything inline - with fs.open(outf, 'wb') as f: - content = h5chunks.translate() - content = _correct_compressor_and_filename(content, - varname, - bryan_bucket=False) - f.write(ujson.dumps(content).encode()) - - zarray = ujson.loads(content['refs'][f"{varname}/.zarray"]) - zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"]) - - return outf, zarray, zattrs - - -def open_zarr_group(out_json, varname): - """ - Do the magic opening - - Open a json file read and saved by the reference file system - into a Zarr Group, then extract the Zarr Array you need. - That Array is in the 'data' attribute. - """ - fs = fsspec.filesystem("reference", fo=out_json) - mapper = fs.get_mapper("") # local FS mapper - #mapper.fs.reference has the kerchunk mapping, how does this propagate into the Zarr array? - zarr_group = zarr.open_group(mapper) - - try: - zarr_array = getattr(zarr_group, varname) - except AttributeError as attrerr: - print(f"Zarr Group does not contain variable {varname}. " - f"Zarr Group info: {zarr_group.info}") - raise attrerr - #print("Zarr array info:", zarr_array.info) - - return zarr_array - - -def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, build_dummy=True): - """Pass a netCDF4 file to be shaped as Zarr file by kerchunk.""" - print(f"Storage type {storage_type}") - - # Write the Zarr group JSON to a temporary file. - with tempfile.NamedTemporaryFile() as out_json: - _, zarray, zattrs = gen_json(fileloc, - varname, - out_json.name, - storage_type, - storage_options) - - # open this monster - print(f"Attempting to open and convert {fileloc}.") - ref_ds = open_zarr_group(out_json.name, varname) - - return ref_ds, zarray, zattrs - - -#d = {'version': 1, -# 'refs': { -# '.zgroup': '{"zarr_format":2}', -# '.zattrs': '{"Conventions":"CF-1.6","access-list":"grenvillelister simonwilson jeffcole","awarning":"**** THIS SUITE WILL ARCHIVE NON-DUPLEXED DATA TO MOOSE. FOR CRITICAL MODEL RUNS SWITCH TO DUPLEXED IN: postproc --> Post Processing - common settings --> Moose Archiving --> non_duplexed_set. Follow guidance in http:\\/\\/www-twiki\\/Main\\/MassNonDuplexPolicy","branch-date":"1950-01-01","calendar":"360_day","code-version":"UM 11.6, NEMO vn3.6","creation_time":"2022-10-28 12:28","decription":"Initialised from EN4 climatology","description":"Copy of u-ar696\\/trunk@77470","email":"r.k.schieman@reading.ac.uk","end-date":"2015-01-01","experiment-id":"historical","forcing":"AA,BC,CO2","forcing-info":"blah, blah, blah","institution":"NCAS","macro-parent-experiment-id":"historical","macro-parent-experiment-mip":"CMIP","macro-parent-variant-id":"r1i1p1f3","model-id":"HadGEM3-CG31-MM","name":"\\/work\\/n02\\/n02\\/grenvill\\/cylc-run\\/u-cn134\\/share\\/cycle\\/19500101T0000Z\\/3h_","owner":"rosalynhatcher","project":"Coupled Climate","timeStamp":"2022-Oct-28 12:20:33 GMT","title":"[CANARI] GC3.1 N216 ORCA025 UM11.6","uuid":"51e5ef20-d376-4aa6-938e-4c242886b7b1"}', -# 'lat/.zarray': '{"chunks":[324],"compressor":{"id":"zlib","level":1},"dtype":" requests.Session: """Create and return a client session object. @@ -53,7 +55,8 @@ def reduce_chunk(session, server, source, bucket, object, """ request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection) - print("Reductionist request data dictionary:", request_data) + if DEBUG: + print(f"Reductionist request data dictionary: {request_data}") api_operation = "sum" if operation == "mean" else operation or "select" url = f'{server}/v1/{api_operation}/' response = request(session, url, request_data) @@ -116,6 +119,8 @@ def encode_missing(missing): if missing_value: if isinstance(missing_value, collections.abc.Sequence): return {"missing_values": [encode_dvalue(v) for v in missing_value]} + elif isinstance(missing_value, np.ndarray): + return {"missing_values": [encode_dvalue(v) for v in missing_value]} else: return {"missing_value": encode_dvalue(missing_value)} if valid_min and valid_max: @@ -137,8 +142,8 @@ def build_request_data(source: str, bucket: str, object: str, offset: int, 'object': object, 'dtype': dtype.name, 'byte_order': encode_byte_order(dtype), - 'offset': offset, - 'size': size, + 'offset': int(offset), + 'size': int(size), 'order': order, } if shape: diff --git a/activestorage/storage.py b/activestorage/storage.py index 28d91f35..80a575ba 100644 --- a/activestorage/storage.py +++ b/activestorage/storage.py @@ -3,7 +3,9 @@ from numcodecs.compat import ensure_ndarray -def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, method=None): +def reduce_chunk(rfile, + offset, size, compression, filters, missing, dtype, shape, + order, chunk_selection, method=None): """ We do our own read of chunks and decoding etc rfile - the actual file with the data @@ -42,11 +44,12 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap if method: if missing != (None, None, None, None): tmp = remove_missing(tmp, missing) - # check on size of tmp; method(empty) returns nan - if tmp.any(): + # Check on size of tmp; method(empty) fails or gives incorrect + # results + if tmp.size: return method(tmp), tmp.size else: - return tmp, None + return tmp, 0 else: return tmp, None @@ -98,3 +101,36 @@ def read_block(open_file, offset, size): data = open_file.read(size) open_file.seek(place) return data + + +def reduce_opens3_chunk(fh, + offset, size, compression, filters, missing, dtype, shape, + order, chunk_selection, method=None): + """ + Same function as reduce_chunk, but this mimics what is done + deep in the bowels of H5py/pyfive. The reason for doing this is + so we can get per chunk metrics + """ + fh.seek(offset) + chunk_buffer = fh.read(size) + chunk = filter_pipeline(chunk_buffer, compression, filters) + # make it a numpy array of bytes + chunk = ensure_ndarray(chunk) + # convert to the appropriate data type + chunk = chunk.view(dtype) + # sort out ordering and convert to the parent hyperslab dimensions + chunk = chunk.reshape(-1, order='A') + chunk = chunk.reshape(shape, order=order) + + tmp = chunk[chunk_selection] + if method: + if missing != (None, None, None, None): + tmp = remove_missing(tmp, missing) + # check on size of tmp; method(empty) returns nan + if tmp.any(): + return method(tmp), tmp.size + else: + return tmp, None + else: + return tmp, None + diff --git a/bnl/bnl_s3test.py b/bnl/bnl_s3test.py new file mode 100644 index 00000000..938e98c0 --- /dev/null +++ b/bnl/bnl_s3test.py @@ -0,0 +1,81 @@ + +from activestorage.active import Active +import os +from pathlib import Path +from netCDF4 import Dataset +import numpy as np +import pyfive +import s3fs + +from activestorage.active import load_from_s3 + +S3_BUCKET = "bnl" + +def simple(filename, var): + + S3_URL = 'https://uor-aces-o.s3-ext.jc.rl.ac.uk/' + fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL}) + uri = 'bnl/'+filename + + with fs.open(uri,'rb') as s3file2: + f2 = pyfive.File(s3file2) + print(f2[var]) + + with fs.open(uri, 'rb') as s3file: + ds = pyfive.File(s3file) + print(ds[var]) + + #f2 = load_from_s3(uri, storage_options={'client_kwargs':{"endpoint_url":S3_URL}}) + +def ex_test(ncfile, var): + """ + Test use of datasets with compression and filters applied for a real + CMIP6 dataset (CMIP6-test.nc) - an IPSL file. + + This is for a special anon=True bucket connected to via valid key.secret + """ + storage_options = { + 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", + 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", + 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"} + } + active_storage_url = "https://192.171.169.248:8080" + + + mypath = Path(__file__).parent + uri = str(mypath/ncfile) + with Dataset(uri) as nc_data: + nc_min = np.min(nc_data[var][0:2,4:6,7:9]) + print(f"Numpy min from compressed file {nc_min}") + + ofile = os.path.basename(uri) + test_file_uri = os.path.join( + S3_BUCKET, + ofile + ) + print("S3 Test file path:", test_file_uri) + + for av in [0,1,2]: + + active = Active(test_file_uri, var, storage_type="s3", + storage_options=storage_options, + active_storage_url=active_storage_url) + + active._version = av + if av > 0: + active._method = "min" + + result = active[0:2,4:6,7:9] + print(active.metric_data) + if av == 0: + result = np.min(result) + assert nc_min == result + assert result == 239.25946044921875 + + + +if __name__=="__main__": + ncfile, var = 'CMIP6-test.nc','tas' + #ncfile, var = 'test_partially_missing_data.nc','data' + simple(ncfile, var) + ex_test(ncfile, var) \ No newline at end of file diff --git a/bnl/bnl_test.py b/bnl/bnl_test.py new file mode 100644 index 00000000..fd9e4d29 --- /dev/null +++ b/bnl/bnl_test.py @@ -0,0 +1,48 @@ +import os +import pytest + +import numpy as np +from netCDF4 import Dataset +from pathlib import Path +import s3fs + +from activestorage.active import Active +from activestorage.config import * + + +def mytest(): + """ + Test again a native model, this time around netCDF4 loadable with h5py + Also, this has _FillValue and missing_value + """ + ncfile, v = "cesm2_native.nc","TREFHT" + ncfile, v = "CMIP6-test.nc", 'tas' + #ncfile, v = "chunked.hdf5", "dataset1" + #ncfile, v = 'daily_data.nc', 'ta' + mypath = Path(__file__).parent + uri = str(mypath/ncfile) + results = [] + for av in [0,1,2]: + + active = Active(uri, v, None) + active._version = av + if av > 0: + active.method="mean" + if v == "dataset1": + d = active[2,:] + else: + d = active[4:5, 1:2] + print(active.metric_data) + if av == 0: + d = np.mean(d) + results.append(d) + + + # check for active + np.testing.assert_allclose(results[0],results[1], rtol=1e-6) + np.testing.assert_allclose(results[1],results[2], rtol=1e-6) + + + +if __name__=="__main__": + mytest() \ No newline at end of file diff --git a/bnl/bnl_timeseries.py b/bnl/bnl_timeseries.py new file mode 100644 index 00000000..92d64238 --- /dev/null +++ b/bnl/bnl_timeseries.py @@ -0,0 +1,68 @@ +# Simple code to get hemispheric mean of the lowest level of air temperature from +# float UM_m01s30i204_vn1106(time, air_pressure_3, latitude_1, longitude_1) ; +# UM_m01s30i204_vn1106:long_name = "TEMPERATURE ON P LEV/UV GRID" ;" ; +# UM_m01s30i204_vn1106:units = "K" ; +# UM_m01s30i204_vn1106:cell_methods = "time: point" ; +# with +# time = 40 ; +# latitude_1 = 1921 ; +# latitude = 1920 ; +# longitude_1 = 2560 ; +# air_pressure_3 = 11 ; + +# This variable is held in an 18 GB file on S3 storage + +from activestorage.active import Active +import numpy as np +from time import time + +S3_BUCKET = "bnl" +S3_URL = "https://uor-aces-o.s3-ext.jc.rl.ac.uk" +ACTIVE_URL = "https://192.171.169.248:8080" + +def timeseries(location='uni', blocks_MB=1, version=2, threads=100): + + invoke = time() + storage_options = { + 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", + 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", + 'client_kwargs': {'endpoint_url': f"{S3_URL}"}, + 'default_fill_cache':False, + 'default_cache_type':"readahead", + 'default_block_size': blocks_MB * 2**20 + } + + filename = 'ch330a.pc19790301-def.nc' + uri = S3_BUCKET + '/' + filename + var = "UM_m01s30i204_vn1106" + + active = Active(uri, var, storage_type="s3", max_threads=threads, + storage_options=storage_options, + active_storage_url=ACTIVE_URL) + + # set active to use the remote reductionist. + # (we intend to change the invocation method + # to something more obvious and transparent.) + active._version = version + + # and set the operation, again, the API will change + active._method = "mean" + + # get hemispheric mean timeseries: + # (this would be more elegant in cf-python) + ts = [] + md = [] + for i in range(40): + ts.append(active[i,0,0:960,:][0]) + # get some performance diagnostics from pyactive + print(active.metric_data) + if i == 0: + nct = active.metric_data['load nc time'] + md.append(active.metric_data['reduction time (s)']) + + result = np.array(ts) + print(result) + complete = time() + method = {1:'Local',2:'Active'}[version] + titlestring = f"{location}:{method} (T{threads},BS{blocks_MB}): {nct:.3}s,{sum(md):.4}s,{complete-invoke:.4}s" + print('Summary: ',titlestring) diff --git a/bnl/bnl_timeseries_plot.py b/bnl/bnl_timeseries_plot.py new file mode 100644 index 00000000..c654411a --- /dev/null +++ b/bnl/bnl_timeseries_plot.py @@ -0,0 +1,67 @@ +from pathlib import Path +import numpy as np +from matplotlib import pyplot as plt +import os + + +plt.rcParams["figure.figsize"] = (8,12) + +data = {'timeseries1':'Hm1, Active: 5 MB Blks', + 'timeseries2':'Hm1, Active: 1 MB Blks', + 'timeseries2v1':'Hm1, Local: 1 MB Blks', + 'timeseries2-uor-t1-a':'Uni, Active: T1, 1 MB Blks', + 'timeseries2-uor-t1-b':'Uni, Active: T1, 1 MB Blks', + 'timeseries2-uor-t1-c':'Uni, Active: T1, 1 MB Blks', + 'timeseries2-uor-t100-a':'Uni, Active: T100, 1 MB Blks', + 'timeseries2-uor-t100-b':'Uni, Active: T100, 1 MB Blks', + 'timeseries1-uor-t100-a':'Uni, Local: T100, 1 MB Blks', + 'timeseries1-uor-t100-b':'Uni, Local: T100, 1 MB Blks', + 'timeseries2v1-uor-t1':'Uni, Local: T1, 1 MB Blks', + 'timeseries1-uor-t1':'Uni, Active: T1, 5 MB Blks', + } +tt = [] + +mypath = Path(__file__).parent + +logfiles = mypath.glob('timeseries3-H*.log') + +#for d,t in data.items(): + +for ff in logfiles: + + #fname1 = mypath/f'{d}.log' + #fname2 = mypath/f'{d}.metrics.txt' + #os.system(f'grep -a "dataset shape" {fname1} > {fname2}') + #with open(fname2,'r') as f: + with open(ff,'r') as f: + lines = f.readlines() + dicts = [eval(l) for l in lines if l.startswith('{') ] + nct = dicts[0]['load nc time'] + rt = [v['reduction time (s)'] for v in dicts] + summary = lines[-1][9:] + overall_time = float(summary.split(',')[-1][:-2]) + tt.append((overall_time, rt, summary)) + #os.system(f'rm {fname2}') + +tts = sorted(tt) +curve = {k:v for i,v,k in tts} + +for k in curve.keys(): + + if 'Local' in k: + ls = 'dashed' + else: + ls = 'solid' + + plt.plot(curve[k], label=k, linestyle=ls) + + +plt.legend(bbox_to_anchor=(0.7, -0.1), fontsize=8) +plt.title('Comparison of reduction parameters') +plt.ylabel('Time to reduce each timestep (s)') +plt.tight_layout() +plt.savefig('home.png') +plt.show() + + + diff --git a/bnl/bnl_timeseries_runner.py b/bnl/bnl_timeseries_runner.py new file mode 100644 index 00000000..dda35478 --- /dev/null +++ b/bnl/bnl_timeseries_runner.py @@ -0,0 +1,15 @@ +from bnl_timeseries import timeseries +import sys + +location = 'Hm1' +blocks = [1,5] +version = [1,2] +threads = [1,100] +iterations = ['a','b','c'] + +for b in blocks: + for v in version: + for t in threads: + for i in iterations: + with open(f'timeseries3-{location}-{b}-{v}-{t}-{i}.log','w') as sys.stdout: + timeseries(location, b, v, t) diff --git a/bnl/import_test.py b/bnl/import_test.py new file mode 100644 index 00000000..e805f77c --- /dev/null +++ b/bnl/import_test.py @@ -0,0 +1,7 @@ +from tests import test_reductionist_json +from pathlib import Path + +mypath = Path(__file__).parent + +test_reductionist_json.test_build_request(mypath) + diff --git a/bnl/test_missing_gubbins.py b/bnl/test_missing_gubbins.py new file mode 100644 index 00000000..65f353a5 --- /dev/null +++ b/bnl/test_missing_gubbins.py @@ -0,0 +1,141 @@ + +from activestorage import dummy_data as dd +from activestorage import Active +import pyfive +import numpy.ma as ma +import numpy as np +import os +from activestorage.config import * +from pathlib import Path +import s3fs + + +def get_masked_data(ds): + """ This is buried in active itself we should pull it out for wider usage in our testing etc""" + attrs = ds.attrs + missing_value = attrs.get('missing_value') + _FillValue = attrs.get('_FillValue') + valid_min = attrs.get('valid_min') + valid_max = attrs.get('valid_max') + valid_range = attrs.get('valid_range') + + if valid_max is not None or valid_min is not None: + if valid_range is not None: + raise ValueError( + "Invalid combination in the file of valid_min, " + "valid_max, valid_range: " + f"{valid_min}, {valid_max}, {valid_range}" + ) + elif valid_range is not None: + valid_min, valid_max = valid_range + + data = ds[:] + + if _FillValue is not None: + data = np.ma.masked_equal(data, _FillValue) + + if missing_value is not None: + data = np.ma.masked_equal(data, missing_value) + + if valid_max is not None: + data = np.ma.masked_greater(data, valid_max) + + if valid_min is not None: + data = np.ma.masked_less(data, valid_min) + + return data + + + + +def upload_to_s3(server, username, password, bucket, object, rfile): + """Upload a file to an S3 object store.""" + s3_fs = s3fs.S3FileSystem(key=username, secret=password, client_kwargs={'endpoint_url': server}) + # Make sure s3 bucket exists + try: + s3_fs.mkdir(bucket) + except FileExistsError: + pass + + s3_fs.put_file(rfile, os.path.join(bucket, object)) + + +def get_storage_type(): + if USE_S3: + return "s3" + else: + return None + +def write_to_storage(ncfile): + """Write a file to storage and return an appropriate URI or path to access it.""" + if USE_S3: + object = os.path.basename(ncfile) + upload_to_s3(S3_URL, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, object, ncfile) + return os.path.join("s3://", S3_BUCKET, object) + else: + return ncfile + + +def active_zero(testfile): + """Run Active with no active storage (version=0).""" + active = Active(testfile, "data", get_storage_type()) + active._version = 0 + d = active[0:2, 4:6, 7:9] + assert ma.is_masked(d) + + # FIXME: For the S3 backend, h5netcdf is used to read the metadata. It does + # not seem to load the missing data attributes (missing_value, _FillValue, + # valid_min, valid_max, valid_range, etc). + assert ma.is_masked(d) + + return np.mean(d) + + +def active_two(testfile): + """Run Active with active storage (version=2).""" + active = Active(testfile, "data", get_storage_type()) + active._version = 2 + active.method = "mean" + active.components = True + result2 = active[0:2, 4:6, 7:9] + + active_mean = result2["sum"] / result2["n"] + + return active_mean + + +def load_dataset(testfile): + """Load data as netCDF4.Dataset.""" + ds = pyfive.File(testfile) + actual_data = get_masked_data(ds["data"]) + + ds.close() + + assert ma.is_masked(actual_data) + + return actual_data + + +def test_partially_missing_data(tmp_path): + testfile = str(tmp_path / 'test_partially_missing_data.nc') + r = dd.make_partially_missing_ncdata(testfile) + + # retrieve the actual numpy-ed result + actual_data = load_dataset(testfile) + unmasked_numpy_mean = actual_data[0:2, 4:6, 7:9].data.mean() + masked_numpy_mean = actual_data[0:2, 4:6, 7:9].mean() + assert unmasked_numpy_mean != masked_numpy_mean + print("Numpy masked result (mean)", masked_numpy_mean) + + # write file to storage + testfile = write_to_storage(testfile) + + # numpy masked to check for correct Active behaviour + no_active_mean = active_zero(testfile) + print("No active storage result (mean)", no_active_mean) + + active_mean = active_two(testfile) + print("Active storage result (mean)", active_mean) + + np.testing.assert_array_equal(masked_numpy_mean, active_mean) + np.testing.assert_array_equal(no_active_mean, active_mean) diff --git a/doc/data/home_experiments_bnl.txt b/doc/data/home_experiments_bnl.txt new file mode 100644 index 00000000..bc1af2f2 --- /dev/null +++ b/doc/data/home_experiments_bnl.txt @@ -0,0 +1,250 @@ +pyfive branch +default_fill_cache : False +default_cache_type : readahead +default_block_size : 1048576 + +r[0:3, 4:6, 7:9] +r[0:3, 400:600, 7:9] +r[0:3, 400:600, 800:1100] +r[5,:,800] +r[:,300:500,800:1000] +r[1,:,:] + +bnl/ch330a.pc19790301-def.nc, 64 chunk entries + +Reduction over 1 chunks took 0.4s +Overall time (version 1) - 5e+00s +6.5275819301605225 : Regular +6.11167573928833 : Active Regular +4.966563940048218 : Active Remote +5.777529239654541 : Regular +5.894297122955322 : Active Regular +4.513840913772583 : Active Remote +5.653490781784058 : Regular +5.954509019851685 : Active Regular +4.394351005554199 : Active Remote +5.651051044464111 : Regular +5.728168964385986 : Active Regular +4.569552898406982 : Active Remote +5.659423112869263 : Regular +5.652827024459839 : Active Regular +5.071002960205078 : Active Remote + +Reduction over 2 chunks took 0.4s +Overall time (version 1) - 4e+00s +7.908946990966797 : Regular +7.5908496379852295 : Active Regular +4.481859922409058 : Active Remote +8.732571840286255 : Regular +8.1283700466156 : Active Regular +4.558679103851318 : Active Remote +7.707890033721924 : Regular +7.721914052963257 : Active Regular +4.404280185699463 : Active Remote +7.809189796447754 : Regular +7.916913986206055 : Active Regular +4.436033010482788 : Active Remote +7.775968790054321 : Regular +7.735078811645508 : Active Regular +4.649733781814575 : Active Remote + +Reduction over 2 chunks took 0.5s +Overall time (version 1) - 5e+00s +7.754213809967041 : Regular +9.058350801467896 : Active Regular +4.2825469970703125 : Active Remote +7.6808648109436035 : Regular +7.936631202697754 : Active Regular +4.475581884384155 : Active Remote +7.986822843551636 : Regular +7.767880916595459 : Active Regular +5.64799165725708 : Active Remote +8.05326509475708 : Regular +8.558978080749512 : Active Regular +4.387536287307739 : Active Remote +8.05326509475708 : Regular +8.558978080749512 : Active Regular +4.387536287307739 : Active Remote + +Reduction over 4 chunks took 1e+00s +Overall time (version 1) - 5e+00s +11.737549781799316 : Regular +11.829929113388062 : Active Regular +5.3570029735565186 : Active Remote +12.96435260772705 : Regular +11.708273887634277 : Active Regular +4.427199125289917 : Active Remote +12.082044124603271 : Regular +11.663249969482422 : Active Regular +4.448379039764404 : Active Remote +12.160427808761597 : Regular +12.381748914718628 : Active Regular +4.864629030227661 : Active Remote +11.940709114074707 : Regular +12.424716234207153 : Active Regular +4.6097517013549805 : Active Remote + +Reduction over 8 chunks took 0.7s +Overall time (version 1) - 5e+00s +20.35632634162903 : Regular +18.701595783233643 : Active Regular +4.780594825744629 : Active Remote +18.91178870201111 : Regular +20.01336669921875 : Active Regular +4.806182861328125 : Active Remote +19.31538200378418 : Regular +19.239036321640015 : Active Regular +4.886173963546753 : Active Remote +18.862577199935913 : Regular +18.919509172439575 : Active Regular +4.527647972106934 : Active Remote +19.1742422580719 : Regular +19.013291835784912 : Active Regular +4.555526971817017 : Active Remote + +Reduction over 16 chunks took 1e+00s +Overall time (version 1) - 5e+00s +33.903298139572144 : Regular +33.95520091056824 : Active Regular +5.479094982147217 : Active Remote +33.47618818283081 : Regular +33.14509582519531 : Active Regular +4.9755659103393555 : Active Remote +34.320921182632446 : Regular +33.631829023361206 : Active Regular +4.777320861816406 : Active Remote +33.07414221763611 : Regular +33.10297989845276 : Active Regular +4.985634803771973 : Active Remote +33.271361112594604 : Regular +33.41872477531433 : Active Regular +5.1617209911346436 : Active Remote + +Reduction over 64 chunks took 3e+00s +Overall time (version 1) - 7e+00s +135.66737484931946 : Regular +125.51056480407715 : Active Regular +6.838850021362305 : Active Remote +125.06190586090088 : Regular +123.48610496520996 : Active Regular +7.4687440395355225 : Active Remote +124.92097496986389 : Regular +123.4994330406189 : Active Regular +6.4895899295806885 : Active Remote +127.3746292591095 : Regular +129.92956972122192 : Active Regular +7.572640895843506 : Active Remote +123.77195906639099 : Regular +124.58749127388 : Active Regular +7.2448132038116455 : Active Remote + + +bnl/ch330a.pc19790301-bnl.nc, 3400 chunk entries + +Reduction over 1 chunks took 0.4s +Overall time (version 1) - 4e+01s +54.81114602088928 : Regular +40.206737995147705 : Active Regular +39.09910821914673 : Active Remote +39.46443700790405 : Regular +39.027403116226196 : Active Regular +37.75058889389038 : Active Remote +37.9528648853302 : Regular +41.495553731918335 : Active Regular +38.07294011116028 : Active Remote +37.73332500457764 : Regular +40.39740324020386 : Active Regular +38.49332809448242 : Active Remote +39.069754123687744 : Regular +40.393052101135254 : Active Regular +39.88633894920349 : Active Remote + +Reduction over 3 chunks took 0.4s +Overall time (version 1) - 4e+01s +40.984943151474 : Regular +39.61718392372131 : Active Regular +39.1941978931427 : Active Remote +39.66099715232849 : Regular +39.43976306915283 : Active Regular +38.039302825927734 : Active Remote +39.62373924255371 : Regular +39.98037099838257 : Active Regular +38.41215372085571 : Active Remote +39.09483599662781 : Regular +39.73953199386597 : Active Regular +41.10827708244324 : Active Remote +40.30459427833557 : Regular +39.665311098098755 : Active Regular +39.48872089385986 : Active Remote + +Reduction over 9 chunks took 0.5s +Overall time (version 1) - 4e+01s +45.93427586555481 : Regular +51.21455001831055 : Active Regular +42.55308794975281 : Active Remote +43.54172086715698 : Regular +43.66517210006714 : Active Regular +38.02682185173035 : Active Remote +43.521965980529785 : Regular +46.44435095787048 : Active Regular +38.40240478515625 : Active Remote +43.9306001663208 : Regular +43.83222007751465 : Active Regular +38.353585958480835 : Active Remote +43.72931981086731 : Regular +45.22025394439697 : Active Regular +39.67265009880066 : Active Remote + +Reduction over 17 chunks took 0.9s +Overall time (version 1) - 4e+01s +47.17348670959473 : Regular +48.371257066726685 : Active Regular +39.67051911354065 : Active Remote +46.177210092544556 : Regular +48.33781385421753 : Active Regular +38.0770800113678 : Active Remote +48.514657974243164 : Regular +49.18296504020691 : Active Regular +39.39418125152588 : Active Remote +50.68878698348999 : Regular +45.65637493133545 : Active Regular +38.83580207824707 : Active Remote +47.00814175605774 : Regular +46.89705014228821 : Active Regular +38.41860294342041 : Active Remote + +Reduction over 60 chunks took 2e+00s +Overall time (version 1) - 4e+01s +71.09002494812012 : Regular +72.1630642414093 : Active Regular +39.93671202659607 : Active Remote +71.39048480987549 : Regular +69.36404490470886 : Active Regular +40.83954119682312 : Active Remote +73.05522775650024 : Regular +70.12043809890747 : Active Regular +42.23869585990906 : Active Remote +71.30378913879395 : Regular +71.10070180892944 : Active Regular +43.04809808731079 : Active Remote +72.75630617141724 : Regular +71.60838198661804 : Active Regular +40.027408838272095 : Active Remote + +Reduction over 340 chunks took 8e+00s +Overall time (version 1) - 5e+01s +216.78450202941895 : Regular +208.11211395263672 : Active Regular +45.26900100708008 : Active Remote +222.73459100723267 : Regular +208.41155195236206 : Active Regular +48.21009302139282 : Active Remote +207.01012992858887 : Regular +212.10812401771545 : Active Regular +47.88032793998718 : Active Remote +209.2855679988861 : Regular +208.8525538444519 : Active Regular +46.665223836898804 : Active Remote +203.70479106903076 : Regular +213.17324495315552 : Active Regular +49.18446397781372 : Active Remote diff --git a/doc/data/plot_bnl.py b/doc/data/plot_bnl.py new file mode 100644 index 00000000..fb76301d --- /dev/null +++ b/doc/data/plot_bnl.py @@ -0,0 +1,130 @@ +from pathlib import Path +from matplotlib import pyplot as plt +import numpy as np + +dimensions = np.array([40, 1920, 2560]) +requests = 3*2*2, 3*199*2,3*199*299,1920,40*199*199,1920*2560 +big_chunks = np.array([10,480,640]) +small_chunks = np.array([4, 113, 128]) + + +def get_data(fname, big=True): + + big, small = {}, {} + block = False + with open(fname,'r') as f: + for line in f: + + if line.startswith('bnl'): + if '-def' in line: + inplay = big + else: + inplay = small + + if line.startswith('Reduction over'): + block=True + r1,r2,r3 = [],[],[] + key = line[line.find('over')+5:line.find('chunks')] + print(key) + continue + if line.startswith('Overall'): + continue + if block: + try: + v, t = tuple([x.strip() for x in line.split(':')]) + if t.startswith('Active Regular'): + r2.append(v) + elif t.startswith('Regular'): + r1.append(v) + elif t.startswith('Active Remote'): + r3.append(v) + except: + inplay[key]=r1,r2,r3 + print('Skipping End of Block') + block = False + else: + print('Skipping') + continue + inplay[key]=r1,r2,r3 + print(big) + print(small) + print(big.keys(), small.keys()) + return small, big + +def do_all(hs,hb,ws,wb): + + titles = ['Home - Small Chunks', + 'Home - Big Chunks', + 'Uni - Small Chunks', + 'Uni - Big Chunks'] + + fig, axes = plt.subplots(nrows=2, ncols=2) + fig.set_size_inches(8, 8) + axes = axes.flatten() + + for a, d, t in zip(axes, [hs,hb,ws,wb], titles): + do_plot(a, d, t) + plt.tight_layout() + plt.show() + +def do_plot(ax, data, t): + + def stats4(d1): + dd = np.array([float(v) for v in d1]) + return [np.mean(dd),np.min(dd),np.max(dd)] + + keys = list(data.keys()) + + x = [] + regular, local, remote = [], [], [] + for k in keys: + # three time series for each key + reg, loc, rem = data[k] + sreg, sloc, srem = stats4(reg), stats4(loc), stats4(rem) + regular.append(sreg) + local.append(sloc) + remote.append(srem) + x.append(float(k)) + + delta = 0 + all = True + for d,c in zip([regular, local, remote],['g','r','b']): + x=np.array(x)+delta*0.2 + y = [r[0] for r in d] + err = [[r[0]-r[1] for r in d],[r[2]-r[0] for r in d]] + if c == 'b' or all: + ax.errorbar(x, y, fmt='o', yerr=err, color=c) + delta+=1 + ax.set_title(t) + ax.set_xlabel('Chunks Processed') + ax.set_ylabel('s') + + if t.find('Small') > 0: + cv = np.prod(small_chunks)*4/1e6 + else: + cv = np.prod(big_chunks)*4/1e6 + v = np.prod(dimensions)*4/1e6 + r = v/cv + print(f'Chunk volume {cv}Mb, Dataset volume {v}Mb - ratio {r}') + + + def c2v(x): + return x*cv + def v2c(x): + return x/cv + + + tax = ax.secondary_xaxis(-0.2, functions=(c2v,v2c)) + tax.set_xlabel('Reductionist Read (MB)') + + + +if __name__=="__main__": + mypath = Path(__file__).parent + fname = mypath/'work_experiments_bnl.txt' + ws, wb = get_data(fname) + fname = mypath/'home_experiments_bnl.txt' + hs, hb = get_data(fname) + do_all(hs, hb, ws, wb) + + diff --git a/doc/data/work_experiments_bnl.txt b/doc/data/work_experiments_bnl.txt new file mode 100644 index 00000000..18e2f1d5 --- /dev/null +++ b/doc/data/work_experiments_bnl.txt @@ -0,0 +1,266 @@ +pyfive branch +default_fill_cache : False +default_cache_type : readahead +default_block_size : 1048576 + +r[0:3, 4:6, 7:9] +r[0:3, 400:600, 7:9] +r[0:3, 400:600, 800:1100] +r[5,:,800] +r[:,300:500,800:1000] +r[1,:,:] + +bnl/ch330a.pc19790301-def.nc, 64 chunk entries + +Reduction over 1 chunks took 0.6s +Overall time (version 1) - 3e+00s +8.344115972518921 : Regular +4.220659971237183 : Active Regular +3.1975197792053223 : Active Remote +3.014167070388794 : Regular +3.355847120285034 : Active Regular +2.804471015930176 : Active Remote +2.9698619842529297 : Regular +2.705400228500366 : Active Regular +3.429996967315674 : Active Remote +2.8196191787719727 : Regular +2.7991769313812256 : Active Regular +2.6798598766326904 : Active Remote +2.8660669326782227 : Regular +3.903325080871582 : Active Regular +2.599932909011841 : Active Remote + +Reduction over 2 chunks took 0.3s +Overall time (version 1) - 3e+00s +2.8660669326782227 : Regular +3.903325080871582 : Active Regular +2.599932909011841 : Active Remote +3.29656720161438 : Regular +3.525721788406372 : Active Regular +2.75189471244812 : Active Remote +3.4454238414764404 : Regular +3.665536880493164 : Active Regular +2.571530818939209 : Active Remote +3.538390874862671 : Regular +4.367408990859985 : Active Regular +2.61018705368042 : Active Remote +3.660421848297119 : Regular +3.500704050064087 : Active Regular +2.6997482776641846 : Active Remote + +Reduction over 2 chunks took 0.4s +Overall time (version 1) - 3e+00s +4.397391080856323 : Regular +3.7289319038391113 : Active Regular +2.8331010341644287 : Active Remote +3.8689942359924316 : Regular +3.6623740196228027 : Active Regular +2.6047868728637695 : Active Remote +3.592682123184204 : Regular +3.7491321563720703 : Active Regular +2.693451166152954 : Active Remote +3.557407855987549 : Regular +4.505606174468994 : Active Regular +3.362746000289917 : Active Remote +4.008091926574707 : Regular +3.4869918823242188 : Active Regular +2.7830257415771484 : Active Remote + +Reduction over 4 chunks took 0.5s +Overall time (version 1) - 3e+00s +4.989245176315308 : Regular +5.01286506652832 : Active Regular +2.7912440299987793 : Active Remote +5.019068002700806 : Regular +5.267661094665527 : Active Regular +2.693160057067871 : Active Remote +5.072666883468628 : Regular +4.841257095336914 : Active Regular +3.7721190452575684 : Active Remote +4.9821202754974365 : Regular +5.039299726486206 : Active Regular +3.1108968257904053 : Active Remote +5.485536098480225 : Regular +4.931081295013428 : Active Regular +2.8403587341308594 : Active Remote + +Reduction over 8 chunks took 0.6s +Overall time (version 1) - 3e+00s +8.201773881912231 : Regular +8.027722120285034 : Active Regular +3.062329053878784 : Active Remote +8.066512107849121 : Regular +7.513127326965332 : Active Regular +2.738662004470825 : Active Remote +7.728527069091797 : Regular +8.574406147003174 : Active Regular +3.0061259269714355 : Active Remote +8.332377195358276 : Regular +7.553836107254028 : Active Regular +2.7933387756347656 : Active Remote +9.098789930343628 : Regular +8.0898118019104 : Active Regular +2.934377908706665 : Active Remote + +Reduction over 16 chunks took 0.9s +Overall time (version 1) - 3e+00s +13.034955978393555 : Regular +12.481261968612671 : Active Regular +3.220118999481201 : Active Remote +12.732249736785889 : Regular +14.228748798370361 : Active Regular +3.1314401626586914 : Active Remote +13.192391157150269 : Regular +12.807947874069214 : Active Regular +2.9496419429779053 : Active Remote +13.663172721862793 : Regular +12.501967191696167 : Active Regular +3.0187788009643555 : Active Remote +13.663172721862793 : Regular +12.501967191696167 : Active Regular +3.0187788009643555 : Active Remote + +Reduction over 64 chunks took 4e+00s +Overall time (version 1) - 7e+00s +48.40218806266785 : Regular +46.85928988456726 : Active Regular +6.921466827392578 : Active Remote +48.40218806266785 : Regular +46.85928988456726 : Active Regular +6.921466827392578 : Active Remote +46.171929121017456 : Regular +47.95656108856201 : Active Regular +5.700792074203491 : Active Remote +46.47747492790222 : Regular +46.345592975616455 : Active Regular +5.459475040435791 : Active Remote +51.872729778289795 : Regular +68.54537105560303 : Active Regular +8.05989122390747 : Active Remote + + +bnl/ch330a.pc19790301-bnl.nc, 3400 chunk entries + +Reduction over 1 chunks took 0.4s +Overall time (version 1) - 2e+01s +45.14613080024719 : Regular +27.162260055541992 : Active Regular +23.115849256515503 : Active Remote +23.882447004318237 : Regular +23.903882026672363 : Active Regular +21.457104921340942 : Active Remote +25.383798837661743 : Regular +24.16863775253296 : Active Regular +23.5748450756073 : Active Remote +22.512950897216797 : Regular +23.25707483291626 : Active Regular +25.665203094482422 : Active Remote +26.175257205963135 : Regular +22.014382123947144 : Active Regular +20.443835020065308 : Active Remote + +Reduction over 3 chunks took 0.6s +Overall time (version 1) - 2e+01s +20.11542320251465 : Regular +20.76677107810974 : Active Regular +22.79468083381653 : Active Remote +27.61694622039795 : Regular +26.036365032196045 : Active Regular +24.752694129943848 : Active Remote +24.694957971572876 : Regular +23.442779064178467 : Active Regular +25.0538170337677 : Active Remote +25.87760305404663 : Regular +25.108362913131714 : Active Regular +20.546348094940186 : Active Remote +22.047937870025635 : Regular +21.188127040863037 : Active Regular +22.21766209602356 : Active Remote + +Reduction over 9 chunks took 0.4s +Overall time (version 1) - 2e+01s +25.24668002128601 : Regular +28.922791957855225 : Active Regular +22.928779125213623 : Active Remote +22.633398056030273 : Regular +23.74671173095703 : Active Regular +21.821603775024414 : Active Remote +24.363435983657837 : Regular +27.717058897018433 : Active Regular +21.293541193008423 : Active Remote +25.705342292785645 : Regular +26.323739051818848 : Active Regular +25.679578065872192 : Active Remote +23.84442114830017 : Regular +33.383033990859985 : Active Regular +36.04991698265076 : Active Remote + +Reduction over 17 chunks took 0.6s +Overall time (version 1) - 2e+01s +48.07362198829651 : Regular +37.93207597732544 : Active Regular +22.77363395690918 : Active Remote +28.11515712738037 : Regular +28.385287046432495 : Active Regular +22.595709085464478 : Active Remote +27.156584978103638 : Regular +27.49553680419922 : Active Regular +23.837636947631836 : Active Remote +25.69541597366333 : Regular +25.940948009490967 : Active Regular +24.014315128326416 : Active Remote +27.273848056793213 : Regular +35.16158103942871 : Active Regular +23.360612869262695 : Active Remote + +Reduction over 60 chunks took 3e+00s +Overall time (version 1) - 2e+01s +41.20513677597046 : Regular +38.56750798225403 : Active Regular +24.23797583580017 : Active Remote +40.17050385475159 : Regular +40.017329931259155 : Active Regular +26.986143827438354 : Active Remote +40.651535987854004 : Regular +42.30823493003845 : Active Regular +25.00485610961914 : Active Remote +40.953128814697266 : Regular +40.59316611289978 : Active Regular +23.82962989807129 : Active Remote +39.43385100364685 : Regular +40.663326263427734 : Active Regular +23.716075897216797 : Active Remote + +Reduction over 340 chunks took 8e+00s +Overall time (version 1) - 3e+01s +116.074697971344 : Regular +116.11710214614868 : Active Regular +30.019917964935303 : Active Remote +119.91504836082458 : Regular +116.83721113204956 : Active Regular +29.05119490623474 : Active Remote +114.2572910785675 : Regular +124.5028350353241 : Active Regular +34.20670700073242 : Active Remote +118.08085703849792 : Regular +123.08248209953308 : Active Regular +28.80124282836914 : Active Remote +125.51857686042786 : Regular +143.63352608680725 : Active Regular +35.165050983428955 : Active Remote + + + + + + + + + + + + + + + + diff --git a/doc/figures/sequence.pu b/doc/figures/sequence.pu new file mode 100644 index 00000000..dfaeccfe --- /dev/null +++ b/doc/figures/sequence.pu @@ -0,0 +1,51 @@ +@startuml +skinparam backgroundColor #EEEBDC +'skinparam handwritten true +skinparam notebackgroundcolor white + +skinparam sequence { + participantBackgroundColor White + BackgroundColor White +} + + +hide footbox +title Key Actors in Active Storage + +box python #lightblue +participant Application +participant Active +end box +box server side #beige +participant Reductionist +participant S3 +end box +Application -> Active: Open File +activate Active +Active -> S3: Read file metadata +S3 -> Active: Metadata blocks +Application -> Active: Active(getitem)\ne.g. mean(X[1,:]) +Active -> S3: Read B-Tree +S3 -> Active: Chunk Index +activate Active +Active -> Active: Identify Chunks +loop +Active -> Reductionist: Reduce Chunk +Reductionist -> S3 : Read chunk +Reductionist -> Active: f(chunk) +end +Active -> Active: f(chunks) +Active -> Application: return\nresult=\nf(getitem) +note left of Application +Multiple getitems +and function calls +can reuse index, +until: +end note +Application -> Active: Close File +deactivate Active + + + + +@enduml \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 728ce2d3..1cfa061e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,6 +3,7 @@ addopts = # --doctest-modules --ignore=old_code/ --ignore=tests/s3_exploratory + --ignore=bnl --cov=activestorage --cov-report=xml:test-reports/coverage.xml --cov-report=html:test-reports/coverage_html diff --git a/tests/s3_exploratory/test_s3_arrange_files.py b/tests/s3_exploratory/test_s3_arrange_files.py index 18b1015e..8117ce17 100644 --- a/tests/s3_exploratory/test_s3_arrange_files.py +++ b/tests/s3_exploratory/test_s3_arrange_files.py @@ -8,10 +8,6 @@ from activestorage.active import Active from activestorage.dummy_data import make_vanilla_ncdata -import activestorage.storage as st -from activestorage.reductionist import reduce_chunk as reductionist_reduce_chunk -from activestorage.netcdf_to_zarr import gen_json -from kerchunk.hdf import SingleHdf5ToZarr from numpy.testing import assert_allclose, assert_array_equal from pathlib import Path diff --git a/tests/s3_exploratory/test_s3_performance.py b/tests/s3_exploratory/test_s3_performance.py index 41537c4b..c47d2ab7 100644 --- a/tests/s3_exploratory/test_s3_performance.py +++ b/tests/s3_exploratory/test_s3_performance.py @@ -4,10 +4,8 @@ import ujson from pathlib import Path -from kerchunk.hdf import SingleHdf5ToZarr from activestorage.active import Active -from activestorage.netcdf_to_zarr import open_zarr_group from config_minio import * @@ -17,6 +15,7 @@ def test_data_path(): return Path(__file__).resolve().parent / 'test_data' +@pytest.mark.xfail(reason="Pyfive don't use Kerchunk") def test_s3_SingleHdf5ToZarr(): """Check Kerchunk's SingleHdf5ToZarr when S3.""" # SingleHdf5ToZarr is VERY quick and MEM-light @@ -33,6 +32,7 @@ def test_s3_SingleHdf5ToZarr(): inline_threshold=0) +@pytest.mark.xfail(reason="Pyfive don't use Kerchunk") def test_local_SingleHdf5ToZarr(test_data_path): """Check Kerchunk's SingleHdf5ToZarr when NO S3.""" # SingleHdf5ToZarr is VERY quick and MEM-light @@ -44,6 +44,7 @@ def test_local_SingleHdf5ToZarr(test_data_path): inline_threshold=0) +@pytest.mark.xfail(reason="Pyfive don't use Kerchunk") def test_s3_kerchunk_to_json(): """Check Kerchunk's SingleHdf5ToZarr dumped to JSON, when S3.""" s3_file = "s3://pyactivestorage/s3_test_bizarre_large.nc" @@ -64,6 +65,7 @@ def test_s3_kerchunk_to_json(): f.write(ujson.dumps(h5chunks.translate()).encode()) +@pytest.mark.xfail(reason="Pyfive don't use Kerchunk") def test_local_kerchunk_to_json(test_data_path): """Check Kerchunk's SingleHdf5ToZarr dumped to JSON, when NO S3.""" local_file = str(test_data_path / "test_bizarre.nc") @@ -78,6 +80,7 @@ def test_local_kerchunk_to_json(test_data_path): f.write(ujson.dumps(h5chunks.translate()).encode()) +@pytest.mark.xfail(reason="Pyfive don't use Kerchunk") def test_s3_kerchunk_openZarrGroup(): """Check Kerchunk's SingleHdf5ToZarr dumped to JSON, when S3.""" s3_file = "s3://pyactivestorage/s3_test_bizarre_large.nc" @@ -99,6 +102,7 @@ def test_s3_kerchunk_openZarrGroup(): ref_ds = open_zarr_group(outf, "data") +@pytest.mark.xfail(reason="Pyfive don't use Kerchunk") def test_local_kerchunk_openZarrGroup(test_data_path): """Check Kerchunk's SingleHdf5ToZarr dumped to JSON, when NO S3.""" local_file = str(test_data_path / "test_bizarre.nc") diff --git a/tests/s3_exploratory/test_s3_reduction.py b/tests/s3_exploratory/test_s3_reduction.py index 3546a528..f2e11071 100644 --- a/tests/s3_exploratory/test_s3_reduction.py +++ b/tests/s3_exploratory/test_s3_reduction.py @@ -19,13 +19,13 @@ def make_tempfile(): """Make dummy data.""" temp_folder = tempfile.mkdtemp() s3_testfile = os.path.join(temp_folder, - 's3_test_bizarre.nc') # Bryan likes this name + 's3_test_bizarre.nc') print(f"S3 Test file is {s3_testfile}") if not os.path.exists(s3_testfile): make_vanilla_ncdata(filename=s3_testfile) local_testfile = os.path.join(temp_folder, - 'local_test_bizarre.nc') # Bryan again + 'local_test_bizarre.nc') print(f"Local Test file is {local_testfile}") if not os.path.exists(local_testfile): make_vanilla_ncdata(filename=local_testfile) diff --git a/tests/test_bigger_data.py b/tests/test_bigger_data.py index 2f38bddb..2173558d 100644 --- a/tests/test_bigger_data.py +++ b/tests/test_bigger_data.py @@ -9,6 +9,7 @@ from activestorage.active import Active from activestorage.config import * +from pyfive.core import InvalidHDF5File as InvalidHDF5Err import utils @@ -162,28 +163,17 @@ def test_native_emac_model_fails(test_data_path): """ ncfile = str(test_data_path / "emac.nc") uri = utils.write_to_storage(ncfile) - # Use local file to avoid h5py - active = Active(ncfile, "aps_ave") - active._version = 0 - d = active[4:5, 1:2] - if len(d): - mean_result = np.mean(d) - else: - # as it happens it is is possible for a slice to be - # all missing, so for the purpose of this test we - # ignore it, but the general case should not. - pass if USE_S3: active = Active(uri, "aps_ave", utils.get_storage_type()) - with pytest.raises(OSError): + with pytest.raises(InvalidHDF5Err): active[...] else: active = Active(uri, "aps_ave") active._version = 2 active.method = "mean" active.components = True - with pytest.raises(OSError): + with pytest.raises(InvalidHDF5Err): result2 = active[4:5, 1:2] diff --git a/tests/test_compression.py b/tests/test_compression.py index 1cba14b1..20d41704 100644 --- a/tests/test_compression.py +++ b/tests/test_compression.py @@ -16,9 +16,9 @@ def check_dataset_filters(temp_file: str, ncvar: str, compression: str, shuffle: # Sanity check that test data is compressed and filtered as expected. if USE_S3: with load_from_s3(temp_file) as test_data: - # NOTE: h5netcdf thinks zlib is gzip - assert test_data.variables[ncvar].compression == "gzip" - assert test_data.variables[ncvar].shuffle == shuffle + print("File attrs", test_data.attrs) + assert test_data[ncvar].compression == "gzip" + assert test_data[ncvar].shuffle == shuffle else: with Dataset(temp_file) as test_data: test_data_filters = test_data.variables[ncvar].filters() @@ -46,18 +46,14 @@ def create_compressed_dataset(tmp_path: str, compression: str, shuffle: bool): 'client_kwargs': {'endpoint_url': S3_URL}, } S3_ACTIVE_URL_MINIO = S3_ACTIVE_STORAGE_URL -S3_ACTIVE_URL_Bryan = "https://192.171.169.248:8080" # TODO include all supported configuration types # so far test three possible configurations for storage_options: # - storage_options = None, active_storage_url = None (Minio and local Reductionist, preset credentials from config.py) # - storage_options = CLASSIC, active_storage_url = CLASSIC (Minio and local Reductionist, preset credentials from config.py but folded in storage_options and active_storage_url) -# - storage_options = CLASSIC, active_storage_url = Bryan's machine (Minio BUT Reductionist moved on Bryan's machine) -# (this invariably fails due to data URL being //localhost:9000 closed to outside Reductionist storage_options_paramlist = [ (None, None), (STORAGE_OPTIONS_CLASSIC, S3_ACTIVE_URL_MINIO), -# (STORAGE_OPTIONS_CLASSIC, S3_ACTIVE_URL_Bryan) ] @@ -91,6 +87,10 @@ def test_compression_and_filters_cmip6_data(storage_options, active_storage_url) check_dataset_filters(test_file, "tas", "zlib", False) + print("Test file and storage options", test_file, storage_options) + if not utils.get_storage_type(): + storage_options = None + active_storage_url = None active = Active(test_file, 'tas', utils.get_storage_type(), storage_options=storage_options, active_storage_url=active_storage_url) @@ -117,6 +117,10 @@ def test_compression_and_filters_obs4mips_data(storage_options, active_storage_u check_dataset_filters(test_file, "rlut", "zlib", False) + print("Test file and storage options", test_file, storage_options) + if not utils.get_storage_type(): + storage_options = None + active_storage_url = None active = Active(test_file, 'rlut', utils.get_storage_type(), storage_options=storage_options, active_storage_url=active_storage_url) diff --git a/tests/test_compression_remote_reductionist.py b/tests/test_compression_remote_reductionist.py deleted file mode 100644 index 4beb957b..00000000 --- a/tests/test_compression_remote_reductionist.py +++ /dev/null @@ -1,193 +0,0 @@ -import os -import numpy as np -import pytest - -from netCDF4 import Dataset -from pathlib import Path - -from activestorage.active import Active, load_from_s3 -from activestorage.config import * -from activestorage.dummy_data import make_compressed_ncdata -from activestorage.reductionist import ReductionistError as RedErr - -import utils - - -# Bryan's S3 machine + Bryan's reductionist -STORAGE_OPTIONS_Bryan = { - 'anon': True, - 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"}, -} -S3_ACTIVE_URL_Bryan = "https://192.171.169.248:8080" -# TODO include all supported configuration types -storage_options_paramlist = [ - (STORAGE_OPTIONS_Bryan, S3_ACTIVE_URL_Bryan) -] -# bucket needed too for this test only -# otherwise, bucket is extracted automatically from full file uri -S3_BUCKET = "bnl" - - -# CMIP6_test.nc keeps being unavailable due to BNL bucket unavailable -@pytest.mark.xfail(reason='JASMIN messing about with SOF.') -@pytest.mark.parametrize("storage_options, active_storage_url", storage_options_paramlist) -def test_compression_and_filters_cmip6_data(storage_options, active_storage_url): - """ - Test use of datasets with compression and filters applied for a real - CMIP6 dataset (CMIP6-test.nc) - an IPSL file. - - This test will always pass when USE_S3 = False; equally, it will always - fail if USE_S3 = True until Reductionist supports anon=True S3 buckets. - See following test below with a forced storage_type="s3" that mimicks - locally the fail, and catches it. Equally, we catch the same exception when USE_S3=True - - Important info on session data: - S3 Storage options to Reductionist: {'anon': True, 'client_kwargs': {'endpoint_url': 'https://uor-aces-o.s3-ext.jc.rl.ac.uk'}} - S3 anon=True Bucket and File: bnl CMIP6-test.nc - Reductionist request data dictionary: {'source': 'https://uor-aces-o.s3-ext.jc.rl.ac.uk', 'bucket': 'bnl', 'object': 'CMIP6-test.nc', 'dtype': 'float32', 'byte_order': 'little', 'offset': 29385, 'size': 942518, 'order': 'C', 'shape': (15, 143, 144), 'selection': [[0, 2, 1], [4, 6, 1], [7, 9, 1]], 'compression': {'id': 'zlib'}} - """ - test_file = str(Path(__file__).resolve().parent / 'test_data' / 'CMIP6-test.nc') - with Dataset(test_file) as nc_data: - nc_min = np.min(nc_data["tas"][0:2,4:6,7:9]) - print(f"Numpy min from compressed file {nc_min}") - - # TODO remember that the special case for "anon=True" buckets is that - # the actual file uri = "bucket/filename" - if USE_S3: - ofile = os.path.basename(test_file) - test_file_uri = os.path.join(S3_BUCKET, ofile) - else: - test_file_uri = test_file - active = Active(test_file_uri, 'tas', utils.get_storage_type(), - storage_options=storage_options, - active_storage_url=active_storage_url) - active._version = 1 - active._method = "min" - - if USE_S3: - # for now anon=True S3 buckets are not supported by Reductionist - with pytest.raises(RedErr) as rederr: - result = active[0:2,4:6,7:9] - access_denied_err = 'code: \\"AccessDenied\\"' - assert access_denied_err in str(rederr.value) - # assert nc_min == result - # assert result == 239.25946044921875 - else: - result = active[0:2,4:6,7:9] - assert nc_min == result - assert result == 239.25946044921875 - - -# CMIP6_test.nc keeps being unavailable due to BNL bucket unavailable -@pytest.mark.xfail(reason='JASMIN messing about with SOF.') -@pytest.mark.parametrize("storage_options, active_storage_url", storage_options_paramlist) -def test_compression_and_filters_cmip6_forced_s3_from_local(storage_options, active_storage_url): - """ - Test use of datasets with compression and filters applied for a real - CMIP6 dataset (CMIP6-test.nc) - an IPSL file. - - This is for a special anon=True bucket ONLY. - """ - test_file = str(Path(__file__).resolve().parent / 'test_data' / 'CMIP6-test.nc') - with Dataset(test_file) as nc_data: - nc_min = np.min(nc_data["tas"][0:2,4:6,7:9]) - print(f"Numpy min from compressed file {nc_min}") - - # TODO remember that the special case for "anon=True" buckets is that - # the actual file uri = "bucket/filename" - ofile = os.path.basename(test_file) - test_file_uri = os.path.join(S3_BUCKET, ofile) - active = Active(test_file_uri, 'tas', storage_type="s3", - storage_options=storage_options, - active_storage_url=active_storage_url) - - active._version = 1 - active._method = "min" - - # for now anon=True S3 buckets are not supported by Reductionist - with pytest.raises(RedErr) as rederr: - result = active[0:2,4:6,7:9] - access_denied_err = 'code: \\"AccessDenied\\"' - assert access_denied_err in str(rederr.value) - # assert nc_min == result - # assert result == 239.25946044921875 - - -# CMIP6_test.nc keeps being unavailable due to BNL bucket unavailable -@pytest.mark.xfail(reason='JASMIN messing about with SOF.') -def test_compression_and_filters_cmip6_forced_s3_from_local_2(): - """ - Test use of datasets with compression and filters applied for a real - CMIP6 dataset (CMIP6-test.nc) - an IPSL file. - - This is for a special anon=True bucket connected to via valid key.secret - """ - storage_options = { - 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", - 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", - 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"} - } - active_storage_url = "https://192.171.169.248:8080" - test_file = str(Path(__file__).resolve().parent / 'test_data' / 'CMIP6-test.nc') - with Dataset(test_file) as nc_data: - nc_min = np.min(nc_data["tas"][0:2,4:6,7:9]) - print(f"Numpy min from compressed file {nc_min}") - - ofile = os.path.basename(test_file) - test_file_uri = os.path.join( - S3_BUCKET, - ofile - ) - print("S3 Test file path:", test_file_uri) - active = Active(test_file_uri, 'tas', storage_type="s3", - storage_options=storage_options, - active_storage_url=active_storage_url) - - active._version = 1 - active._method = "min" - - result = active[0:2,4:6,7:9] - assert nc_min == result - assert result == 239.25946044921875 - - -# CMIP6_test.nc keeps being unavailable due to BNL bucket unavailable -@pytest.mark.xfail(reason='JASMIN messing about with SOF.') -@pytest.mark.skipif(not USE_S3, reason="we need only localhost Reductionist in GA CI") -@pytest.mark.skipif(REMOTE_RED, reason="we need only localhost Reductionist in GA CI") -def test_compression_and_filters_cmip6_forced_s3_using_local_Reductionist(): - """ - Test use of datasets with compression and filters applied for a real - CMIP6 dataset (CMIP6-test.nc) - an IPSL file. - - This is for a special anon=True bucket connected to via valid key.secret - and uses the locally deployed Reductionist via container. - """ - print("Reductionist URL", S3_ACTIVE_STORAGE_URL) - storage_options = { - 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", - 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", - 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"} - } - - test_file = str(Path(__file__).resolve().parent / 'test_data' / 'CMIP6-test.nc') - with Dataset(test_file) as nc_data: - nc_min = np.min(nc_data["tas"][0:2,4:6,7:9]) - print(f"Numpy min from compressed file {nc_min}") - - ofile = os.path.basename(test_file) - test_file_uri = os.path.join( - S3_BUCKET, - ofile - ) - print("S3 Test file path:", test_file_uri) - active = Active(test_file_uri, 'tas', storage_type="s3", - storage_options=storage_options, - active_storage_url=S3_ACTIVE_STORAGE_URL) - - active._version = 1 - active._method = "min" - - result = active[0:2,4:6,7:9] - assert nc_min == result - assert result == 239.25946044921875 diff --git a/tests/test_data/zero_chunked.nc b/tests/test_data/zero_chunked.nc new file mode 100644 index 00000000..aad62297 Binary files /dev/null and b/tests/test_data/zero_chunked.nc differ diff --git a/tests/test_harness.py b/tests/test_harness.py index f7b9db47..315a3b46 100644 --- a/tests/test_harness.py +++ b/tests/test_harness.py @@ -24,7 +24,7 @@ def create_test_dataset(tmp_path): return test_file -@pytest.mark.xfail(USE_S3, reason="descriptor 'flatten' for 'numpy.ndarray' objects doesn't apply to a 'memoryview' object") +# @pytest.mark.xfail(USE_S3, reason="descriptor 'flatten' for 'numpy.ndarray' objects doesn't apply to a 'memoryview' object") def test_read0(tmp_path): """ Test a normal read slicing the data an interesting way, using version 0 (native interface) @@ -32,8 +32,10 @@ def test_read0(tmp_path): test_file = create_test_dataset(tmp_path) active = Active(test_file, 'data', utils.get_storage_type()) active._version = 0 - d = active[0:2,4:6,7:9] - nda = np.ndarray.flatten(d.data) + d = active[0:2, 4:6, 7:9] + # d.data is a memoryview object in both local POSIX and remote S3 storages + # keep the current behaviour of the test to catch possible type changes + nda = np.ndarray.flatten(np.asarray(d.data)) assert np.array_equal(nda,np.array([740.,840.,750.,850.,741.,841.,751.,851.])) def test_read1(tmp_path): diff --git a/tests/test_missing.py b/tests/test_missing.py index 722358af..66aa4830 100644 --- a/tests/test_missing.py +++ b/tests/test_missing.py @@ -7,11 +7,15 @@ import tempfile import unittest +# TODO remove in stable +import h5py import h5netcdf +import pyfive + from netCDF4 import Dataset -from activestorage.active import Active +from activestorage.active import Active, load_from_s3 from activestorage.config import * from activestorage import dummy_data as dd @@ -194,9 +198,28 @@ def test_validmax(tmp_path): assert unmasked_numpy_mean != masked_numpy_mean print("Numpy masked result (mean)", masked_numpy_mean) + # load files via external protocols + y = Dataset(testfile) + z = h5py.File(testfile) + a = h5netcdf.File(testfile) + # write file to storage testfile = utils.write_to_storage(testfile) + # load file via our protocols + if USE_S3: + x = load_from_s3(testfile) + else: + x = pyfive.File(testfile) + + # print stuff + print('y-valid-max', y['data'].getncattr('valid_max')) + print('x-valid-max', x['data'].attrs.get('valid_max')) + print('z-valid-max', z['data'].attrs.get('valid_max')) + print('a-valid-max', a['data'].attrs.get('valid_max')) + + + # numpy masked to check for correct Active behaviour no_active_mean = active_zero(testfile) @@ -247,58 +270,33 @@ def test_validrange(tmp_path): def test_active_mask_data(tmp_path): testfile = str(tmp_path / 'test_partially_missing_data.nc') - # with valid min - r = dd.make_validmin_ncdata(testfile, valid_min=500) - - # retrieve the actual numpy-ed result - actual_data = load_dataset(testfile) + def check_masking(testfile, testname): - # dataset variable - ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True) - dsvar = ds["data"] + valid_masked_data = load_dataset(testfile) + ds = pyfive.File(testfile) + dsvar = ds["data"] + dsdata = dsvar[:] + ds.close() + a = Active(testfile, "data") + data = a._mask_data(dsdata) + np.testing.assert_array_equal(data, valid_masked_data,f'Failed masking for {testname}') - # test the function - data = Active._mask_data(None, actual_data, dsvar) - ds.close() + # with valid min + r = dd.make_validmin_ncdata(testfile, valid_min=500) + check_masking(testfile, "valid min") # with valid range r = dd.make_validrange_ncdata(testfile, valid_range=[750., 850.]) + check_masking(testfile, "valid range") # retrieve the actual numpy-ed result actual_data = load_dataset(testfile) - # dataset variable - ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True) - dsvar = ds["data"] - - # test the function - data = Active._mask_data(None, actual_data, dsvar) - ds.close() - # with missing r = dd.make_missing_ncdata(testfile) - - # retrieve the actual numpy-ed result - actual_data = load_dataset(testfile) - - # dataset variable - ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True) - dsvar = ds["data"] - - # test the function - data = Active._mask_data(None, actual_data, dsvar) - ds.close() + check_masking(testfile,'missing') # with _FillValue r = dd.make_fillvalue_ncdata(testfile) + check_masking(testfile,"_FillValue") - # retrieve the actual numpy-ed result - actual_data = load_dataset(testfile) - - # dataset variable - ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True) - dsvar = ds["data"] - - # test the function - data = Active._mask_data(None, actual_data, dsvar) - ds.close() diff --git a/tests/test_package.py b/tests/test_package.py index 3ee2f41c..e6f00c37 100644 --- a/tests/test_package.py +++ b/tests/test_package.py @@ -6,7 +6,7 @@ # test version def test_version(): assert hasattr(activestorage, "__version__") - assert activestorage.__version__ == "0.0.1" + assert activestorage.__version__ == "0.0.2" print(activestorage.__version__) # check activestorage class @@ -14,7 +14,6 @@ def test_activestorage_class_attrs(): assert hasattr(activestorage, "Active") assert hasattr(activestorage, "active") assert hasattr(activestorage, "storage") - assert hasattr(activestorage, "netcdf_to_zarr") # check Active class def test_active_class_attrs(): @@ -25,7 +24,6 @@ def test_active_class_attrs(): assert hasattr(act, "_get_active") assert hasattr(act, "_get_selection") assert hasattr(act, "_process_chunk") - assert hasattr(act, "_via_kerchunk") assert hasattr(act, "components") assert hasattr(act, "method") assert hasattr(act, "ncvar") diff --git a/tests/test_reductionist_json.py b/tests/test_reductionist_json.py new file mode 100644 index 00000000..c7cc09c0 --- /dev/null +++ b/tests/test_reductionist_json.py @@ -0,0 +1,48 @@ +import pyfive +from activestorage.active import Active, get_missing_attributes +from activestorage.hdf2numcodec import decode_filters +import numpy as np + +from activestorage import reductionist +from activestorage.active import load_from_s3 +from activestorage.config import * +from test_bigger_data import save_cl_file_with_a + +import json + +class MockActive: + def __init__(self, f, v): + if USE_S3: + self.f = load_from_s3(f) + else: + self.f = pyfive.File(f) + ds = self.f[v] + self.dtype = np.dtype(ds.dtype) + self.array = pyfive.indexing.ZarrArrayStub(ds.shape, ds.chunks or ds.shape) + self.missing = get_missing_attributes(ds) + ds = ds.id + self.ds = ds + def __getitem__(self, args): + if self.ds.filter_pipeline is None: + compressor, filters = None, None + else: + compressor, filters = decode_filters(self.ds.filter_pipeline , self.dtype.itemsize, 'a') + if self.ds.chunks is not None: + self.ds._get_chunk_addresses() + + indexer = pyfive.indexing.OrthogonalIndexer(args, self.array) + for chunk_coords, chunk_selection, out_selection in indexer: + storeinfo = self.ds.get_chunk_info_from_chunk_coord(chunk_coords) + offset, size = storeinfo.byte_offset, storeinfo.size + jd = reductionist.build_request_data('a','b','c', + offset, size, compressor, filters, self.missing, self.dtype, + self.array._chunks,self.ds._order,chunk_selection) + js = json.dumps(jd) + return None + +def test_build_request(tmp_path): + + ncfile, v = save_cl_file_with_a(tmp_path), 'cl' + A = MockActive(ncfile,v) + x = A[4:5, 1:2] + # not interested in what is returned, checking that the request builds ok diff --git a/tests/unit/test_active.py b/tests/unit/test_active.py index 2f95fdee..25f988cc 100644 --- a/tests/unit/test_active.py +++ b/tests/unit/test_active.py @@ -81,6 +81,7 @@ def test_active(): init = active.__init__(uri=uri, ncvar=ncvar) +@pytest.mark.xfail(reason="We don't employ locks with Pyfive anymore, yet.") def test_lock(): """Unit test for class:Active.""" uri = "tests/test_data/cesm2_native.nc" diff --git a/tests/unit/test_active_tools.py b/tests/unit/test_active_tools.py deleted file mode 100644 index 2e1102f5..00000000 --- a/tests/unit/test_active_tools.py +++ /dev/null @@ -1,191 +0,0 @@ -import os -import numpy as np -import pytest -import zarr - -from activestorage import active_tools as at -from numcodecs import Blosc -from zarr.indexing import ( - OrthogonalIndexer, - is_contiguous_selection, -) -from zarr.util import is_total_slice - - -def assemble_zarr(): - """Create a test zarr object.""" - compressor = Blosc(cname='zstd', clevel=1, shuffle=Blosc.BITSHUFFLE) - z = zarr.create((10000, 10000), chunks=(1000, 1000), dtype='i1', order='C', - compressor=compressor) - - return z - - -def assemble_zarr_uncompressed(): - """Create a test zarr object.""" - z = zarr.create((1000, 1000), chunks=(2, 8), dtype='i1', order='C', - compressor=None) - - return z - - -def assemble_zarr_with_fillvalue(): - """Create a test zarr object.""" - compressor = Blosc(cname='zstd', clevel=1, shuffle=Blosc.BITSHUFFLE) - z = zarr.create((10000, 10000), chunks=(1000, 1000), dtype='i1', order='C', - compressor=compressor) - z._fill_value = -99 - - return z - - -def assemble_zarr_uncompressed_2(): - """Create a test zarr object.""" - z = zarr.create((100, 100), chunks=(10, 10), dtype='i1', order='C', - compressor=None) - - return z - - -def test_as_get_orthogonal_selection(): - """Test Zarr Orthogonal selection.""" - z = assemble_zarr() - z._cache_metadata = None - selection = (slice(0, 2, 1), slice(4, 6, 1)) - sel = at.as_get_orthogonal_selection(z, selection=selection, out=None, - fields=None) - expected_shape = (2, 2) - np.testing.assert_array_equal(sel.shape, expected_shape) - expected_elem = [0, 0] - np.testing.assert_array_equal(sel[0], expected_elem) - - -def test_as_get_selection(): - """Test chunk iterator.""" - z = assemble_zarr() - selection = (slice(0, 2, 1), slice(4, 6, 1)) - indexer = OrthogonalIndexer(selection, z) - ch = at.as_get_selection(z, indexer, out=None, - fields=None) - np.testing.assert_array_equal(ch[0][0], [0, 0]) - np.testing.assert_array_equal(ch[1][0], None) - np.testing.assert_array_equal(ch[2][0], (0, 0)) - - -def test_as_chunk_getitem(): - """Test chunk get item.""" - z = assemble_zarr() - z = at.make_an_array_instance_active(z) - chunk_coords = (0, 3) - chunk_selection = [slice(0, 2, 1)] - out = np.array((2, 2, 2)) - out_selection = slice(0, 2, 1) - ch = at.as_chunk_getitem(z, chunk_coords, chunk_selection, out, out_selection, - drop_axes=None, fields=None) - np.testing.assert_array_equal(ch[0], (1000, 1000)) - np.testing.assert_array_equal(ch[1], [slice(0, 2, 1)]) - # PCI - assert list(ch[2]) == [(0, 2000, (slice(0, 2, 1),))] - - -def test_as_chunk_getitem_with_fillvalue(): - """Test chunk get item.""" - z = assemble_zarr_with_fillvalue() - z = at.make_an_array_instance_active(z) - chunk_coords = (0, 3) - chunk_selection = [slice(0, 2, 1)] - out = np.array((2, 2, 2)) - out_selection = slice(0, 2, 1) - ch = at.as_chunk_getitem(z, chunk_coords, chunk_selection, out, out_selection, - drop_axes=None, fields=None) - np.testing.assert_array_equal(ch[0], (1000, 1000)) - np.testing.assert_array_equal(ch[1], [slice(0, 2, 1)]) - # PCI - assert list(ch[2]) == [(0, 2000, (slice(0, 2, 1),))] - - -def test_process_chunk_uncompressed(): - """Test for processing chunk engine for uncompressed data""" - z = assemble_zarr_uncompressed() - z = at.make_an_array_instance_active(z) - out = np.ones((1, 8)) - cdata = np.ones((1, 2)) - chunk_selection = slice(0, 1, 1) - out_selection = np.array((0)) - ch = at.as_process_chunk(z, - out, - cdata, - chunk_selection, - drop_axes=False, - out_is_ndarray=True, - fields=None, - out_selection=out_selection, - partial_read_decode=False) - - assert is_contiguous_selection(out_selection) - assert not is_total_slice(chunk_selection, z._chunks) - assert out.any() - assert out.shape == (1, 8) - - -def test_process_chunk_uncompressed_write_direct(): - """Test for processing chunk engine for uncompressed data""" - z = assemble_zarr_uncompressed_2() - z = at.make_an_array_instance_active(z) - out = np.ones((10, 10)) - cdata = np.ones((10, 10)) - chunk_selection = (slice(0, 10, 1), slice(10, 20, 1)) - out_selection = np.array((0)) - with pytest.raises(ValueError) as exc: - ch = at.as_process_chunk(z, - out, - cdata, - chunk_selection, - drop_axes=False, - out_is_ndarray=True, - fields=None, - out_selection=out_selection, - partial_read_decode=False) - assert str(exc.value) == "Chunk shape (10, 80) exceeds data chunks shape (10, 10)" - - assert is_contiguous_selection(out_selection) - assert is_total_slice(chunk_selection, z._chunks) - - z._chunks = (2, 8) - out = np.ones((1, 8)) - cdata = np.ones((1, 2)) - chunk_selection = slice(0, 8, 1) - out_selection = np.array((0)) - with pytest.raises(ValueError) as exc: - ch = at.as_process_chunk(z, - out, - cdata, - chunk_selection, - drop_axes=False, - out_is_ndarray=True, - fields=None, - out_selection=out_selection, - partial_read_decode=False) - assert str(exc.value) == "Storage chunk shape (2, 8) exceeds permitted output data shape (1, 8)." - - -def test_process_chunk_uncompressed_with_compressor(): - """Test for processing chunk engine for uncompressed data""" - z = assemble_zarr() - z = at.make_an_array_instance_active(z) - out = np.ones((1, 8)) - cdata = np.ones((1, 2)) - chunk_selection = slice(0, 1, 1) - out_selection = np.array((0)) - raised = f'cdata {cdata} is an ndarray, can not decompress.' - with pytest.raises(TypeError) as exc: - ch = at.as_process_chunk(z, - out, - cdata, - chunk_selection, - drop_axes=False, - out_is_ndarray=True, - fields=None, - out_selection=out_selection, - partial_read_decode=False) - assert str(exc.value) == raised diff --git a/tests/unit/test_reductionist.py b/tests/unit/test_reductionist.py index 417bad8b..043ccbd6 100644 --- a/tests/unit/test_reductionist.py +++ b/tests/unit/test_reductionist.py @@ -36,8 +36,8 @@ def test_reduce_chunk_defaults(mock_request): s3_url = "https://active.example.com" bucket = "fake-bucket" object = "fake-object" - offset = None - size = None + offset = 0 + size = 0 compression = None filters = None missing = (None, None, None, None) @@ -67,6 +67,8 @@ def test_reduce_chunk_defaults(mock_request): "bucket": bucket, "object": object, "dtype": "int32", + 'offset':0, + 'size':0, "byte_order": sys.byteorder, } mock_request.assert_called_once_with(session, expected_url, expected_data) diff --git a/tests/unit/test_storage.py b/tests/unit/test_storage.py index 22e6dfbb..8e279725 100644 --- a/tests/unit/test_storage.py +++ b/tests/unit/test_storage.py @@ -60,7 +60,7 @@ def test_reduced_chunk_fully_masked_data_fill(): order="C", chunk_selection=ch_sel, method=np.mean) assert rc[0].size == 0 - assert rc[1] is None + assert rc[1] == 0 def test_reduced_chunk_fully_masked_data_missing(): @@ -79,7 +79,7 @@ def test_reduced_chunk_fully_masked_data_missing(): order="C", chunk_selection=ch_sel, method=np.mean) assert rc[0].size == 0 - assert rc[1] is None + assert rc[1] == 0 def test_reduced_chunk_fully_masked_data_vmin(): @@ -98,7 +98,7 @@ def test_reduced_chunk_fully_masked_data_vmin(): order="C", chunk_selection=ch_sel, method=np.mean) assert rc[0].size == 0 - assert rc[1] is None + assert rc[1] == 0 def test_reduced_chunk_fully_masked_data_vmax(): @@ -117,4 +117,23 @@ def test_reduced_chunk_fully_masked_data_vmax(): order="C", chunk_selection=ch_sel, method=np.mean) assert rc[0].size == 0 - assert rc[1] is None + assert rc[1] == 0 + + +def test_zero_data(): + """Test method with zero data.""" + rfile = "tests/test_data/zero_chunked.nc" + offset = 8760 + size = 48 + + # no compression + ch_sel = (slice(0, 3, 1), slice(0, 4, 1)) + rc = st.reduce_chunk(rfile, offset, size, + compression=None, filters=None, + missing=(None, None, None, None), + dtype="float32", shape=(3, 4), + order="C", chunk_selection=ch_sel, + method=np.mean) + assert rc[0].size == 1 + assert rc[0] == 0 + assert rc[1] is 12 diff --git a/tests/unit/test_storage_types.py b/tests/unit/test_storage_types.py index 561c36ff..70b23559 100644 --- a/tests/unit/test_storage_types.py +++ b/tests/unit/test_storage_types.py @@ -3,10 +3,9 @@ # interaction and replace with local file operations. import botocore -import contextlib import os -import h5netcdf import numpy as np +import pyfive import pytest import requests.exceptions from unittest import mock @@ -16,31 +15,23 @@ from activestorage.active import Active from activestorage.config import * from activestorage.dummy_data import make_vanilla_ncdata -from activestorage import netcdf_to_zarr import activestorage.reductionist import activestorage.storage -# Capture the real function before it is mocked. -old_netcdf_to_zarr = netcdf_to_zarr.load_netcdf_zarr_generic @mock.patch.object(activestorage.active, "load_from_s3") -@mock.patch.object(activestorage.netcdf_to_zarr, "load_netcdf_zarr_generic") @mock.patch.object(activestorage.active.reductionist, "reduce_chunk") -def test_s3(mock_reduce, mock_nz, mock_load, tmp_path): +def test_s3(mock_reduce, mock_load, tmp_path): """Test stack when call to Active contains storage_type == s3.""" # Since this is a unit test, we can't assume that an S3 object store or # active storage server is available. Therefore, we mock out the remote # service interaction and replace with local file operations. - @contextlib.contextmanager - def load_from_s3(uri): - yield h5netcdf.File(test_file, 'r', invalid_netcdf=True) - - def load_netcdf_zarr_generic(uri, ncvar, storage_type, storage_options=None): - return old_netcdf_to_zarr(test_file, ncvar, None, None) + def load_from_s3(uri, storage_options=None): + return pyfive.File(test_file) def reduce_chunk( session, @@ -74,7 +65,6 @@ def reduce_chunk( ) mock_load.side_effect = load_from_s3 - mock_nz.side_effect = load_netcdf_zarr_generic mock_reduce.side_effect = reduce_chunk uri = "s3://fake-bucket/fake-object" @@ -82,17 +72,21 @@ def reduce_chunk( make_vanilla_ncdata(test_file) active = Active(uri, "data", "s3") - active._version = 1 + active._version = 2 active._method = "max" + print("This test has severe flakiness:") + print("Either fails with AssestionError - bTREE stuff,") + print("or it fails with a multitude of KeyErrors.") + print(active) result = active[::] assert result == 999.0 - # S3 loading is not done from Active anymore - mock_load.assert_not_called() + # S3 loading is done from Active + # but we should delegate that outside at some point + # mock_load.assert_not_called() - mock_nz.assert_called_once_with(uri, "data", "s3", None) # NOTE: This gets called multiple times with various arguments. Match on # the common ones. mock_reduce.assert_called_with( @@ -118,9 +112,8 @@ def reduce_chunk( def test_reductionist_version_0(mock_load, tmp_path): """Test stack when call to Active contains storage_type == s3 using version 0.""" - @contextlib.contextmanager def load_from_s3(uri, storage_options=None): - yield h5netcdf.File(test_file, 'r', invalid_netcdf=True) + return pyfive.File(test_file) mock_load.side_effect = load_from_s3 @@ -149,20 +142,14 @@ def test_s3_load_failure(mock_load): @mock.patch.object(activestorage.active, "load_from_s3") -@mock.patch.object(activestorage.netcdf_to_zarr, "load_netcdf_zarr_generic") @mock.patch.object(activestorage.active.reductionist, "reduce_chunk") -def test_reductionist_connection(mock_reduce, mock_nz, mock_load, tmp_path): +def test_reductionist_connection(mock_reduce, mock_load, tmp_path): """Test stack when call to Active contains storage_type == s3.""" - @contextlib.contextmanager - def load_from_s3(uri): - yield h5netcdf.File(test_file, 'r', invalid_netcdf=True) - - def load_netcdf_zarr_generic(uri, ncvar, storage_type, storage_options=None): - return old_netcdf_to_zarr(test_file, ncvar, None, None) + def load_from_s3(uri, storage_options=None): + return pyfive.File(test_file) mock_load.side_effect = load_from_s3 - mock_nz.side_effect = load_netcdf_zarr_generic mock_reduce.side_effect = requests.exceptions.ConnectTimeout() uri = "s3://fake-bucket/fake-object" @@ -170,7 +157,7 @@ def load_netcdf_zarr_generic(uri, ncvar, storage_type, storage_options=None): make_vanilla_ncdata(test_file) active = Active(uri, "data", "s3") - active._version = 1 + active._version = 2 active._method = "max" with pytest.raises(requests.exceptions.ConnectTimeout): @@ -178,20 +165,14 @@ def load_netcdf_zarr_generic(uri, ncvar, storage_type, storage_options=None): @mock.patch.object(activestorage.active, "load_from_s3") -@mock.patch.object(activestorage.netcdf_to_zarr, "load_netcdf_zarr_generic") @mock.patch.object(activestorage.active.reductionist, "reduce_chunk") -def test_reductionist_bad_request(mock_reduce, mock_nz, mock_load, tmp_path): +def test_reductionist_bad_request(mock_reduce, mock_load, tmp_path): """Test stack when call to Active contains storage_type == s3.""" - @contextlib.contextmanager - def load_from_s3(uri): - yield h5netcdf.File(test_file, 'r', invalid_netcdf=True) - - def load_netcdf_zarr_generic(uri, ncvar, storage_type, storage_options=None): - return old_netcdf_to_zarr(test_file, ncvar, None, None) + def load_from_s3(uri, storage_options=None): + return pyfive.File(test_file) mock_load.side_effect = load_from_s3 - mock_nz.side_effect = load_netcdf_zarr_generic mock_reduce.side_effect = activestorage.reductionist.ReductionistError(400, "Bad request") uri = "s3://fake-bucket/fake-object" @@ -199,7 +180,7 @@ def load_netcdf_zarr_generic(uri, ncvar, storage_type, storage_options=None): make_vanilla_ncdata(test_file) active = Active(uri, "data", "s3") - active._version = 1 + active._version = 2 active._method = "max" with pytest.raises(activestorage.reductionist.ReductionistError):