From 3b2370ec9b49113bb725b474eab0588f5e65c0ad Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Tue, 19 Dec 2023 00:31:03 +0100 Subject: [PATCH 1/4] update sharing stations hourly --- pipeline/assets/sharing.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pipeline/assets/sharing.py b/pipeline/assets/sharing.py index 0a7e4f5..37dff90 100644 --- a/pipeline/assets/sharing.py +++ b/pipeline/assets/sharing.py @@ -2,6 +2,7 @@ import pandas as pd from dagster import ( + AutoMaterializePolicy, DefaultScheduleStatus, DefaultSensorStatus, DynamicPartitionsDefinition, @@ -22,7 +23,8 @@ io_manager_key='pg_gpd_io_manager', compute_kind='Lamassu', group_name='sharing', - freshness_policy=FreshnessPolicy(maximum_lag_minutes=1), + freshness_policy=FreshnessPolicy(maximum_lag_minutes=60), + auto_materialize_policy=AutoMaterializePolicy.eager(), ) def sharing_stations(context, lamassu: LamassuResource) -> pd.DataFrame: """ From c2bb2781142f798b26a4e12922278190c54be139 Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Tue, 19 Dec 2023 00:32:47 +0100 Subject: [PATCH 2/4] fix: handle nullable last_reported for vehicles --- pipeline/sources/lamassu.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pipeline/sources/lamassu.py b/pipeline/sources/lamassu.py index 391383d..c98ac23 100644 --- a/pipeline/sources/lamassu.py +++ b/pipeline/sources/lamassu.py @@ -254,8 +254,9 @@ def _postprocess_columns_and_types( """ df = df.reset_index() df['feed_id'] = feed_id - # convert seconds since epoch into datetime - df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', utc=True) + # convert seconds since epoch into datetime, if available (for vehicles, it's optional) + if 'last_reported' in df.columns: + df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', utc=True, errors='coerce') df_with_enforced_columns = Lamassu._enforce_columns(df, enforced_columns) return df_with_enforced_columns.set_index(index) From 4a39ded1d048b3d01a854d316896f4f2b9be804b Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Tue, 19 Dec 2023 00:33:58 +0100 Subject: [PATCH 3/4] add range properties for vehicles, remove is_reserved --- pipeline/sources/lamassu.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipeline/sources/lamassu.py b/pipeline/sources/lamassu.py index c98ac23..4e4047f 100644 --- a/pipeline/sources/lamassu.py +++ b/pipeline/sources/lamassu.py @@ -28,8 +28,9 @@ 'vehicle_id': pd.StringDtype(), 'form_factor': pd.StringDtype(), 'name': pd.StringDtype(), - 'is_reserved': pd.BooleanDtype(), 'propulsion_type': pd.StringDtype(), + 'current_fuel_percent': pd.Float32Dtype(), + 'current_range_meters': pd.Int32Dtype(), 'max_range_meters': pd.Int32Dtype(), 'rental_uris_android': pd.StringDtype(), 'rental_uris_ios': pd.StringDtype(), From 7a507164354103445f87a7469a848a9bb359cbe6 Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Tue, 19 Dec 2023 00:35:29 +0100 Subject: [PATCH 4/4] fix: include index columns for geodataframes --- pipeline/resources/postgis_geopandas_io_manager.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipeline/resources/postgis_geopandas_io_manager.py b/pipeline/resources/postgis_geopandas_io_manager.py index dce5c0f..9fe011a 100644 --- a/pipeline/resources/postgis_geopandas_io_manager.py +++ b/pipeline/resources/postgis_geopandas_io_manager.py @@ -171,8 +171,8 @@ def handle_output(self, context: OutputContext, obj: geopandas.GeoDataFrame): schema, table = self._get_schema_table(context.asset_key) if isinstance(obj, geopandas.GeoDataFrame): - len(obj) with connect_postgresql(config=self._config) as con: + self._create_schema_if_not_exists(schema, con) if context.has_partition_key: # add additional column (name? for now just partition) # to the frame and initialize with partition_name @@ -189,9 +189,9 @@ def handle_output(self, context: OutputContext, obj: geopandas.GeoDataFrame): # All data can be replaced (e.g. deleted before insertion). # geopandas will take care of this. if_exists_action = 'replace' - - self._create_schema_if_not_exists(schema, con) - obj.to_postgis(con=con, name=table, schema=schema, if_exists=if_exists_action, chunksize=self.chunksize) + obj.to_postgis( + con=con, name=table, index=True, schema=schema, if_exists=if_exists_action, chunksize=self.chunksize + ) context.add_output_metadata({'num_rows': len(obj), 'table_name': f'{schema}.{table}'}) else: super().handle_output(context, obj)