Skip to content

Commit

Permalink
Internal functions of iter_data module now tested. Next step: testing…
Browse files Browse the repository at this point in the history
… iter_merged_pandas_parquet_file.
  • Loading branch information
yohplala committed Nov 11, 2024
1 parent 7429172 commit a5c9417
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 61 deletions.
25 changes: 16 additions & 9 deletions oups/store/iter_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ def _iter_pandas_dataframe(
Yields
------
DataFrame
Chunks of the DataFrame, each with size <= max_row_group_size.
Chunks of the DataFrame, each with size <= max_row_group_size, except
if distinct_bounds is True and there are more duplicates in the
'ordered_on' column than max_row_group_size.
Returns
-------
Expand Down Expand Up @@ -214,7 +216,9 @@ def _iter_resized_parquet_file(
Yields
------
DataFrame
Chunks of data with size <= max_row_group_size.
Chunks of data, each with size <= max_row_group_size, except if
distinct_bounds is True and there are more duplicates in the
'ordered_on' column than max_row_group_size.
Returns
-------
Expand All @@ -241,7 +245,7 @@ def _iter_resized_parquet_file(
if buffer_num_rows >= max_row_group_size:
data = pf[start_rg_idx:rg_idx].to_pandas()
if remainder is not None:
data = concat([remainder, data], ignore_index=True, inplace=True)
data = concat([remainder, data], ignore_index=True)
del remainder
chunk, end_idx = _get_next_chunk(
data,
Expand All @@ -251,12 +255,15 @@ def _iter_resized_parquet_file(
distinct_bounds,
)
yield chunk
remainder = data.iloc[end_idx:].copy(deep=True) if buffer_num_rows > end_idx else None
del data
start_rg_idx = rg_idx - 1
buffer_num_rows = len(remainder) if remainder is not None else 0

if yield_remainder:
if buffer_num_rows > end_idx:
remainder = data.iloc[end_idx:]
buffer_num_rows = len(remainder)
else:
buffer_num_rows = 0
remainder = None
start_rg_idx = rg_idx

if yield_remainder and remainder is not None:
yield remainder
else:
return remainder
Expand Down
4 changes: 2 additions & 2 deletions oups/store/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@ def write_ordered(
"""
if not data.empty:
if cmidx_expand:
data.columns = to_midx(data.columns, cmidx_levels)
if ordered_on not in data.columns:
# Check 'ordered_on' column is within input dataframe.
raise ValueError(f"column '{ordered_on}' does not exist in input data.")
Expand Down Expand Up @@ -686,8 +688,6 @@ def write_ordered(
# Case initiating a new dataset.
iter_data = iter_dataframe(data, max_row_group_size)
chunk = next(iter_data)
if cmidx_expand:
chunk.columns = to_midx(chunk.columns, cmidx_levels)
# In case multi-index is used, check that it complies with fastparquet
# limitations.
if isinstance(chunk.columns, MultiIndex):
Expand Down
98 changes: 48 additions & 50 deletions tests/test_store/test_iter_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"""

from typing import Iterable

import pytest
from fastparquet import ParquetFile
from fastparquet import write
Expand Down Expand Up @@ -87,6 +89,15 @@ def test_get_next_chunk(
assert end_idx == expected_end_idx


def yield_all(iterator: Iterable[DataFrame]) -> Iterable[DataFrame]:
"""
Yield all chunks from an iterator, including any remainder.
"""
remainder = yield from iterator
if remainder is not None:
yield remainder


@pytest.mark.parametrize(
"start_df, duplicates_on, yield_remainder",
[
Expand Down Expand Up @@ -146,13 +157,7 @@ def test_iter_pandas_dataframe(
all_chunks = list(iterator)
yielded_chunks = all_chunks
else:

def yield_all():
remainder = yield from iterator
if remainder is not None:
yield remainder

all_chunks = list(yield_all())
all_chunks = list(yield_all(iterator))
# Do a 2nd time to check only yielded chunks.
iterator2 = _iter_pandas_dataframe(
sample_df,
Expand All @@ -164,14 +169,9 @@ def yield_all():
yield_remainder=yield_remainder,
)
yielded_chunks = list(iterator2)
print("yielded_chunks")
print(yielded_chunks)

print("all_chunks")
print(all_chunks)
complete_chunks = all_chunks[:-1] if has_remainder else all_chunks

# Verify chunk sizes
complete_chunks = all_chunks[:-1] if has_remainder else all_chunks
for chunk in complete_chunks:
assert len(chunk) == row_group_size

Expand All @@ -185,10 +185,6 @@ def yield_all():
expected_without_remainder = expected.iloc[
: (len(expected) // row_group_size) * row_group_size
]
print("expected_without_remainder")
print(expected_without_remainder)
print("result")
print(result)
assert_frame_equal(result, expected_without_remainder)


Expand All @@ -207,19 +203,19 @@ def _create_parquet(df):


@pytest.mark.parametrize(
"start_df,yield_remainder,has_remainder",
"start_df,yield_remainder",
[
(None, True, False), # Basic case
(DataFrame({"ordered": [0], "values": ["z"]}), True, False), # With start_df
(None, False, True), # Return remainder
(None, True), # Basic case
(DataFrame({"ordered": [0], "values": ["z"]}), True), # With start_df
(None, False), # Return remainder
(DataFrame({"ordered": [0], "values": ["z"]}), False), # With start_df
],
)
def test_iter_resized_parquet_file(
sample_df,
create_parquet_file,
start_df,
yield_remainder,
has_remainder,
):
"""
Test _iter_resized_parquet_file with various configurations.
Expand All @@ -234,15 +230,17 @@ def test_iter_resized_parquet_file(
Optional starter DataFrame.
yield_remainder : bool
Whether to yield the last chunk.
has_remainder : bool
Whether a remainder is expected.
"""
pf = create_parquet_file(sample_df)
row_group_size = 3
sample_pf = create_parquet_file(sample_df)
row_group_size = 4
expected = (
concat([start_df, sample_df], ignore_index=True) if start_df is not None else sample_df
)
has_remainder = len(expected) % row_group_size > 0

iterator = _iter_resized_parquet_file(
pf,
sample_pf,
row_group_size,
"ordered",
start_df=start_df,
Expand All @@ -251,35 +249,35 @@ def test_iter_resized_parquet_file(
)

# Collect all chunks
chunks = []
returned_remainder = None
for chunk in iterator:
chunks.append(chunk)

# Get remainder if it was returned
if not yield_remainder:
returned_remainder = iterator.send(None)
if yield_remainder:
all_chunks = list(iterator)
yielded_chunks = all_chunks
else:
all_chunks = list(yield_all(iterator))
# Do a 2nd time to check only yielded chunks.
iterator2 = _iter_resized_parquet_file(
sample_pf,
row_group_size,
"ordered",
start_df=start_df.copy(deep=True) if start_df is not None else None,
distinct_bounds=False,
yield_remainder=yield_remainder,
)
yielded_chunks = list(iterator2)

# Verify chunk sizes
for chunk in chunks[:-1]: # All but last chunk
assert len(chunk) <= row_group_size
complete_chunks = all_chunks[:-1] if has_remainder else all_chunks
for chunk in complete_chunks:
assert len(chunk) == row_group_size

# Verify total data
result = concat(chunks, ignore_index=True)
expected = (
concat([start_df, sample_df], ignore_index=True) if start_df is not None else sample_df
)
# Verify yielded data
result = concat(yielded_chunks, ignore_index=True)

if not yield_remainder:
if yield_remainder:
assert_frame_equal(result, expected)
else:
# When not yielding last chunk, expected data should exclude remainder
expected_without_remainder = expected.iloc[
: (len(expected) // row_group_size) * row_group_size
]
n_expected_rows = len(expected_without_remainder)
expected_remainder = expected.iloc[n_expected_rows:]
assert returned_remainder is not None
assert_frame_equal(returned_remainder, expected_remainder)
assert_frame_equal(result, expected_without_remainder)
else:
assert returned_remainder is None
assert_frame_equal(result, expected)

0 comments on commit a5c9417

Please sign in to comment.