Skip to content

Commit

Permalink
Merge pull request #54 from mobidata-bw/sharing-changes
Browse files Browse the repository at this point in the history
Sharing Asset changes
  • Loading branch information
hbruch authored Dec 19, 2023
2 parents ae74d7f + 7a50716 commit 1bf6a66
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 8 deletions.
4 changes: 3 additions & 1 deletion pipeline/assets/sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pandas as pd
from dagster import (
AutoMaterializePolicy,
DefaultScheduleStatus,
DefaultSensorStatus,
DynamicPartitionsDefinition,
Expand All @@ -22,7 +23,8 @@
io_manager_key='pg_gpd_io_manager',
compute_kind='Lamassu',
group_name='sharing',
freshness_policy=FreshnessPolicy(maximum_lag_minutes=1),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def sharing_stations(context, lamassu: LamassuResource) -> pd.DataFrame:
"""
Expand Down
8 changes: 4 additions & 4 deletions pipeline/resources/postgis_geopandas_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ def handle_output(self, context: OutputContext, obj: geopandas.GeoDataFrame):
schema, table = self._get_schema_table(context.asset_key)

if isinstance(obj, geopandas.GeoDataFrame):
len(obj)
with connect_postgresql(config=self._config) as con:
self._create_schema_if_not_exists(schema, con)
if context.has_partition_key:
# add additional column (name? for now just partition)
# to the frame and initialize with partition_name
Expand All @@ -189,9 +189,9 @@ def handle_output(self, context: OutputContext, obj: geopandas.GeoDataFrame):
# All data can be replaced (e.g. deleted before insertion).
# geopandas will take care of this.
if_exists_action = 'replace'

self._create_schema_if_not_exists(schema, con)
obj.to_postgis(con=con, name=table, schema=schema, if_exists=if_exists_action, chunksize=self.chunksize)
obj.to_postgis(
con=con, name=table, index=True, schema=schema, if_exists=if_exists_action, chunksize=self.chunksize
)
context.add_output_metadata({'num_rows': len(obj), 'table_name': f'{schema}.{table}'})
else:
super().handle_output(context, obj)
Expand Down
8 changes: 5 additions & 3 deletions pipeline/sources/lamassu.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
'vehicle_id': pd.StringDtype(),
'form_factor': pd.StringDtype(),
'name': pd.StringDtype(),
'is_reserved': pd.BooleanDtype(),
'propulsion_type': pd.StringDtype(),
'current_fuel_percent': pd.Float32Dtype(),
'current_range_meters': pd.Int32Dtype(),
'max_range_meters': pd.Int32Dtype(),
'rental_uris_android': pd.StringDtype(),
'rental_uris_ios': pd.StringDtype(),
Expand Down Expand Up @@ -254,8 +255,9 @@ def _postprocess_columns_and_types(
"""
df = df.reset_index()
df['feed_id'] = feed_id
# convert seconds since epoch into datetime
df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', utc=True)
# convert seconds since epoch into datetime, if available (for vehicles, it's optional)
if 'last_reported' in df.columns:
df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', utc=True, errors='coerce')
df_with_enforced_columns = Lamassu._enforce_columns(df, enforced_columns)
return df_with_enforced_columns.set_index(index)

Expand Down

0 comments on commit 1bf6a66

Please sign in to comment.