Skip to content

Commit

Permalink
Merge pull request #234 from NCAS-CMS/fix_pyfive_branch
Browse files Browse the repository at this point in the history
Fix `pyfive` branch with latest Pyfive branch `h5netdf`
  • Loading branch information
valeriupredoi authored Jan 21, 2025
2 parents 5f67387 + 41c9ce9 commit bd1f06f
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 34 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/run-test-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ jobs:
use-mamba: true
- run: conda --version
- run: python -V
- name: Install development version of bnlawrence/Pyfive:issue60
- name: Install development version of NCAS-CMS/Pyfive:h5netcdf
run: |
cd ..
git clone https://github.com/bnlawrence/pyfive.git
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout issue60
git checkout h5netcdf
pip install -e .
- run: pip install -e .
- run: conda list
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ jobs:
use-mamba: true
- run: conda --version
- run: python -V
- name: Install development version of bnlawrence/Pyfive:issue60
- name: Install development version of NCAS-CMS/Pyfive:h5netcdf
run: |
cd ..
git clone https://github.com/bnlawrence/pyfive.git
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout issue60
git checkout h5netcdf
pip install -e .
- run: conda list
- run: pip install -e .
Expand All @@ -66,12 +66,12 @@ jobs:
use-mamba: true
- run: conda --version
- run: python -V
- name: Install development version of bnlawrence/Pyfive:issue60
- name: Install development version of NCAS-CMS/Pyfive:h5netcdf
run: |
cd ..
git clone https://github.com/bnlawrence/pyfive.git
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout issue60
git checkout h5netcdf
pip install -e .
- run: conda list
- run: mamba install -c conda-forge git
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/test_s3_minio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ jobs:
python-version: ${{ matrix.python-version }}
miniforge-version: "latest"
use-mamba: true
- name: Install development version of bnlawrence/Pyfive:issue60
- name: Install development version of NCAS-CMS/Pyfive:h5netcdf
run: |
cd ..
git clone https://github.com/bnlawrence/pyfive.git
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout issue60
git checkout h5netcdf
pip install -e .
- name: Install PyActiveStorage
run: |
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/test_s3_remote_reductionist.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ jobs:
python-version: ${{ matrix.python-version }}
miniforge-version: "latest"
use-mamba: true
- name: Install development version of bnlawrence/Pyfive:issue60
- name: Install development version of NCAS-CMS/Pyfive:h5netcdf
run: |
cd ..
git clone https://github.com/bnlawrence/pyfive.git
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout issue60
git checkout h5netcdf
pip install -e .
- name: Install PyActiveStorage
run: |
Expand Down
29 changes: 16 additions & 13 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import urllib
import pyfive
import time
from pyfive.h5d import StoreInfo

import s3fs

Expand Down Expand Up @@ -307,8 +308,8 @@ def _get_selection(self, *args):
name = self.ds.name
dtype = np.dtype(self.ds.dtype)
# hopefully fix pyfive to get a dtype directly
array = pyfive.ZarrArrayStub(self.ds.shape, self.ds.chunks)
ds = self.ds._dataobjects
array = pyfive.indexing.ZarrArrayStub(self.ds.shape, self.ds.chunks)
ds = self.ds.id

self.metric_data['args'] = args
self.metric_data['dataset shape'] = self.ds.shape
Expand All @@ -318,7 +319,7 @@ def _get_selection(self, *args):
else:
compressor, filters = decode_filters(ds.filter_pipeline , dtype.itemsize, name)

indexer = pyfive.OrthogonalIndexer(*args, array)
indexer = pyfive.indexing.OrthogonalIndexer(*args, array)
out_shape = indexer.shape
#stripped_indexer = [(a, b, c) for a,b,c in indexer]
drop_axes = indexer.drop_axes and keepdims
Expand All @@ -334,7 +335,7 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f
out = []
counts = []
else:
out = np.empty(out_shape, dtype=out_dtype, order=ds.order)
out = np.empty(out_shape, dtype=out_dtype, order=ds._order)
counts = None # should never get touched with no method!

# Create a shared session object.
Expand Down Expand Up @@ -364,10 +365,10 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f

if ds.chunks is not None:
t1 = time.time()
ds._get_chunk_addresses()
# ds._get_chunk_addresses()
t2 = time.time() - t1
self.metric_data['indexing time (s)'] = t2
self.metric_data['chunk number'] = len(ds._zchunk_index)
# self.metric_data['chunk number'] = len(ds._zchunk_index)
chunk_count = 0
t1 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor:
Expand Down Expand Up @@ -464,15 +465,17 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
#FIXME: Do, we, it's not actually used?
"""

offset, size, filter_mask = ds.get_chunk_details(chunk_coords)

# retrieve coordinates from chunk index
storeinfo = ds.get_chunk_info_from_chunk_coord(chunk_coords)
offset, size = storeinfo.byte_offset, storeinfo.size
self.data_read += size

if self.storage_type == 's3' and self._version == 1:

tmp, count = reduce_opens3_chunk(ds.fh, offset, size, compressor, filters,
tmp, count = reduce_opens3_chunk(ds._fh, offset, size, compressor, filters,
self.missing, ds.dtype,
chunks, ds.order,
chunks, ds._order,
chunk_selection, method=self.method
)

Expand All @@ -499,7 +502,7 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
size, compressor, filters,
self.missing, np.dtype(ds.dtype),
chunks,
ds.order,
ds._order,
chunk_selection,
operation=self._method)
else:
Expand All @@ -518,7 +521,7 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
size, compressor, filters,
self.missing, np.dtype(ds.dtype),
chunks,
ds.order,
ds._order,
chunk_selection,
operation=self._method)
elif self.storage_type=='ActivePosix' and self.version==2:
Expand All @@ -531,7 +534,7 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
# although we will version changes.
tmp, count = reduce_chunk(self.filename, offset, size, compressor, filters,
self.missing, ds.dtype,
chunks, ds.order,
chunks, ds._order,
chunk_selection, method=self.method)

if self.method is not None:
Expand Down
2 changes: 1 addition & 1 deletion activestorage/hdf2numcodec.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def decode_filters(filter_pipeline, itemsize, name):
for filter in filter_pipeline:

filter_id=filter['filter_id']
properties = filter['client_data_values']
properties = filter['client_data']


# We suppor the following
Expand Down
11 changes: 6 additions & 5 deletions tests/test_reductionist_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ def __init__(self, f, v):
self.f = pyfive.File(f)
ds = self.f[v]
self.dtype = np.dtype(ds.dtype)
self.array = pyfive.ZarrArrayStub(ds.shape, ds.chunks or ds.shape)
self.array = pyfive.indexing.ZarrArrayStub(ds.shape, ds.chunks or ds.shape)
self.missing = get_missing_attributes(ds)
ds = ds._dataobjects
ds = ds.id
self.ds = ds
def __getitem__(self, args):
if self.ds.filter_pipeline is None:
Expand All @@ -30,12 +30,13 @@ def __getitem__(self, args):
if self.ds.chunks is not None:
self.ds._get_chunk_addresses()

indexer = pyfive.OrthogonalIndexer(args, self.array)
indexer = pyfive.indexing.OrthogonalIndexer(args, self.array)
for chunk_coords, chunk_selection, out_selection in indexer:
offset, size, filter_mask = self.ds.get_chunk_details(chunk_coords)
storeinfo = self.ds.get_chunk_info_from_chunk_coord(chunk_coords)
offset, size = storeinfo.byte_offset, storeinfo.size
jd = reductionist.build_request_data('a','b','c',
offset, size, compressor, filters, self.missing, self.dtype,
self.array._chunks,self.ds.order,chunk_selection)
self.array._chunks,self.ds._order,chunk_selection)
js = json.dumps(jd)
return None

Expand Down

0 comments on commit bd1f06f

Please sign in to comment.