Skip to content

Commit

Permalink
Wip in '_get_atomic_merge_regions()'.
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Feb 9, 2025
1 parent fe46471 commit c7bb430
Show file tree
Hide file tree
Showing 2 changed files with 328 additions and 41 deletions.
223 changes: 183 additions & 40 deletions oups/store/ordered_merge_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@

from fastparquet import ParquetFile
from numpy import any as np_any
from numpy import arange
from numpy import array
from numpy import column_stack
from numpy import cumsum
from numpy import diff
from numpy import flatnonzero
from numpy import insert
from numpy import int8
from numpy import nonzero
from numpy import r_
Expand Down Expand Up @@ -229,7 +231,7 @@ def is_incomplete(self) -> NDArray:
"""
return self.rg_n_rows < self.min_size

def get_fragmentation_risk(
def get_risk_of_exceeding_rgst(
self,
indices_overlap: NDArray,
indices_enlarged: NDArray,
Expand All @@ -242,19 +244,32 @@ def get_fragmentation_risk(
creating a complete row group that would split a region of contiguous
incomplete row groups.
Rational of this logic is to prevent that the addition of new data after
existing incomplete row groups creates an isolated set of incomplete row
groups followed by complete row groups.
Example:
- Before addition:
- rg [crg] [crg] [irg] [irg]
- df .....
- Not desired after addition:
- rg [crg] [crg] [irg] [crg] [irg]
- Desired after addition:
- rg [crg] [crg] [crg] [crg] [irg]
Parameters
----------
indices_overlap : NDArray
Array of shape (m, 2) containing start and end indices of overlap
Array of shape (o, 2) containing start and end indices of overlap
regions.
indices_enlarged : NDArray
Array of shape (n, 2) containing start and end indices of enlarged
regions (n <= m).
Array of shape (e, 2) containing start and end indices of enlarged
regions (e <= o).
Returns
-------
NDArray
Boolean array of length n indicating which enlarged regions contain
Boolean array of shape (e) indicating which enlarged regions contain
overlap regions that risk creating fragmentation when merged.
"""
Expand All @@ -264,10 +279,19 @@ def get_fragmentation_risk(
m_values=cumsum(self.rg_n_rows),
indices=indices_overlap,
)
print("rg_rows_per_overlap_regions")
print(rg_rows_per_overlap_regions)
# TODO:
# should include all rows of df that are over the overlap regions
# as well as df chunk neighbor to these regions and incomplete row
# groups
# reword fragmentation_risk into "exceed_rgs_target"
df_rows_per_overlap_regions = get_region_start_end_delta(
m_values=self.df_idx_tmrg_ends_excl - self.df_idx_tmrg_starts - 1,
indices=indices_overlap,
)
print("df_rows_per_overlap_regions")
print(df_rows_per_overlap_regions)
# 'overlap_has_risk' is an array of length the number of overlap
# regions.
overlap_has_risk = (
Expand Down Expand Up @@ -418,15 +442,14 @@ def get_merge_regions(
include neighbor incomplete row groups depending two conditions,
- if the total number of existing incomplete row groups in the enlarged
merge region is greater than `max_n_irgs`.
- if the possibly resulting row group(s) from the merge between Dataframe
chunks and corresponding overlapping row groups may result in complete
row groups, i.e. may result in a fragmentation of contiguous incomplete
row group regions.
- if the resulting row group(s) from the addition of the Dataframe chunks
and corresponding overlapping row groups and neighbor incomplete row
groups are possibly complete row groups.
Parameters
----------
is_overlap : NDArray
A 1d numpy array of dtype `bool`.
Boolean array of shape (o).
max_n_irgs : int
Maximum number of incomplete row groups allowed in a merge region.
Expand All @@ -441,16 +464,25 @@ def get_merge_regions(
# regions.
indices_overlap = get_region_indices_of_true_values(is_overlap)
is_incomplete = rg_size_pattern.is_incomplete()
indices_enlarged = get_region_indices_of_true_values(is_overlap | is_incomplete)
is_enlarged = is_overlap | is_incomplete
indices_enlarged = get_region_indices_of_true_values(is_enlarged)
# Split regions are regions of complete row group, not overlapping with any
# DataFrame chunks.
# indices_split = get_region_indices_of_true_values(~is_enlarged)

# Step 2: keep only enlarged regions that overlap with at least one overlap

# Step 2: keep only enlarged regions that overlap with at least one overlap
# region.
is_overlapping_with_overlap = np_any(
(indices_enlarged[:, None, 0] <= indices_overlap[None, :, 1])
& (indices_enlarged[:, None, 1] >= indices_overlap[None, :, 0]),
axis=1, # Note: axis=1 to reduce along columns (overlap regions)
)
indices_enlarged = indices_enlarged[is_overlapping_with_overlap]
# Should not do that, to keep incomplete row groups that are neighbors to a
# a large df chunk to be written together, even if they are not overlapping.

# is_overlapping_with_overlap = np_any(
# (indices_enlarged[:, None, 0] <= indices_overlap[None, :, 1])
# & (indices_enlarged[:, None, 1] >= indices_overlap[None, :, 0]),
# axis=1, # Note: axis=1 to reduce along columns (overlap regions)
# )
# indices_enlarged = indices_enlarged[is_overlapping_with_overlap]

# Step 3: Filter out enlarged candidates based on multiple criteria.
# Get number of incomplete row groups at region boundaries.
Expand All @@ -461,7 +493,7 @@ def get_merge_regions(
cumsum_incomplete[indices_enlarged[:, 0]] - cumsum_incomplete[indices_enlarged[0, 0]]
)
# Get which enlarged regions contain overlap regions that risk fragmentation.
fragmentation_risk = rg_size_pattern.get_fragmentation_risk(
fragmentation_risk = rg_size_pattern.get_risk_of_exceeding_rgst(
indices_overlap=indices_overlap,
indices_enlarged=indices_enlarged,
)
Expand All @@ -484,6 +516,108 @@ def get_merge_regions(
return vstack((indices_overlap, indices_enlarged))


def _get_atomic_merge_regions(
rg_mins: List,
rg_maxs: List,
df_ordered_on: Series,
drop_duplicates: bool,
) -> Tuple[NDArray, NDArray, NDArray]:
"""
Get atomic merge regions.
An atomic merge region is
- either defined by an existing row group in
ParquetFile and if existing, its corresponding overlapping DataFrame chunk,
- or a DataFrame chunk that is not overlapping with any row group in
ParquetFile.
Returned arrays provide the start and end (excluded) indices in row groups
and end (excluded) indices in DataFrame for each of these atomic merge
regions.
These arrays are of same size.
There is no need to provide the start indices in DataFrame, as they can be
inferred from the end (excluded) indices in DataFrame of the previous atomic
merge region (no part of the DataFrame is omitted for the write).
Parameters
----------
rg_mins : List
Minimum values of 'ordered_on' in each row group.
rg_maxs : List
Maximum values of 'ordered_on' in each row group.
df_ordered_on : Series[Timestamp]
Values of 'ordered_on' column in DataFrame.
drop_duplicates : bool
Flag impacting how overlap boundaries have to be managed.
More exactly, 'pf' is considered as first data, and 'df' as second
data, coming after. In case of 'pf' leading 'df', if last value in
'pf' is a duplicate of the first in 'df', then
- If True, at this index, overlap starts
- If False, no overlap at this index
Returns
-------
Tuple[NDArray, NDArray, NDArray]
- First NDArray: indices in ParquetFile containing the starts of each
row group to merge with corresponding DataFrame chunk.
- Second NDArray: indices in ParquetFile containing the ends (excluded)
of each row group to merge with corresponding DataFrame chunk.
- Third NDArray: indices in DataFrame containing the ends (excluded)
of each DataFrame chunk to merge with corresponding row group.
"""
# Find regions in DataFrame overlapping with row groups.
if drop_duplicates:
print("drop_duplicates")
# Determine overlap start/end indices in row groups
df_idx_tmrg_starts = searchsorted(df_ordered_on, rg_mins, side=LEFT)
df_idx_tmrg_ends_excl = searchsorted(df_ordered_on, rg_maxs, side=RIGHT)
else:
print("no drop_duplicates")
df_idx_tmrg_starts, df_idx_tmrg_ends_excl = searchsorted(
df_ordered_on,
vstack((rg_mins, rg_maxs)),
side=LEFT,
)
print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}")
print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}")
# Find regions in DataFrame not overlapping with any row group.
# `amr` for atomic merge region.
df_idxs_enlarged = r_[
df_idx_tmrg_starts[0], # gap at start (0 to first start)
df_idx_tmrg_starts[1:] - df_idx_tmrg_ends_excl[:-1],
len(df_ordered_on) - df_idx_tmrg_ends_excl[-1], # gap at end
]
print(f"df_idxs_enlarged: {df_idxs_enlarged}")
amr_idx_non_overlapping = flatnonzero(df_idxs_enlarged)
print(f"amr_idx_non_overlapping: {amr_idx_non_overlapping}")
rg_idxs = arange(len(rg_mins) + 1)
print(f"rg_idxs: {rg_idxs}")
if len(amr_idx_non_overlapping) == 0:
# No non-overlapping regions in DataFrame
return rg_idxs[:-1], rg_idxs[1:], df_idx_tmrg_ends_excl
else:
# Get insert accounting for previous insertions
# insert_positions = amr_idx_non_overlapping + arange(len(amr_idx_non_overlapping))
# print(f"insert_positions: {insert_positions}")
# Fill arrays
rg_idx_to_insert = rg_idxs[amr_idx_non_overlapping]
print(f"rg_idx_to_insert: {rg_idx_to_insert}")
rg_idxs_with_inserts = insert(rg_idxs, amr_idx_non_overlapping, rg_idx_to_insert)
print(f"rg_idxs_with_inserts: {rg_idxs_with_inserts}")
if amr_idx_non_overlapping[-1] == len(df_ordered_on):
df_idx_to_insert = df_idx_tmrg_starts[amr_idx_non_overlapping]
else:
df_idx_to_insert = r_[df_idx_tmrg_starts, len(df_ordered_on)][amr_idx_non_overlapping]
df_idx_with_inserts = insert(
df_idx_tmrg_ends_excl,
amr_idx_non_overlapping,
df_idx_to_insert,
)
print(f"df_idx_with_inserts: {df_idx_with_inserts}")
return rg_idxs_with_inserts[:-1], rg_idxs_with_inserts[1:], df_idx_with_inserts


def _rgst_as_str__irgs_analysis(
rg_mins: NDArray,
rg_idx_merge_start: int,
Expand Down Expand Up @@ -798,24 +932,32 @@ def analyze_chunks_to_merge(
rg_n_rows = [rg.num_rows for rg in pf.row_groups]
n_rgs = len(rg_n_rows)
# df_min = df.loc[:, ordered_on].iloc[0]
# Find overlapping regions in dataframe
rg_mins = pf.statistics[MIN][ordered_on]
rg_maxs = pf.statistics[MAX][ordered_on]
if drop_duplicates:
print("drop_duplicates")
# Determine overlap start/end indices in row groups
df_idx_tmrg_starts = searchsorted(df.loc[:, ordered_on], rg_mins, side=LEFT)
df_idx_tmrg_ends_excl = searchsorted(df.loc[:, ordered_on], rg_maxs, side=RIGHT)
else:
print("no drop_duplicates")
df_idx_tmrg_starts, df_idx_tmrg_ends_excl = searchsorted(
df.loc[:, ordered_on],
vstack((rg_mins, rg_maxs)),
side=LEFT,
)
# # Find overlapping regions in dataframe
# rg_mins = pf.statistics[MIN][ordered_on]
# rg_maxs = pf.statistics[MAX][ordered_on]
# if drop_duplicates:
# print("drop_duplicates")
# Determine overlap start/end indices in row groups
# df_idx_tmrg_starts = searchsorted(df.loc[:, ordered_on], rg_mins, side=LEFT)
# df_idx_tmrg_ends_excl = searchsorted(df.loc[:, ordered_on], rg_maxs, side=RIGHT)
# else:
# print("no drop_duplicates")
# df_idx_tmrg_starts, df_idx_tmrg_ends_excl = searchsorted(
# df.loc[:, ordered_on],
# vstack((rg_mins, rg_maxs)),
# side=LEFT,
# )
pf_statistics = pf.statistics
rg_idx_starts, rg_idx_ends_excl, df_idx_ends_excl = _get_atomic_merge_regions(
rg_mins=pf_statistics[MIN][ordered_on],
rg_maxs=pf_statistics[MAX][ordered_on],
df_ordered_on=df.loc[:, ordered_on],
drop_duplicates=drop_duplicates,
)

# Get overlap status per row groups.
is_overlap = df_idx_tmrg_starts == df_idx_tmrg_ends_excl - 1
is_overlap = 1
# is_overlap = df_idx_tmrg_starts == df_idx_tmrg_ends_excl - 1

# if not df_idx_tmrg_ends_excl[-1]:
# # df after last row group in pf.
Expand Down Expand Up @@ -872,8 +1014,8 @@ def analyze_chunks_to_merge(
print("")
print("before irgs analysis")
print(f"df_n_rows: {df_n_rows}")
print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}")
print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}")
# print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}")
# print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}")
print(f"rg_idx_merge_start: {rg_idx_merge_starts}")
print(f"rg_idx_merge_end_excl: {rg_idx_merge_ends_excl}")
# Assess if neighbor incomplete row groups in ParquetFile have to be
Expand Down Expand Up @@ -920,8 +1062,8 @@ def analyze_chunks_to_merge(
n_missing_rows=n_missing_rows,
)
print("after max_n_irgs=0 analysis")
print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}")
print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}")
# print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}")
# print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}")

# Row groups have to be sorted after write step
# - either if there is a merge region (new row groups are written first,
Expand Down Expand Up @@ -949,11 +1091,12 @@ def analyze_chunks_to_merge(
if max_n_irgs == 0
else row_group_size_target,
df_n_rows=df_n_rows,
df_idx_tmrg_ends_excl=df_idx_tmrg_ends_excl,
# df_idx_tmrg_ends_excl=df_idx_tmrg_ends_excl,
df_idx_tmrg_ends_excl=df_idx_ends_excl,
)
if isinstance(row_group_size_target, int)
else _rgst_as_str__merge_plan(
rg_maxs=rg_maxs,
# rg_maxs=rg_maxs,
row_group_period=row_group_size_target,
df_ordered_on=df.loc[:, ordered_on],
)
Expand Down
Loading

0 comments on commit c7bb430

Please sign in to comment.