Skip to content

Commit

Permalink
A different 'max_row_group_size' can be used for bins and snapshots r…
Browse files Browse the repository at this point in the history
…esults.
  • Loading branch information
yohplala committed Oct 27, 2024
1 parent 69176ce commit 16973e7
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 18 deletions.
68 changes: 54 additions & 14 deletions oups/aggstream/aggstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from collections import ChainMap
from collections import namedtuple
from dataclasses import dataclass
from enum import Enum
from itertools import chain
from multiprocessing import cpu_count
from typing import Callable, Iterable, List, Optional, Tuple, Union
Expand Down Expand Up @@ -38,6 +39,7 @@
from oups.store import ParquetSet
from oups.store.router import ParquetHandle
from oups.store.writer import KEY_DUPLICATES_ON
from oups.store.writer import KEY_MAX_ROW_GROUP_SIZE
from oups.store.writer import OUPS_METADATA
from oups.store.writer import OUPS_METADATA_KEY
from oups.store.writer import write
Expand All @@ -63,6 +65,7 @@
KEY_AGG_IN_MEMORY_SIZE = "agg_in_memory_size"
KEY_MAX_IN_MEMORY_SIZE_B = "max_in_memory_size_b"
KEY_MAX_IN_MEMORY_SIZE_MB = "max_in_memory_size"
KEY_AGG_RES_TYPE = "agg_res_type"
KEY_SEG_CONFIG = "seg_config"
# Filters
NO_FILTER_ID = "_"
Expand All @@ -89,6 +92,7 @@


FilterApp = namedtuple("FilterApp", "keys n_jobs")
AggResType = Enum("AggResType", ["BINS", "SNAPS", "BOTH"])


def _is_aggstream_result(handle: ParquetHandle) -> bool:
Expand Down Expand Up @@ -153,8 +157,10 @@ def _init_keys_config(
in bytes
'write_config' : {'ordered_on' : str,
'duplicates_on' : str or list,
'max_row_group_size' : Union[str, int, tuple]
...
},
'agg_res_rype' : AggResType, either 'BINS', 'SNAPS', or 'BOTH'.
},
}``
- ``self.agg_pd``, dict, specifying per key the aggregation
Expand Down Expand Up @@ -238,12 +244,34 @@ def _init_keys_config(
key_conf_in[KEY_DUPLICATES_ON] = (
bin_on_out if bin_on_out else key_conf_in[KEY_ORDERED_ON]
)
if seg_config[KEY_SNAP_BY] is None:
# Snapshots not requested, aggreagtation results are necessarily
# bins.
agg_res_type = AggResType.BINS
elif isinstance(key, tuple):
# 2 keys are provided, aggregation results are necessarily both
# bins and snapshots.
agg_res_type = AggResType.BOTH
else:
# Otherwise, a single aggregation result is expected, and it is
# created from both bins and snapshots. Hence it is snaps like.
agg_res_type = AggResType.SNAPS
if agg_res_type is AggResType.BOTH:
if KEY_MAX_ROW_GROUP_SIZE in key_conf_in:
if not isinstance(key_conf_in[KEY_MAX_ROW_GROUP_SIZE], tuple):
key_conf_in[KEY_MAX_ROW_GROUP_SIZE] = (
key_conf_in[KEY_MAX_ROW_GROUP_SIZE],
key_conf_in[KEY_MAX_ROW_GROUP_SIZE],
)
else:
key_conf_in[KEY_MAX_ROW_GROUP_SIZE] = (None, None)
consolidated_keys_config[key] = {
KEY_SEG_CONFIG: seg_config,
KEY_BIN_ON_OUT: bin_on_out,
KEY_MAX_IN_MEMORY_SIZE_B: key_conf_in.pop(KEY_MAX_IN_MEMORY_SIZE_B),
KEY_POST: key_conf_in.pop(KEY_POST),
KEY_WRITE_CONFIG: key_conf_in,
KEY_AGG_RES_TYPE: agg_res_type,
}
return consolidated_keys_config, agg_pd

Expand Down Expand Up @@ -611,6 +639,7 @@ def _concat_agg_res(

def _post_n_write_agg_chunks(
agg_buffers: dict,
agg_res_type: Enum,
append_last_res: bool,
store: ParquetSet,
key: Union[dataclass, Tuple[dataclass, dataclass]],
Expand Down Expand Up @@ -658,6 +687,8 @@ def _post_n_write_agg_chunks(
It is NOT reset after writing. It is however required to be
written in metadata.
agg_res_type : Enum
Either 'BINS', 'SNAPS', or 'BOTH'.
append_last_res : bool
If 'agg_res' should be appended to 'agg_res_buffer' and if 'bin_res'
should be appended to 'bin_res_buffers'.
Expand Down Expand Up @@ -705,7 +736,7 @@ def _post_n_write_agg_chunks(
# When there is no result, 'agg_res' is None.
if isinstance((bin_res := agg_buffers[KEY_BIN_RES]), DataFrame):
# To keep track there has been res in the 1st place.
not_null_res = True
initial_agg_res = True
# Concat list of aggregation results.
bin_res = _concat_agg_res(
agg_buffers[KEY_BIN_RES_BUFFER],
Expand All @@ -714,8 +745,7 @@ def _post_n_write_agg_chunks(
index_name,
)
# Same if needed with 'snap_res_buffer'.
snap_res = agg_buffers[KEY_SNAP_RES]
if snap_res is not None:
if isinstance((snap_res := agg_buffers[KEY_SNAP_RES]), DataFrame):
snap_res = _concat_agg_res(
agg_buffers[KEY_SNAP_RES_BUFFER],
snap_res,
Expand All @@ -729,15 +759,15 @@ def _post_n_write_agg_chunks(
# number of rows before outputting results (warm-up).
main_res = (
post(buffer=post_buffer, bin_res=bin_res)
if snap_res is None
if agg_res_type is AggResType.BINS
else post(buffer=post_buffer, bin_res=bin_res, snap_res=snap_res)
)
if isinstance(main_res, tuple):
if agg_res_type is AggResType.BOTH:
# First result, recorded with 'bin_key', is considered main
# result.
main_res, snap_res = main_res
else:
if isinstance(key, tuple):
try:
main_res, snap_res = main_res
except ValueError:
raise ValueError(
f"not possible to have key '{key[0]}' for bins and "
f"key '{key[1]}' for snapshots but 'post()' function "
Expand All @@ -747,7 +777,7 @@ def _post_n_write_agg_chunks(
# mistake in 'key' parameter (finally commented out).
# snap_res = None
# bin_res = None
elif snap_res is None or isinstance(key, tuple):
elif agg_res_type is not AggResType.SNAPS:
# Case only 'bin_res' is recorded or both 'bin_res' and 'snap_res'.
# main_res, bin_res = bin_res, None
main_res = bin_res
Expand All @@ -756,7 +786,7 @@ def _post_n_write_agg_chunks(
# main_res, bin_res, snap_res = snap_res, None, None
main_res = snap_res
else:
not_null_res = False
initial_agg_res = False
main_res = None
main_key, snap_key = key if isinstance(key, tuple) else (key, None)
if last_seed_index:
Expand All @@ -782,9 +812,17 @@ def _post_n_write_agg_chunks(
# When there is no result, 'main_res' is None.
if isinstance(main_res, DataFrame):
# Record data (with metadata possibly updated).
store[main_key] = write_config, main_res
if snap_key is not None:
store[snap_key] = write_config, snap_res
if agg_res_type is AggResType.BOTH:
store[main_key] = (
write_config | {KEY_MAX_ROW_GROUP_SIZE: write_config[KEY_MAX_ROW_GROUP_SIZE][0]},
main_res,
)
store[snap_key] = (
write_config | {KEY_MAX_ROW_GROUP_SIZE: write_config[KEY_MAX_ROW_GROUP_SIZE][1]},
snap_res,
)
else:
store[main_key] = write_config, main_res
elif last_seed_index:
# If no result, metadata is possibly to be written, as this is the
# flag indicating the last 'aggstream' local iteration.
Expand All @@ -794,7 +832,7 @@ def _post_n_write_agg_chunks(
# In case no Parquet file exist yet, need to initiate one to start
# storing metadata.
store[main_key] = DataFrame()
if not_null_res:
if initial_agg_res:
# If there have been results, they have been processed (either written
# directly or through 'post()'). Time to reset aggregation buffers and
# counters.
Expand Down Expand Up @@ -866,6 +904,7 @@ def agg_iter(
# iteration.
_post_n_write_agg_chunks(
agg_buffers=agg_buffers,
agg_res_type=keys_config[KEY_AGG_RES_TYPE],
append_last_res=False,
store=store,
key=key,
Expand Down Expand Up @@ -1478,6 +1517,7 @@ def agg(
store=self.store,
key=key,
agg_buffers=agg_res,
agg_res_type=self.keys_config[key][KEY_AGG_RES_TYPE],
append_last_res=True,
write_config=self.keys_config[key][KEY_WRITE_CONFIG],
index_name=self.keys_config[key][KEY_BIN_ON_OUT],
Expand Down
15 changes: 11 additions & 4 deletions tests/test_aggstream/test_aggstream_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from oups import toplevel
from oups.aggstream.aggstream import KEY_AGG
from oups.aggstream.aggstream import KEY_AGGSTREAM
from oups.aggstream.aggstream import KEY_MAX_ROW_GROUP_SIZE
from oups.aggstream.aggstream import KEY_PRE_BUFFER
from oups.aggstream.aggstream import KEY_RESTART_INDEX
from oups.aggstream.aggstream import SeedPreException
Expand Down Expand Up @@ -1114,7 +1115,8 @@ def post_only_snap(buffer: dict, bin_res: DataFrame = None, snap_res: DataFrame
return snap_res

val = "val"
max_row_group_size = 5
max_row_group_size_5 = 5
max_row_group_size_8 = 8
snap_duration = "5T"
common_key_params = {
KEY_SNAP_BY: TimeGrouper(key=ordered_on, freq=snap_duration, closed="left", label="right"),
Expand All @@ -1124,6 +1126,7 @@ def post_only_snap(buffer: dict, bin_res: DataFrame = None, snap_res: DataFrame
key1_sst = Indexer("agg_10T_sst")
key1_cf = {
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="10T", closed="left", label="right"),
KEY_MAX_ROW_GROUP_SIZE: (max_row_group_size_8, max_row_group_size_5),
}
key2_sst = Indexer("agg_20T_sst")
key2_cf = {
Expand Down Expand Up @@ -1152,7 +1155,7 @@ def post_only_snap(buffer: dict, bin_res: DataFrame = None, snap_res: DataFrame
filter1: [(filter_on, "==", True)],
filter2: [(filter_on, "==", False)],
},
max_row_group_size=max_row_group_size,
max_row_group_size=max_row_group_size_5,
**common_key_params,
parallel=True,
post=post_bin_snap if with_post else None,
Expand All @@ -1176,6 +1179,9 @@ def post_only_snap(buffer: dict, bin_res: DataFrame = None, snap_res: DataFrame
seed_list = [seed_df.iloc[:17], seed_df.iloc[17:31]]
as1.agg(seed=seed_list, trim_start=False, discard_last=False, final_write=True)
del as1
# Check 'max_row_group_size' values have been both used.
assert [rg.num_rows for rg in store[key1_sst].pf.row_groups] == [5, 5, 3]
assert [rg.num_rows for rg in store[key1_bin].pf.row_groups] == [7]
# New aggregation.
as2 = AggStream(
ordered_on=ordered_on,
Expand All @@ -1191,13 +1197,14 @@ def post_only_snap(buffer: dict, bin_res: DataFrame = None, snap_res: DataFrame
filter1: [(filter_on, "==", True)],
filter2: [(filter_on, "==", False)],
},
max_row_group_size=max_row_group_size,
max_row_group_size=max_row_group_size_5,
**common_key_params,
parallel=True,
post=post_bin_snap if with_post else None,
)
as2.agg(seed=seed_df.iloc[31:], trim_start=False, discard_last=False, final_write=True)
# Reference results by continuous aggregation.
del key1_cf[KEY_MAX_ROW_GROUP_SIZE]
key1_bin_ref, key1_sst_ref = cumsegagg(
data=seed_df.loc[seed_df[filter_on], :],
**(key1_cf | common_key_params),
Expand Down Expand Up @@ -1234,7 +1241,7 @@ def post_only_snap(buffer: dict, bin_res: DataFrame = None, snap_res: DataFrame


def test_exception_two_keys_but_single_result_from_post(store, seed_path):
# A key is provided for bins and one for snapshots, byt 'post()' only
# A key is provided for bins and one for snapshots, but 'post()' only
# return one result.

def post(buffer: dict, bin_res: DataFrame, snap_res: DataFrame):
Expand Down
11 changes: 11 additions & 0 deletions tests/test_aggstream/test_aggstream_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from oups.aggstream.aggstream import FIRST
from oups.aggstream.aggstream import KEY_AGG
from oups.aggstream.aggstream import KEY_AGG_IN_MEMORY_SIZE
from oups.aggstream.aggstream import KEY_AGG_RES_TYPE
from oups.aggstream.aggstream import KEY_BIN_BY
from oups.aggstream.aggstream import KEY_BIN_ON
from oups.aggstream.aggstream import KEY_BIN_ON_OUT
Expand All @@ -52,6 +53,7 @@
from oups.aggstream.aggstream import LAST
from oups.aggstream.aggstream import NO_FILTER_ID
from oups.aggstream.aggstream import SUM
from oups.aggstream.aggstream import AggResType
from oups.aggstream.aggstream import FilterApp
from oups.store.writer import KEY_MAX_ROW_GROUP_SIZE

Expand Down Expand Up @@ -113,6 +115,7 @@ def always_false(**kwargs):
KEY_DUPLICATES_ON: "ts",
},
KEY_MAX_IN_MEMORY_SIZE_B: 146800640,
KEY_AGG_RES_TYPE: AggResType.BINS,
},
},
# ref_agg_pd
Expand Down Expand Up @@ -184,6 +187,7 @@ def always_false(**kwargs):
KEY_ORDERED_ON: "ts_dflt",
KEY_DUPLICATES_ON: "ts_dflt",
},
KEY_AGG_RES_TYPE: AggResType.BINS,
},
Indexer("key2_only_specific"): {
KEY_BIN_ON_OUT: None,
Expand All @@ -195,6 +199,7 @@ def always_false(**kwargs):
KEY_ORDERED_ON: "ts_spec",
KEY_DUPLICATES_ON: "ts_spec",
},
KEY_AGG_RES_TYPE: AggResType.BINS,
},
Indexer("key3_only_default"): {
KEY_BIN_ON_OUT: "bin_out_spec",
Expand All @@ -206,6 +211,7 @@ def always_false(**kwargs):
KEY_ORDERED_ON: "ts_dflt",
KEY_DUPLICATES_ON: "bin_out_spec",
},
KEY_AGG_RES_TYPE: AggResType.BINS,
},
Indexer("key4_most_default"): {
KEY_BIN_ON_OUT: "ts_dflt",
Expand All @@ -217,6 +223,7 @@ def always_false(**kwargs):
KEY_ORDERED_ON: "ts_spec",
KEY_DUPLICATES_ON: "ts_dflt",
},
KEY_AGG_RES_TYPE: AggResType.BINS,
},
},
# ref_agg_pd
Expand Down Expand Up @@ -309,6 +316,7 @@ def always_false(**kwargs):
KEY_ORDERED_ON: "ts_dflt",
KEY_DUPLICATES_ON: "ts_dflt",
},
KEY_AGG_RES_TYPE: AggResType.SNAPS,
},
Indexer("key2_only_specific"): {
KEY_BIN_ON_OUT: None,
Expand All @@ -320,6 +328,7 @@ def always_false(**kwargs):
KEY_ORDERED_ON: "ts_spec",
KEY_DUPLICATES_ON: "ts_spec",
},
KEY_AGG_RES_TYPE: AggResType.SNAPS,
},
Indexer("key3_only_default"): {
KEY_BIN_ON_OUT: "bin_out_spec",
Expand All @@ -331,6 +340,7 @@ def always_false(**kwargs):
KEY_ORDERED_ON: "ts_dflt",
KEY_DUPLICATES_ON: "bin_out_spec",
},
KEY_AGG_RES_TYPE: AggResType.SNAPS,
},
Indexer("key4_most_default"): {
KEY_BIN_ON_OUT: "ts_dflt",
Expand All @@ -342,6 +352,7 @@ def always_false(**kwargs):
KEY_ORDERED_ON: "ts_spec",
KEY_DUPLICATES_ON: "ts_dflt",
},
KEY_AGG_RES_TYPE: AggResType.SNAPS,
},
},
# ref_agg_pd
Expand Down

0 comments on commit 16973e7

Please sign in to comment.