Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interpolate: Support "limit" #147

Open
epa095 opened this issue Feb 10, 2022 · 1 comment
Open

Interpolate: Support "limit" #147

epa095 opened this issue Feb 10, 2022 · 1 comment
Assignees
Labels
enhancement New feature or request

Comments

@epa095
Copy link

epa095 commented Feb 10, 2022

Pandas supports a parameter limit : " Maximum number of consecutive NaNs to fill. Must be greater than 0."

It would be usefull if tempo supported something simililar. The pandas version is a bit weird (imo) in that it still interpolates up to that nr of NaNs, and then just stops. So if you have a resampling of 1 min, a gap of 1,5 hours (90 buckets) and a limit of 1 hour (60 buckets) it will interpolate out 60 bucketes into the gap, and leave 30 NaNs. To me the reasonable implementation of "limt" will avoid the whole stretch, and interpolate none of the NaNs, since the reasonable interpretation of the limit is something like "if the gap is this large we have to little information".

@epa095
Copy link
Author

epa095 commented Feb 14, 2022

If someone else is looking for this feature I can share the hacky way I implemented it so far (but it would definitely be better if it was included in tempo):

def filter_on_consecutive_equal_values(
    df,
    search_column: str,
    order_column: str,
    max_consecutive_rows: int,
    search_value,
    partition: Optional[Union[str, List[str]]] = None,
    include_debug_columns=False,
):
    """This searches for consecutive occurrences of a value in a given column (
    possible partitioned on columns in "partition"), and if it finds streaks longer than
    max_consecutive_rows it removes *all* rows in that streak.

    Parameters
    ----------
    df
        PySpark dataframe to filter
    search_column
        Column to search for large streaks in
    order_column
        To find streaks we need a order, and this column describes that order.
    max_consecutive_rows
        Max allowed streak size. Any streak strictly larger than this will be removed.
    search_value
        Value to search for
    partition
        Optional column to partition on.
    include_debug_columns : bool
        If true then keep generated columns for debug
    """
    if partition is None:
        partition = []
    elif isinstance(partition, str):
        partition = [partition]

    has_value = generate_unique_column_name(df, "has_value")
    start_streak = generate_unique_column_name(df, "start_streak")
    streak_id = generate_unique_column_name(df, "streak_id")
    streak_count = generate_unique_column_name(df, "streak_count")
    df = (
        # Indicating if this row has the value we are limiting.
        df.withColumn(
            has_value, F.when(F.col(search_column) == search_value, 1).otherwise(0)
        )
        # 1 if this is the start of a streak of the value we are searching for in
        # this partition, 0 otherwise.
        .withColumn(
            start_streak,
            F.when(
                (F.col(has_value) == 1)
                & (
                    (F.col(has_value) == 1)
                    != F.lead(has_value, -1).over(
                        Window.partitionBy(partition).orderBy(order_column)
                    )  # Note: Because of Null we must not just check if prev is 0
                ),
                1,
            ).otherwise(0),
        )
        # The cumulative sum of start_streak's (which are 1 or 0) inside this
        # partition gives this streaks streak_id (a running nr from 1 and up). All rows
        # without the value has streak_id==0
        .withColumn(
            streak_id,
            F.when(
                (F.col(has_value) == 1),
                F.sum(start_streak).over(
                    Window.partitionBy(partition).orderBy(order_column)
                ),
            ).otherwise(0),
        )
        # Now we add the count of rows inside each partition-streak
        .withColumn(
            streak_count,
            F.when(
                (F.col(has_value) == 1),
                F.count("*").over(Window.partitionBy(partition + [streak_id])),
            ).otherwise(0),
        )
        # We filter away all rows which are part of an oversized streak.
        .where((F.col(has_value) == 0) | (F.col(streak_count) <= max_consecutive_rows))
    )

    if not include_debug_columns:
        df = df.drop(
            *[
                has_value,
                start_streak,
                streak_id,
                streak_count,
            ]
        )
    return df


def generate_unique_column_name(df, base_name):
    """Generates column name based on base_name which does not exist in df. If
    base_name not in df.columns it will return base_name."""
    propsal = base_name
    while propsal in df.columns:
        random_suffix = "".join(
            random.choices(string.ascii_uppercase + string.digits, k=5)
        )
        propsal = base_name + "_" + random_suffix
    return propsal

And then after interpolating into resampled with show_interpolated=True I filter like this:

df = filter_on_consecutive_equal_values(
    df=resampled.df,
    search_column=f"is_interpolated_{value_col_name}",
    order_column=ts_col,
    max_consecutive_rows=max_consecutive_interpolated_rows,
    search_value=True,
    partition=partition_cols,
    include_debug_columns=False,
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants