From e0b63c61c94a693637aa822f93e3b5da01dc3a2b Mon Sep 17 00:00:00 2001 From: luutuankiet Date: Mon, 26 Feb 2024 17:48:18 +0700 Subject: [PATCH] fix new dagster asset materilization --- .github/workflows/deployment.sh | 2 +- app/ETL/EL.py | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/.github/workflows/deployment.sh b/.github/workflows/deployment.sh index 48dd26c..c7c30a8 100755 --- a/.github/workflows/deployment.sh +++ b/.github/workflows/deployment.sh @@ -4,7 +4,7 @@ . ./env_init.sh . ./.env -pip install --upgrade -q -r requirements.txt +# pip install --upgrade -q -r requirements.txt # setup sessions for service STREAMLIT="streamlit" diff --git a/app/ETL/EL.py b/app/ETL/EL.py index d2a9197..5e8651f 100644 --- a/app/ETL/EL.py +++ b/app/ETL/EL.py @@ -1,5 +1,5 @@ import os -from dagster import asset,AssetExecutionContext,AssetOut, multi_asset, load_assets_from_modules +from dagster import AssetMaterialization, asset,AssetExecutionContext,AssetOut, multi_asset, AssetKey from dagster_dbt import get_asset_keys_by_output_name_for_source import duckdb import pandas as pd @@ -30,11 +30,16 @@ def dump_to_motherduck(context: AssetExecutionContext): cur = con.cursor() cur.sql(f"CREATE OR REPLACE TABLE {entity}_raw as SELECT * FROM entity_df") context.log.info(f'loaded {len(entity_df)} rows to {entity}_raw table.') + asset_key = AssetKey(name=f"{entity}_raw") + + # yield the materialization result + yield AssetMaterialization(asset_key=asset_key, + metadata={'num_rows': len(entity_df)}, + description=f'Successfully loaded {len(entity_df)} rows to {entity}_raw table.' + ) finally: con.close() cur.close() - return None -