Skip to content

Commit

Permalink
fix new dagster asset materilization
Browse files Browse the repository at this point in the history
  • Loading branch information
luutuankiet committed Feb 26, 2024
1 parent 409855f commit e0b63c6
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deployment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
. ./env_init.sh
. ./.env

pip install --upgrade -q -r requirements.txt
# pip install --upgrade -q -r requirements.txt

# setup sessions for service
STREAMLIT="streamlit"
Expand Down
11 changes: 8 additions & 3 deletions app/ETL/EL.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from dagster import asset,AssetExecutionContext,AssetOut, multi_asset, load_assets_from_modules
from dagster import AssetMaterialization, asset,AssetExecutionContext,AssetOut, multi_asset, AssetKey
from dagster_dbt import get_asset_keys_by_output_name_for_source
import duckdb
import pandas as pd
Expand Down Expand Up @@ -30,11 +30,16 @@ def dump_to_motherduck(context: AssetExecutionContext):
cur = con.cursor()
cur.sql(f"CREATE OR REPLACE TABLE {entity}_raw as SELECT * FROM entity_df")
context.log.info(f'loaded {len(entity_df)} rows to {entity}_raw table.')
asset_key = AssetKey(name=f"{entity}_raw")

# yield the materialization result
yield AssetMaterialization(asset_key=asset_key,
metadata={'num_rows': len(entity_df)},
description=f'Successfully loaded {len(entity_df)} rows to {entity}_raw table.'
)
finally:
con.close()
cur.close()
return None




0 comments on commit e0b63c6

Please sign in to comment.