From 2e7dcf7c9b044cd69efd3b123d6fc089747be45e Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Sun, 3 Dec 2023 14:25:27 +0100 Subject: [PATCH 01/11] rename IPL_LAMASSU_BASE_URL to IPL_LAMASSU_INTERNAL_BASE --- .env.EXAMPLE | 2 +- docker-compose.yml | 2 +- pipeline/__init__.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.env.EXAMPLE b/.env.EXAMPLE index 4bb7d24..d1ed490 100644 --- a/.env.EXAMPLE +++ b/.env.EXAMPLE @@ -3,7 +3,7 @@ DAGSTER_POSTGRES_PASSWORD=postgres_password DAGSTER_POSTGRES_DB=postgres_db DAGSTER_CURRENT_IMAGE=dagster_pipeline_image -IPL_LAMASSU_BASE_URL=http://localhost:8080 +IPL_LAMASSU_INTERNAL_BASE_URL=http://localhost:8080/ IPL_POSTGIS_VERSION_TAG=15-3.3-alpine IPL_POSTGRES_HOST=localhost diff --git a/docker-compose.yml b/docker-compose.yml index 50b3be7..76f6785 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,7 +54,7 @@ services: - DAGSTER_POSTGRES_USER - DAGSTER_POSTGRES_PASSWORD - DAGSTER_POSTGRES_DB - - IPL_LAMASSU_BASE_URL + - IPL_LAMASSU_INTERNAL_BASE_URL - IPL_POSTGRES_HOST - IPL_POSTGRES_USER - IPL_POSTGRES_PASSWORD diff --git a/pipeline/__init__.py b/pipeline/__init__.py index 3feaa5e..34a6a69 100644 --- a/pipeline/__init__.py +++ b/pipeline/__init__.py @@ -17,7 +17,7 @@ assets=assets, schedules=[sharing.update_stations_and_vehicles_minutely], resources={ - 'lamassu': LamassuResource(lamassu_base_url=EnvVar('IPL_LAMASSU_BASE_URL')), + 'lamassu': LamassuResource(lamassu_base_url=EnvVar('IPL_LAMASSU_INTERNAL_BASE_URL')), 'pg_gpd_io_manager': PostGISGeoPandasIOManager( host=EnvVar('IPL_POSTGRES_HOST'), user=EnvVar('IPL_POSTGRES_USER'), From 35733602ed692ec31c7d430f04e6a01a81c3ae9e Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Sun, 3 Dec 2023 14:26:13 +0100 Subject: [PATCH 02/11] perform get requests with user-agent header --- pipeline/sources/lamassu.py | 10 +++++++--- pipeline/util/urllib.py | 10 +++++++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/pipeline/sources/lamassu.py b/pipeline/sources/lamassu.py index d3ddc11..3d9abd8 100644 --- a/pipeline/sources/lamassu.py +++ b/pipeline/sources/lamassu.py @@ -9,6 +9,8 @@ import requests from sqlalchemy import create_engine, text from sqlalchemy.engine import URL, Connection +from urllib.parse import urljoin +from pipeline.util.urllib import get # TODO: Fragen MobiData-BW: # Schema für Stationen und Fahrzeuge? @@ -51,12 +53,14 @@ def __init__(self, lamassu_base_url: str): self.lamassu_base_url = lamassu_base_url def get_systems(self) -> dict: - resp = requests.get(f'{self.lamassu_base_url}/gbfs', timeout=self.timeout) + url = urljoin(self.lamassu_base_url, f'gbfs-internal') + resp = requests.get(url, timeout=self.timeout) resp.raise_for_status() return resp.json()['systems'] def get_system_feeds(self, system_id: str, preferred_feed_languages: list) -> dict: - resp = requests.get(f'{self.lamassu_base_url}/gbfs/{system_id}/gbfs.json', timeout=self.timeout) + url = urljoin(self.lamassu_base_url, f'gbfs-internal/{system_id}/gbfs.json') + resp = get(url, timeout=self.timeout) resp.raise_for_status() data = resp.json()['data'] @@ -154,7 +158,7 @@ def _load_feed_as_frame(self, url: str, element: Optional[str] = None): Loads a specific gbfs endpoint and returns the data node (or the data's node) as a denormalized flat pandas frame. """ - resp = requests.get(url, timeout=self.timeout) + resp = get(url, timeout=self.timeout) resp.raise_for_status() data = resp.json()['data'][element] if element else resp.json()['data'] diff --git a/pipeline/util/urllib.py b/pipeline/util/urllib.py index c8fd79c..ff31f83 100644 --- a/pipeline/util/urllib.py +++ b/pipeline/util/urllib.py @@ -4,12 +4,20 @@ import tempfile from datetime import datetime from email.utils import parsedate_to_datetime +from typing import Union import requests user_agent = 'IPL (MobiData-BW) +https://github.com/mobidata-bw/ipl-dagster-pipeline' +def get(url: str, timeout: int, headers: Union[dict[str, str], None] = None, stream: bool = False): + if headers is None: + headers = {} + headers['User-Agent'] = user_agent + return requests.get(url, headers=headers, timeout=timeout, stream=stream) + + def download( source: str, destination_path: str, @@ -40,7 +48,7 @@ def download( pre_existing_file_last_modified = datetime.utcfromtimestamp(os.path.getmtime(finalfilename)) headers['If-Modified-Since'] = pre_existing_file_last_modified.strftime('%a, %d %b %Y %H:%M:%S GMT') - response = requests.get(source, headers=headers, stream=True, timeout=timeout) + response = get(source, timeout=timeout, headers=headers, stream=True) if response.status_code == 304: # File not modified since last download return From 031166f9727880c544cb826c122a5a707b8550ac Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Sun, 3 Dec 2023 14:30:47 +0100 Subject: [PATCH 03/11] add sharing_station_status --- pipeline/__init__.py | 2 +- pipeline/assets/sharing.py | 46 ++++++++++--- pipeline/resources/lamassu.py | 3 + pipeline/sources/lamassu.py | 121 +++++++++++++++++++++++++++------- 4 files changed, 140 insertions(+), 32 deletions(-) diff --git a/pipeline/__init__.py b/pipeline/__init__.py index 34a6a69..df81a8b 100644 --- a/pipeline/__init__.py +++ b/pipeline/__init__.py @@ -15,7 +15,7 @@ defs = Definitions( assets=assets, - schedules=[sharing.update_stations_and_vehicles_minutely], + schedules=[sharing.update_sharing_station_status_and_vehicles_minutely], resources={ 'lamassu': LamassuResource(lamassu_base_url=EnvVar('IPL_LAMASSU_INTERNAL_BASE_URL')), 'pg_gpd_io_manager': PostGISGeoPandasIOManager( diff --git a/pipeline/assets/sharing.py b/pipeline/assets/sharing.py index 7f689aa..7dbf3da 100644 --- a/pipeline/assets/sharing.py +++ b/pipeline/assets/sharing.py @@ -20,7 +20,7 @@ compute_kind='Lamassu', group_name='sharing', ) -def stations(context, lamassu: LamassuResource) -> pd.DataFrame: +def sharing_stations(context, lamassu: LamassuResource) -> pd.DataFrame: """ Pushes stations published by lamassu to table in a postgis database. @@ -36,11 +36,37 @@ def stations(context, lamassu: LamassuResource) -> pd.DataFrame: system_id = system['id'] feeds = lamassu.get_system_feeds(system_id) stations = lamassu.get_stations_as_frame(feeds, system_id) - if stations: + if stations is not None: data_frames.append(stations) return pd.concat(data_frames) +@asset( + io_manager_key='pg_gpd_io_manager', + compute_kind='Lamassu', + group_name='sharing', +) +def sharing_station_status(context, lamassu: LamassuResource) -> pd.DataFrame: + """ + Pushes station_statuss published by lamassu + to table in a postgis database. + """ + + # Instead of handling each system as a partition, we iterate over all of them + # and insert them as one large batch. + # This works around dagster's inefficient job handling for many small sized, frequently updated partitions. + # See also this discussion in dagster slack: https://dagster.slack.com/archives/C01U954MEER/p1694188602187579 + systems = lamassu.get_systems() + data_frames = [] + for system in systems: + system_id = system['id'] + feeds = lamassu.get_system_feeds(system_id) + station_status = lamassu.get_station_status_by_form_factor_as_frame(feeds, system_id) + if station_status is not None: + data_frames.append(station_status) + return pd.concat(data_frames) + + @asset( io_manager_key='pg_gpd_io_manager', compute_kind='Lamassu', @@ -61,7 +87,7 @@ def vehicles(context, lamassu: LamassuResource) -> pd.DataFrame: system_id = system['id'] feeds = lamassu.get_system_feeds(system_id) vehicles = lamassu.get_vehicles_as_frame(feeds, system_id) - if vehicles: + if vehicles is not None: data_frames.append(vehicles) return pd.concat(data_frames) @@ -78,16 +104,18 @@ def vehicles(context, lamassu: LamassuResource) -> pd.DataFrame: ''' Define asset job grouping update of stations and vehicles asset. ''' -stations_and_vehicles_job = define_asset_job( - 'stations_and_vehicles_job', - selection=[stations, vehicles], +sharing_station_status_and_vehicles_job = define_asset_job( + 'sharing_station_status_and_vehicles_job', + selection=[sharing_station_status, vehicles], config=in_process_job_config, - description='Pushes stations and vehicles from Lamassu to PostGIS', + description='Pushes sharing_station_status and vehicles from Lamassu to PostGIS', ) -@schedule(job=stations_and_vehicles_job, cron_schedule='* * * * *', default_status=DefaultScheduleStatus.RUNNING) -def update_stations_and_vehicles_minutely(context): +@schedule( + job=sharing_station_status_and_vehicles_job, cron_schedule='* * * * *', default_status=DefaultScheduleStatus.RUNNING +) +def update_sharing_station_status_and_vehicles_minutely(context): """ Run stations_and_vehicles_job in process on the provided schedule (minutely). """ diff --git a/pipeline/resources/lamassu.py b/pipeline/resources/lamassu.py index c4d5e87..e2c2f95 100644 --- a/pipeline/resources/lamassu.py +++ b/pipeline/resources/lamassu.py @@ -31,3 +31,6 @@ def get_vehicles_as_frame(self, feed: dict, system_id: str) -> Optional[DataFram def get_stations_as_frame(self, feed: dict, system_id: str) -> Optional[DataFrame]: return self._lamassu().get_stations_as_frame(feed, system_id) + + def get_station_status_by_form_factor_as_frame(self, feed: dict, system_id: str) -> Optional[DataFrame]: + return self._lamassu().get_station_status_by_form_factor_as_frame(feed, system_id) diff --git a/pipeline/sources/lamassu.py b/pipeline/sources/lamassu.py index 3d9abd8..121766b 100644 --- a/pipeline/sources/lamassu.py +++ b/pipeline/sources/lamassu.py @@ -1,8 +1,6 @@ -# push_lamassu_feeds_to_postgis(EnvVar("DIP_LAMASSU_BASE_URL")) - - import traceback -from typing import Optional +from typing import List, Optional, Union +from urllib.parse import urljoin import geopandas as gpd import pandas as pd @@ -43,14 +41,27 @@ 'geometry', ] +STATION_BY_FORM_FACTOR_COLUMNS = [ + # 'station_id', is already part of index, don't add as column + 'num_bicycles_available', + 'num_cargo_bicycles_available', + 'num_cars_available', + 'num_scooters_seated_available', + 'num_scooters_standing_available', + 'num_mopeds_available', + 'num_others_available', +] class Lamassu: + # Feed ids of feeds, whose scooters are scooter_seated. + scooter_seated_feeds: List[str] = [] lamassu_base_url: str # Timeout used for lamassu requests timeout: int = 2 - def __init__(self, lamassu_base_url: str): + def __init__(self, lamassu_base_url: str, scooter_seated_feeds: Optional[List[str]] = None): self.lamassu_base_url = lamassu_base_url + self.scooter_seated_feeds = scooter_seated_feeds if scooter_seated_feeds else [] def get_systems(self) -> dict: url = urljoin(self.lamassu_base_url, f'gbfs-internal') @@ -71,30 +82,88 @@ def get_system_feeds(self, system_id: str, preferred_feed_languages: list) -> di return {} - def get_stations_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFrame]: - if 'station_information' not in feed or 'station_status' not in feed: + def _group_and_pivot(self, dataframe, index, columns, values): + # unpack index/columns if they are lists, not just single column names + group_cols = [*(index if type(index)==list else [index]), *(columns if type(columns)==list else [columns])] + grouped_sums_df = dataframe.groupby(group_cols).sum(numeric_only=True).reset_index() + return grouped_sums_df.pivot(index=index, columns=columns, values=values) + + def _availability_col_name_for_form_factor(self, feed_id: str, form_factor: str) -> str: + """ + Maps form_factor to corresponding `num__available` column names. + Note: as GBFSv2.3 still supports scooter, which can be scooter_seated or scooter_standing, + we map per default to scooter_standing. If a feed / provider's vehicles are scooter_seated, + these should be explicitly defined in scooter_seated_feeds. + """ + if 'scooter_' in form_factor: + return 'num_scooters_' + form_factor[len('scooter_') :] + '_available' + if 'scooter' == form_factor: + # Note: for now most scooter vehicle_types correspond to scooter_standing. + if feed_id in self.scooter_seated_feeds: + return 'num_scooters_seating_available' + return 'num_scooters_standing_available' + return 'num_' + form_factor + 's_available' + + + def get_station_status_by_form_factor_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFrame]: + if 'station_status' not in feed or 'vehicle_types' not in feed: return None - stations_infos_df = self._load_feed_as_frame(feed['station_information'], 'stations') - stations_status_df = self._load_feed_as_frame(feed['station_status'], 'stations') - if stations_infos_df.empty or stations_status_df.empty: + stations_status_df = self._load_feed_as_frame( + feed['station_status'], + 'stations', + 'vehicle_types_available', + ['station_id', 'num_bikes_available', 'is_renting', 'is_installed'], + [], + ) + vehicle_types_df = self._load_feed_as_frame(feed['vehicle_types'], 'vehicle_types') + + if vehicle_types_df.empty or stations_status_df.empty: return None - merged = pd.merge(stations_infos_df, stations_status_df, on=['station_id']) - # add feed name to do delete insert - merged['feed_id'] = feed_id + # merge station_status and vehicle_type, so we know form_factor for vehicle_types_available + merged = pd.merge(stations_status_df, vehicle_types_df, on=['vehicle_type_id']) # filter those not installed or not renting filtered = merged.loc[(merged['is_renting'] == True) & (merged['is_installed'] == True)] # noqa: E712 + # rename num_bikes_available to upcoming GBFS3 num_vehicles_available + filtered = filtered.rename(columns={'num_bikes_available': 'num_vehicles_available'}) + # convert stations_status into a dataframe, one row per station, a column per form_factor + # reflecting the number of available vehicles of this form_factor + stations_availabilities_by_form_factor_df = self._group_and_pivot( + filtered, ['station_id', 'num_vehicles_available'], 'form_factor', 'count' + ) + # rename form_factor cols to num_s_available + renamings = { + c: self._availability_col_name_for_form_factor(feed_id, c) + for c in stations_availabilities_by_form_factor_df.columns + } + stations_availabilities_by_form_factor_df = stations_availabilities_by_form_factor_df.rename(columns=renamings) + # add feed name + stations_availabilities_by_form_factor_df['feed_id'] = feed_id + + return self._enforce_columns(stations_availabilities_by_form_factor_df, STATION_BY_FORM_FACTOR_COLUMNS) + + def get_stations_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFrame]: + if 'station_information' not in feed: + return None + + stations_infos_df = self._load_feed_as_frame(feed['station_information'], 'stations') + + if stations_infos_df.empty: + return None + + # add feed name to do delete insert + stations_infos_df['feed_id'] = feed_id + # Add geometry - filtered_with_geom = gpd.GeoDataFrame( - filtered, geometry=gpd.points_from_xy(filtered.lon, filtered.lat), crs='EPSG:4326' + stations_infos_df_with_geom = gpd.GeoDataFrame( + stations_infos_df, geometry=gpd.points_from_xy(stations_infos_df.lon, stations_infos_df.lat), crs='EPSG:4326' ) - return self._enforce_columns(filtered_with_geom, STATION_COLUMNS) + return self._enforce_columns(stations_infos_df_with_geom, STATION_COLUMNS) # TODO add operator - # TODO add formfactor # TODO schema sharing def get_vehicles_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFrame]: @@ -137,8 +206,6 @@ def get_vehicles_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFra return self._enforce_columns(filtered_with_geom, VEHICLE_COLUMNS) - # TODO add operator - def _enforce_columns(self, df: pd.DataFrame, column_names: list) -> pd.DataFrame: """ Make sure all intended columns exist in data frame. @@ -153,7 +220,14 @@ def _enforce_columns(self, df: pd.DataFrame, column_names: list) -> pd.DataFrame # restrict to essentiel columns or provide defaults return df[column_names] - def _load_feed_as_frame(self, url: str, element: Optional[str] = None): + def _load_feed_as_frame( + self, + url: str, + element: Optional[str] = None, + record_path: Union[str, List[str], None] = None, + meta: Union[str, List[Union[str, List[str]]], None] = None, + default_record_path=None, + ): """ Loads a specific gbfs endpoint and returns the data node (or the data's node) as a denormalized flat pandas frame. @@ -162,5 +236,8 @@ def _load_feed_as_frame(self, url: str, element: Optional[str] = None): resp.raise_for_status() data = resp.json()['data'][element] if element else resp.json()['data'] - - return pd.json_normalize(data, sep='_') + if isinstance(record_path, str): + for record in data: + if record_path not in record: + record[record_path] = default_record_path + return pd.json_normalize(data, record_path, meta, sep='_') From c03b3f14c50061b0cea0d505db16b5f4be5a878d Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Sun, 3 Dec 2023 14:31:01 +0100 Subject: [PATCH 04/11] fix argument order --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0b34d39..de4a63a 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ dependencies for the different dagster services. `pipeline.Dockerfile` and `dags In addition, you need a postgres database into which the datasets are loaded. This database can be started via -`docker compose up -f docker-compose.dev.yml` +`docker compose -f docker-compose.dev.yml up` ### Running From f63438cbd2c579c8d02b1d33f0150525c604e407 Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Tue, 5 Dec 2023 18:27:02 +0100 Subject: [PATCH 05/11] create dataframes/db tables with intended types --- .../resources/postgis_geopandas_io_manager.py | 7 +- pipeline/sources/lamassu.py | 118 ++++++++++-------- pyproject.toml | 2 +- 3 files changed, 67 insertions(+), 60 deletions(-) diff --git a/pipeline/resources/postgis_geopandas_io_manager.py b/pipeline/resources/postgis_geopandas_io_manager.py index cafb177..dce5c0f 100644 --- a/pipeline/resources/postgis_geopandas_io_manager.py +++ b/pipeline/resources/postgis_geopandas_io_manager.py @@ -79,14 +79,13 @@ def handle_output(self, context: OutputContext, obj: pandas.DataFrame): schema=schema, if_exists='replace', ) - obj_without_index = obj.reset_index() + obj.reset_index() sio = StringIO() - writer = csv.writer(sio, delimiter='\t') - writer.writerows(obj_without_index.values) + obj.to_csv(sio, sep='\t', na_rep='', header=False) sio.seek(0) c = con.connection.cursor() # ignore mypy attribute check, as postgres cursor has custom extension to DBAPICursor: copy_expert - c.copy_expert(f"COPY {schema}.{table} FROM STDIN WITH (FORMAT csv, DELIMITER '\t', NULL 'nan')", sio) # type: ignore[attr-defined] + c.copy_expert(f"COPY {schema}.{table} FROM STDIN WITH (FORMAT csv, DELIMITER '\t')", sio) # type: ignore[attr-defined] con.connection.commit() context.add_output_metadata({'num_rows': len(obj), 'table_name': f'{schema}.{table}'}) elif obj is None: diff --git a/pipeline/sources/lamassu.py b/pipeline/sources/lamassu.py index 121766b..9382e7a 100644 --- a/pipeline/sources/lamassu.py +++ b/pipeline/sources/lamassu.py @@ -5,52 +5,51 @@ import geopandas as gpd import pandas as pd import requests +from geopandas.array import ExtensionDtype, GeometryDtype from sqlalchemy import create_engine, text from sqlalchemy.engine import URL, Connection -from urllib.parse import urljoin + from pipeline.util.urllib import get -# TODO: Fragen MobiData-BW: -# Schema für Stationen und Fahrzeuge? -# Stationen können unterschiedliche Form-Faktoren umfassen. Als welche sollen Sie zurückgegeben werden? -# Wenn keine Fahrzeuge verfügbar sind, anhand derer dies ermittelbar wäre, ist dies nicht beurteilbar. Vorschlag: Kreuzprodukt aus vehicle_types.form_factor? -# num_bikes_available heißt es heute, ich würde auf num_vehicles_available umstellen -STATION_COLUMNS = [ - 'feed_id', - 'station_id', - 'name', - 'num_bikes_available', - 'rental_uris_android', - 'rental_uris_ios', - 'rental_uris_web', - 'last_reported', - 'geometry', -] -VEHICLE_COLUMNS = [ - 'feed_id', - 'vehicle_id', - 'form_factor', - 'name', - 'is_reserved', - 'propulsion_type', - 'max_range_meters', - 'rental_uris_android', - 'rental_uris_ios', - 'rental_uris_web', - 'last_reported', - 'geometry', -] - -STATION_BY_FORM_FACTOR_COLUMNS = [ +# https://pandas.pydata.org/docs/user_guide/basics.html#basics-dtypes +STATION_COLUMNS = { + 'feed_id': pd.StringDtype(), + 'station_id': pd.StringDtype(), + 'name': pd.StringDtype(), + 'rental_uris_android': pd.StringDtype(), + 'rental_uris_ios': pd.StringDtype(), + 'rental_uris_web': pd.StringDtype(), + 'last_reported': pd.DatetimeTZDtype(tz='UTC'), + 'geometry': GeometryDtype(), +} + +VEHICLE_COLUMNS = { + 'feed_id': pd.StringDtype(), + 'vehicle_id': pd.StringDtype(), + 'form_factor': pd.StringDtype(), + 'name': pd.StringDtype(), + 'is_reserved': pd.BooleanDtype(), + 'propulsion_type': pd.StringDtype(), + 'max_range_meters': pd.Int32Dtype(), + 'rental_uris_android': pd.StringDtype(), + 'rental_uris_ios': pd.StringDtype(), + 'rental_uris_web': pd.StringDtype(), + 'last_reported': pd.DatetimeTZDtype(tz='UTC'), + 'geometry': GeometryDtype(), +} + +STATION_BY_FORM_FACTOR_COLUMNS = { # 'station_id', is already part of index, don't add as column - 'num_bicycles_available', - 'num_cargo_bicycles_available', - 'num_cars_available', - 'num_scooters_seated_available', - 'num_scooters_standing_available', - 'num_mopeds_available', - 'num_others_available', -] + 'num_bicycles_available': pd.Int32Dtype(), + 'num_cargo_bicycles_available': pd.Int32Dtype(), + 'num_cars_available': pd.Int32Dtype(), + 'num_scooters_seated_available': pd.Int32Dtype(), + 'num_scooters_standing_available': pd.Int32Dtype(), + 'num_mopeds_available': pd.Int32Dtype(), + 'num_others_available': pd.Int32Dtype(), + 'last_reported': pd.DatetimeTZDtype(tz='UTC'), +} + class Lamassu: # Feed ids of feeds, whose scooters are scooter_seated. @@ -64,13 +63,13 @@ def __init__(self, lamassu_base_url: str, scooter_seated_feeds: Optional[List[st self.scooter_seated_feeds = scooter_seated_feeds if scooter_seated_feeds else [] def get_systems(self) -> dict: - url = urljoin(self.lamassu_base_url, f'gbfs-internal') + url = urljoin(self.lamassu_base_url, 'gbfs-internal') resp = requests.get(url, timeout=self.timeout) resp.raise_for_status() return resp.json()['systems'] def get_system_feeds(self, system_id: str, preferred_feed_languages: list) -> dict: - url = urljoin(self.lamassu_base_url, f'gbfs-internal/{system_id}/gbfs.json') + url = urljoin(self.lamassu_base_url, f'gbfs-internal/{system_id}/gbfs.json') resp = get(url, timeout=self.timeout) resp.raise_for_status() @@ -84,8 +83,16 @@ def get_system_feeds(self, system_id: str, preferred_feed_languages: list) -> di def _group_and_pivot(self, dataframe, index, columns, values): # unpack index/columns if they are lists, not just single column names - group_cols = [*(index if type(index)==list else [index]), *(columns if type(columns)==list else [columns])] + group_cols = [ + *(index if isinstance(index, list) else [index]), + *(columns if isinstance(columns, list) else [columns]), + ] grouped_sums_df = dataframe.groupby(group_cols).sum(numeric_only=True).reset_index() + # convert int64 to Int32, which is sufficiently large and can represent NA, + # which avoids they are converted to float by pivot + for column_name in grouped_sums_df.columns: + if grouped_sums_df[column_name].dtype.name == 'int64': + grouped_sums_df[column_name] = grouped_sums_df[column_name].astype('Int32') return grouped_sums_df.pivot(index=index, columns=columns, values=values) def _availability_col_name_for_form_factor(self, feed_id: str, form_factor: str) -> str: @@ -117,7 +124,7 @@ def get_station_status_by_form_factor_as_frame(self, feed: dict, feed_id: str) - [], ) vehicle_types_df = self._load_feed_as_frame(feed['vehicle_types'], 'vehicle_types') - + if vehicle_types_df.empty or stations_status_df.empty: return None @@ -140,25 +147,26 @@ def get_station_status_by_form_factor_as_frame(self, feed: dict, feed_id: str) - stations_availabilities_by_form_factor_df = stations_availabilities_by_form_factor_df.rename(columns=renamings) # add feed name stations_availabilities_by_form_factor_df['feed_id'] = feed_id - - return self._enforce_columns(stations_availabilities_by_form_factor_df, STATION_BY_FORM_FACTOR_COLUMNS) + return self._enforce_columns(stations_availabilities_by_form_factor_df, STATION_BY_FORM_FACTOR_COLUMNS) def get_stations_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFrame]: if 'station_information' not in feed: return None stations_infos_df = self._load_feed_as_frame(feed['station_information'], 'stations') - + if stations_infos_df.empty: return None # add feed name to do delete insert stations_infos_df['feed_id'] = feed_id - + # Add geometry stations_infos_df_with_geom = gpd.GeoDataFrame( - stations_infos_df, geometry=gpd.points_from_xy(stations_infos_df.lon, stations_infos_df.lat), crs='EPSG:4326' + stations_infos_df, + geometry=gpd.points_from_xy(stations_infos_df.lon, stations_infos_df.lat), + crs='EPSG:4326', ) return self._enforce_columns(stations_infos_df_with_geom, STATION_COLUMNS) @@ -206,19 +214,19 @@ def get_vehicles_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFra return self._enforce_columns(filtered_with_geom, VEHICLE_COLUMNS) - def _enforce_columns(self, df: pd.DataFrame, column_names: list) -> pd.DataFrame: + def _enforce_columns(self, df: pd.DataFrame, column_names: dict[str, ExtensionDtype]) -> pd.DataFrame: """ Make sure all intended columns exist in data frame. Unwanted colums are discarded. Intended, but not yet existing are created with value "None". """ - # - for column in column_names: + # Create missing columns with their appropriate dtype + for column, dtype in column_names.items(): if column not in df: - df[column] = None + df[column] = pd.Series(dtype=dtype) if dtype else None # restrict to essentiel columns or provide defaults - return df[column_names] + return df[column_names.keys()] def _load_feed_as_frame( self, diff --git a/pyproject.toml b/pyproject.toml index 21bbc6d..8aad8e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,5 +69,5 @@ docstring-quotes = "double" [[tool.mypy.overrides]] # See https://github.com/HBNetwork/python-decouple/issues/122 -module = ["geopandas", "osgeo", "osgeo_utils.samples", "dagster_docker"] +module = ["dagster_docker", "geopandas", "geopandas.array", "osgeo", "osgeo_utils.samples"] ignore_missing_imports = true From 9165a81740d77d136d9e3e19a861f97d2da09e81 Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Tue, 5 Dec 2023 18:28:14 +0100 Subject: [PATCH 06/11] define freshness policy for stations --- pipeline/assets/sharing.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pipeline/assets/sharing.py b/pipeline/assets/sharing.py index 7dbf3da..f3df825 100644 --- a/pipeline/assets/sharing.py +++ b/pipeline/assets/sharing.py @@ -3,6 +3,7 @@ DefaultScheduleStatus, DefaultSensorStatus, DynamicPartitionsDefinition, + FreshnessPolicy, RunRequest, ScheduleDefinition, SensorResult, @@ -19,6 +20,7 @@ io_manager_key='pg_gpd_io_manager', compute_kind='Lamassu', group_name='sharing', + freshness_policy=FreshnessPolicy(maximum_lag_minutes=1), ) def sharing_stations(context, lamassu: LamassuResource) -> pd.DataFrame: """ From db33e3c98437f428206b89dc69169debc541240e Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Mon, 18 Dec 2023 09:42:08 +0100 Subject: [PATCH 07/11] catch exceptions per provider and log them Issues with individual providers should not fail the whole publication --- pipeline/assets/sharing.py | 42 ++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/pipeline/assets/sharing.py b/pipeline/assets/sharing.py index f3df825..0a7e4f5 100644 --- a/pipeline/assets/sharing.py +++ b/pipeline/assets/sharing.py @@ -1,3 +1,5 @@ +import logging + import pandas as pd from dagster import ( DefaultScheduleStatus, @@ -35,11 +37,14 @@ def sharing_stations(context, lamassu: LamassuResource) -> pd.DataFrame: systems = lamassu.get_systems() data_frames = [] for system in systems: - system_id = system['id'] - feeds = lamassu.get_system_feeds(system_id) - stations = lamassu.get_stations_as_frame(feeds, system_id) - if stations is not None: - data_frames.append(stations) + try: + system_id = system['id'] + feeds = lamassu.get_system_feeds(system_id) + stations = lamassu.get_stations_as_frame(feeds, system_id) + if stations is not None: + data_frames.append(stations) + except Exception: + logging.exception(f'Error retrieving stations for system {system}') return pd.concat(data_frames) @@ -61,11 +66,15 @@ def sharing_station_status(context, lamassu: LamassuResource) -> pd.DataFrame: systems = lamassu.get_systems() data_frames = [] for system in systems: - system_id = system['id'] - feeds = lamassu.get_system_feeds(system_id) - station_status = lamassu.get_station_status_by_form_factor_as_frame(feeds, system_id) - if station_status is not None: - data_frames.append(station_status) + try: + system_id = system['id'] + feeds = lamassu.get_system_feeds(system_id) + station_status = lamassu.get_station_status_by_form_factor_as_frame(feeds, system_id) + if station_status is not None: + data_frames.append(station_status) + except Exception: + logging.exception(f'Error retrieving sharing_station_status for system {system}') + return pd.concat(data_frames) @@ -86,11 +95,14 @@ def vehicles(context, lamassu: LamassuResource) -> pd.DataFrame: systems = lamassu.get_systems() data_frames = [] for system in systems: - system_id = system['id'] - feeds = lamassu.get_system_feeds(system_id) - vehicles = lamassu.get_vehicles_as_frame(feeds, system_id) - if vehicles is not None: - data_frames.append(vehicles) + try: + system_id = system['id'] + feeds = lamassu.get_system_feeds(system_id) + vehicles = lamassu.get_vehicles_as_frame(feeds, system_id) + if vehicles is not None: + data_frames.append(vehicles) + except Exception: + logging.exception(f'Error retrieving vehicles for system {system}') return pd.concat(data_frames) From bb1d3f04611163bdb585d44c4536a4afd35aae74 Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Mon, 18 Dec 2023 11:47:13 +0100 Subject: [PATCH 08/11] fix: fix vehicle filtering --- pipeline/sources/lamassu.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipeline/sources/lamassu.py b/pipeline/sources/lamassu.py index 9382e7a..b10f202 100644 --- a/pipeline/sources/lamassu.py +++ b/pipeline/sources/lamassu.py @@ -204,8 +204,8 @@ def get_vehicles_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFra # filter those already reserved or disabled # Note: 'is False' results in boolean label can not be used without a boolean index filtered = merged.loc[ - merged.lon.notnull() & (merged['is_reserved'] is False) & (merged['is_disabled'] is False) - ] # noqa: E712 + merged.lon.notnull() & (merged['is_reserved'] == False) & (merged['is_disabled'] == False) # noqa: E712 + ] # Add geometry filtered_with_geom = gpd.GeoDataFrame( From 2395cee41b30258ce2088cfa209369798082c0c6 Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Mon, 18 Dec 2023 11:47:40 +0100 Subject: [PATCH 09/11] fix: rename bike_id to vehicle_id --- pipeline/sources/lamassu.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipeline/sources/lamassu.py b/pipeline/sources/lamassu.py index b10f202..ba731a4 100644 --- a/pipeline/sources/lamassu.py +++ b/pipeline/sources/lamassu.py @@ -189,6 +189,7 @@ def get_vehicles_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFra free_vehicle_status_df = self._load_feed_as_frame(feed['free_bike_status'], 'bikes') cols_to_add = [col for col in ['lon', 'lat'] if col not in free_vehicle_status_df.columns] free_vehicle_status_df.loc[:, cols_to_add] = None + free_vehicle_status_df = free_vehicle_status_df.rename(columns={'bike_id': 'vehicle_id'}) vehicle_types_df = self._load_feed_as_frame(feed['vehicle_types'], 'vehicle_types') From 511d9fe53d75bb018bf38b174631ce99c5d69960 Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Mon, 18 Dec 2023 11:55:44 +0100 Subject: [PATCH 10/11] fix: return sooner if vehicle_status_df empty --- pipeline/sources/lamassu.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pipeline/sources/lamassu.py b/pipeline/sources/lamassu.py index ba731a4..c45a43d 100644 --- a/pipeline/sources/lamassu.py +++ b/pipeline/sources/lamassu.py @@ -187,15 +187,13 @@ def get_vehicles_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFra return None free_vehicle_status_df = self._load_feed_as_frame(feed['free_bike_status'], 'bikes') + if free_vehicle_status_df.empty: + return None cols_to_add = [col for col in ['lon', 'lat'] if col not in free_vehicle_status_df.columns] free_vehicle_status_df.loc[:, cols_to_add] = None free_vehicle_status_df = free_vehicle_status_df.rename(columns={'bike_id': 'vehicle_id'}) vehicle_types_df = self._load_feed_as_frame(feed['vehicle_types'], 'vehicle_types') - - if free_vehicle_status_df.empty: - return None - # Fix issues with duplicate vehicle_type_ids vehicle_types_df = vehicle_types_df.drop_duplicates(subset=['vehicle_type_id'], keep='last') From e895b7b22091f25282a7a59f16d0ea131593b5b4 Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Mon, 18 Dec 2023 11:56:49 +0100 Subject: [PATCH 11/11] fix: enforce column types --- pipeline/sources/lamassu.py | 121 ++++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 48 deletions(-) diff --git a/pipeline/sources/lamassu.py b/pipeline/sources/lamassu.py index c45a43d..391383d 100644 --- a/pipeline/sources/lamassu.py +++ b/pipeline/sources/lamassu.py @@ -1,5 +1,5 @@ import traceback -from typing import List, Optional, Union +from typing import Dict, List, Optional, Union from urllib.parse import urljoin import geopandas as gpd @@ -39,7 +39,8 @@ } STATION_BY_FORM_FACTOR_COLUMNS = { - # 'station_id', is already part of index, don't add as column + 'feed_id': pd.StringDtype(), + 'station_id': pd.StringDtype(), 'num_bicycles_available': pd.Int32Dtype(), 'num_cargo_bicycles_available': pd.Int32Dtype(), 'num_cars_available': pd.Int32Dtype(), @@ -54,6 +55,7 @@ class Lamassu: # Feed ids of feeds, whose scooters are scooter_seated. scooter_seated_feeds: List[str] = [] + # Base URL to a lamassu instance. lamassu_base_url: str # Timeout used for lamassu requests timeout: int = 2 @@ -81,20 +83,6 @@ def get_system_feeds(self, system_id: str, preferred_feed_languages: list) -> di return {} - def _group_and_pivot(self, dataframe, index, columns, values): - # unpack index/columns if they are lists, not just single column names - group_cols = [ - *(index if isinstance(index, list) else [index]), - *(columns if isinstance(columns, list) else [columns]), - ] - grouped_sums_df = dataframe.groupby(group_cols).sum(numeric_only=True).reset_index() - # convert int64 to Int32, which is sufficiently large and can represent NA, - # which avoids they are converted to float by pivot - for column_name in grouped_sums_df.columns: - if grouped_sums_df[column_name].dtype.name == 'int64': - grouped_sums_df[column_name] = grouped_sums_df[column_name].astype('Int32') - return grouped_sums_df.pivot(index=index, columns=columns, values=values) - def _availability_col_name_for_form_factor(self, feed_id: str, form_factor: str) -> str: """ Maps form_factor to corresponding `num__available` column names. @@ -110,7 +98,6 @@ def _availability_col_name_for_form_factor(self, feed_id: str, form_factor: str) return 'num_scooters_seating_available' return 'num_scooters_standing_available' return 'num_' + form_factor + 's_available' - def get_station_status_by_form_factor_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFrame]: if 'station_status' not in feed or 'vehicle_types' not in feed: @@ -120,7 +107,8 @@ def get_station_status_by_form_factor_as_frame(self, feed: dict, feed_id: str) - feed['station_status'], 'stations', 'vehicle_types_available', - ['station_id', 'num_bikes_available', 'is_renting', 'is_installed'], + # Note: in gbfs 2.3, num_bikes_available means num_vehicles_available, will be renamed in v3 + ['station_id', 'num_bikes_available', 'is_renting', 'is_installed', 'last_reported'], [], ) vehicle_types_df = self._load_feed_as_frame(feed['vehicle_types'], 'vehicle_types') @@ -137,7 +125,7 @@ def get_station_status_by_form_factor_as_frame(self, feed: dict, feed_id: str) - # convert stations_status into a dataframe, one row per station, a column per form_factor # reflecting the number of available vehicles of this form_factor stations_availabilities_by_form_factor_df = self._group_and_pivot( - filtered, ['station_id', 'num_vehicles_available'], 'form_factor', 'count' + filtered, ['station_id', 'num_vehicles_available', 'last_reported'], 'form_factor', 'count' ) # rename form_factor cols to num_s_available renamings = { @@ -145,10 +133,9 @@ def get_station_status_by_form_factor_as_frame(self, feed: dict, feed_id: str) - for c in stations_availabilities_by_form_factor_df.columns } stations_availabilities_by_form_factor_df = stations_availabilities_by_form_factor_df.rename(columns=renamings) - # add feed name - stations_availabilities_by_form_factor_df['feed_id'] = feed_id - - return self._enforce_columns(stations_availabilities_by_form_factor_df, STATION_BY_FORM_FACTOR_COLUMNS) + return self._postprocess_columns_and_types( + stations_availabilities_by_form_factor_df, feed_id, STATION_BY_FORM_FACTOR_COLUMNS, 'station_id' + ) def get_stations_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFrame]: if 'station_information' not in feed: @@ -159,9 +146,6 @@ def get_stations_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFra if stations_infos_df.empty: return None - # add feed name to do delete insert - stations_infos_df['feed_id'] = feed_id - # Add geometry stations_infos_df_with_geom = gpd.GeoDataFrame( stations_infos_df, @@ -169,10 +153,7 @@ def get_stations_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFra crs='EPSG:4326', ) - return self._enforce_columns(stations_infos_df_with_geom, STATION_COLUMNS) - - # TODO add operator - # TODO schema sharing + return self._postprocess_columns_and_types(stations_infos_df_with_geom, feed_id, STATION_COLUMNS, 'station_id') def get_vehicles_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFrame]: """ @@ -199,33 +180,17 @@ def get_vehicles_as_frame(self, feed: dict, feed_id: str) -> Optional[pd.DataFra # Join vehicles and their type informatoin merged = pd.merge(free_vehicle_status_df, vehicle_types_df, on=['vehicle_type_id']) - merged['feed_id'] = feed_id # filter those already reserved or disabled # Note: 'is False' results in boolean label can not be used without a boolean index filtered = merged.loc[ - merged.lon.notnull() & (merged['is_reserved'] == False) & (merged['is_disabled'] == False) # noqa: E712 + merged.lon.notnull() & (merged['is_reserved'] == False) & (merged['is_disabled'] == False) # noqa: E712 ] # Add geometry filtered_with_geom = gpd.GeoDataFrame( filtered, geometry=gpd.points_from_xy(filtered.lon, filtered.lat), crs='EPSG:4326' ) - - return self._enforce_columns(filtered_with_geom, VEHICLE_COLUMNS) - - def _enforce_columns(self, df: pd.DataFrame, column_names: dict[str, ExtensionDtype]) -> pd.DataFrame: - """ - Make sure all intended columns exist in data frame. - Unwanted colums are discarded. Intended, but not yet - existing are created with value "None". - """ - # Create missing columns with their appropriate dtype - for column, dtype in column_names.items(): - if column not in df: - df[column] = pd.Series(dtype=dtype) if dtype else None - - # restrict to essentiel columns or provide defaults - return df[column_names.keys()] + return self._postprocess_columns_and_types(filtered_with_geom, feed_id, VEHICLE_COLUMNS, 'vehicle_id') def _load_feed_as_frame( self, @@ -248,3 +213,63 @@ def _load_feed_as_frame( if record_path not in record: record[record_path] = default_record_path return pd.json_normalize(data, record_path, meta, sep='_') + + @staticmethod + def _coerce_int64_to_int32(dataframe: pd.DataFrame) -> None: + """ + convert int64 to Int32, which can represent NA, and is + sufficiently large to represent expected values. + """ + for column_name in dataframe.columns: + if dataframe[column_name].dtype.name == 'int64': + dataframe[column_name] = dataframe[column_name].astype('Int32') + + @staticmethod + def _group_and_pivot( + dataframe: pd.DataFrame, index: Union[list, str], columns: Union[list, str], values: Union[list, str] + ) -> pd.DataFrame: + """ + Groups the dataframe by index and columns, summing up the value columns and returning them pivoted. + """ + # unpack index/columns if they are lists, not just single column names + group_cols = [ + *(index if isinstance(index, list) else [index]), + *(columns if isinstance(columns, list) else [columns]), + ] + grouped_sums_df = dataframe.groupby(group_cols).sum(numeric_only=True).reset_index() + Lamassu._coerce_int64_to_int32(grouped_sums_df) + return grouped_sums_df.pivot(index=index, columns=columns, values=values) + + @staticmethod + def _postprocess_columns_and_types( + df: pd.DataFrame, feed_id: str, enforced_columns: Dict[str, ExtensionDtype], index: str + ) -> pd.DataFrame: + """ + Performs some datafram post-processsing common to all feature frames: + * adds the given feed_id as column + * sets the given index column as new index + * converts last_reported from epoch to datetime column + * assures that the returned dataframe has excactly the enforced_columns. Columns not contained in enforced_columns + will be dropped, not yet existing columns created with their aassigned type, existing coerced to the given type + """ + df = df.reset_index() + df['feed_id'] = feed_id + # convert seconds since epoch into datetime + df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', utc=True) + df_with_enforced_columns = Lamassu._enforce_columns(df, enforced_columns) + return df_with_enforced_columns.set_index(index) + + @staticmethod + def _enforce_columns(df: pd.DataFrame, column_names: dict[str, ExtensionDtype]) -> pd.DataFrame: + """ + Make sure all intended columns exist in data frame. + Unwanted colums are discarded. Intended, but not yet + existing are created with value "None". + """ + # Create missing columns with their appropriate dtype + for column, dtype in column_names.items(): + if column not in df: + df[column] = pd.Series(dtype=dtype) if dtype else None + + # restrict to essentiel columns or provide defaults + return df[column_names.keys()]