diff --git a/django_project/dcas/functions.py b/django_project/dcas/functions.py index a0c3917e..7c668645 100644 --- a/django_project/dcas/functions.py +++ b/django_project/dcas/functions.py @@ -10,6 +10,7 @@ from dcas.rules.rule_engine import DCASRuleEngine from dcas.rules.variables import DCASData from dcas.service import GrowthStageService +from dcas.utils import read_grid_crop_data def calculate_growth_stage( @@ -126,3 +127,145 @@ def calculate_message_output( var_name = f'message_{idx + 1}' if idx > 0 else 'message' row[var_name] = code return row + + +def get_last_message_dates( + farm_id: int, + min_allowed_date: pd.Timestamp, + historical_parquet_path: str +) -> pd.DataFrame: + """ + Get all messages for a given farm after min_allowed_date. + + :param farm_id: ID of the farm + :type farm_id: int + :param min_allowed_date: Minimum date for filtering messages + :type min_allowed_date: pd.Timestamp + :param historical_parquet_path: Path to historical message parquet file + :type historical_parquet_path: str + :return: Filtered DataFrame containing relevant messages + :rtype: pd.DataFrame + """ + # Read historical messages + historical_data = read_grid_crop_data(historical_parquet_path, [], []) + + # Filter messages for the given farm and min_allowed_date + filtered_data = historical_data[ + (historical_data['farm_id'] == farm_id) & + (historical_data['message_date'] >= min_allowed_date) + ].copy() + + return filtered_data + + +def get_last_message_date( + farm_messages: pd.DataFrame, + crop_id: int, + message_code: str +) -> pd.Timestamp: + """ + Get the last date a message code was sent for a specific crop. + + :param farm_messages: Pre-filtered messages for a farm + :type farm_messages: pd.DataFrame + :param crop_id: ID of the crop + :type crop_id: int + :param message_code: The message code to check + :type message_code: str + :return: Timestamp of the last message occurrence or None if not found + :rtype: pd.Timestamp or None + """ + # Further filter for the specific crop and message_code + filtered_data = farm_messages[ + (farm_messages['crop_id'] == crop_id) & + ( + (farm_messages['message'] == message_code) | + (farm_messages['message_2'] == message_code) | + (farm_messages['message_3'] == message_code) | + (farm_messages['message_4'] == message_code) | + (farm_messages['message_5'] == message_code) + ) + ] + + if filtered_data.empty: + return None + + # Return the most recent message date or None if empty + return filtered_data['message_date'].max() + + +def filter_messages_by_weeks( + df: pd.DataFrame, + historical_parquet_path: str, + weeks_constraint: int +) -> pd.DataFrame: + """ + Remove messages that have been sent within the last X weeks. + + :param df: DataFrame containing new messages to be sent + :type df: pd.DataFrame + :param historical_parquet_path: Path to historical message parquet file + :type historical_parquet_path: str + :param weeks_constraint: Number of weeks to check for duplicate messages + :type weeks_constraint: int + :return: DataFrame with duplicate messages removed + :rtype: pd.DataFrame + """ + if 'farm_id' not in df.columns: + df["farm_id"] = df["grid_id"] + + min_allowed_date = ( + pd.Timestamp.now() - pd.Timedelta(weeks=weeks_constraint) + ) + + # Load historical messages once for all farms in df + unique_farm_ids = df['farm_id'].unique() + historical_data = read_grid_crop_data(historical_parquet_path, [], []) + + # Filter historical messages for relevant farms and min_allowed_date + historical_data = historical_data[ + (historical_data['farm_id'].isin(unique_farm_ids)) & + (historical_data['message_date'] >= min_allowed_date) + ] + + # Create a lookup dictionary + message_lookup = {} + for _, row in historical_data.iterrows(): + farm_id, crop_id = row['farm_id'], row['crop_id'] + for message_column in [ + 'message', + 'message_2', + 'message_3', + 'message_4', + 'message_5' + ]: + message_code = row[message_column] + if pd.notna(message_code): + message_lookup[(farm_id, crop_id, message_code)] = max( + message_lookup.get( + (farm_id, crop_id, message_code), pd.Timestamp.min), + row['message_date'] + ) + + # Remove messages that have already been sent recently + for idx, row in df.iterrows(): + farm_id, crop_id = row['farm_id'], row['crop_id'] + for message_column in [ + 'message', + 'message_2', + 'message_3', + 'message_4', + 'message_5' + ]: + message_code = row[message_column] + + if pd.isna(message_code): + continue # Skip empty messages + + last_sent_date = message_lookup.get( + (farm_id, crop_id, message_code), None) + + if last_sent_date and last_sent_date >= min_allowed_date: + df.at[idx, message_column] = None # Remove duplicate message + + return df diff --git a/django_project/dcas/outputs.py b/django_project/dcas/outputs.py index da248d85..28fe336d 100644 --- a/django_project/dcas/outputs.py +++ b/django_project/dcas/outputs.py @@ -81,6 +81,12 @@ def grid_crop_data_path(self): """Return full path to grid with crop data.""" return self.grid_crop_data_dir_path + '/*.parquet' + @property + def farm_crop_data_path(self): + """Return full path to the farm crop data parquet file.""" + return self._get_directory_path( + self.DCAS_OUTPUT_DIR) + '/*.parquet' + @property def output_csv_file_path(self): """Return full path to output csv file.""" diff --git a/django_project/dcas/pipeline.py b/django_project/dcas/pipeline.py index d14df91c..f4d34e4e 100644 --- a/django_project/dcas/pipeline.py +++ b/django_project/dcas/pipeline.py @@ -17,7 +17,10 @@ from django.contrib.gis.db.models import Union from sqlalchemy import create_engine -from gap.models import FarmRegistry, Grid, CropGrowthStage +from gap.models import ( + FarmRegistry, Grid, CropGrowthStage, + Preferences +) from dcas.models import DCASConfig, DCASConfigCountry from dcas.partitions import ( process_partition_total_gdd, @@ -31,6 +34,7 @@ from dcas.queries import DataQuery from dcas.outputs import DCASPipelineOutput, OutputType from dcas.inputs import DCASPipelineInput +from dcas.functions import filter_messages_by_weeks from dcas.service import GrowthStageService @@ -321,6 +325,13 @@ def process_grid_crop_data(self): grid_crop_df_meta = self.data_query.grid_data_with_crop_meta( self.farm_registry_group_ids ) + # add farm_id + if "farm_id" not in grid_crop_df_meta.columns: + grid_crop_df_meta = grid_crop_df_meta.assign( + farm_id=pd.Series(dtype='Int64') + ) + # Ensure the column order in `meta` matches the expected DataFrame + grid_crop_df_meta = grid_crop_df_meta[grid_crop_df.columns] # Process gdd cumulative # for Total GDD, we use date from planting_date to request_date - 1 @@ -434,6 +445,8 @@ def process_farm_registry_data(self): self.duck_db_num_threads, meta=farm_df_meta ) + if Preferences.load().enable_message_filtering: + self.filter_message_output() self.data_output.save(OutputType.FARM_CROP_DATA, farm_df) @@ -465,6 +478,51 @@ def extract_csv_output(self): return file_path + def filter_message_output(self): + """Filter messages before extracting CSV.""" + # Read Parquet file (processed farm crop data) + df = dd.read_parquet(self.data_output.grid_crop_data_path) + + if "farm_id" not in df.columns: + df["farm_id"] = df["grid_id"] + + df["farm_id"] = df["farm_id"].astype(int) + df["crop_id"] = df["crop_id"].astype(int) + + meta = { + "farm_id": np.int64, + "crop_id": np.int64, + "growth_stage_id": np.int64, + "message": "object", + "message_2": "object", + "message_3": "object", + "message_4": "object", + "message_5": "object", + "message_date": "datetime64[ns]", + } + data_parquet_path = self.data_output._get_directory_path( + self.data_output.DCAS_OUTPUT_DIR + ) + '/iso_a3=*/year=*/month=*/day=*/*.parquet' + + # Apply message filtering + df = df.map_partitions( + filter_messages_by_weeks, + data_parquet_path, + 2, # Weeks constraint (default: 2 weeks) + meta=meta + ) + + parquet_path = self.data_output._get_directory_path( + self.data_output.DCAS_OUTPUT_DIR + ) + '/iso_a3=*/year=*/month=*/day=*/*.parquet' + + # Save the filtered Parquet file (overwrite previous Parquet) + df.to_parquet( + parquet_path, + write_index=False, + storage_options=self.data_output.s3_options + ) + def run(self): """Run data pipeline.""" self.setup() @@ -472,6 +530,11 @@ def run(self): self.data_collection() self.process_grid_crop_data() self.process_farm_registry_data() + + self.extract_csv_output() + + self.cleanup_gdd_matrix() + print(f'Finished {time.time() - start_time} seconds.') def cleanup(self): diff --git a/django_project/dcas/tests/test_pipeline.py b/django_project/dcas/tests/test_pipeline.py index 7f0e44c4..fbbb4123 100644 --- a/django_project/dcas/tests/test_pipeline.py +++ b/django_project/dcas/tests/test_pipeline.py @@ -119,6 +119,33 @@ def test_process_farm_registry_data(self): pipeline.data_output.save.assert_called_once() conn_engine.dispose() + @patch("dask.dataframe.read_parquet", return_value=MagicMock()) + @patch("dask.dataframe.DataFrame.to_parquet") # Mock to_parquet + def test_filter_message_output(self, mock_to_parquet, mock_read_parquet): + """Ensure `df.to_parquet()` is executed for Codecov coverage.""" + test_data = { + "grid_id": [1, 2], + "crop_id": [100, 200], + "message": ["msg1", "msg2"], + } + expected_df = dd.from_pandas(pd.DataFrame(test_data), npartitions=1) + mock_read_parquet.return_value = expected_df # Mock read_parquet + + # Ensure `to_parquet()` is a callable mock + mock_to_parquet.return_value = None + + # Initialize Pipeline + pipeline = DCASDataPipeline([1], "2025-01-01") + + pipeline.data_output.setup() + + # Call Function (No Assertions Needed) + pipeline.filter_message_output() + expected_df.compute() + + # Ensure `df.to_parquet()` was called + mock_to_parquet.assert_called_once() + # class DCASAllPipelineTest(TransactionTestCase, BasePipelineTest): # """Test to run the pipeline with committed transaction.""" diff --git a/django_project/dcas/tests/test_pipeline_functions.py b/django_project/dcas/tests/test_pipeline_functions.py index 7c1b5c7b..e339a29c 100644 --- a/django_project/dcas/tests/test_pipeline_functions.py +++ b/django_project/dcas/tests/test_pipeline_functions.py @@ -8,9 +8,13 @@ import numpy as np import pandas as pd from mock import patch +from datetime import datetime, timedelta from dcas.tests.base import DCASPipelineBaseTest -from dcas.functions import calculate_growth_stage +from dcas.functions import ( + calculate_growth_stage, get_last_message_date, + filter_messages_by_weeks +) def set_cache_dummy(cache_key, growth_stage_matrix, timeout): @@ -217,3 +221,176 @@ def test_calculate_growth_stage_na_value(self, mock_cache): self.assertIn('growth_stage_start_date', row) self.assertEqual(row['growth_stage_id'], 13) self.assertEqual(row['growth_stage_start_date'], 125) + + @patch("dcas.functions.read_grid_crop_data") + def test_get_last_message_date_exists(self, mock_read_grid_crop_data): + """ + Test when a message exists in history. + + It should return the latest timestamp among all message columns. + """ + now = datetime.now() + mock_data = pd.DataFrame({ + 'farm_id': [1, 1, 1, 2, 2, 3], + 'crop_id': [100, 100, 100, 101, 101, 102], + 'message': ['MSG1', 'MSG2', 'MSG1', 'MSG3', 'MSG1', 'MSG4'], + 'message_2': [None, 'MSG1', None, None, 'MSG3', None], + 'message_3': [None, None, 'MSG1', None, None, None], + 'message_4': [None, None, None, None, None, None], + 'message_5': [None, None, None, 'MSG1', None, 'MSG4'], + 'message_date': [ + now - timedelta(days=15), # MSG1 - Oldest farm 1, crop 100 + now - timedelta(days=10), # MSG2 + now - timedelta(days=5), # MSG1 - More recent + now - timedelta(days=12), # MSG3 + now - timedelta(days=3), # MSG1 - Most recent + now - timedelta(days=20) # MSG4 - Oldest + ] + }) + + # Simulate `read_grid_crop_data` returning the dataset + mock_read_grid_crop_data.return_value = mock_data + + # Pre-filter messages for farm 2 + farm_messages_farm_2 = mock_data[mock_data["farm_id"] == 2] + + # Latest MSG1 for farm 2, crop 101 should be at index 4 (3 days ago) + result = get_last_message_date(farm_messages_farm_2, 101, "MSG1") + self.assertEqual(result, mock_data['message_date'].iloc[4]) + + # Latest MSG3 for farm 2, crop 101 should be at index 3 (12 days ago) + result = get_last_message_date(farm_messages_farm_2, 101, "MSG3") + self.assertEqual(result, mock_data['message_date'].iloc[4]) + + # Pre-filter messages for farm 1 + farm_messages_farm_1 = mock_data[mock_data["farm_id"] == 1] + + # Latest MSG2 for farm 1, crop 100 should be at index 1 (10 days ago) + result = get_last_message_date(farm_messages_farm_1, 100, "MSG2") + self.assertEqual(result, mock_data['message_date'].iloc[1]) + + # Latest MSG1 for farm 1, crop 100 should be at index 2 (5 days ago) + result = get_last_message_date(farm_messages_farm_1, 100, "MSG1") + self.assertEqual(result, mock_data['message_date'].iloc[2]) + + # MSG5 does not exist in the dataset for farm 2, crop 101 + result = get_last_message_date(farm_messages_farm_2, 101, "MSG5") + self.assertIsNone(result) + + @patch("dcas.functions.read_grid_crop_data") + def test_get_last_message_date_not_exists(self, mock_read_grid_crop_data): + """Test when the message does not exist in history.""" + now = pd.Timestamp(datetime.now()) + + # Mock DataFrame with different messages, but not "MSG1" + mock_data = pd.DataFrame({ + 'farm_id': [1, 1, 2], + 'crop_id': [100, 100, 101], + 'message': ['MSG2', 'MSG3', 'MSG4'], + 'message_2': ['MSG5', None, None], # Different message + 'message_3': [None, 'MSG6', None], # Different message + 'message_4': [None, None, 'MSG7'], # Different message + 'message_5': [None, None, None], # No relevant messages + 'message_date': [ + now - timedelta(days=10), # MSG2 + now - timedelta(days=5), # MSG3 + now - timedelta(days=3) # MSG4 + ] + }) + + mock_read_grid_crop_data.return_value = mock_data + + # Attempting to get "MSG1", which is not present in the history + result = get_last_message_date(mock_data, 100, "MSG1") + + # Ensure that the function correctly returns None + self.assertIsNone(result) + + @patch("dcas.functions.read_grid_crop_data") + def test_get_last_message_date_multiple_messages( + self, mock_read_grid_crop_data + ): + """ + Test when the same message appears multiple times. + + It should return the most recent timestamp. + """ + # Mock DataFrame representing historical messages + mock_data = pd.DataFrame({ + 'farm_id': [1, 1, 1], + 'crop_id': [100, 100, 100], + 'message': ['MSG1', 'MSG1', 'MSG1'], + 'message_2': [None, None, None], + 'message_3': [None, None, None], + 'message_4': [None, None, None], + 'message_5': [None, None, None], + 'message_date': [ + pd.Timestamp(datetime.now() - timedelta(days=15)), # Oldest + pd.Timestamp(datetime.now() - timedelta(days=7)), # Middle + pd.Timestamp(datetime.now() - timedelta(days=2)) # recent + ] + }) + + # Mock return value for read_grid_crop_data + mock_read_grid_crop_data.return_value = mock_data + + # Pre-filter data to simulate getting farm messages + farm_messages = mock_data[mock_data["farm_id"] == 1] + + # Call function with the updated parameters + result = get_last_message_date(farm_messages, 100, "MSG1") + + # Expected result: Most recent message date + expected_result = mock_data['message_date'].max() + + # Assertions + self.assertEqual( + result, + expected_result, + f"Expected {expected_result}, but got {result}" + ) + + @patch("dcas.functions.read_grid_crop_data") + def test_filter_messages_by_weeks(self, mock_read_grid_crop_data): + """Test filtering messages based on the time constraint (weeks).""" + test_weeks = 2 # Remove messages sent within the last 2 weeks + current_date = pd.Timestamp(datetime.now()) # Fixed datetime + + # Mock input DataFrame (new messages) + df = pd.DataFrame({ + 'farm_id': [1, 2, 3], + 'crop_id': [100, 200, 300], + 'message': ['MSG1', 'MSG2', 'MSG3'], + 'message_2': [None, None, None], + 'message_3': [None, None, None], + 'message_4': [None, None, None], + 'message_5': [None, None, None], + }) + + # Mock historical messages (Parquet data) + historical_df = pd.DataFrame({ + 'farm_id': [1, 2], # Only farms 1 and 2 have historical messages + 'crop_id': [100, 200], + 'message': ['MSG1', 'MSG2'], + 'message_2': [None, None], + 'message_3': [None, None], + 'message_4': [None, None], + 'message_5': [None, None], + 'message_date': [ + current_date - timedelta(weeks=1), # Recent + current_date - timedelta(weeks=3)], # Older + }) + + # Mock `read_grid_crop_data` to return the historical messages + mock_read_grid_crop_data.return_value = historical_df + + # Run function + filtered_df = filter_messages_by_weeks(df, "/fake/path", test_weeks) + + # Assertions + self.assertIsNone(filtered_df.loc[0, 'message']) + self.assertEqual(filtered_df.loc[1, 'message'], 'MSG2') + self.assertEqual(filtered_df.loc[2, 'message'], 'MSG3') + + # Ensure `read_grid_crop_data` was called once + mock_read_grid_crop_data.assert_called_once_with("/fake/path", [], []) diff --git a/django_project/gap/models/preferences.py b/django_project/gap/models/preferences.py index b0963dfe..f792f496 100644 --- a/django_project/gap/models/preferences.py +++ b/django_project/gap/models/preferences.py @@ -77,6 +77,7 @@ def default_dcas_config() -> dict: 'farm_npartitions': None, 'grid_crop_npartitions': None, 'farm_registries': [], + 'enable_message_filtering': False, 'store_csv_to_minio': False, 'store_csv_to_sftp': False, } @@ -200,6 +201,11 @@ class Preferences(SingletonModel): ) ) + @property + def enable_message_filtering(self) -> bool: + """Check if message filtering should be enabled.""" + return self.dcas_config.get('enable_message_filtering', True) + class Meta: # noqa: D106 verbose_name_plural = "preferences"