-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat message management #378
base: main
Are you sure you want to change the base?
Changes from all commits
6ba7bf9
09b3d2d
25935e8
31f4be1
47cf8a8
fa0cf4d
bcef149
9676e73
ddc5609
67f971d
d4e6e46
8bf0623
1d6760e
399691d
ee3e5db
cb680a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
from dcas.rules.rule_engine import DCASRuleEngine | ||
from dcas.rules.variables import DCASData | ||
from dcas.service import GrowthStageService | ||
from dcas.utils import read_grid_crop_data | ||
|
||
|
||
def calculate_growth_stage( | ||
|
@@ -126,3 +127,100 @@ | |
var_name = f'message_{idx + 1}' if idx > 0 else 'message' | ||
row[var_name] = code | ||
return row | ||
|
||
|
||
def get_last_message_date( | ||
farm_id: int, | ||
crop_id: int, | ||
message_code: str, | ||
historical_parquet_path: str | ||
) -> pd.Timestamp: | ||
""" | ||
Get the last date a message code was sent for a specific farm and crop. | ||
|
||
:param farm_id: ID of the farm | ||
:type farm_id: int | ||
:param crop_id: ID of the crop | ||
:type crop_id: int | ||
:param message_code: The message code to check | ||
:type message_code: str | ||
:param historical_parquet_path: Path to the historical message parquet file | ||
:type historical_parquet_path: str | ||
:return: Timestamp of the last message occurrence or None if not found | ||
:rtype: pd.Timestamp or None | ||
""" | ||
# Read historical messages | ||
historical_data = read_grid_crop_data( | ||
historical_parquet_path, [], [crop_id], | ||
) | ||
|
||
# Filter messages for the given farm, crop, and message code | ||
filtered_data = historical_data[ | ||
(historical_data['farm_id'] == farm_id) & | ||
(historical_data['crop_id'] == crop_id) & | ||
( | ||
(historical_data['message'] == message_code) | | ||
(historical_data['message_2'] == message_code) | | ||
(historical_data['message_3'] == message_code) | | ||
(historical_data['message_4'] == message_code) | | ||
(historical_data['message_5'] == message_code) | ||
) | ||
] | ||
|
||
# If no record exists, return None | ||
if filtered_data.empty: | ||
return None | ||
|
||
# Return the most recent message date | ||
return filtered_data['message_date'].max() | ||
|
||
|
||
def filter_messages_by_weeks( | ||
df: pd.DataFrame, | ||
historical_parquet_path: str, | ||
weeks_constraint: int | ||
) -> pd.DataFrame: | ||
""" | ||
Remove messages that have been sent within the last X weeks. | ||
|
||
:param df: DataFrame containing new messages to be sent | ||
:type df: pd.DataFrame | ||
:param historical_parquet_path: Path to historical message parquet file | ||
:type historical_parquet_path: str | ||
:param weeks_constraint: Number of weeks to check for duplicate messages | ||
:type weeks_constraint: int | ||
:return: DataFrame with duplicate messages removed | ||
:rtype: pd.DataFrame | ||
""" | ||
print("Available columns in df:", df.columns) # Debugging line | ||
|
||
if 'farm_id' not in df.columns: | ||
df["farm_id"] = df["grid_id"] | ||
# id' is missing in the DataFrame!") | ||
min_allowed_date = ( | ||
pd.Timestamp.now() - pd.Timedelta(weeks=weeks_constraint) | ||
) | ||
|
||
for idx, row in df.iterrows(): | ||
for message_column in [ | ||
'message', | ||
'message_2', | ||
'message_3', | ||
'message_4', | ||
'message_5' | ||
]: | ||
message_code = row[message_column] | ||
|
||
if pd.isna(message_code): | ||
continue # Skip empty messages | ||
|
||
last_sent_date = get_last_message_date( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we query once to get all messages for given a farm from min_allowed_date? then do the check against this list. This is to avoid too many query by each code and farm. |
||
row['farm_id'], | ||
row['crop_id'], | ||
message_code, | ||
historical_parquet_path) | ||
|
||
if last_sent_date and last_sent_date >= min_allowed_date: | ||
df.at[idx, message_column] = None # Remove duplicate message | ||
|
||
return df |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
from dcas.queries import DataQuery | ||
from dcas.outputs import DCASPipelineOutput, OutputType | ||
from dcas.inputs import DCASPipelineInput | ||
from dcas.functions import filter_messages_by_weeks | ||
from dcas.service import GrowthStageService | ||
|
||
|
||
|
@@ -321,6 +322,13 @@ | |
grid_crop_df_meta = self.data_query.grid_data_with_crop_meta( | ||
self.farm_registry_group_ids | ||
) | ||
# add farm_id | ||
if "farm_id" not in grid_crop_df_meta.columns: | ||
grid_crop_df_meta = grid_crop_df_meta.assign( | ||
farm_id=pd.Series(dtype='Int64') | ||
) | ||
# Ensure the column order in `meta` matches the expected DataFrame | ||
grid_crop_df_meta = grid_crop_df_meta[grid_crop_df.columns] | ||
|
||
# Process gdd cumulative | ||
# for Total GDD, we use date from planting_date to request_date - 1 | ||
|
@@ -465,13 +473,62 @@ | |
|
||
return file_path | ||
|
||
def filter_message_output(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be done in process_farm_registry_data, right before writing to final parquet file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also need to add a config from Preferences whether to run the message checks or not. |
||
"""Filter messages before extracting CSV.""" | ||
# Read Parquet file (processed farm crop data) | ||
df = dd.read_parquet(self.data_output.grid_crop_data_path) | ||
|
||
if "farm_id" not in df.columns: | ||
df["farm_id"] = df["grid_id"] | ||
|
||
df["farm_id"] = df["farm_id"].astype(int) | ||
df["crop_id"] = df["crop_id"].astype(int) | ||
|
||
meta = { | ||
"farm_id": np.int64, | ||
"crop_id": np.int64, | ||
"growth_stage_id": np.int64, | ||
"message": "object", | ||
"message_2": "object", | ||
"message_3": "object", | ||
"message_4": "object", | ||
"message_5": "object", | ||
"message_date": "datetime64[ns]", | ||
} | ||
|
||
# Apply message filtering | ||
df = df.map_partitions( | ||
filter_messages_by_weeks, | ||
self.data_output.grid_crop_data_path, | ||
2, # Weeks constraint (default: 2 weeks) | ||
meta=meta | ||
) | ||
|
||
parquet_path = self.data_output._get_directory_path( | ||
self.data_output.DCAS_OUTPUT_DIR | ||
) + '/iso_a3=*/year=*/month=*/day=*/*.parquet' | ||
|
||
# Save the filtered Parquet file (overwrite previous Parquet) | ||
df.to_parquet( | ||
parquet_path, | ||
write_index=False, | ||
storage_options=self.data_output.s3_options | ||
) | ||
|
||
def run(self): | ||
"""Run data pipeline.""" | ||
self.setup() | ||
start_time = time.time() | ||
self.data_collection() | ||
self.process_grid_crop_data() | ||
self.process_farm_registry_data() | ||
|
||
self.filter_message_output() | ||
|
||
self.extract_csv_output() | ||
|
||
self.cleanup_gdd_matrix() | ||
|
||
print(f'Finished {time.time() - start_time} seconds.') | ||
|
||
def cleanup(self): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should read to parquet file in minio with date filter, similar to
https://github.com/kartoza/tomorrownow_gap/blob/main/django_project/dcas/queries.py#L287