Skip to content

Commit

Permalink
feat: Adding selected_partitions parameter to collection.load() and v…
Browse files Browse the repository at this point in the history
…iew.load() functions.

refactor: collection.partitions() and view.partitions() now handle indexer and selected_partitions parameters.
  • Loading branch information
Thomas Zilio committed Dec 7, 2024
1 parent d4662f8 commit 7fbef18
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 111 deletions.
26 changes: 17 additions & 9 deletions zcollection/collection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
storage,
sync,
)
from .abc import PartitionFilter, ReadOnlyCollection
from .abc import Indexer, PartitionFilter, ReadOnlyCollection
from .callable_objects import UpdateCallable, WrappedPartitionCallable
from .detail import (
PartitionSlice,
Expand All @@ -45,6 +45,12 @@
_wrap_update_func_with_overlap,
)

__all__ = ('dask_utils', 'dataset', 'fs_utils', 'merging', 'meta',
'partitioning', 'storage', 'sync', 'Indexer', 'PartitionFilter',
'ReadOnlyCollection', 'UpdateCallable', 'WrappedPartitionCallable',
'PartitionSlice', '_insert', '_try_infer_callable',
'_wrap_update_func', '_wrap_update_func_with_overlap')

#: Module logger.
_LOGGER: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -874,6 +880,14 @@ def validate_partitions(self,

invalid_partitions: list[str] = []

def _validity_check(_partition, _valid):
"""Check partition validity and add it to invalid partitions if not
valid."""
if not _valid:
warnings.warn(f'Invalid partition: {_partition}',
category=RuntimeWarning)
invalid_partitions.append(_partition)

if distributed:
client: dask.distributed.Client = dask_utils.get_client()
futures: list[dask.distributed.Future] = client.map(
Expand All @@ -884,20 +898,14 @@ def validate_partitions(self,

for item in dask.distributed.as_completed(futures):
partition, valid = item.result() # type: ignore
if not valid:
warnings.warn(f'Invalid partition: {partition}',
category=RuntimeWarning)
invalid_partitions.append(partition)
_validity_check(_partition=partition, _valid=valid)
else:
for partition in partitions:
partition, valid = _check_partition(
partition,
fs=self.fs,
partitioning_strategy=self.partitioning)
if not valid:
warnings.warn(f'Invalid partition: {partition}',
category=RuntimeWarning)
invalid_partitions.append(partition)
_validity_check(_partition=partition, _valid=valid)

if fix and invalid_partitions:
for item in invalid_partitions:
Expand Down
Loading

0 comments on commit 7fbef18

Please sign in to comment.