-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Interpolate: Support "limit" #147
Labels
enhancement
New feature or request
Comments
If someone else is looking for this feature I can share the hacky way I implemented it so far (but it would definitely be better if it was included in tempo): def filter_on_consecutive_equal_values(
df,
search_column: str,
order_column: str,
max_consecutive_rows: int,
search_value,
partition: Optional[Union[str, List[str]]] = None,
include_debug_columns=False,
):
"""This searches for consecutive occurrences of a value in a given column (
possible partitioned on columns in "partition"), and if it finds streaks longer than
max_consecutive_rows it removes *all* rows in that streak.
Parameters
----------
df
PySpark dataframe to filter
search_column
Column to search for large streaks in
order_column
To find streaks we need a order, and this column describes that order.
max_consecutive_rows
Max allowed streak size. Any streak strictly larger than this will be removed.
search_value
Value to search for
partition
Optional column to partition on.
include_debug_columns : bool
If true then keep generated columns for debug
"""
if partition is None:
partition = []
elif isinstance(partition, str):
partition = [partition]
has_value = generate_unique_column_name(df, "has_value")
start_streak = generate_unique_column_name(df, "start_streak")
streak_id = generate_unique_column_name(df, "streak_id")
streak_count = generate_unique_column_name(df, "streak_count")
df = (
# Indicating if this row has the value we are limiting.
df.withColumn(
has_value, F.when(F.col(search_column) == search_value, 1).otherwise(0)
)
# 1 if this is the start of a streak of the value we are searching for in
# this partition, 0 otherwise.
.withColumn(
start_streak,
F.when(
(F.col(has_value) == 1)
& (
(F.col(has_value) == 1)
!= F.lead(has_value, -1).over(
Window.partitionBy(partition).orderBy(order_column)
) # Note: Because of Null we must not just check if prev is 0
),
1,
).otherwise(0),
)
# The cumulative sum of start_streak's (which are 1 or 0) inside this
# partition gives this streaks streak_id (a running nr from 1 and up). All rows
# without the value has streak_id==0
.withColumn(
streak_id,
F.when(
(F.col(has_value) == 1),
F.sum(start_streak).over(
Window.partitionBy(partition).orderBy(order_column)
),
).otherwise(0),
)
# Now we add the count of rows inside each partition-streak
.withColumn(
streak_count,
F.when(
(F.col(has_value) == 1),
F.count("*").over(Window.partitionBy(partition + [streak_id])),
).otherwise(0),
)
# We filter away all rows which are part of an oversized streak.
.where((F.col(has_value) == 0) | (F.col(streak_count) <= max_consecutive_rows))
)
if not include_debug_columns:
df = df.drop(
*[
has_value,
start_streak,
streak_id,
streak_count,
]
)
return df
def generate_unique_column_name(df, base_name):
"""Generates column name based on base_name which does not exist in df. If
base_name not in df.columns it will return base_name."""
propsal = base_name
while propsal in df.columns:
random_suffix = "".join(
random.choices(string.ascii_uppercase + string.digits, k=5)
)
propsal = base_name + "_" + random_suffix
return propsal And then after interpolating into df = filter_on_consecutive_equal_values(
df=resampled.df,
search_column=f"is_interpolated_{value_col_name}",
order_column=ts_col,
max_consecutive_rows=max_consecutive_interpolated_rows,
search_value=True,
partition=partition_cols,
include_debug_columns=False,
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Pandas supports a parameter
limit
: " Maximum number of consecutive NaNs to fill. Must be greater than 0."It would be usefull if tempo supported something simililar. The pandas version is a bit weird (imo) in that it still interpolates up to that nr of NaNs, and then just stops. So if you have a resampling of 1 min, a gap of 1,5 hours (90 buckets) and a limit of 1 hour (60 buckets) it will interpolate out 60 bucketes into the gap, and leave 30 NaNs. To me the reasonable implementation of "limt" will avoid the whole stretch, and interpolate none of the NaNs, since the reasonable interpretation of the limit is something like "if the gap is this large we have to little information".
The text was updated successfully, but these errors were encountered: