From c7bb4307946876a93d5c7edc7496f8b1085cfd8b Mon Sep 17 00:00:00 2001 From: yohplala Date: Sun, 9 Feb 2025 21:37:05 +0100 Subject: [PATCH] Wip in '_get_atomic_merge_regions()'. --- oups/store/ordered_merge_info.py | 223 ++++++++++++++++---- tests/test_store/test_ordered_merge_info.py | 146 ++++++++++++- 2 files changed, 328 insertions(+), 41 deletions(-) diff --git a/oups/store/ordered_merge_info.py b/oups/store/ordered_merge_info.py index 0930721..b95ccc5 100644 --- a/oups/store/ordered_merge_info.py +++ b/oups/store/ordered_merge_info.py @@ -11,11 +11,13 @@ from fastparquet import ParquetFile from numpy import any as np_any +from numpy import arange from numpy import array from numpy import column_stack from numpy import cumsum from numpy import diff from numpy import flatnonzero +from numpy import insert from numpy import int8 from numpy import nonzero from numpy import r_ @@ -229,7 +231,7 @@ def is_incomplete(self) -> NDArray: """ return self.rg_n_rows < self.min_size - def get_fragmentation_risk( + def get_risk_of_exceeding_rgst( self, indices_overlap: NDArray, indices_enlarged: NDArray, @@ -242,19 +244,32 @@ def get_fragmentation_risk( creating a complete row group that would split a region of contiguous incomplete row groups. + Rational of this logic is to prevent that the addition of new data after + existing incomplete row groups creates an isolated set of incomplete row + groups followed by complete row groups. + + Example: + - Before addition: + - rg [crg] [crg] [irg] [irg] + - df ..... + - Not desired after addition: + - rg [crg] [crg] [irg] [crg] [irg] + - Desired after addition: + - rg [crg] [crg] [crg] [crg] [irg] + Parameters ---------- indices_overlap : NDArray - Array of shape (m, 2) containing start and end indices of overlap + Array of shape (o, 2) containing start and end indices of overlap regions. indices_enlarged : NDArray - Array of shape (n, 2) containing start and end indices of enlarged - regions (n <= m). + Array of shape (e, 2) containing start and end indices of enlarged + regions (e <= o). Returns ------- NDArray - Boolean array of length n indicating which enlarged regions contain + Boolean array of shape (e) indicating which enlarged regions contain overlap regions that risk creating fragmentation when merged. """ @@ -264,10 +279,19 @@ def get_fragmentation_risk( m_values=cumsum(self.rg_n_rows), indices=indices_overlap, ) + print("rg_rows_per_overlap_regions") + print(rg_rows_per_overlap_regions) + # TODO: + # should include all rows of df that are over the overlap regions + # as well as df chunk neighbor to these regions and incomplete row + # groups + # reword fragmentation_risk into "exceed_rgs_target" df_rows_per_overlap_regions = get_region_start_end_delta( m_values=self.df_idx_tmrg_ends_excl - self.df_idx_tmrg_starts - 1, indices=indices_overlap, ) + print("df_rows_per_overlap_regions") + print(df_rows_per_overlap_regions) # 'overlap_has_risk' is an array of length the number of overlap # regions. overlap_has_risk = ( @@ -418,15 +442,14 @@ def get_merge_regions( include neighbor incomplete row groups depending two conditions, - if the total number of existing incomplete row groups in the enlarged merge region is greater than `max_n_irgs`. - - if the possibly resulting row group(s) from the merge between Dataframe - chunks and corresponding overlapping row groups may result in complete - row groups, i.e. may result in a fragmentation of contiguous incomplete - row group regions. + - if the resulting row group(s) from the addition of the Dataframe chunks + and corresponding overlapping row groups and neighbor incomplete row + groups are possibly complete row groups. Parameters ---------- is_overlap : NDArray - A 1d numpy array of dtype `bool`. + Boolean array of shape (o). max_n_irgs : int Maximum number of incomplete row groups allowed in a merge region. @@ -441,16 +464,25 @@ def get_merge_regions( # regions. indices_overlap = get_region_indices_of_true_values(is_overlap) is_incomplete = rg_size_pattern.is_incomplete() - indices_enlarged = get_region_indices_of_true_values(is_overlap | is_incomplete) + is_enlarged = is_overlap | is_incomplete + indices_enlarged = get_region_indices_of_true_values(is_enlarged) + # Split regions are regions of complete row group, not overlapping with any + # DataFrame chunks. + # indices_split = get_region_indices_of_true_values(~is_enlarged) + + # Step 2: keep only enlarged regions that overlap with at least one overlap # Step 2: keep only enlarged regions that overlap with at least one overlap # region. - is_overlapping_with_overlap = np_any( - (indices_enlarged[:, None, 0] <= indices_overlap[None, :, 1]) - & (indices_enlarged[:, None, 1] >= indices_overlap[None, :, 0]), - axis=1, # Note: axis=1 to reduce along columns (overlap regions) - ) - indices_enlarged = indices_enlarged[is_overlapping_with_overlap] + # Should not do that, to keep incomplete row groups that are neighbors to a + # a large df chunk to be written together, even if they are not overlapping. + + # is_overlapping_with_overlap = np_any( + # (indices_enlarged[:, None, 0] <= indices_overlap[None, :, 1]) + # & (indices_enlarged[:, None, 1] >= indices_overlap[None, :, 0]), + # axis=1, # Note: axis=1 to reduce along columns (overlap regions) + # ) + # indices_enlarged = indices_enlarged[is_overlapping_with_overlap] # Step 3: Filter out enlarged candidates based on multiple criteria. # Get number of incomplete row groups at region boundaries. @@ -461,7 +493,7 @@ def get_merge_regions( cumsum_incomplete[indices_enlarged[:, 0]] - cumsum_incomplete[indices_enlarged[0, 0]] ) # Get which enlarged regions contain overlap regions that risk fragmentation. - fragmentation_risk = rg_size_pattern.get_fragmentation_risk( + fragmentation_risk = rg_size_pattern.get_risk_of_exceeding_rgst( indices_overlap=indices_overlap, indices_enlarged=indices_enlarged, ) @@ -484,6 +516,108 @@ def get_merge_regions( return vstack((indices_overlap, indices_enlarged)) +def _get_atomic_merge_regions( + rg_mins: List, + rg_maxs: List, + df_ordered_on: Series, + drop_duplicates: bool, +) -> Tuple[NDArray, NDArray, NDArray]: + """ + Get atomic merge regions. + + An atomic merge region is + - either defined by an existing row group in + ParquetFile and if existing, its corresponding overlapping DataFrame chunk, + - or a DataFrame chunk that is not overlapping with any row group in + ParquetFile. + + Returned arrays provide the start and end (excluded) indices in row groups + and end (excluded) indices in DataFrame for each of these atomic merge + regions. + These arrays are of same size. + There is no need to provide the start indices in DataFrame, as they can be + inferred from the end (excluded) indices in DataFrame of the previous atomic + merge region (no part of the DataFrame is omitted for the write). + + Parameters + ---------- + rg_mins : List + Minimum values of 'ordered_on' in each row group. + rg_maxs : List + Maximum values of 'ordered_on' in each row group. + df_ordered_on : Series[Timestamp] + Values of 'ordered_on' column in DataFrame. + drop_duplicates : bool + Flag impacting how overlap boundaries have to be managed. + More exactly, 'pf' is considered as first data, and 'df' as second + data, coming after. In case of 'pf' leading 'df', if last value in + 'pf' is a duplicate of the first in 'df', then + - If True, at this index, overlap starts + - If False, no overlap at this index + + Returns + ------- + Tuple[NDArray, NDArray, NDArray] + - First NDArray: indices in ParquetFile containing the starts of each + row group to merge with corresponding DataFrame chunk. + - Second NDArray: indices in ParquetFile containing the ends (excluded) + of each row group to merge with corresponding DataFrame chunk. + - Third NDArray: indices in DataFrame containing the ends (excluded) + of each DataFrame chunk to merge with corresponding row group. + + """ + # Find regions in DataFrame overlapping with row groups. + if drop_duplicates: + print("drop_duplicates") + # Determine overlap start/end indices in row groups + df_idx_tmrg_starts = searchsorted(df_ordered_on, rg_mins, side=LEFT) + df_idx_tmrg_ends_excl = searchsorted(df_ordered_on, rg_maxs, side=RIGHT) + else: + print("no drop_duplicates") + df_idx_tmrg_starts, df_idx_tmrg_ends_excl = searchsorted( + df_ordered_on, + vstack((rg_mins, rg_maxs)), + side=LEFT, + ) + print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}") + print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}") + # Find regions in DataFrame not overlapping with any row group. + # `amr` for atomic merge region. + df_idxs_enlarged = r_[ + df_idx_tmrg_starts[0], # gap at start (0 to first start) + df_idx_tmrg_starts[1:] - df_idx_tmrg_ends_excl[:-1], + len(df_ordered_on) - df_idx_tmrg_ends_excl[-1], # gap at end + ] + print(f"df_idxs_enlarged: {df_idxs_enlarged}") + amr_idx_non_overlapping = flatnonzero(df_idxs_enlarged) + print(f"amr_idx_non_overlapping: {amr_idx_non_overlapping}") + rg_idxs = arange(len(rg_mins) + 1) + print(f"rg_idxs: {rg_idxs}") + if len(amr_idx_non_overlapping) == 0: + # No non-overlapping regions in DataFrame + return rg_idxs[:-1], rg_idxs[1:], df_idx_tmrg_ends_excl + else: + # Get insert accounting for previous insertions + # insert_positions = amr_idx_non_overlapping + arange(len(amr_idx_non_overlapping)) + # print(f"insert_positions: {insert_positions}") + # Fill arrays + rg_idx_to_insert = rg_idxs[amr_idx_non_overlapping] + print(f"rg_idx_to_insert: {rg_idx_to_insert}") + rg_idxs_with_inserts = insert(rg_idxs, amr_idx_non_overlapping, rg_idx_to_insert) + print(f"rg_idxs_with_inserts: {rg_idxs_with_inserts}") + if amr_idx_non_overlapping[-1] == len(df_ordered_on): + df_idx_to_insert = df_idx_tmrg_starts[amr_idx_non_overlapping] + else: + df_idx_to_insert = r_[df_idx_tmrg_starts, len(df_ordered_on)][amr_idx_non_overlapping] + df_idx_with_inserts = insert( + df_idx_tmrg_ends_excl, + amr_idx_non_overlapping, + df_idx_to_insert, + ) + print(f"df_idx_with_inserts: {df_idx_with_inserts}") + return rg_idxs_with_inserts[:-1], rg_idxs_with_inserts[1:], df_idx_with_inserts + + def _rgst_as_str__irgs_analysis( rg_mins: NDArray, rg_idx_merge_start: int, @@ -798,24 +932,32 @@ def analyze_chunks_to_merge( rg_n_rows = [rg.num_rows for rg in pf.row_groups] n_rgs = len(rg_n_rows) # df_min = df.loc[:, ordered_on].iloc[0] - # Find overlapping regions in dataframe - rg_mins = pf.statistics[MIN][ordered_on] - rg_maxs = pf.statistics[MAX][ordered_on] - if drop_duplicates: - print("drop_duplicates") - # Determine overlap start/end indices in row groups - df_idx_tmrg_starts = searchsorted(df.loc[:, ordered_on], rg_mins, side=LEFT) - df_idx_tmrg_ends_excl = searchsorted(df.loc[:, ordered_on], rg_maxs, side=RIGHT) - else: - print("no drop_duplicates") - df_idx_tmrg_starts, df_idx_tmrg_ends_excl = searchsorted( - df.loc[:, ordered_on], - vstack((rg_mins, rg_maxs)), - side=LEFT, - ) + # # Find overlapping regions in dataframe + # rg_mins = pf.statistics[MIN][ordered_on] + # rg_maxs = pf.statistics[MAX][ordered_on] + # if drop_duplicates: + # print("drop_duplicates") + # Determine overlap start/end indices in row groups + # df_idx_tmrg_starts = searchsorted(df.loc[:, ordered_on], rg_mins, side=LEFT) + # df_idx_tmrg_ends_excl = searchsorted(df.loc[:, ordered_on], rg_maxs, side=RIGHT) + # else: + # print("no drop_duplicates") + # df_idx_tmrg_starts, df_idx_tmrg_ends_excl = searchsorted( + # df.loc[:, ordered_on], + # vstack((rg_mins, rg_maxs)), + # side=LEFT, + # ) + pf_statistics = pf.statistics + rg_idx_starts, rg_idx_ends_excl, df_idx_ends_excl = _get_atomic_merge_regions( + rg_mins=pf_statistics[MIN][ordered_on], + rg_maxs=pf_statistics[MAX][ordered_on], + df_ordered_on=df.loc[:, ordered_on], + drop_duplicates=drop_duplicates, + ) # Get overlap status per row groups. - is_overlap = df_idx_tmrg_starts == df_idx_tmrg_ends_excl - 1 + is_overlap = 1 + # is_overlap = df_idx_tmrg_starts == df_idx_tmrg_ends_excl - 1 # if not df_idx_tmrg_ends_excl[-1]: # # df after last row group in pf. @@ -872,8 +1014,8 @@ def analyze_chunks_to_merge( print("") print("before irgs analysis") print(f"df_n_rows: {df_n_rows}") - print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}") - print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}") + # print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}") + # print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}") print(f"rg_idx_merge_start: {rg_idx_merge_starts}") print(f"rg_idx_merge_end_excl: {rg_idx_merge_ends_excl}") # Assess if neighbor incomplete row groups in ParquetFile have to be @@ -920,8 +1062,8 @@ def analyze_chunks_to_merge( n_missing_rows=n_missing_rows, ) print("after max_n_irgs=0 analysis") - print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}") - print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}") + # print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}") + # print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}") # Row groups have to be sorted after write step # - either if there is a merge region (new row groups are written first, @@ -949,11 +1091,12 @@ def analyze_chunks_to_merge( if max_n_irgs == 0 else row_group_size_target, df_n_rows=df_n_rows, - df_idx_tmrg_ends_excl=df_idx_tmrg_ends_excl, + # df_idx_tmrg_ends_excl=df_idx_tmrg_ends_excl, + df_idx_tmrg_ends_excl=df_idx_ends_excl, ) if isinstance(row_group_size_target, int) else _rgst_as_str__merge_plan( - rg_maxs=rg_maxs, + # rg_maxs=rg_maxs, row_group_period=row_group_size_target, df_ordered_on=df.loc[:, ordered_on], ) diff --git a/tests/test_store/test_ordered_merge_info.py b/tests/test_store/test_ordered_merge_info.py index 72c2fcb..a12ba6c 100644 --- a/tests/test_store/test_ordered_merge_info.py +++ b/tests/test_store/test_ordered_merge_info.py @@ -5,7 +5,7 @@ @author: yoh """ -from typing import List +from typing import List, Tuple import pytest from numpy import array @@ -19,6 +19,7 @@ from pandas import date_range from oups.store.ordered_merge_info import NRowsPattern +from oups.store.ordered_merge_info import _get_atomic_merge_regions from oups.store.ordered_merge_info import _rgst_as_str__merge_plan from oups.store.ordered_merge_info import analyze_chunks_to_merge from oups.store.ordered_merge_info import get_merge_regions @@ -31,6 +32,149 @@ REF_D = "2020/01/01 " +@pytest.mark.parametrize( + "test_id, rg_mins, rg_maxs, df_ordered_on, drop_duplicates, expected", + [ + ( + "no_gaps_rg_df", + [10, 20, 30], # rg_mins + [15, 25, 35], # rg_maxs + Series([12, 22, 32]), # df_ordered_on + True, + ( + array([0, 1, 2]), # rg_idx_starts + array([1, 2, 3]), # rg_idx_ends_excl + array([1, 2, 3]), # df_idx_tmrg_ends_excl + ), + ), + ( + "gap_at_start_df_leading_rg", + [20, 30], # rg_mins + [25, 35], # rg_maxs + Series([5, 22, 32]), # df_ordered_on + True, + ( + array([0, 0, 1]), # rg_idx_starts + array([0, 1, 2]), # rg_idx_ends_excl + array([1, 2, 3]), # df_idx_tmrg_ends_excl + ), + ), + ( + "gap_in_middle_df_not_overlapping_rg", + [10, 30], # rg_mins + [15, 35], # rg_maxs + Series([12, 22, 32]), # df_ordered_on + True, + ( + array([0, 1, 1]), # rg_idx_starts + array([1, 1, 2]), # rg_idx_ends_excl + array([1, 2, 3]), # df_idx_tmrg_ends_excl + ), + ), + ( + "gap_at_end_df_trailing_rg", + [10, 20], # rg_mins + [15, 25], # rg_maxs + Series([12, 22, 32]), # df_ordered_on + True, + ( + array([0, 1, 2]), # rg_idx_starts + array([1, 2, 2]), # rg_idx_ends_excl + array([1, 2, 3]), # df_idx_tmrg_ends_excl + ), + ), + ( + "gap_at_start_rg_leading_df", + [0, 20, 30], # rg_mins + [5, 23, 33], # rg_maxs + Series([22, 32]), # df_ordered_on + True, + ( + array([0, 1, 2]), # rg_idx_starts + array([1, 2, 3]), # rg_idx_ends_excl + array([0, 1, 2]), # df_idx_tmrg_ends_excl + ), + ), + ( + "gap_in_middle_rg_not_overlapping_df", + [0, 10, 30], # rg_mins + [5, 15, 35], # rg_maxs + Series([2, 32]), # df_ordered_on + True, + ( + array([0, 1, 2]), # rg_idx_starts + array([1, 2, 3]), # rg_idx_ends_excl + array([1, 1, 2]), # df_idx_tmrg_ends_excl + ), + ), + # TODO: work in progress here + ( + "gap_at_end_rg_trailing_df", + [10, 20, 30], # rg_mins + [15, 25, 35], # rg_maxs + Series([12, 22]), # df_ordered_on + True, + ( + array([0, 1, 2]), # rg_idx_starts + array([1, 2, 2]), # rg_idx_ends_excl + array([1, 2, 2]), # df_idx_tmrg_ends_excl + ), + ), + ( + "multiple_gaps_non_overlapping_rg", + [20, 40, 43], # rg_mins + [25, 43, 45], # rg_maxs + Series([5, 22, 32, 42, 46, 52]), # df_ordered_on + True, + ( + array([0, 0, 1, 1, 2, 2, 3]), # rg_idx_starts + array([0, 1, 1, 2, 2, 3, 3]), # rg_idx_ends_excl + array([1, 2, 3, 4, 5, 6, 7]), # df_idx_tmrg_ends_excl + ), + ), + ( + "no_drop_duplicates", + [10, 20], # rg_mins + [15, 25], # rg_maxs + Series([15, 22, 32]), # df_ordered_on - note 15 is duplicate + False, + ( + array([0, 1, 2]), # rg_idx_starts + array([1, 2, 2]), # rg_idx_ends_excl + array([1, 2, 3]), # df_idx_tmrg_ends_excl + ), + ), + ], +) +def test_get_atomic_merge_regions( + test_id: str, + rg_mins: List, + rg_maxs: List, + df_ordered_on: Series, + drop_duplicates: bool, + expected: Tuple[NDArray, NDArray, NDArray], +) -> None: + """ + Test _get_atomic_merge_regions with various scenarios. + + Test cases cover: + - No gaps between regions + - Gap at start of DataFrame + - Gap in middle + - Gap at end + - Multiple gaps + - Behavior with drop_duplicates=False + + """ + result = _get_atomic_merge_regions(rg_mins, rg_maxs, df_ordered_on, drop_duplicates) + for res, exp in zip(result, expected): + print("res:") + print(res) + print("exp:") + print(exp) + assert array_equal(res, exp) + + @pytest.mark.parametrize( "mask, expected", [