Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Re-enable eager parquet reading #1273

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions src/coffea/nanoevents/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,12 @@ def from_parquet(
io.IOBase,
)

if isinstance(file, dict):
file, filespec_treepath = next(iter(file.items()))
if filespec_treepath is not None:
warnings.warn(
f'For parquet file="{file}", treepath="{filespec_treepath}" is ignored when opening files'
)
if (
delayed
and not isinstance(schemaclass, FunctionType)
Expand Down
30 changes: 27 additions & 3 deletions src/coffea/nanoevents/mapping/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,35 @@ def preload_column_source(self, uuid, path_in_source, source):
key = self.key_root() + tuple_to_key((uuid, path_in_source))
self._cache[key] = source

def get_column_handle(self, columnsource, name):
def get_column_handle(self, columnsource, name, allow_missing):
if allow_missing:
return (
ParquetSourceMapping.UprootLikeShim(columnsource, name)
if name in columnsource.file.schema_arrow.names
else None
)
return ParquetSourceMapping.UprootLikeShim(columnsource, name)

def extract_column(self, columnhandle, start, stop, **kwargs):
return columnhandle.array(entry_start=start, entry_stop=stop)
def extract_column(self, columnhandle, start, stop, allow_missing, **kwargs):
if allow_missing and columnhandle is None:
return awkward.contents.IndexedOptionArray(
awkward.index.Index64(numpy.full(stop - start, -1, dtype=numpy.int64)),
awkward.contents.NumpyArray(numpy.array([], dtype=bool)),
)
elif not allow_missing and columnhandle is None:
raise RuntimeError(
"Received columnhandle of None when missing column in file is not allowed!"
)

the_array = columnhandle.array(entry_start=start, entry_stop=stop)

if allow_missing:
the_array = awkward.contents.IndexedOptionArray(
awkward.index.Index64(numpy.arange(stop - start, dtype=numpy.int64)),
awkward.contents.NumpyArray(the_array),
)

return the_array

def __len__(self):
return self._stop - self._start
Expand Down
Binary file modified tests/samples/nano_dimuon.parquet
Binary file not shown.
Binary file added tests/samples/nano_dimuon_semver.parquet
Binary file not shown.
Binary file modified tests/samples/nano_dy.parquet
Binary file not shown.
Binary file added tests/samples/nano_dy_semver.parquet
Binary file not shown.
2 changes: 1 addition & 1 deletion tests/test_nanoevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def crossref(events):

suffixes = [
"root",
# "parquet",
"parquet",
]


Expand Down
Loading