Skip to content

Commit

Permalink
revision to PR #127, robustify update of Yahoo Finance data
Browse files Browse the repository at this point in the history
Few changes to the SymbolData and YahooFinance classes following PR #127 (improved data module); lots of new testcases. There was an incident yesterday with one of the larger example strategies, online update of a symbol failed to correctly ffill. This should have been fixed. Cleaning and filling on update of YahooFinance has been improved in various ways and tested much more. Note that this is specific to *updating* already downloaded data, not downloading from scratch (that was already tested with #127).
  • Loading branch information
enzbus committed Feb 17, 2024
1 parent fecd078 commit 6dad45a
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 62 deletions.
4 changes: 3 additions & 1 deletion cvxportfolio/data/market_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,9 @@ def _remove_missing_recent(self):

if self.prices.iloc[-5:].isnull().any().any():
logger.warning(
'Removing some recent lines because there are missing values.')
'Removing some recent lines because there are missing values,'
+ ' the issue is with symbol(s) %s',
self.prices.columns[self.prices.iloc[-5:].isnull().any()])
drop_at = self.prices.iloc[-5:].isnull().any(axis=1).idxmax()
logger.warning('Dropping at index %s', drop_at)
self.returns = self.returns.loc[self.returns.index < drop_at]
Expand Down
161 changes: 115 additions & 46 deletions cvxportfolio/data/symbol_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,13 @@ def _store(self, data):
storer(self.symbol, data, self.storage_location)

def _print_difference(self, current, new):
"""Helper method to print difference if update is not append-only.
This is temporary and will be re-factored.
"""
print("TEMPORARY: Diff between overlap of downloaded and stored")
print((new - current).dropna(how='all').tail(5))
"""Helper method to print difference if update is not append-only."""
diff = (new - current).dropna(how='all').tail(2)
# nan-out diff on intraday data
if hasattr(diff, 'columns'):
diff.iloc[-1, 1:] = np.nan
logger.warning(
"Difference between overlap of downloaded and stored: %s", diff)

def update(self, grace_period):
"""Update current stored data for symbol.
Expand Down Expand Up @@ -201,28 +202,35 @@ def update(self, grace_period):
# via logreturns and numerical errors can sift through
np.isclose(updated.loc[current.index[:-1]],
current.iloc[:-1], equal_nan=True)):
logger.error(f"{self.__class__.__name__} update"
logger.warning(f"{self.__class__.__name__} update"
+ f" of {self.symbol} is not append-only!")
self._print_difference(current, updated)
if hasattr(current, 'columns'):
# the first column is open price
if not current.iloc[-1, 0] == updated.loc[
current.index[-1]].iloc[0]:
logger.error(
f"{self.__class__.__name__} update "
+ f" of {self.symbol} changed last open price!")
self._print_difference(current, updated)
logger.warning(
"%s(%s) update changed last open price:"
+ " stored value was %s, new value is %s",
self.__class__.__name__, self.symbol,
current.iloc[-1, 0],
updated.loc[current.index[-1]].iloc[0])
else:
if not current.iloc[-1] == updated.loc[current.index[-1]]:
logger.error(
f"{self.__class__.__name__} update"
+ f" of {self.symbol} changed last value!")
logger.warning(
"%s(%s) update changed last value:"
+ " stored value was %s, new value is %s",
self.__class__.__name__, self.symbol,
current.iloc[-1],
updated.loc[current.index[-1]])
self._print_difference(current, updated)
# this should have become superflous
except KeyError: # pragma: no cover
except KeyError:
logger.error("%s update of %s could not be checked for"
+ " append-only edits. Was there a DST change?",
self.__class__.__name__, self.symbol) # pragma: no cover
+ " append-only edits. This is not recoverable,"
+ " re-downloading from the start",
self.__class__.__name__, self.symbol)
updated = self._download(
self.symbol, None, grace_period=grace_period)
self._store(updated)

def _download(self, symbol, current, grace_period, **kwargs):
Expand Down Expand Up @@ -344,8 +352,8 @@ def _process(self, new_data, saved_data=None):
## We believe them (for now). We forward fill them if unavailable.

# forward-fill close
self._fillna_and_message(
new_data, 'close', 'last available', filler='ffill', level='info')
self._ffill(data=new_data, col_name='close', message='last available',
saved_data=saved_data, level='info')

## Volumes.
## We set negative to NaN, and fill with zeros.
Expand All @@ -355,8 +363,7 @@ def _process(self, new_data, saved_data=None):

# fill with zeros
self._fillna_and_message(
new_data, 'volume', 'zeros', filler='fillna', filler_arg=0.,
level='info')
new_data, 'volume', 'zeros', filler_arg=0., level='info')

## Open price.
## We remove if lower than low, higher than high, or open to close
Expand All @@ -375,7 +382,7 @@ def _process(self, new_data, saved_data=None):

# fill open with close from day before
self._fillna_and_message(
new_data, 'open', 'close from period before', filler='fillna',
new_data, 'open', 'close from period before',
filler_arg=new_data['close'].shift(1), level='info')

## Low price.
Expand All @@ -395,7 +402,7 @@ def _process(self, new_data, saved_data=None):

# fill low with min of open and close
self._fillna_and_message(
new_data, 'low', 'min of open and close', filler='fillna',
new_data, 'low', 'min of open and close',
filler_arg=new_data[['open', 'close']].min(axis=1), level='info')

## High price.
Expand All @@ -415,7 +422,7 @@ def _process(self, new_data, saved_data=None):

# fill high with max of open and close
self._fillna_and_message(
new_data, 'high', 'max of open and close', filler='fillna',
new_data, 'high', 'max of open and close',
filler_arg=new_data[['open', 'close']].max(axis=1), level='info')

## Some asserts
Expand All @@ -430,19 +437,35 @@ def _process(self, new_data, saved_data=None):
return new_data

def _fillna_and_message(
self, data, col_name, message, filler='fillna', filler_arg=None,
level='warning'):
self, data, col_name, message, filler_arg=None, level='warning'):
"""Fill NaNs in column with chosen method and arg."""
bad_indexes = data.index[data[col_name].isnull()]
if len(bad_indexes) > 0:
getattr(logger, level)(
'%s("%s").data["%s"] has NaNs on timestamps: %s,'
+ ' filling them with %s.', self.__class__.__name__,
self.symbol, col_name, bad_indexes, message)
if filler == 'ffill':
data[col_name] = data[col_name].fillna(filler_arg)

def _ffill(self, data, col_name, message, saved_data=None,
level='warning'):
"""Forward-fill column also using saved data if present."""
bad_indexes = data.index[data[col_name].isnull()]
if len(bad_indexes) > 0:
getattr(logger, level)(
'%s("%s").data["%s"] has NaNs on timestamps: %s,'
+ ' filling them with %s.', self.__class__.__name__,
self.symbol, col_name, bad_indexes, message)
if saved_data is None:
data[col_name] = data[col_name].ffill()
else:
data[col_name] = getattr(data[col_name], filler)(filler_arg)
data.loc[data.index, col_name] = pd.concat(
[saved_data.loc[
# saved_data is already clean, we only need last row
# we make 2 for backward compatibility w/ data stored
# by Cvxportfolio < 1.2.0
saved_data.index < data.index[0], col_name].iloc[-2:],
data[col_name]]).ffill().loc[data.index]

def _nan_anomalous_prices(
self, new_data, price_name, threshold, saved_data=None,
Expand All @@ -457,8 +480,11 @@ def _nan_anomalous_prices(
if saved_data is None:
all_lr_to_close = new_lr_to_close
else:
max_past = max(self.FILTERING_WINDOWS) + len(new_data) # to be safe
old_lr_to_close =\
np.log(saved_data['close']) - np.log(saved_data[price_name])
np.log(
saved_data['close'].iloc[-max_past:]) - np.log(
saved_data[price_name].iloc[-max_past:])
all_lr_to_close = pd.concat(
[old_lr_to_close.loc[
old_lr_to_close.index < new_lr_to_close.index[0]],
Expand Down Expand Up @@ -689,6 +715,12 @@ class YahooFinance(OLHCV):
# ONLY before this date, otherwise don't filter them
ASSUME_FALSE_BEFORE = pd.Timestamp('2000-01-01', tz='UTC')

# when updating already saved data, download this many days before
# the last available one; only the last 2 rows will be overwritten
# longer overlap should make the cleaning of new data more robust,
# shorter overlap makes the update faster
UPDATE_OVERLAP = 5

def _throw_out_all_data_before_many_bad_adjcloses(
self, new_data, level='warning'):
"""Throw out all data before many NaN on adjclose column."""
Expand All @@ -707,21 +739,50 @@ def _throw_out_all_data_before_many_bad_adjcloses(
new_data.loc[new_data.index > last_invalid_index], copy=True)
return new_data

def _remove_data_on_bad_adjcloses(self, new_data, level='warning'):
def _remove_data_on_bad_adjcloses(
self, new_data, level='warning', saved_data=None):
"""Remove adjcloses if implied logreturns are highly anomalous."""
# worst case (if it goes to end of for loop)
# we throw out all data before the event
for _ in range(self.MAX_CONTIGUOUS_MISSING_ADJCLOSES + 1):
logrets = np.log10(new_data.adjclose.ffill()).diff()

if saved_data is not None:
# obtain total close to close
max_history = max(self.FILTERING_WINDOWS)
intraday_logreturn = np.log10(
saved_data["close"].iloc[-max_history:]) - np.log10(
saved_data["open"].iloc[-max_history:])
open_to_open_total_logreturn = np.log10(
1+saved_data['return'].iloc[-max_history:])
close_to_close_total_logreturn = (
open_to_open_total_logreturn - intraday_logreturn
+ intraday_logreturn.shift(-1)
)
saved_data_logrets = close_to_close_total_logreturn.shift(1)

all_lr = pd.concat(
[saved_data_logrets.loc[
saved_data_logrets.index < logrets.index[0]], logrets])

# print('logrets')
# print(logrets)
# print('saved_data_logrets')
# print(saved_data_logrets)
else:
all_lr = logrets

# with this we skip over exact zeros (which we assume come from
# some cleaning) and would bias the scale down
logrets.loc[logrets == 0.] = np.nan
all_lr.loc[all_lr == 0.] = np.nan

score = _unlikeliness_score(
logrets, logrets, scaler=_median_scale_around,
all_lr, all_lr, scaler=_median_scale_around,
windows=self.FILTERING_WINDOWS)
bad_score = score > self.THRESHOLD_BAD_ADJCLOSE
bad_score = bad_score.loc[logrets.index]

# print(score)

too_large_logreturns = np.abs(
logrets) > self.THRESHOLD_FALSE_LOG10RETS
Expand Down Expand Up @@ -762,15 +823,16 @@ def _process(self, new_data, saved_data=None):
new_data, level='info')

# Remove all data when highly anomalous adjclose prices are detected
self._remove_data_on_bad_adjcloses(new_data, level='info')
self._remove_data_on_bad_adjcloses(
new_data, level='info', saved_data=saved_data)

# Repeat throw out all data before many NaN on adjclose
new_data = self._throw_out_all_data_before_many_bad_adjcloses(
new_data, level='info')

# forward-fill adj close
self._fillna_and_message(
new_data, 'adjclose', 'last available', filler='ffill',
self._ffill( # we can't ffill using saved_data :(
new_data, col_name='adjclose', message='last available',
level='info')

# eliminate (initial) rows where adjclose is NaN
Expand Down Expand Up @@ -905,8 +967,7 @@ def _get_data_yahoo(ticker, start='1900-01-01', end='2100-01-01'):
return df_result[
['open', 'low', 'high', 'close', 'adjclose', 'volume']]

def _download(self, symbol, current=None,
overlap=5, grace_period='5d', **kwargs):
def _download(self, symbol, current=None, grace_period='5d', **kwargs):
"""Download single stock from Yahoo Finance.
If data was already downloaded we only download
Expand All @@ -923,12 +984,13 @@ def _download(self, symbol, current=None,
Returns:
updated (pandas.DataFrame): updated DataFrame for the symbol
"""
# this should have been solved:
# if overlap < 2:
# raise SyntaxError(
# f'{self.__class__.__name__} with overlap smaller than 2'
# + ' could have issues with DST.')
# TODO this could be put at a lower class hierarchy
if overlap < 2:
raise SyntaxError(
f'{self.__class__.__name__} with overlap smaller than 2'
+ ' could have issues with DST.')
if (current is None) or (len(current) < overlap):
if (current is None) or (len(current) < self.UPDATE_OVERLAP):
updated = self._get_data_yahoo(symbol, **kwargs)
logger.info('Downloading from the start.')
result = self._process(updated)
Expand All @@ -941,9 +1003,16 @@ def _download(self, symbol, current=None,
logger.info(
'Skipping download because stored data is recent enough.')
return current
new = self._get_data_yahoo(symbol, start=current.index[-overlap])
new = self._process(new)
return pd.concat([current.iloc[:-overlap], new])
new = self._get_data_yahoo(
symbol, start=current.index[-self.UPDATE_OVERLAP])
new = self._process(new, saved_data=set_pd_read_only(current))
# print('current')
# print(current)
# print('new')
# print(new)
used_current = current.iloc[:-2]
return pd.concat(
[used_current, new.loc[new.index > used_current.index[-1]]])


#
Expand Down
Loading

0 comments on commit 6dad45a

Please sign in to comment.