From 2c91be144c52ae1e2740c72adc8792ed27fbd53f Mon Sep 17 00:00:00 2001 From: Marco Angheben Date: Wed, 25 Sep 2024 15:33:08 +0200 Subject: [PATCH] added metro container to docker compose; added main module for road_weather calculation; further development on creating observations csv and metro ws invocation - issue #31 --- pollution_v2/docker-compose.yaml | 7 ++ .../src/common/data_model/road_weather.py | 64 +++++++++++++++++-- pollution_v2/src/common/manager/station.py | 4 ++ pollution_v2/src/common/model/helper.py | 28 +++++++- pollution_v2/src/common/settings.py | 7 +- pollution_v2/src/config/road_weather.yaml | 4 ++ pollution_v2/src/main_road_weather.py | 63 ++++++++++++++++++ .../src/road_weather/manager/_forecast.py | 4 ++ .../src/road_weather/manager/road_weather.py | 13 ++-- .../road_weather/model/road_weather_model.py | 59 +++++++++++++++-- pollution_v2/src/tmp/.gitignore | 2 + 11 files changed, 238 insertions(+), 17 deletions(-) create mode 100644 pollution_v2/src/main_road_weather.py create mode 100644 pollution_v2/src/tmp/.gitignore diff --git a/pollution_v2/docker-compose.yaml b/pollution_v2/docker-compose.yaml index 0508aef..4e4b633 100644 --- a/pollution_v2/docker-compose.yaml +++ b/pollution_v2/docker-compose.yaml @@ -90,6 +90,9 @@ x-airflow-common: AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_HOST: 'redis' AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_PORT: 6379 AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_DB: 10 + AIRFLOW_VAR_DAG_POLLUTION_EXECUTION_CRONTAB: 0 4 * * * + AIRFLOW_VAR_DAG_VALIDATION_EXECUTION_CRONTAB: 0 2 * * * + AIRFLOW_VAR_ROAD_WEATHER_CONFIG_FILE: "dags/config/road_weather.yaml" AIRFLOW_VAR_DATATYPE_PREFIX: "" # AIRFLOW__CORE__REMOTE_LOGGING: '' # AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: '' @@ -304,5 +307,9 @@ services: airflow-init: condition: service_completed_successfully + metro: + image: road_weather:latest + restart: always + volumes: postgres-db-volume: diff --git a/pollution_v2/src/common/data_model/road_weather.py b/pollution_v2/src/common/data_model/road_weather.py index 4dc7fef..5709133 100644 --- a/pollution_v2/src/common/data_model/road_weather.py +++ b/pollution_v2/src/common/data_model/road_weather.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from datetime import datetime -from typing import Optional +from typing import Optional, List, Iterator, Dict from common.data_model.common import MeasureCollection, Measure from common.data_model.entry import GenericEntry @@ -17,10 +17,11 @@ class RoadWeatherObservationMeasureType(Enum): + PREC_QTA = "prec_qta" + STATO_METEO = "stato_meteo" TEMP_ARIA = "temp_aria" - TEMP_SOULO = "temp_suolo" TEMP_RUGIADA = "temp_rugiada" - PREC_QTA = "prec_qta" + TEMP_SOULO = "temp_suolo" VENTO_VEL = "vento_vel" @@ -42,13 +43,14 @@ class RoadWeatherForecastMeasureType(Enum): class RoadWeatherObservationEntry(GenericEntry): def __init__(self, station: Station, valid_time: datetime, temp_aria: float, temp_suolo: float, - temp_rugiada: float, prec_qta: float, vento_vel: float, period: Optional[int]): + temp_rugiada: float, prec_qta: float, vento_vel: float, stato_meteo: int, period: Optional[int]): super().__init__(station, valid_time, period) self.temp_aria = temp_aria self.temp_suolo = temp_suolo self.temp_rugiada = temp_rugiada self.prec_qta = prec_qta self.vento_vel = vento_vel + self.stato_meteo = stato_meteo class RoadWeatherObservationMeasure(Measure): @@ -65,6 +67,60 @@ class RoadWeatherObservationMeasureCollection(MeasureCollection[RoadWeatherObser """ pass + def get_entries(self) -> List[RoadWeatherObservationEntry]: + """ + Build and retrieve the list of traffic entry from the available measures + + :return: a list of traffic entries + """ + return list(self._get_entries_iterator()) + + def _get_entries_iterator(self) -> Iterator[RoadWeatherObservationEntry]: + """ + Build and retrieve the iterator for list of observation entries from the available measures + + :return: an iterator of traffic entries + """ + for station_dict in self._build_entries_dictionary().values(): + for entry in station_dict.values(): + yield entry + + def _build_entries_dictionary(self) -> Dict[str, Dict[datetime, RoadWeatherObservationEntry]]: + # A temporary dictionary used for faster aggregation of the results + # The dictionary will have the following structure + # StationCode -> (measure valid time -> (RoadWeatherObservationEntry)) + tmp: Dict[str, Dict[datetime, dict]] = {} + stations: Dict[str, Station] = {} + for measure in self.measures: + if measure.station.code not in stations: + stations[measure.station.code] = measure.station + if measure.station.code not in tmp: + tmp[measure.station.code] = {} + if measure.valid_time not in tmp[measure.station.code]: + tmp[measure.station.code][measure.valid_time] = {} + tmp[measure.station.code][measure.valid_time][measure.data_type.name] = measure.value + + result: Dict[str, Dict[datetime, RoadWeatherObservationEntry]] = {} + for group_by_station in tmp: + if group_by_station not in result: + result[group_by_station] = {} + for group_by_time in tmp[group_by_station]: + entry = tmp[group_by_station][group_by_time] + result[group_by_station][group_by_time] = RoadWeatherObservationEntry( + station=stations[group_by_station], + valid_time=group_by_time, + # TODO check + period=-1, + temp_aria=entry['temp_aria'], + temp_suolo=entry['temp_suolo'], + temp_rugiada=entry['temp_rugiada'], + prec_qta=entry['prec_qta'], + vento_vel=entry['vento_vel'], + stato_meteo=entry['stato_meteo'] + ) + + return result + @dataclass class RoadWeatherForecastEntry(GenericEntry): diff --git a/pollution_v2/src/common/manager/station.py b/pollution_v2/src/common/manager/station.py index 88022ea..b62b62b 100644 --- a/pollution_v2/src/common/manager/station.py +++ b/pollution_v2/src/common/manager/station.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: NOI Techpark +# +# SPDX-License-Identifier: AGPL-3.0-or-later + from abc import ABC from typing import List, Optional, Type diff --git a/pollution_v2/src/common/model/helper.py b/pollution_v2/src/common/model/helper.py index 1783478..68b2279 100644 --- a/pollution_v2/src/common/model/helper.py +++ b/pollution_v2/src/common/model/helper.py @@ -9,7 +9,7 @@ import json import logging -from common.data_model import TrafficSensorStation +from common.data_model import TrafficSensorStation, RoadWeatherObservationEntry from common.data_model.history import HistoryEntry from common.data_model.traffic import TrafficEntry @@ -81,6 +81,32 @@ def get_traffic_dataframe(traffic_entries: Iterable[TrafficEntry], return pd.DataFrame(temp) + @staticmethod + def get_observation_dataframe(observation_entries: Iterable[RoadWeatherObservationEntry]) -> pd.DataFrame: + """ + Get a dataframe from the given observation entries. The resulting dataframe will have the following columns: + time,station_code,prec_qta,stato_meteo,temp_aria,temp_rugiada,temp_suolo,vento_vel + + :param observation_entries: the observation entries + :return: the observation dataframe + """ + temp = [] + for entry in observation_entries: + temp.append({ + "date": entry.valid_time.date().isoformat(), + "time": entry.valid_time.time().isoformat(), + "Location": entry.station.id_strada, + "Station": entry.station.id_stazione, + "Lane": entry.station.id_corsia, + "Category": entry.vehicle_class.value, + "Transits": entry.nr_of_vehicles, + "Speed": entry.average_speed, + "Period": entry.period, + "KM": km + }) + + return pd.DataFrame(temp) + @staticmethod def get_traffic_dataframe_for_validation(traffic_entries: Iterable[TrafficEntry], date: date) -> pd.DataFrame: """ diff --git a/pollution_v2/src/common/settings.py b/pollution_v2/src/common/settings.py index eca718b..25932c3 100644 --- a/pollution_v2/src/common/settings.py +++ b/pollution_v2/src/common/settings.py @@ -71,5 +71,10 @@ # use it not empty to add a test prefix to datatype DATATYPE_PREFIX = Variable.get("DATATYPE_PREFIX", "") -ROAD_WEATHER_CONFIG_FILE = Variable.get("ROAD_WEATHER_CONFIG_FILE", "src/config/road_weather.yaml") +ROAD_WEATHER_CONFIG_FILE = Variable.get("ROAD_WEATHER_CONFIG_FILE", "config/road_weather.yaml") VALIDATOR_CONFIG_FILE = Variable.get("VALIDATOR_CONFIG_FILE", "src/config/validator.yaml") + +MAIN_DIR = os.getenv("MAIN_DIR", ".") +TMP_DIR = f'{MAIN_DIR}/tmp' +if not os.path.exists(TMP_DIR): + os.makedirs(TMP_DIR) diff --git a/pollution_v2/src/config/road_weather.yaml b/pollution_v2/src/config/road_weather.yaml index 4d881d7..639a518 100644 --- a/pollution_v2/src/config/road_weather.yaml +++ b/pollution_v2/src/config/road_weather.yaml @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: NOI Techpark +# +# SPDX-License-Identifier: AGPL-3.0-or-later + whitelist: - 2021 - 2022 diff --git a/pollution_v2/src/main_road_weather.py b/pollution_v2/src/main_road_weather.py new file mode 100644 index 0000000..c363436 --- /dev/null +++ b/pollution_v2/src/main_road_weather.py @@ -0,0 +1,63 @@ +# SPDX-FileCopyrightText: NOI Techpark +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +from __future__ import absolute_import, annotations + +import argparse +import logging.config +from datetime import datetime +from typing import Optional + +import dateutil.parser +import sentry_sdk +from redis.client import Redis + +from common.cache.computation_checkpoint import ComputationCheckpointCache +from common.connector.collector import ConnectorCollector +from common.data_model.common import Provenance +from common.logging import get_logging_configuration +from common.settings import (DEFAULT_TIMEZONE, SENTRY_SAMPLE_RATE, ODH_MINIMUM_STARTING_DATE, + COMPUTATION_CHECKPOINT_REDIS_HOST, COMPUTATION_CHECKPOINT_REDIS_PORT, PROVENANCE_ID, + PROVENANCE_LINEAGE, PROVENANCE_NAME_VALIDATION, PROVENANCE_VERSION, + COMPUTATION_CHECKPOINT_REDIS_DB, ODH_COMPUTATION_BATCH_SIZE_VALIDATION) +from road_weather.manager.road_weather import RoadWeatherManager +from validator.manager.validation import ValidationManager + +logging.config.dictConfig(get_logging_configuration("pollution_v2")) + +logger = logging.getLogger("pollution_v2.main_road_weather") + +sentry_sdk.init( + traces_sample_rate=SENTRY_SAMPLE_RATE, + integrations=[] +) + + +# not used anymore after refactoring from Celery to Airflow +def compute_data() -> None: + """ + Start the computation of a batch of traffic data measures to be validated. As starting date for the batch is used + the latest validated measure available on the ODH, if no validated measures are available min_from_date is used. + """ + + collector_connector = ConnectorCollector.build_from_env() + provenance = Provenance(PROVENANCE_ID, PROVENANCE_LINEAGE, PROVENANCE_NAME_VALIDATION, PROVENANCE_VERSION) + manager = RoadWeatherManager(collector_connector, provenance) + for station in manager.get_station_list(): + manager.run_computation_for_single_station(station) + + +if __name__ == "__main__": + + arg_parser = argparse.ArgumentParser(description="Manually run a road weather forecast") + + arg_parser.add_argument("--run-async", action="store_true", help="If set it run the task in the celery cluster") + + compute_data() + '''if args.run_async: + task: AsyncResult = compute_data.delay(min_from_date=from_date, max_to_date=to_date) + logger.info(f"Scheduled async pollution computation. Task ID: [{task.task_id}]") + else: + logger.info("Staring pollution computation") + compute_data(min_from_date=from_date, max_to_date=to_date)''' diff --git a/pollution_v2/src/road_weather/manager/_forecast.py b/pollution_v2/src/road_weather/manager/_forecast.py index 131c67a..11ce6ed 100644 --- a/pollution_v2/src/road_weather/manager/_forecast.py +++ b/pollution_v2/src/road_weather/manager/_forecast.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: NOI Techpark +# +# SPDX-License-Identifier: AGPL-3.0-or-later + #!/usr/bin/env python3 # -*- coding: utf-8 -*- diff --git a/pollution_v2/src/road_weather/manager/road_weather.py b/pollution_v2/src/road_weather/manager/road_weather.py index 6521775..44dc48e 100644 --- a/pollution_v2/src/road_weather/manager/road_weather.py +++ b/pollution_v2/src/road_weather/manager/road_weather.py @@ -15,7 +15,7 @@ from common.data_model import TrafficSensorStation, Station, RoadWeatherObservationMeasureCollection, Provenance from common.data_model.entry import GenericEntry from common.manager.station import StationManager -from common.settings import ROAD_WEATHER_CONFIG_FILE +from common.settings import ROAD_WEATHER_CONFIG_FILE, TMP_DIR from road_weather.manager._forecast import Forecast from road_weather.model.road_weather_model import RoadWeatherModel @@ -86,10 +86,10 @@ def _download_forecast_data(self, traffic_station: Station) -> Tuple[str, str]: forecast.interpolate_hourly() forecast.negative_radiation_filter() roadcast_start = forecast.start - print('* forecast - XML processed correctly') - forecast_filename = f"data/forecast/forecast_{wrf_station_code}_{roadcast_start}.xml" + logger.info('forecast - XML processed correctly') + forecast_filename = f"{TMP_DIR}/forecast_{wrf_station_code}_{roadcast_start}.xml" forecast.to_xml(forecast_filename) - print('* forecast - XML saved in ', forecast_filename) + logger.info(f'forecast - XML saved in {forecast_filename} ') return forecast_filename, roadcast_start def _compute_observation_start_end_dates(self, forecast_start: str) -> Tuple[datetime, datetime]: @@ -107,8 +107,7 @@ def _compute_observation_start_end_dates(self, forecast_start: str) -> Tuple[dat end_date = forecast_start + timedelta(hours=8) return start_date, end_date - - def _download_data_and_compute(self, station: TrafficSensorStation) -> List[GenericEntry]: + def _download_data_and_compute(self, station: Station) -> List[GenericEntry]: if not station: logger.error(f"Cannot compute road condition on empty station") @@ -135,7 +134,7 @@ def _download_data_and_compute(self, station: TrafficSensorStation) -> List[Gene return [] - def run_computation_for_single_station(self, station: TrafficSensorStation) -> None: + def run_computation_for_single_station(self, station: Station) -> None: """ Run the computation for a single station. diff --git a/pollution_v2/src/road_weather/model/road_weather_model.py b/pollution_v2/src/road_weather/model/road_weather_model.py index e29daf2..be6deaa 100644 --- a/pollution_v2/src/road_weather/model/road_weather_model.py +++ b/pollution_v2/src/road_weather/model/road_weather_model.py @@ -6,11 +6,18 @@ import datetime import logging +import time +import urllib from typing import List from common.data_model import TrafficSensorStation, RoadWeatherObservationMeasureCollection from common.model.helper import ModelHelper +import urllib.request +import mimetypes + +from common.settings import TMP_DIR + logger = logging.getLogger("pollution_v2.road_weather.model.road_weather_model") @@ -19,6 +26,24 @@ class RoadWeatherModel: The model for computing road condition. """ + @classmethod + def create_multipart_formdata(cls, files): + + boundary = '----------Boundary' + lines = [] + for filename in files: + content_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream' + lines.append(f'--{boundary}'.encode()) + lines.append(f'Content-Disposition: form-data; name="files"; filename="{filename}"'.encode()) + lines.append(f'Content-Type: {content_type}'.encode()) + lines.append(''.encode()) + with open(filename, 'rb') as f: + lines.append(f.read()) + lines.append(f'--{boundary}--'.encode()) + lines.append(''.encode()) + body = b'\r\n'.join(lines) + return body, boundary + def compute_data(self, observation: RoadWeatherObservationMeasureCollection, forecast_filename: str, # TODO: change with RoadWeatherForecastMeasureCollection forecast_start: str, # TODO: check if needed @@ -32,12 +57,38 @@ def compute_data(self, observation: RoadWeatherObservationMeasureCollection, :return: The list of road conditions """ - logger.info(f"Computing road condition for station [{station.code}]") - logger.info(f"Observation measures available: {len(observation.measures)}") - logger.info(f"Forecast file name: {forecast_filename}") + logger.info(f"Creating observations file from {len(observation.measures)} measures") + + entries = observation.get_entries() + observation_filename = f"{TMP_DIR}/observations_{round(time.time() * 1000)}.csv" + with open(observation_filename, 'a') as tmp_csv: + tmp_csv.write('"time","station_code","prec_qta","stato_meteo",' + '"temp_aria","temp_rugiada","temp_suolo","vento_vel"\n') + for entry in entries: + tmp_csv.write(f'{entry.valid_time.strftime("%Y-%m-%d %H:%M:%S")},{entry.station.code},{entry.prec_qta},' + f'{entry.stato_meteo},{entry.temp_aria},{entry.temp_rugiada},{entry.temp_suolo},' + f'{entry.vento_vel}\n') + + logger.info(f"Computing road condition for station [{station.code}] with observations from " + f"{observation_filename} and forecast from {forecast_filename}") logger.info(f"Forecast start: {forecast_start}") - # TODO: convert to dataframe and compute road condition + # TODO cablato + url = "http://metro:80/predict/?station_code=101" + + # List of files to upload + files_to_upload = [forecast_filename, observation_filename] + + # Create multipart form data + body, boundary = RoadWeatherModel.create_multipart_formdata(files_to_upload) + + # Create a request object + req = urllib.request.Request(url, data=body) + req.add_header('Content-Type', f'multipart/form-data; boundary={boundary}') + + with urllib.request.urlopen(req) as response: + response_data = response.read() + print(response_data.decode('utf-8')) # Print the response from the server # validation_data_types = {str(measure.data_type) for measure in validation.measures} # traffic_data_types = {str(measure.data_type) for measure in traffic.measures} diff --git a/pollution_v2/src/tmp/.gitignore b/pollution_v2/src/tmp/.gitignore new file mode 100644 index 0000000..27b4278 --- /dev/null +++ b/pollution_v2/src/tmp/.gitignore @@ -0,0 +1,2 @@ +*.xml +*.csv