Skip to content

Commit

Permalink
added metro container to docker compose; added main module for road_w…
Browse files Browse the repository at this point in the history
…eather calculation; further development on creating observations csv and metro ws invocation - issue noi-techpark#31
  • Loading branch information
Marco Angheben committed Sep 25, 2024
1 parent df43c24 commit 2c91be1
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 17 deletions.
7 changes: 7 additions & 0 deletions pollution_v2/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ x-airflow-common:
AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_HOST: 'redis'
AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_PORT: 6379
AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_DB: 10
AIRFLOW_VAR_DAG_POLLUTION_EXECUTION_CRONTAB: 0 4 * * *
AIRFLOW_VAR_DAG_VALIDATION_EXECUTION_CRONTAB: 0 2 * * *
AIRFLOW_VAR_ROAD_WEATHER_CONFIG_FILE: "dags/config/road_weather.yaml"
AIRFLOW_VAR_DATATYPE_PREFIX: ""
# AIRFLOW__CORE__REMOTE_LOGGING: '<your_choice_on_remote_logging>'
# AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: '<your_bucket_for_remote_logging>'
Expand Down Expand Up @@ -304,5 +307,9 @@ services:
airflow-init:
condition: service_completed_successfully

metro:
image: road_weather:latest
restart: always

volumes:
postgres-db-volume:
64 changes: 60 additions & 4 deletions pollution_v2/src/common/data_model/road_weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from typing import Optional, List, Iterator, Dict

from common.data_model.common import MeasureCollection, Measure
from common.data_model.entry import GenericEntry
Expand All @@ -17,10 +17,11 @@

class RoadWeatherObservationMeasureType(Enum):

PREC_QTA = "prec_qta"
STATO_METEO = "stato_meteo"
TEMP_ARIA = "temp_aria"
TEMP_SOULO = "temp_suolo"
TEMP_RUGIADA = "temp_rugiada"
PREC_QTA = "prec_qta"
TEMP_SOULO = "temp_suolo"
VENTO_VEL = "vento_vel"


Expand All @@ -42,13 +43,14 @@ class RoadWeatherForecastMeasureType(Enum):
class RoadWeatherObservationEntry(GenericEntry):

def __init__(self, station: Station, valid_time: datetime, temp_aria: float, temp_suolo: float,
temp_rugiada: float, prec_qta: float, vento_vel: float, period: Optional[int]):
temp_rugiada: float, prec_qta: float, vento_vel: float, stato_meteo: int, period: Optional[int]):
super().__init__(station, valid_time, period)
self.temp_aria = temp_aria
self.temp_suolo = temp_suolo
self.temp_rugiada = temp_rugiada
self.prec_qta = prec_qta
self.vento_vel = vento_vel
self.stato_meteo = stato_meteo


class RoadWeatherObservationMeasure(Measure):
Expand All @@ -65,6 +67,60 @@ class RoadWeatherObservationMeasureCollection(MeasureCollection[RoadWeatherObser
"""
pass

def get_entries(self) -> List[RoadWeatherObservationEntry]:
"""
Build and retrieve the list of traffic entry from the available measures
:return: a list of traffic entries
"""
return list(self._get_entries_iterator())

def _get_entries_iterator(self) -> Iterator[RoadWeatherObservationEntry]:
"""
Build and retrieve the iterator for list of observation entries from the available measures
:return: an iterator of traffic entries
"""
for station_dict in self._build_entries_dictionary().values():
for entry in station_dict.values():
yield entry

def _build_entries_dictionary(self) -> Dict[str, Dict[datetime, RoadWeatherObservationEntry]]:
# A temporary dictionary used for faster aggregation of the results
# The dictionary will have the following structure
# StationCode -> (measure valid time -> (RoadWeatherObservationEntry))
tmp: Dict[str, Dict[datetime, dict]] = {}
stations: Dict[str, Station] = {}
for measure in self.measures:
if measure.station.code not in stations:
stations[measure.station.code] = measure.station
if measure.station.code not in tmp:
tmp[measure.station.code] = {}
if measure.valid_time not in tmp[measure.station.code]:
tmp[measure.station.code][measure.valid_time] = {}
tmp[measure.station.code][measure.valid_time][measure.data_type.name] = measure.value

result: Dict[str, Dict[datetime, RoadWeatherObservationEntry]] = {}
for group_by_station in tmp:
if group_by_station not in result:
result[group_by_station] = {}
for group_by_time in tmp[group_by_station]:
entry = tmp[group_by_station][group_by_time]
result[group_by_station][group_by_time] = RoadWeatherObservationEntry(
station=stations[group_by_station],
valid_time=group_by_time,
# TODO check
period=-1,
temp_aria=entry['temp_aria'],
temp_suolo=entry['temp_suolo'],
temp_rugiada=entry['temp_rugiada'],
prec_qta=entry['prec_qta'],
vento_vel=entry['vento_vel'],
stato_meteo=entry['stato_meteo']
)

return result


@dataclass
class RoadWeatherForecastEntry(GenericEntry):
Expand Down
4 changes: 4 additions & 0 deletions pollution_v2/src/common/manager/station.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: NOI Techpark <[email protected]>
#
# SPDX-License-Identifier: AGPL-3.0-or-later

from abc import ABC
from typing import List, Optional, Type

Expand Down
28 changes: 27 additions & 1 deletion pollution_v2/src/common/model/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import json
import logging

from common.data_model import TrafficSensorStation
from common.data_model import TrafficSensorStation, RoadWeatherObservationEntry
from common.data_model.history import HistoryEntry
from common.data_model.traffic import TrafficEntry

Expand Down Expand Up @@ -81,6 +81,32 @@ def get_traffic_dataframe(traffic_entries: Iterable[TrafficEntry],

return pd.DataFrame(temp)

@staticmethod
def get_observation_dataframe(observation_entries: Iterable[RoadWeatherObservationEntry]) -> pd.DataFrame:
"""
Get a dataframe from the given observation entries. The resulting dataframe will have the following columns:
time,station_code,prec_qta,stato_meteo,temp_aria,temp_rugiada,temp_suolo,vento_vel
:param observation_entries: the observation entries
:return: the observation dataframe
"""
temp = []
for entry in observation_entries:
temp.append({
"date": entry.valid_time.date().isoformat(),
"time": entry.valid_time.time().isoformat(),
"Location": entry.station.id_strada,
"Station": entry.station.id_stazione,
"Lane": entry.station.id_corsia,
"Category": entry.vehicle_class.value,
"Transits": entry.nr_of_vehicles,
"Speed": entry.average_speed,
"Period": entry.period,
"KM": km
})

return pd.DataFrame(temp)

@staticmethod
def get_traffic_dataframe_for_validation(traffic_entries: Iterable[TrafficEntry], date: date) -> pd.DataFrame:
"""
Expand Down
7 changes: 6 additions & 1 deletion pollution_v2/src/common/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,10 @@
# use it not empty to add a test prefix to datatype
DATATYPE_PREFIX = Variable.get("DATATYPE_PREFIX", "")

ROAD_WEATHER_CONFIG_FILE = Variable.get("ROAD_WEATHER_CONFIG_FILE", "src/config/road_weather.yaml")
ROAD_WEATHER_CONFIG_FILE = Variable.get("ROAD_WEATHER_CONFIG_FILE", "config/road_weather.yaml")
VALIDATOR_CONFIG_FILE = Variable.get("VALIDATOR_CONFIG_FILE", "src/config/validator.yaml")

MAIN_DIR = os.getenv("MAIN_DIR", ".")
TMP_DIR = f'{MAIN_DIR}/tmp'
if not os.path.exists(TMP_DIR):
os.makedirs(TMP_DIR)
4 changes: 4 additions & 0 deletions pollution_v2/src/config/road_weather.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: NOI Techpark <[email protected]>
#
# SPDX-License-Identifier: AGPL-3.0-or-later

whitelist:
- 2021
- 2022
Expand Down
63 changes: 63 additions & 0 deletions pollution_v2/src/main_road_weather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# SPDX-FileCopyrightText: NOI Techpark <[email protected]>
#
# SPDX-License-Identifier: AGPL-3.0-or-later

from __future__ import absolute_import, annotations

import argparse
import logging.config
from datetime import datetime
from typing import Optional

import dateutil.parser
import sentry_sdk
from redis.client import Redis

from common.cache.computation_checkpoint import ComputationCheckpointCache
from common.connector.collector import ConnectorCollector
from common.data_model.common import Provenance
from common.logging import get_logging_configuration
from common.settings import (DEFAULT_TIMEZONE, SENTRY_SAMPLE_RATE, ODH_MINIMUM_STARTING_DATE,
COMPUTATION_CHECKPOINT_REDIS_HOST, COMPUTATION_CHECKPOINT_REDIS_PORT, PROVENANCE_ID,
PROVENANCE_LINEAGE, PROVENANCE_NAME_VALIDATION, PROVENANCE_VERSION,
COMPUTATION_CHECKPOINT_REDIS_DB, ODH_COMPUTATION_BATCH_SIZE_VALIDATION)
from road_weather.manager.road_weather import RoadWeatherManager
from validator.manager.validation import ValidationManager

logging.config.dictConfig(get_logging_configuration("pollution_v2"))

logger = logging.getLogger("pollution_v2.main_road_weather")

sentry_sdk.init(
traces_sample_rate=SENTRY_SAMPLE_RATE,
integrations=[]
)


# not used anymore after refactoring from Celery to Airflow
def compute_data() -> None:
"""
Start the computation of a batch of traffic data measures to be validated. As starting date for the batch is used
the latest validated measure available on the ODH, if no validated measures are available min_from_date is used.
"""

collector_connector = ConnectorCollector.build_from_env()
provenance = Provenance(PROVENANCE_ID, PROVENANCE_LINEAGE, PROVENANCE_NAME_VALIDATION, PROVENANCE_VERSION)
manager = RoadWeatherManager(collector_connector, provenance)
for station in manager.get_station_list():
manager.run_computation_for_single_station(station)


if __name__ == "__main__":

arg_parser = argparse.ArgumentParser(description="Manually run a road weather forecast")

arg_parser.add_argument("--run-async", action="store_true", help="If set it run the task in the celery cluster")

compute_data()
'''if args.run_async:
task: AsyncResult = compute_data.delay(min_from_date=from_date, max_to_date=to_date)
logger.info(f"Scheduled async pollution computation. Task ID: [{task.task_id}]")
else:
logger.info("Staring pollution computation")
compute_data(min_from_date=from_date, max_to_date=to_date)'''
4 changes: 4 additions & 0 deletions pollution_v2/src/road_weather/manager/_forecast.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: NOI Techpark <[email protected]>
#
# SPDX-License-Identifier: AGPL-3.0-or-later

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

Expand Down
13 changes: 6 additions & 7 deletions pollution_v2/src/road_weather/manager/road_weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from common.data_model import TrafficSensorStation, Station, RoadWeatherObservationMeasureCollection, Provenance
from common.data_model.entry import GenericEntry
from common.manager.station import StationManager
from common.settings import ROAD_WEATHER_CONFIG_FILE
from common.settings import ROAD_WEATHER_CONFIG_FILE, TMP_DIR
from road_weather.manager._forecast import Forecast
from road_weather.model.road_weather_model import RoadWeatherModel

Expand Down Expand Up @@ -86,10 +86,10 @@ def _download_forecast_data(self, traffic_station: Station) -> Tuple[str, str]:
forecast.interpolate_hourly()
forecast.negative_radiation_filter()
roadcast_start = forecast.start
print('* forecast - XML processed correctly')
forecast_filename = f"data/forecast/forecast_{wrf_station_code}_{roadcast_start}.xml"
logger.info('forecast - XML processed correctly')
forecast_filename = f"{TMP_DIR}/forecast_{wrf_station_code}_{roadcast_start}.xml"
forecast.to_xml(forecast_filename)
print('* forecast - XML saved in ', forecast_filename)
logger.info(f'forecast - XML saved in {forecast_filename} ')
return forecast_filename, roadcast_start

def _compute_observation_start_end_dates(self, forecast_start: str) -> Tuple[datetime, datetime]:
Expand All @@ -107,8 +107,7 @@ def _compute_observation_start_end_dates(self, forecast_start: str) -> Tuple[dat
end_date = forecast_start + timedelta(hours=8)
return start_date, end_date


def _download_data_and_compute(self, station: TrafficSensorStation) -> List[GenericEntry]:
def _download_data_and_compute(self, station: Station) -> List[GenericEntry]:

if not station:
logger.error(f"Cannot compute road condition on empty station")
Expand All @@ -135,7 +134,7 @@ def _download_data_and_compute(self, station: TrafficSensorStation) -> List[Gene

return []

def run_computation_for_single_station(self, station: TrafficSensorStation) -> None:
def run_computation_for_single_station(self, station: Station) -> None:
"""
Run the computation for a single station.
Expand Down
59 changes: 55 additions & 4 deletions pollution_v2/src/road_weather/model/road_weather_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@

import datetime
import logging
import time
import urllib
from typing import List

from common.data_model import TrafficSensorStation, RoadWeatherObservationMeasureCollection
from common.model.helper import ModelHelper

import urllib.request
import mimetypes

from common.settings import TMP_DIR

logger = logging.getLogger("pollution_v2.road_weather.model.road_weather_model")


Expand All @@ -19,6 +26,24 @@ class RoadWeatherModel:
The model for computing road condition.
"""

@classmethod
def create_multipart_formdata(cls, files):

boundary = '----------Boundary'
lines = []
for filename in files:
content_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
lines.append(f'--{boundary}'.encode())
lines.append(f'Content-Disposition: form-data; name="files"; filename="{filename}"'.encode())
lines.append(f'Content-Type: {content_type}'.encode())
lines.append(''.encode())
with open(filename, 'rb') as f:
lines.append(f.read())
lines.append(f'--{boundary}--'.encode())
lines.append(''.encode())
body = b'\r\n'.join(lines)
return body, boundary

def compute_data(self, observation: RoadWeatherObservationMeasureCollection,
forecast_filename: str, # TODO: change with RoadWeatherForecastMeasureCollection
forecast_start: str, # TODO: check if needed
Expand All @@ -32,12 +57,38 @@ def compute_data(self, observation: RoadWeatherObservationMeasureCollection,
:return: The list of road conditions
"""

logger.info(f"Computing road condition for station [{station.code}]")
logger.info(f"Observation measures available: {len(observation.measures)}")
logger.info(f"Forecast file name: {forecast_filename}")
logger.info(f"Creating observations file from {len(observation.measures)} measures")

entries = observation.get_entries()
observation_filename = f"{TMP_DIR}/observations_{round(time.time() * 1000)}.csv"
with open(observation_filename, 'a') as tmp_csv:
tmp_csv.write('"time","station_code","prec_qta","stato_meteo",'
'"temp_aria","temp_rugiada","temp_suolo","vento_vel"\n')
for entry in entries:
tmp_csv.write(f'{entry.valid_time.strftime("%Y-%m-%d %H:%M:%S")},{entry.station.code},{entry.prec_qta},'
f'{entry.stato_meteo},{entry.temp_aria},{entry.temp_rugiada},{entry.temp_suolo},'
f'{entry.vento_vel}\n')

logger.info(f"Computing road condition for station [{station.code}] with observations from "
f"{observation_filename} and forecast from {forecast_filename}")
logger.info(f"Forecast start: {forecast_start}")

# TODO: convert to dataframe and compute road condition
# TODO cablato
url = "http://metro:80/predict/?station_code=101"

# List of files to upload
files_to_upload = [forecast_filename, observation_filename]

# Create multipart form data
body, boundary = RoadWeatherModel.create_multipart_formdata(files_to_upload)

# Create a request object
req = urllib.request.Request(url, data=body)
req.add_header('Content-Type', f'multipart/form-data; boundary={boundary}')

with urllib.request.urlopen(req) as response:
response_data = response.read()
print(response_data.decode('utf-8')) # Print the response from the server

# validation_data_types = {str(measure.data_type) for measure in validation.measures}
# traffic_data_types = {str(measure.data_type) for measure in traffic.measures}
Expand Down
2 changes: 2 additions & 0 deletions pollution_v2/src/tmp/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.xml
*.csv

0 comments on commit 2c91be1

Please sign in to comment.