Skip to content

Commit

Permalink
Merge pull request #5 from tejtw/v1.1.1
Browse files Browse the repository at this point in the history
V1.1.1
  • Loading branch information
Han860207 authored Feb 15, 2024
2 parents f98c2cb + c5c373e commit ae7367b
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 640 deletions.
6 changes: 0 additions & 6 deletions TejToolAPI/Error.py

This file was deleted.

10 changes: 5 additions & 5 deletions TejToolAPI/Map_Dask_API.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,8 @@ def getMRAnnd_np(data):
# The last row represents the data with the greatest mdate
# data = data.drop_duplicates(subset=['coid', 'key3','annd'], keep='last')
data['ver'] = data['mdate'].astype(str) + '-' + data['no']
data = data.groupby(['coid', 'key3','annd'], as_index=False).max()
data = data.groupby(['coid','key3']).apply(lambda x: np.fmax.accumulate(x, axis=0))
data = data.groupby(['coid', 'key3','annd'], as_index=False, group_keys = False).max()
data = data.groupby(['coid','key3'], group_keys = False).apply(lambda x: np.fmax.accumulate(x, axis=0))
data = parallelize_ver_process(data)
data = data.drop(columns = 'ver')

Expand All @@ -415,8 +415,8 @@ def get_announce_date(tickers, **kwargs):
# The last row represents the data with the greatest mdate
# data = data.drop_duplicates(subset=['coid', 'key3','annd'], keep='last')
data['ver'] = data['mdate'].astype(str) + '-' + data['no']
data = data.groupby(['coid', 'key3','annd'], as_index=False).max()
data = data.groupby(['coid','key3']).apply(lambda x: np.fmax.accumulate(x, axis=0))
data = data.groupby(['coid', 'key3','annd'], as_index=False, group_keys = False).max()
data = data.groupby(['coid','key3'], group_keys = False).apply(lambda x: np.fmax.accumulate(x, axis=0))
data = parallelize_ver_process(data)
data.drop(columns = 'ver')

Expand All @@ -432,7 +432,7 @@ def parallize_annd_process(data, annd = 'annd'):
# print(uni_dates)

# 傳入 ExchangeCalendar 物件
result = vectorized_annd_adjusted(para.exc, uni_dates)
result = vectorized_annd_adjusted(para.exc, uni_dates, False)

# Create a mapping dictionary
dict_map = {uni_dates[i]:result[i] for i in range(len(result))}
Expand Down
92 changes: 76 additions & 16 deletions TejToolAPI/TejToolAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import gc
from . import parameters as para
from . import Map_Dask_API as dask_api
from .utils import get_api_key_info
import dask
from .meta_types import Meta_Types

Expand All @@ -23,6 +24,7 @@ def get_history_data(ticker:list, columns:list = [], fin_type:list = ['A','Q','T
transfer_to_chinese = kwargs.get('transfer_to_chinese', False)
npartitions = kwargs.get('npartitions', para.npartitions_local)
require_annd = kwargs.get('require_annd', False)
show_progress = kwargs.get('show_progress', True)

# Shift start date 180 days backward.
start_dt = pd.to_datetime(start)
Expand All @@ -33,6 +35,7 @@ def get_history_data(ticker:list, columns:list = [], fin_type:list = ['A','Q','T
# Triggers
all_tables = triggers(ticker = ticker, columns= columns, start= shift_start, end= end, fin_type= fin_type, include_self_acc= include_self_acc, npartitions = npartitions)


# Combind fin_self_acc and fin_auditor
try:
# Concate fin_self_acc with fin_auditor
Expand Down Expand Up @@ -63,6 +66,8 @@ def get_history_data(ticker:list, columns:list = [], fin_type:list = ['A','Q','T
# Consecutive merge.
history_data = consecutive_merge(all_tables, trigger_tables)



# Drop redundant columns of the merged table.
if require_annd:
history_data = history_data.drop(columns=[i for i in history_data.columns if i in para.drop_keys])
Expand All @@ -82,6 +87,9 @@ def get_history_data(ticker:list, columns:list = [], fin_type:list = ['A','Q','T

# Apply forward value to fill the precending NaN.
history_data = history_data.groupby('coid', group_keys = False).apply(dask_api.fillna_multicolumns)
# Drop suspend trading day
history_data = dd.merge(all_tables['coid_calendar'], history_data, on= ['coid','mdate'], how = 'left')
history_data = history_data.compute(meta = Meta_Types.all_meta)

# Truncate resuly by user-setted start.
history_data = history_data.loc[history_data.mdate >= org_start,:]
Expand All @@ -91,7 +99,8 @@ def get_history_data(ticker:list, columns:list = [], fin_type:list = ['A','Q','T
lang_map = transfer_language_columns(history_data.columns, isChinese=transfer_to_chinese)
history_data = history_data.rename(columns= lang_map)
history_data = history_data.reset_index(drop=True)

get_api_key_info(show_progress)

return history_data

def process_fin_data(all_tables, variable, tickers, start, end):
Expand Down Expand Up @@ -181,6 +190,11 @@ def triggers(ticker:list, columns:list = [], fin_type:list = ['A','Q','TTM'], i
# Qualify the table triggered by the given `columns`
trigger_tables = search_table(columns)

if 'stk_price' in trigger_tables:
coid_calendar = all_tables['stk_price'][['coid','mdate']]
else:
coid_calendar = get_stock_calendar(ticker, start = start, end = end, npartitions = npartitions)

# Get trading calendar of all given tickers
# if 'stk_price' not in trigger_tables['TABLE_NAMES'].unique():
trading_calendar = get_trading_calendar(ticker, start = start, end = end, npartitions = npartitions)
Expand Down Expand Up @@ -218,18 +232,13 @@ def consecutive_merge(local_var, loop_array):
data = local_var['trading_calendar']

for i in range(len(loop_array)):
if loop_array[i] == 'APISTOCK':
# get coid attribution
data = dd.merge(data, local_var[loop_array[i]], on = 'coid', how = 'left', suffixes = ('','_surfeit'))

else:
right_keys = table_keys.loc[table_keys['TABLE_NAMES']==loop_array[i], 'KEYS'].tolist()
# Merge tables by dask merge.
data = dd.merge(data, local_var[loop_array[i]], left_on = ['coid', 'mdate'], right_on = right_keys, how = 'left', suffixes = ('','_surfeit'))
right_keys = table_keys.loc[table_keys['TABLE_NAMES']==loop_array[i], 'KEYS'].tolist()
# Merge tables by dask merge.
data = dd.merge(data, local_var[loop_array[i]], left_on = ['coid', 'mdate'], right_on = right_keys, how = 'left', suffixes = ('','_surfeit'))

# Clear the right table to release memory.
del local_var[loop_array[i]]
gc.collect()
# del local_var[loop_array[i]]
# gc.collect()

# Drop surfeit columns.
data = data.loc[:,~data.columns.str.contains('_surfeit')]
Expand All @@ -251,17 +260,68 @@ def get_trading_calendar(tickers, **kwargs):
end = kwargs.get('end', para.default_end)
npartitions = kwargs.get('npartitions', para.npartitions_local)

def get_index_trading_date(tickers):
index = tejapi.fastget('TWN/APIPRCD',
coid = 'IX0001', # 台灣加權指數
paginate = True,
chinese_column_name=False,
mdate = {'gte':start,'lte':end},
opts = {'columns':['mdate'], 'sort':{'coid.asc', 'mdate.asc'}})

mdate = index['mdate'].tolist()

data = pd.DataFrame({
'coid':[tick for tick in tickers for i in mdate],
'mdate':mdate*len(tickers)
})
if len(data)<1:
return pd.DataFrame({'coid': pd.Series(dtype='object'), 'mdate': pd.Series(dtype='datetime64[ns]')})

return data

# def get_data(tickers):
# # trading calendar
# data = get_index_trading_date(tickers)

# if len(data)<1:
# return pd.DataFrame({'coid': pd.Series(dtype='object'), 'mdate': pd.Series(dtype='datetime64[ns]')})

# return data


# Define the meta of the dataframe
# meta = pd.DataFrame({'coid': pd.Series(dtype='object'), 'mdate': pd.Series(dtype='datetime64[ns]')})

# Calculate the number of tickers in each partition.
ticker_partitions = dask_api.get_partition_group(tickers = tickers, npartitions= npartitions)

# Submit jobs to the parallel cores
trading_calendar = dd.from_delayed([dask.delayed(get_index_trading_date)(tickers[(i-1)*npartitions:i*npartitions]) for i in range(1, ticker_partitions)])

# If ticker smaller than defaulted partitions, then transform it into defaulted partitions
if trading_calendar.npartitions < npartitions:
trading_calendar = trading_calendar.repartition(npartitions=npartitions)

return trading_calendar

def get_stock_calendar(tickers, **kwargs):
# Setting default value of the corresponding parameters.
start = kwargs.get('start', para.default_start)
end = kwargs.get('end', para.default_end)
npartitions = kwargs.get('npartitions', para.npartitions_local)

def get_data(tickers):
# trading calendar
data = tejapi.fastget('TWN/APIPRCD',
coid = tickers,
coid = tickers,
paginate = True,
chinese_column_name=False,
mdate = {'gte':start,'lte':end},
opts = {'columns':['coid','mdate'], 'sort':{'coid.asc', 'mdate.asc'}})
opts = {'columns':['coid', 'mdate'], 'sort':{'coid.asc', 'mdate.asc'}})


if len(data)<1:
return pd.DataFrame({'coid': pd.Series(dtype='object'), 'mdate': pd.Series(dtype='datetime64[ns]')})

return data


Expand All @@ -275,7 +335,7 @@ def get_data(tickers):
trading_calendar = dd.from_delayed([dask.delayed(get_data)(tickers[(i-1)*npartitions:i*npartitions]) for i in range(1, ticker_partitions)])

# If ticker smaller than defaulted partitions, then transform it into defaulted partitions
if trading_calendar.npartitions < 12:
if trading_calendar.npartitions < npartitions:
trading_calendar = trading_calendar.repartition(npartitions=npartitions)

return trading_calendar
Expand Down
6 changes: 5 additions & 1 deletion TejToolAPI/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
get_internal_code,
get_trading_calendar,
transfer_language_columns,
triggers
triggers,
get_stock_calendar
)

from .meta_types import Meta_Types


from . import Map_Dask_API

from .utils import get_api_key_info


63 changes: 48 additions & 15 deletions TejToolAPI/exchange_calendar.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,64 @@
# import os
import os
import tejapi
# tejapi.ApiConfig.ignoretz = True
# tejapi.ApiConfig.page_limit=10000
# tejapi.ApiConfig.api_base = os.environ.get('TEJAPI_BASE')
# tejapi.ApiConfig.api_key = os.environ.get('TEJAPI_KEY')

import datetime
import pandas as pd
import numpy as np

module_dir = os.path.dirname(os.path.abspath(__file__))
tmp_path = os.path.join(module_dir,'temp')

# If temp folder not exists, create it.
if not os.path.exists(tmp_path):
os.makedirs(tmp_path)

# Path to store exchange_calendar
calendar_file_path = os.path.join(tmp_path, 'exchange_calendar.csv')

# Check whether the file exists.
if os.path.isfile(calendar_file_path):
catch = False
else:
catch = True

if 'exchange_calendar.csv' in os.listdir(tmp_path):
catch = False
else:
catch = True

class ExchangeCalendar:
def __init__(self) -> None:
self.calendar = self.get_trading_calendar()
self.calendar = self.get_trading_calendar(catch)
self.calendar_list = self.calendar['zdate'].tolist()
self.date_int = self.calendar['zdate'].values.astype(np.int64)

def get_trading_calendar(self):
def get_trading_calendar(self, catch):
"""
Extrieve calendar from tejapi-TWN/TRADEDAY_TWSE, retain all trading dates of the calendar.
"""
calendar = tejapi.fastget('TWN/TRADEDAY_TWSE',
paginate = True,
mkt = 'TWSE',
date_rmk = '',
opts = {'columns':['zdate']}
)

if catch:
calendar = tejapi.fastget('TWN/TRADEDAY_TWSE',
paginate = True,
mkt = 'TWSE',
date_rmk = '',
opts = {'columns':['zdate']}
)
calendar.to_csv(calendar_file_path, index=False)

else:
calendar = pd.read_csv(calendar_file_path, parse_dates=['zdate'])
most_recent_date = max(calendar['zdate'])
if most_recent_date < datetime.datetime.now():
update = tejapi.fastget('TWN/TRADEDAY_TWSE',
paginate = True,
mkt = 'TWSE',
date_rmk = '',
zdate= {'gt':most_recent_date},
opts = {'columns':['zdate']}
)

calendar = pd.concat([calendar, update]).drop_duplicates().reset_index(drop = True)
calendar.to_csv(calendar_file_path, index=False)

return calendar

def is_session(self, date):
Expand Down
6 changes: 1 addition & 5 deletions TejToolAPI/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
from .exchange_calendar import ExchangeCalendar

# Initialize exchange calendar
try:
exc = ExchangeCalendar()
except:
raise ValueError('請設定 TEJAPI_KEY : os.environ["TEJAPI_KEY"] = "your_key"')
# exc = ExchangeCalendar()
exc = ExchangeCalendar()

# current directory
module_dir = os.path.dirname(os.path.abspath(__file__))
Expand Down
Loading

0 comments on commit ae7367b

Please sign in to comment.