Skip to content

Commit

Permalink
Merge branch '31-connect-to-road-weather-data-on-odh' into 'main'
Browse files Browse the repository at this point in the history
Resolve "Connect to road weather data on ODH"

Closes noi-techpark#31

See merge request u-hopper/projects/industrial/open-data-hub-bz/bdp-elaborations!23
  • Loading branch information
Marco Angheben committed Sep 30, 2024
2 parents 19209dc + a2a3c1a commit 96d2ec3
Show file tree
Hide file tree
Showing 20 changed files with 386 additions and 110 deletions.
61 changes: 32 additions & 29 deletions pollution_v2/README.md

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions pollution_v2/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ x-airflow-common:
AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_HOST: 'redis'
AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_PORT: 6379
AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_DB: 10
AIRFLOW_VAR_DAG_POLLUTION_EXECUTION_CRONTAB: 0 4 * * *
AIRFLOW_VAR_DAG_VALIDATION_EXECUTION_CRONTAB: 0 2 * * *
AIRFLOW_VAR_ROAD_WEATHER_CONFIG_FILE: "dags/config/road_weather.yaml"
AIRFLOW_VAR_DATATYPE_PREFIX: ""
# AIRFLOW__CORE__REMOTE_LOGGING: '<your_choice_on_remote_logging>'
# AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: '<your_bucket_for_remote_logging>'
Expand Down Expand Up @@ -304,5 +307,15 @@ services:
airflow-init:
condition: service_completed_successfully

metro:
image: road_weather:latest
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:80/"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always

volumes:
postgres-db-volume:
4 changes: 2 additions & 2 deletions pollution_v2/src/common/connector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from common.connector.history import HistoryODHConnector
from common.connector.pollution import PollutionODHConnector
from common.connector.road_weather import RoadWeatherObservationODHConnector
from common.connector.road_weather import RoadWeatherObservationODHConnector, RoadWeatherForecastODHConnector
from common.connector.traffic import TrafficODHConnector
from common.connector.validation import ValidationODHConnector
from common.settings import ODH_AUTHENTICATION_URL, ODH_USERNAME, ODH_PASSWORD, ODH_CLIENT_ID, \
Expand Down Expand Up @@ -49,7 +49,7 @@ def build_from_env() -> ConnectorCollector:
"validation": ValidationODHConnector,
"pollution": PollutionODHConnector,
"road_weather_observation": RoadWeatherObservationODHConnector,
"road_weather_forecast": RoadWeatherObservationODHConnector
"road_weather_forecast": RoadWeatherForecastODHConnector
}

connectors = {}
Expand Down
17 changes: 8 additions & 9 deletions pollution_v2/src/common/connector/road_weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
from typing import Optional, List

from common.connector.common import ODHBaseConnector
from common.data_model.road_weather import RoadWeatherObservationMeasure, RoadWeatherForecastMeasure, \
RoadWeatherObservationMeasureType, RoadWeatherForecastMeasureType
from common.data_model.traffic import TrafficSensorStation
from common.data_model import RoadWeatherObservationMeasure, RoadWeatherForecastMeasure, \
RoadWeatherObservationMeasureType, RoadWeatherForecastMeasureType, Station
from common.settings import PERIOD_1SEC


class RoadWeatherObservationODHConnector(ODHBaseConnector[RoadWeatherObservationMeasure, TrafficSensorStation]):
class RoadWeatherObservationODHConnector(ODHBaseConnector[RoadWeatherObservationMeasure, Station]):

def __init__(self,
base_reader_url: str,
Expand Down Expand Up @@ -55,15 +54,15 @@ def __init__(self,
period)

@staticmethod
def build_station(raw_station: dict) -> TrafficSensorStation:
return TrafficSensorStation.from_odh_repr(raw_station)
def build_station(raw_station: dict) -> Station:
return Station.from_odh_repr(raw_station)

@staticmethod
def build_measure(raw_measure: dict) -> RoadWeatherObservationMeasure:
return RoadWeatherObservationMeasure.from_odh_repr(raw_measure)


class RoadWeatherForecastODHConnector(ODHBaseConnector[RoadWeatherForecastMeasure, TrafficSensorStation]):
class RoadWeatherForecastODHConnector(ODHBaseConnector[RoadWeatherForecastMeasure, Station]):

def __init__(self,
base_reader_url: str,
Expand Down Expand Up @@ -104,8 +103,8 @@ def __init__(self,
period)

@staticmethod
def build_station(raw_station: dict) -> TrafficSensorStation:
return TrafficSensorStation.from_odh_repr(raw_station)
def build_station(raw_station: dict) -> Station:
return Station.from_odh_repr(raw_station)

@staticmethod
def build_measure(raw_measure: dict) -> RoadWeatherForecastMeasure:
Expand Down
6 changes: 5 additions & 1 deletion pollution_v2/src/common/data_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@
#
# SPDX-License-Identifier: AGPL-3.0-or-later
from .common import VehicleClass, MeasureCollection, Measure, Provenance, DataType, Station
from .traffic import TrafficSensorStation, TrafficMeasure, TrafficMeasureCollection, TrafficEntry
from .traffic import TrafficMeasure, TrafficMeasureCollection, TrafficEntry
from .pollution import PollutionEntry, PollutantClass, PollutionMeasure, PollutionMeasureCollection
from .road_weather import RoadWeatherObservationMeasureType, RoadWeatherForecastMeasureType, RoadWeatherObservationEntry, \
RoadWeatherObservationMeasure, RoadWeatherObservationMeasureCollection, RoadWeatherForecastEntry, \
RoadWeatherForecastMeasure, RoadWeatherForecastMeasureCollection
from .station import Station, StationType, TrafficSensorStation
63 changes: 59 additions & 4 deletions pollution_v2/src/common/data_model/road_weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from typing import Optional, List, Iterator, Dict

from common.data_model.common import MeasureCollection, Measure
from common.data_model.entry import GenericEntry
Expand All @@ -17,10 +17,11 @@

class RoadWeatherObservationMeasureType(Enum):

PREC_QTA = "prec_qta"
STATO_METEO = "stato_meteo"
TEMP_ARIA = "temp_aria"
TEMP_SOULO = "temp_suolo"
TEMP_RUGIADA = "temp_rugiada"
PREC_QTA = "prec_qta"
TEMP_SOULO = "temp_suolo"
VENTO_VEL = "vento_vel"


Expand All @@ -42,13 +43,14 @@ class RoadWeatherForecastMeasureType(Enum):
class RoadWeatherObservationEntry(GenericEntry):

def __init__(self, station: Station, valid_time: datetime, temp_aria: float, temp_suolo: float,
temp_rugiada: float, prec_qta: float, vento_vel: float, period: Optional[int]):
temp_rugiada: float, prec_qta: float, vento_vel: float, stato_meteo: int, period: Optional[int]):
super().__init__(station, valid_time, period)
self.temp_aria = temp_aria
self.temp_suolo = temp_suolo
self.temp_rugiada = temp_rugiada
self.prec_qta = prec_qta
self.vento_vel = vento_vel
self.stato_meteo = stato_meteo


class RoadWeatherObservationMeasure(Measure):
Expand All @@ -65,6 +67,59 @@ class RoadWeatherObservationMeasureCollection(MeasureCollection[RoadWeatherObser
"""
pass

def get_entries(self) -> List[RoadWeatherObservationEntry]:
"""
Build and retrieve the list of traffic entry from the available measures
:return: a list of traffic entries
"""
return list(self._get_entries_iterator())

def _get_entries_iterator(self) -> Iterator[RoadWeatherObservationEntry]:
"""
Build and retrieve the iterator for list of observation entries from the available measures
:return: an iterator of traffic entries
"""
for station_dict in self._build_entries_dictionary().values():
for entry in station_dict.values():
yield entry

def _build_entries_dictionary(self) -> Dict[str, Dict[datetime, RoadWeatherObservationEntry]]:
# A temporary dictionary used for faster aggregation of the results
# The dictionary will have the following structure
# StationCode -> (measure valid time -> (RoadWeatherObservationEntry))
tmp: Dict[str, Dict[datetime, dict]] = {}
stations: Dict[str, Station] = {}
for measure in self.measures:
if measure.station.code not in stations:
stations[measure.station.code] = measure.station
if measure.station.code not in tmp:
tmp[measure.station.code] = {}
if measure.valid_time not in tmp[measure.station.code]:
tmp[measure.station.code][measure.valid_time] = {}
tmp[measure.station.code][measure.valid_time][measure.data_type.name] = measure.value

result: Dict[str, Dict[datetime, RoadWeatherObservationEntry]] = {}
for group_by_station in tmp:
if group_by_station not in result:
result[group_by_station] = {}
for group_by_time in tmp[group_by_station]:
entry = tmp[group_by_station][group_by_time]
result[group_by_station][group_by_time] = RoadWeatherObservationEntry(
station=stations[group_by_station],
valid_time=group_by_time,
period=1,
temp_aria=entry['temp_aria'],
temp_suolo=entry['temp_suolo'],
temp_rugiada=entry['temp_rugiada'],
prec_qta=entry['prec_qta'],
vento_vel=entry['vento_vel'],
stato_meteo=entry['stato_meteo']
)

return result


@dataclass
class RoadWeatherForecastEntry(GenericEntry):
Expand Down
7 changes: 6 additions & 1 deletion pollution_v2/src/common/data_model/station.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
class Station:

code: str
wrf_code: Optional[str]
active: bool
available: bool
coordinates: dict
Expand Down Expand Up @@ -50,6 +51,7 @@ def sensor_type(self) -> float:
def from_odh_repr(cls, raw_data: dict):
return cls(
code=raw_data["scode"],
wrf_code=raw_data.get("wrf_code"),
active=raw_data["sactive"],
available=raw_data["savailable"],
coordinates=raw_data["scoordinate"],
Expand All @@ -62,6 +64,7 @@ def from_odh_repr(cls, raw_data: dict):
def to_json(self) -> dict:
return {
"code": self.code,
"wrf_code": self.wrf_code,
"active": self.active,
"available": self.available,
"coordinates": self.coordinates,
Expand All @@ -73,8 +76,9 @@ def to_json(self) -> dict:

@classmethod
def from_json(cls, dict_data) -> Station:
return Station(
res = Station(
code=dict_data["code"],
wrf_code=dict_data.get("wrf_code"),
active=dict_data["active"],
available=dict_data["available"],
coordinates=dict_data["coordinates"],
Expand All @@ -83,6 +87,7 @@ def from_json(cls, dict_data) -> Station:
station_type=dict_data["station_type"],
origin=dict_data["origin"]
)
return res


StationType = TypeVar("StationType", bound=Station)
Expand Down
17 changes: 14 additions & 3 deletions pollution_v2/src/common/manager/station.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
# SPDX-FileCopyrightText: NOI Techpark <[email protected]>
#
# SPDX-License-Identifier: AGPL-3.0-or-later

from abc import ABC
from typing import List, Optional
from typing import List, Optional, Type

from common.cache.computation_checkpoint import ComputationCheckpointCache
from common.connector.collector import ConnectorCollector
from common.connector.common import ODHBaseConnector
from common.data_model import Station, Provenance


class StationManager(ABC):
"""
Manager in charge of the retrieval of the stations.
"""
station_list_connector: ODHBaseConnector = None

def __init__(self, connector_collector: ConnectorCollector, provenance: Provenance,
checkpoint_cache: Optional[ComputationCheckpointCache] = None):
Expand All @@ -19,5 +25,10 @@ def __init__(self, connector_collector: ConnectorCollector, provenance: Provenan
self._create_data_types = True

def get_station_list(self) -> List[Station]:
# TODO: implement the retrieval of the stations
pass
"""
Retrieve the list of stations
"""
if self.station_list_connector is None:
raise NotImplementedError("The station_list_connector must be defined")

return self.station_list_connector.get_station_list()
5 changes: 3 additions & 2 deletions pollution_v2/src/common/manager/traffic_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Type

from common.cache.computation_checkpoint import ComputationCheckpointCache, ComputationCheckpoint
from common.connector.collector import ConnectorCollector
Expand Down Expand Up @@ -35,6 +35,7 @@ def __init__(self, connector_collector: ConnectorCollector, provenance: Provenan
checkpoint_cache: Optional[ComputationCheckpointCache] = None):
super().__init__(connector_collector, provenance, checkpoint_cache)
self._traffic_stations: List[TrafficSensorStation] = []
self.station_list_connector = connector_collector.traffic

@abstractmethod
def _get_manager_code(self) -> str:
Expand Down Expand Up @@ -269,7 +270,7 @@ def __get_station_list(self) -> List[TrafficSensorStation]:
"""
Retrieve the list of all the available stations.
"""
return self._connector_collector.traffic.get_station_list()
return self.station_list_connector.get_station_list()

def _download_traffic_data(self,
from_date: datetime,
Expand Down
28 changes: 27 additions & 1 deletion pollution_v2/src/common/model/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import json
import logging

from common.data_model import TrafficSensorStation
from common.data_model import TrafficSensorStation, RoadWeatherObservationEntry
from common.data_model.history import HistoryEntry
from common.data_model.traffic import TrafficEntry

Expand Down Expand Up @@ -81,6 +81,32 @@ def get_traffic_dataframe(traffic_entries: Iterable[TrafficEntry],

return pd.DataFrame(temp)

@staticmethod
def get_observation_dataframe(observation_entries: Iterable[RoadWeatherObservationEntry]) -> pd.DataFrame:
"""
Get a dataframe from the given observation entries. The resulting dataframe will have the following columns:
time,station_code,prec_qta,stato_meteo,temp_aria,temp_rugiada,temp_suolo,vento_vel
:param observation_entries: the observation entries
:return: the observation dataframe
"""
temp = []
for entry in observation_entries:
temp.append({
"date": entry.valid_time.date().isoformat(),
"time": entry.valid_time.time().isoformat(),
"Location": entry.station.id_strada,
"Station": entry.station.id_stazione,
"Lane": entry.station.id_corsia,
"Category": entry.vehicle_class.value,
"Transits": entry.nr_of_vehicles,
"Speed": entry.average_speed,
"Period": entry.period,
"KM": km
})

return pd.DataFrame(temp)

@staticmethod
def get_traffic_dataframe_for_validation(traffic_entries: Iterable[TrafficEntry], date: date) -> pd.DataFrame:
"""
Expand Down
10 changes: 9 additions & 1 deletion pollution_v2/src/common/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,12 @@
# use it not empty to add a test prefix to datatype
DATATYPE_PREFIX = Variable.get("DATATYPE_PREFIX", "")

ROAD_WEATHER_CONFIG_FILE = Variable.get("ROAD_WEATHER_CONFIG_FILE", "src/config/road_weather.yaml")
ROAD_WEATHER_CONFIG_FILE = Variable.get("ROAD_WEATHER_CONFIG_FILE", "config/road_weather.yaml")
VALIDATOR_CONFIG_FILE = Variable.get("VALIDATOR_CONFIG_FILE", "src/config/validator.yaml")

MAIN_DIR = os.getenv("MAIN_DIR", ".")
TMP_DIR = f'{MAIN_DIR}/tmp'
if not os.path.exists(TMP_DIR):
os.makedirs(TMP_DIR)

METRO_WS_PREDICTION_ENDPOINT = Variable.get("METRO_WS_PREDICTION_ENDPOINT", "http://metro:80/predict/?station_code=")
12 changes: 8 additions & 4 deletions pollution_v2/src/config/road_weather.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
# SPDX-FileCopyrightText: NOI Techpark <[email protected]>
#
# SPDX-License-Identifier: AGPL-3.0-or-later

whitelist:
- 2021
- 2022
- 1888
- 2023

mappings:
2021: 01
2022: 02
1888: 03
2023: 04
2021: "101"
2022: "102"
1888: "103"
2023: "104"
Loading

0 comments on commit 96d2ec3

Please sign in to comment.