Skip to content

Commit

Permalink
Merge pull request #50 from mobidata-bw/stations_availability
Browse files Browse the repository at this point in the history
New table sharing_station_status and various fixes
  • Loading branch information
hbruch authored Dec 18, 2023
2 parents aa4c6ed + e895b7b commit ae74d7f
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 108 deletions.
2 changes: 1 addition & 1 deletion .env.EXAMPLE
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ DAGSTER_POSTGRES_PASSWORD=postgres_password
DAGSTER_POSTGRES_DB=postgres_db
DAGSTER_CURRENT_IMAGE=dagster_pipeline_image

IPL_LAMASSU_BASE_URL=http://localhost:8080
IPL_LAMASSU_INTERNAL_BASE_URL=http://localhost:8080/

IPL_POSTGIS_VERSION_TAG=15-3.3-alpine
IPL_POSTGRES_HOST=localhost
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dependencies for the different dagster services. `pipeline.Dockerfile` and `dags
In addition, you need a postgres database into which the datasets are loaded.
This database can be started via

`docker compose up -f docker-compose.dev.yml`
`docker compose -f docker-compose.dev.yml up`

### Running

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ services:
- DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB
- IPL_LAMASSU_BASE_URL
- IPL_LAMASSU_INTERNAL_BASE_URL
- IPL_POSTGRES_HOST
- IPL_POSTGRES_USER
- IPL_POSTGRES_PASSWORD
Expand Down
4 changes: 2 additions & 2 deletions pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

defs = Definitions(
assets=assets,
schedules=[sharing.update_stations_and_vehicles_minutely],
schedules=[sharing.update_sharing_station_status_and_vehicles_minutely],
resources={
'lamassu': LamassuResource(lamassu_base_url=EnvVar('IPL_LAMASSU_BASE_URL')),
'lamassu': LamassuResource(lamassu_base_url=EnvVar('IPL_LAMASSU_INTERNAL_BASE_URL')),
'pg_gpd_io_manager': PostGISGeoPandasIOManager(
host=EnvVar('IPL_POSTGRES_HOST'),
user=EnvVar('IPL_POSTGRES_USER'),
Expand Down
76 changes: 59 additions & 17 deletions pipeline/assets/sharing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import logging

import pandas as pd
from dagster import (
DefaultScheduleStatus,
DefaultSensorStatus,
DynamicPartitionsDefinition,
FreshnessPolicy,
RunRequest,
ScheduleDefinition,
SensorResult,
Expand All @@ -19,8 +22,9 @@
io_manager_key='pg_gpd_io_manager',
compute_kind='Lamassu',
group_name='sharing',
freshness_policy=FreshnessPolicy(maximum_lag_minutes=1),
)
def stations(context, lamassu: LamassuResource) -> pd.DataFrame:
def sharing_stations(context, lamassu: LamassuResource) -> pd.DataFrame:
"""
Pushes stations published by lamassu
to table in a postgis database.
Expand All @@ -33,11 +37,44 @@ def stations(context, lamassu: LamassuResource) -> pd.DataFrame:
systems = lamassu.get_systems()
data_frames = []
for system in systems:
system_id = system['id']
feeds = lamassu.get_system_feeds(system_id)
stations = lamassu.get_stations_as_frame(feeds, system_id)
if stations:
data_frames.append(stations)
try:
system_id = system['id']
feeds = lamassu.get_system_feeds(system_id)
stations = lamassu.get_stations_as_frame(feeds, system_id)
if stations is not None:
data_frames.append(stations)
except Exception:
logging.exception(f'Error retrieving stations for system {system}')
return pd.concat(data_frames)


@asset(
io_manager_key='pg_gpd_io_manager',
compute_kind='Lamassu',
group_name='sharing',
)
def sharing_station_status(context, lamassu: LamassuResource) -> pd.DataFrame:
"""
Pushes station_statuss published by lamassu
to table in a postgis database.
"""

# Instead of handling each system as a partition, we iterate over all of them
# and insert them as one large batch.
# This works around dagster's inefficient job handling for many small sized, frequently updated partitions.
# See also this discussion in dagster slack: https://dagster.slack.com/archives/C01U954MEER/p1694188602187579
systems = lamassu.get_systems()
data_frames = []
for system in systems:
try:
system_id = system['id']
feeds = lamassu.get_system_feeds(system_id)
station_status = lamassu.get_station_status_by_form_factor_as_frame(feeds, system_id)
if station_status is not None:
data_frames.append(station_status)
except Exception:
logging.exception(f'Error retrieving sharing_station_status for system {system}')

return pd.concat(data_frames)


Expand All @@ -58,11 +95,14 @@ def vehicles(context, lamassu: LamassuResource) -> pd.DataFrame:
systems = lamassu.get_systems()
data_frames = []
for system in systems:
system_id = system['id']
feeds = lamassu.get_system_feeds(system_id)
vehicles = lamassu.get_vehicles_as_frame(feeds, system_id)
if vehicles:
data_frames.append(vehicles)
try:
system_id = system['id']
feeds = lamassu.get_system_feeds(system_id)
vehicles = lamassu.get_vehicles_as_frame(feeds, system_id)
if vehicles is not None:
data_frames.append(vehicles)
except Exception:
logging.exception(f'Error retrieving vehicles for system {system}')
return pd.concat(data_frames)


Expand All @@ -78,16 +118,18 @@ def vehicles(context, lamassu: LamassuResource) -> pd.DataFrame:
'''
Define asset job grouping update of stations and vehicles asset.
'''
stations_and_vehicles_job = define_asset_job(
'stations_and_vehicles_job',
selection=[stations, vehicles],
sharing_station_status_and_vehicles_job = define_asset_job(
'sharing_station_status_and_vehicles_job',
selection=[sharing_station_status, vehicles],
config=in_process_job_config,
description='Pushes stations and vehicles from Lamassu to PostGIS',
description='Pushes sharing_station_status and vehicles from Lamassu to PostGIS',
)


@schedule(job=stations_and_vehicles_job, cron_schedule='* * * * *', default_status=DefaultScheduleStatus.RUNNING)
def update_stations_and_vehicles_minutely(context):
@schedule(
job=sharing_station_status_and_vehicles_job, cron_schedule='* * * * *', default_status=DefaultScheduleStatus.RUNNING
)
def update_sharing_station_status_and_vehicles_minutely(context):
"""
Run stations_and_vehicles_job in process on the provided schedule (minutely).
"""
Expand Down
3 changes: 3 additions & 0 deletions pipeline/resources/lamassu.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ def get_vehicles_as_frame(self, feed: dict, system_id: str) -> Optional[DataFram

def get_stations_as_frame(self, feed: dict, system_id: str) -> Optional[DataFrame]:
return self._lamassu().get_stations_as_frame(feed, system_id)

def get_station_status_by_form_factor_as_frame(self, feed: dict, system_id: str) -> Optional[DataFrame]:
return self._lamassu().get_station_status_by_form_factor_as_frame(feed, system_id)
7 changes: 3 additions & 4 deletions pipeline/resources/postgis_geopandas_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,13 @@ def handle_output(self, context: OutputContext, obj: pandas.DataFrame):
schema=schema,
if_exists='replace',
)
obj_without_index = obj.reset_index()
obj.reset_index()
sio = StringIO()
writer = csv.writer(sio, delimiter='\t')
writer.writerows(obj_without_index.values)
obj.to_csv(sio, sep='\t', na_rep='', header=False)
sio.seek(0)
c = con.connection.cursor()
# ignore mypy attribute check, as postgres cursor has custom extension to DBAPICursor: copy_expert
c.copy_expert(f"COPY {schema}.{table} FROM STDIN WITH (FORMAT csv, DELIMITER '\t', NULL 'nan')", sio) # type: ignore[attr-defined]
c.copy_expert(f"COPY {schema}.{table} FROM STDIN WITH (FORMAT csv, DELIMITER '\t')", sio) # type: ignore[attr-defined]
con.connection.commit()
context.add_output_metadata({'num_rows': len(obj), 'table_name': f'{schema}.{table}'})
elif obj is None:
Expand Down
Loading

0 comments on commit ae74d7f

Please sign in to comment.