-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEL.py
30 lines (22 loc) · 925 Bytes
/
EL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import os
from dagster import asset,AssetExecutionContext
import duckdb
import pandas as pd
from helper.source_env import raw_path
@asset(compute_kind='Python')
def dump_to_motherduck(context: AssetExecutionContext):
motherduck_token=os.environ.get('motherduck_token')
entities = ['tasks','lists','folders']
for entity in entities:
entity_path = os.path.join(raw_path,f'{entity}.json')
entity_df = pd.read_json(entity_path,dtype=str)
try:
context.log.info(f'loading {entity} to motherduck...')
con = duckdb.connect(f'md:ticktick_gtd??motherduck_token={motherduck_token}')
cur = con.cursor()
cur.sql(f"CREATE OR REPLACE TABLE {entity}_raw as SELECT * FROM entity_df")
context.log.info(f'loaded {len(entity_df)} rows to {entity}_raw table.')
finally:
con.close()
cur.close()
return None