From af28ed2e18bf9a9f1b1cac3255e158e8b5d11669 Mon Sep 17 00:00:00 2001 From: luutuankiet Date: Wed, 21 Feb 2024 12:54:56 +0700 Subject: [PATCH] offload ticktickpy api calls to separate runner always on --- app/ETL/EL.py | 191 +------------------------------------ app/ETL/definitions.py | 5 +- app/ETL/loader.py | 208 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 210 insertions(+), 194 deletions(-) create mode 100644 app/ETL/loader.py diff --git a/app/ETL/EL.py b/app/ETL/EL.py index c069d0c..3289cc8 100644 --- a/app/ETL/EL.py +++ b/app/ETL/EL.py @@ -1,200 +1,11 @@ -from ticktick.oauth2 import OAuth2 # OAuth2 Manager -from ticktick.api import TickTickClient # Main Interface -from os import environ -import json import os -from datetime import datetime, timedelta,timezone -import logging from dagster import asset,AssetExecutionContext import duckdb import pandas as pd -from helper.source_env import dotenv_path,raw_path +from helper.source_env import raw_path -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s' -) - -cache_path=os.path.join(dotenv_path,'.token-oauth') - -client_id=environ.get('client_id') -client_secret=environ.get('client_secret') -username=environ.get('username') -password=environ.get('password') -redirect_uri=environ.get('redirect_uri') - -tasks_file_path = os.path.join(raw_path,'tasks.json') -lists_file_path = os.path.join(raw_path,'lists.json') -folders_file_path = os.path.join(raw_path,'folders.json') - -default_start = datetime(2022, 7, 23,tzinfo=timezone.utc) -date_format = '%Y-%m-%dT%H:%M:%S.%f%z' - - -class TickTickClientWrapper: - """ - wrapper so that the ticktick client class wont be triggered by arbitrary server events like dagster webserver checks. - goal : create a single client instance that all funcs subscribes to. - """ - _instance = None - - - # this __new__ func is called whenever this wrapper class is called. - def __new__(cls,client_id,client_secret,username,password,redirect_uri,cache_path): - if not cls._instance: - cls._instance = super(TickTickClientWrapper, cls).__new__(cls) - cls._instance.client_id = client_id - cls._instance.client_secret = client_secret - cls._instance.username = username - cls._instance.password = password - cls._instance.redirect_uri = redirect_uri - cls._instance.cache_path = cache_path - cls._instance.client = None - return cls._instance - # this makes sure only a single instance of the class is used across the module whichever func calls this class. - # the _instance attribute now holds all the suceeding function's output. - - - def _initialize_client(self): - auth_client = OAuth2(client_id=self.client_id, - client_secret=self.client_secret, - redirect_uri=self.redirect_uri, - cache_path=self.cache_path - ) - self.client = TickTickClient(self.username, self.password, auth_client) - - def get_client(self): - if self.client is None: - self._initialize_client() - return self.client - -# one time call the client instance. -wrapper = TickTickClientWrapper(client_id,client_secret,username,password,redirect_uri,cache_path) - - -def deduplicate(source) -> list: - """ - checks each item and remove duplicated - """ - unique_items={} - unique_list=[] - - for item in source: - item_id=item.get("id") - - if item_id not in unique_items: - unique_list.append(item) - unique_items[item_id]=True - return unique_list - - -def _get_completed_tasks(start=None, end=datetime.now(timezone.utc), full_load=True): - """_summary_ - returns a json string - internal func - uses tickpy to grab completed tasks from start > end. - `start` has a default value 2022/07/23 which is the start of my ticktick interactions - `end` defaults to the runtime date. - - Returns: - list: all the completed tasks in the interval. - """ - - completed_tasks=[] - logging.info('start loading tasks') - client=wrapper.get_client() - - if full_load: - current_date=default_start - elif not full_load: - current_date=start - while current_date <= end: - tasks=client.task.get_completed(current_date) - if tasks != []: - for task in tasks: - completed_tasks.append(task) - logging.info(f'loaded {len(tasks)} new tasks from {current_date}. next interation...') - - current_date += timedelta(days=1) - # completed_tasks=json.dumps(completed_tasks) - return completed_tasks - -def get_completed_task() -> list: - """ - returns a list the full completed tasks and utilize existing file as cache if available. - """ - try: - with open(tasks_file_path,'r') as f: - cached=json.load(f) - cached_completed=[item for item in cached if 'completedTime' in item] - last_cached_date=[item['completedTime'] for item in cached if 'completedTime' in item] - last_cached_date=max(last_cached_date) - last_cached_date=datetime.strptime(last_cached_date,date_format) - last_cached_date=last_cached_date - timedelta(days=1) - full_load=False - metadata=last_cached_date - except FileNotFoundError: - logging.info('no cache found. doing full load...') - cached_completed=[] - last_cached_date=None - full_load=True - metadata=None - - # checks existing and append to cached list - net_new=_get_completed_tasks(start=last_cached_date,full_load=full_load) - - # concatenate final completed list - all_completed_tasks=net_new+cached_completed - all_completed_tasks=deduplicate(all_completed_tasks) - return all_completed_tasks,metadata - - -def get_new_tasks() -> list: - client = wrapper.get_client() - new_tasks=client.state['tasks'] - return new_tasks @asset(compute_kind='Python') -def extract_json(context: AssetExecutionContext): - # task - new=get_new_tasks() - context.log.info(f'new tasks : {len(new)}') - completed,metadata=get_completed_task() - context.log.info(f'completed tasks : {len(completed)}. cached from : {metadata}') - all_tasks=new+completed - context.log.info(f'all tasks : {len(all_tasks)}') - - - client = wrapper.get_client() - - # list - lists = client.state['projects'] - - # folders - folders = client.state['project_folders'] - - - return all_tasks,folders,lists - - - -def _dump_to_file(source:list, target:str): - """ - takes source then dumps to json raw file - """ - - with open(target,'w') as f: - json.dump(source,f,indent=4,) - -@asset(compute_kind='Python') -def dump_to_file(extract_json): - all_tasks,folders,lists = extract_json - _dump_to_file(lists,lists_file_path) - _dump_to_file(folders,folders_file_path) - _dump_to_file(all_tasks,tasks_file_path) - return None - - -@asset(deps=[dump_to_file],compute_kind='Python') def dump_to_motherduck(context: AssetExecutionContext): motherduck_token=os.environ.get('motherduck_token') diff --git a/app/ETL/definitions.py b/app/ETL/definitions.py index 4a25f1b..9c1d8de 100644 --- a/app/ETL/definitions.py +++ b/app/ETL/definitions.py @@ -1,14 +1,11 @@ import os -from dagster import Definitions, AssetSelection,ScheduleDefinition,define_asset_job,load_assets_from_modules +from dagster import Definitions,ScheduleDefinition,define_asset_job,load_assets_from_modules from dagster_dbt import DbtCliResource -# from .dbt_assets import ticktick_dbt_assets from .constants import DBT_PROJECT_DIR -# from .EL import get_all_tasks,dump_to_file,get_lists,get_folders,dump_to_motherduck from. import EL,dbt_assets all_assets = load_assets_from_modules([EL,dbt_assets]) -# ETL_job = define_asset_job("ETL_job",selection=AssetSelection.all()) ETL_job = define_asset_job("ETL_job",selection=all_assets) ETL_schedule = ScheduleDefinition( job=ETL_job, diff --git a/app/ETL/loader.py b/app/ETL/loader.py new file mode 100644 index 0000000..883dda3 --- /dev/null +++ b/app/ETL/loader.py @@ -0,0 +1,208 @@ +# a standalone job to run once and stay on for the duration of the machine. +# this helps retain one tick tick api call in the duration. + +from ticktick.oauth2 import OAuth2 # OAuth2 Manager +from ticktick.api import TickTickClient # Main Interface +from os import environ +import json +import os +from datetime import datetime, timedelta,timezone +import logging +from dagster import asset,AssetExecutionContext +import duckdb +import pandas as pd +from helper.source_env import dotenv_path,raw_path +import time + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) + +cache_path=os.path.join(dotenv_path,'.token-oauth') + +client_id=environ.get('client_id') +client_secret=environ.get('client_secret') +username=environ.get('username') +password=environ.get('password') +redirect_uri=environ.get('redirect_uri') + +tasks_file_path = os.path.join(raw_path,'tasks.json') +lists_file_path = os.path.join(raw_path,'lists.json') +folders_file_path = os.path.join(raw_path,'folders.json') + +default_start = datetime(2022, 7, 23,tzinfo=timezone.utc) +date_format = '%Y-%m-%dT%H:%M:%S.%f%z' + + +class TickTickClientWrapper: + """ + wrapper so that the ticktick client class wont be triggered by arbitrary server events like dagster webserver checks. + goal : create a single client instance that all funcs subscribes to. + """ + _instance = None + + + # this __new__ func is called whenever this wrapper class is called. + def __new__(cls,client_id,client_secret,username,password,redirect_uri,cache_path): + if not cls._instance: + cls._instance = super(TickTickClientWrapper, cls).__new__(cls) + cls._instance.client_id = client_id + cls._instance.client_secret = client_secret + cls._instance.username = username + cls._instance.password = password + cls._instance.redirect_uri = redirect_uri + cls._instance.cache_path = cache_path + cls._instance.client = None + return cls._instance + # this makes sure only a single instance of the class is used across the module whichever func calls this class. + # the _instance attribute now holds all the suceeding function's output. + + + def _initialize_client(self): + auth_client = OAuth2(client_id=self.client_id, + client_secret=self.client_secret, + redirect_uri=self.redirect_uri, + cache_path=self.cache_path + ) + self.client = TickTickClient(self.username, self.password, auth_client) + + def get_client(self): + if self.client is None: + self._initialize_client() + return self.client + +# one time call the client instance. +wrapper = TickTickClientWrapper(client_id,client_secret,username,password,redirect_uri,cache_path) + + +def deduplicate(source) -> list: + """ + checks each item and remove duplicated + """ + unique_items={} + unique_list=[] + + for item in source: + item_id=item.get("id") + + if item_id not in unique_items: + unique_list.append(item) + unique_items[item_id]=True + return unique_list + + +def _get_completed_tasks(start=None, end=datetime.now(timezone.utc), full_load=True): + """_summary_ + returns a json string + internal func - uses tickpy to grab completed tasks from start > end. + `start` has a default value 2022/07/23 which is the start of my ticktick interactions + `end` defaults to the runtime date. + + Returns: + list: all the completed tasks in the interval. + """ + + completed_tasks=[] + logging.info('start loading tasks') + client=wrapper.get_client() + + if full_load: + current_date=default_start + elif not full_load: + current_date=start + while current_date <= end: + tasks=client.task.get_completed(current_date) + if tasks != []: + for task in tasks: + completed_tasks.append(task) + logging.info(f'loaded {len(tasks)} new tasks from {current_date}. next interation...') + + current_date += timedelta(days=1) + # completed_tasks=json.dumps(completed_tasks) + return completed_tasks + +def get_completed_task() -> list: + """ + returns a list the full completed tasks and utilize existing file as cache if available. + """ + try: + with open(tasks_file_path,'r') as f: + cached=json.load(f) + cached_completed=[item for item in cached if 'completedTime' in item] + last_cached_date=[item['completedTime'] for item in cached if 'completedTime' in item] + last_cached_date=max(last_cached_date) + last_cached_date=datetime.strptime(last_cached_date,date_format) + last_cached_date=last_cached_date - timedelta(days=1) + full_load=False + metadata=last_cached_date + except FileNotFoundError: + logging.info('no cache found. doing full load...') + cached_completed=[] + last_cached_date=None + full_load=True + metadata=None + + # checks existing and append to cached list + net_new=_get_completed_tasks(start=last_cached_date,full_load=full_load) + + # concatenate final completed list + all_completed_tasks=net_new+cached_completed + all_completed_tasks=deduplicate(all_completed_tasks) + return all_completed_tasks,metadata + + +def get_new_tasks() -> list: + client = wrapper.get_client() + new_tasks=client.state['tasks'] + return new_tasks + + +def extract_json(): + # task + new=get_new_tasks() + logging.info(f'new tasks : {len(new)}') + completed,metadata=get_completed_task() + logging.info(f'completed tasks : {len(completed)}. cached from : {metadata}') + all_tasks=new+completed + logging.info(f'all tasks : {len(all_tasks)}') + + + client = wrapper.get_client() + + # list + lists = client.state['projects'] + + # folders + folders = client.state['project_folders'] + + + return all_tasks,folders,lists + + + +def _dump_to_file(source:list, target:str): + """ + takes source then dumps to json raw file + """ + + with open(target,'w') as f: + json.dump(source,f,indent=4,) + +def dump_to_file(extract_json): + all_tasks,folders,lists = extract_json + _dump_to_file(lists,lists_file_path) + _dump_to_file(folders,folders_file_path) + _dump_to_file(all_tasks,tasks_file_path) + return None + +if __name__ == '__main__': + while True: + sleep_time = 60*30 + logging.info('start loading...') + # extract_json() + dump_to_file(extract_json()) + logging.info(f'done loading. next iteration in {sleep_time} seconds...') + time.sleep(sleep_time) # test + # time.sleep(60*30) + \ No newline at end of file