diff --git a/cvxportfolio/data.py b/cvxportfolio/data.py deleted file mode 100644 index e0ef1c23d..000000000 --- a/cvxportfolio/data.py +++ /dev/null @@ -1,1398 +0,0 @@ -# Copyright 2023 Enzo Busseti -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""This module include classes that download, store, and serve market data. - -The two main abstractions are :class:`SymbolData` and :class:`MarketData`. -Neither are exposed outside this module. Their derived classes instead are. - -If you want to interface cvxportfolio with financial data source other -than the ones we provide, you should derive from either of those two classes. -""" - -import datetime -import logging -import sqlite3 -import sys -import warnings -from pathlib import Path -from urllib.error import URLError - -import numpy as np -import pandas as pd -import requests -import requests.exceptions - -from .errors import DataError -from .utils import (hash_, make_numeric, periods_per_year_from_datetime_index, - resample_returns) - -__all__ = ["YahooFinance", "Fred", - "UserProvidedMarketData", "DownloadedMarketData"] - -logger = logging.getLogger(__name__) - -BASE_LOCATION = Path.home() / "cvxportfolio_data" - -def now_timezoned(): - """Return current timestamp with local timezone. - - :returns: Current timestamp with local timezone. - :rtype: pandas.Timestamp - """ - return pd.Timestamp( - datetime.datetime.now(datetime.timezone.utc).astimezone()) - -class SymbolData: - """Base class for a single symbol time series data. - - The data is either in the form of a Pandas Series or DataFrame - and has datetime index. - - This class needs to be derived. At a minimum, - one should redefine the ``_download`` method, which - implements the downloading of the symbol's time series - from an external source. The method takes the current (already - downloaded and stored) data and is supposed to **only append** to it. - In this way we only store new data and don't modify already downloaded - data. - - Additionally one can redefine the ``_preload`` method, which prepares - data to serve to the user (so the data is stored in a different format - than what the user sees.) We found that this separation can be useful. - - This class interacts with module-level functions named ``_loader_BACKEND`` - and ``_storer_BACKEND``, where ``BACKEND`` is the name of the storage - system used. We define ``pickle``, ``csv``, and ``sqlite`` backends. - These may have limitations. See their docstrings for more information. - - - :param symbol: The symbol that we downloaded. - :type symbol: str - :param storage_backend: The storage backend, implemented ones are - ``'pickle'``, ``'csv'``, and ``'sqlite'``. By default ``'pickle'``. - :type storage_backend: str - :param base_location: The location of the storage. We store in a - subdirectory named after the class which derives from this. By default - it's a directory named ``cvxportfolio_data`` in your home folder. - :type base_location: pathlib.Path - :param grace_period: If the most recent observation in the data is less - old than this we do not download new data. By default it's one day. - :type grace_period: pandas.Timedelta - - :attribute data: The downloaded data for the symbol. - """ - - def __init__(self, symbol, - storage_backend='pickle', - base_location=BASE_LOCATION, - grace_period=pd.Timedelta('1d')): - self._symbol = symbol - self._storage_backend = storage_backend - self._base_location = base_location - self.update(grace_period) - self._data = self.load() - - @property - def storage_location(self): - """Storage location. Directory is created if not existent. - - :rtype: pathlib.Path - """ - loc = self._base_location / f"{self.__class__.__name__}" - loc.mkdir(parents=True, exist_ok=True) - return loc - - @property - def symbol(self): - """The symbol whose data this instance contains. - - :rtype: str - """ - return self._symbol - - @property - def data(self): - """Time series data, updated to the most recent observation. - - :rtype: pandas.Series or pandas.DataFrame - """ - return self._data - - def _load_raw(self): - """Load raw data from database.""" - # we could implement multiprocess safety here - loader = globals()['_loader_' + self._storage_backend] - try: - logger.info( - f"{self.__class__.__name__} is trying to load {self.symbol}" - + f" with {self._storage_backend} backend" - + f" from {self.storage_location}") - return loader(self.symbol, self.storage_location) - except FileNotFoundError: - return None - - def load(self): - """Load data from database using `self.preload` function to process. - - :returns: Loaded time-series data for the symbol. - :rtype: pandas.Series or pandas.DataFrame - """ - return self._preload(self._load_raw()) - - def _store(self, data): - """Store data in database. - - :param data: Time-series data to store. - :type data: pandas.Series or pandas.DataFrame - """ - # we could implement multiprocess safety here - storer = globals()['_storer_' + self._storage_backend] - logger.info( - f"{self.__class__.__name__} is storing {self.symbol}" - + f" with {self._storage_backend} backend" - + f" in {self.storage_location}") - storer(self.symbol, data, self.storage_location) - - def _print_difference(self, current, new): - """Helper method to print difference if update is not append-only. - - This is temporary and will be re-factored. - """ - print("TEMPORARY: Diff between overlap of downloaded and stored") - print((new - current).dropna(how='all').tail(5)) - - def update(self, grace_period): - """Update current stored data for symbol. - - :param grace_period: If the time between now and the last value stored - is less than this, we don't update the data already stored. - :type grace_period: pandas.Timedelta - """ - current = self._load_raw() - logger.info( - f"Downloading {self.symbol}" - + f" from {self.__class__.__name__}") - updated = self._download( - self.symbol, current, grace_period=grace_period) - - if np.any(updated.iloc[:-1].isnull()): - logger.warning( - " cvxportfolio.%s('%s').data contains NaNs." - + " You may want to inspect it. If you want, you can delete the" - + " data file in %s to force re-download from the start.", - self.__class__.__name__, self.symbol, self.storage_location) - - try: - if current is not None: - if not np.all( - # we use numpy.isclose because returns may be computed - # via logreturns and numerical errors can sift through - np.isclose(updated.loc[current.index[:-1]], - current.iloc[:-1], equal_nan=True, - rtol=1e-08, atol=1e-08)): - logger.error(f"{self.__class__.__name__} update" - + f" of {self.symbol} is not append-only!") - self._print_difference(current, updated) - if hasattr(current, 'columns'): - # the first column is open price - if not current.iloc[-1, 0] == updated.loc[ - current.index[-1]].iloc[0]: - logger.error( - f"{self.__class__.__name__} update " - + f" of {self.symbol} changed last open price!") - self._print_difference(current, updated) - else: - if not current.iloc[-1] == updated.loc[current.index[-1]]: - logger.error( - f"{self.__class__.__name__} update" - + f" of {self.symbol} changed last value!") - self._print_difference(current, updated) - except KeyError: - logger.error("%s update of %s could not be checked for" - + " append-only edits. Was there a DST change?", - self.__class__.__name__, self.symbol) - self._store(updated) - - def _download(self, symbol, current, grace_period, **kwargs): - """Download data from external source given already downloaded data. - - This method must be redefined by derived classes. - - :param symbol: The symbol we download. - :type symbol: str - :param current: The data already downloaded. We are supposed to - **only append** to it. If None, no data is present. - :type current: pandas.Series or pandas.DataFrame or None - :rtype: pandas.Series or pandas.DataFrame - """ - raise NotImplementedError #pragma: no cover - - def _preload(self, data): - """Prepare data to serve to the user. - - This method can be redefined by derived classes. - - :param data: The data returned by the storage backend. - :type data: pandas.Series or pandas.DataFrame - :rtype: pandas.Series or pandas.DataFrame - """ - return data - - -# -# Yahoo Finance. -# - -def _timestamp_convert(unix_seconds_ts): - """Convert a UNIX timestamp in seconds to a pandas.Timestamp.""" - return pd.Timestamp(unix_seconds_ts*1E9, tz='UTC') - - -class YahooFinance(SymbolData): - """Yahoo Finance symbol data. - - :param symbol: The symbol that we downloaded. - :type symbol: str - :param storage_backend: The storage backend, implemented ones are - ``'pickle'``, ``'csv'``, and ``'sqlite'``. - :type storage_backend: str - :param base_storage_location: The location of the storage. We store in a - subdirectory named after the class which derives from this. - :type base_storage_location: pathlib.Path - :param grace_period: If the most recent observation in the data is less - old than this we do not download new data. - :type grace_period: pandas.Timedelta - - :attribute data: The downloaded, and cleaned, data for the symbol. - :type data: pandas.DataFrame - """ - - # is open-high-low-close-volume-(total)return - IS_OHLCVR = True - - @staticmethod - def _clean(data): - """Clean Yahoo Finance open-close-high-low-volume-adjclose data.""" - - # print(data) - # print(data.isnull().sum()) - - # nan-out nonpositive prices - data.loc[data["open"] <= 0, 'open'] = np.nan - data.loc[data["close"] <= 0, "close"] = np.nan - data.loc[data["high"] <= 0, "high"] = np.nan - data.loc[data["low"] <= 0, "low"] = np.nan - data.loc[data["adjclose"] <= 0, "adjclose"] = np.nan - - # nan-out negative volumes - data.loc[data["volume"] < 0, 'volume'] = np.nan - - # all infinity values are nans - data.iloc[:, :] = np.nan_to_num( - data.values, copy=True, nan=np.nan, posinf=np.nan, neginf=np.nan) - - # print(data) - # print(data.isnull().sum()) - - # if low is not the lowest, set it to nan - data.loc[data['low'] > data[['open', 'high', 'close']].min(1), - 'low'] = np.nan - - # if high is not the highest, set it to nan - data.loc[data['high'] < data[['open', 'high', 'close']].max(1), - 'high'] = np.nan - - # print(data) - # print(data.isnull().sum()) - - # - # fills - # - - # fill volumes with zeros (safest choice) - data['volume'] = data['volume'].fillna(0.) - - # fill close price with open price - data['close'] = data['close'].fillna(data['open']) - - # fill open price with close from day(s) before - # repeat as long as it helps (up to 1 year) - for shifter in range(252): - orig_missing_opens = data['open'].isnull().sum() - data['open'] = data['open'].fillna(data['close'].shift( - shifter+1)) - new_missing_opens = data['open'].isnull().sum() - if orig_missing_opens == new_missing_opens: - break - logger.info( - "Filled missing open prices with close from %s periods before", - shifter+1) - - # fill close price with same day's open - data['close'] = data['close'].fillna(data['open']) - - # fill high price with max - data['high'] = data['high'].fillna(data[['open', 'close']].max(1)) - - # fill low price with max - data['low'] = data['low'].fillna(data[['open', 'close']].min(1)) - - # print(data) - # print(data.isnull().sum()) - - # - # Compute returns - # - - # compute log of ratio between adjclose and close - log_adjustment_ratio = np.log(data['adjclose'] / data['close']) - - # forward fill adjustment ratio - log_adjustment_ratio = log_adjustment_ratio.ffill() - - # non-market log returns (dividends, splits) - non_market_lr = log_adjustment_ratio.diff().shift(-1) - - # full open-to-open returns - open_to_open = np.log(data["open"]).diff().shift(-1) - data['return'] = np.exp(open_to_open + non_market_lr) - 1 - - # print(data) - # print(data.isnull().sum()) - - # intraday_logreturn = np.log(data["close"]) - np.log(data["open"]) - # close_to_close_logreturn = np.log(data["adjclose"]).diff().shift(-1) - # open_to_open_logreturn = ( - # close_to_close_logreturn + intraday_logreturn - - # intraday_logreturn.shift(-1) - # ) - # data["return"] = np.exp(open_to_open_logreturn) - 1 - del data["adjclose"] - - # eliminate last period's intraday data - data.loc[data.index[-1], - ["high", "low", "close", "return", "volume"]] = np.nan - - # print(data) - # print(data.isnull().sum()) - - return data - - @staticmethod - def _get_data_yahoo(ticker, start='1900-01-01', end='2100-01-01'): - """Get 1 day OHLC from Yahoo finance. - - Result is timestamped with the open time (time-zoned) of the - instrument. - """ - - base_url = 'https://query2.finance.yahoo.com' - - headers = { - 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1)' - ' AppleWebKit/537.36 (KHTML, like Gecko)' - ' Chrome/39.0.2171.95 Safari/537.36'} - - # print(HEADERS) - start = int(pd.Timestamp(start).timestamp()) - end = int(pd.Timestamp(end).timestamp()) - - try: - res = requests.get( - url=f"{base_url}/v8/finance/chart/{ticker}", - params={'interval': '1d', - "period1": start, - "period2": end}, - headers=headers, - timeout=10) # seconds - except requests.ConnectionError as exc: - raise DataError( - f"Download of {ticker} from YahooFinance failed." - + " Are you connected to the Internet?") from exc - - # print(res) - - if res.status_code == 404: - raise DataError( - f'Data for symbol {ticker} is not available.' - + 'Json output:', str(res.json())) - - if res.status_code != 200: - raise DataError(f'Yahoo finance download of {ticker} failed. Json:', - str(res.json())) # pragma: no cover - - data = res.json()['chart']['result'][0] - - try: - index = pd.DatetimeIndex( - [_timestamp_convert(el) for el in data['timestamp']]) - - df_result = pd.DataFrame( - data['indicators']['quote'][0], index=index) - df_result['adjclose'] = data[ - 'indicators']['adjclose'][0]['adjclose'] - except KeyError: - raise DataError(f'Yahoo finance download of {ticker} failed.' - + ' Json:', str(res.json())) # pragma: no cover - - # last timestamp is probably broken (not timed to market open) - # we set its time to same as the day before, but this is wrong - # on days of DST switch. It's fine though because that line will be - # overwritten next update - if df_result.index[-1].time() != df_result.index[-2].time(): - tm1 = df_result.index[-2].time() - newlast = df_result.index[-1].replace( - hour=tm1.hour, minute=tm1.minute, second=tm1.second) - df_result.index = pd.DatetimeIndex( - list(df_result.index[:-1]) + [newlast]) - - return df_result[ - ['open', 'low', 'high', 'close', 'adjclose', 'volume']] - - def _download(self, symbol, current=None, - overlap=5, grace_period='5d', **kwargs): - """Download single stock from Yahoo Finance. - - If data was already downloaded we only download - the most recent missing portion. - - Args: - - symbol (str): yahoo name of the instrument - current (pandas.DataFrame or None): current data present locally - overlap (int): how many lines of current data will be overwritten - by newly downloaded data - kwargs (dict): extra arguments passed to yfinance.download - - Returns: - updated (pandas.DataFrame): updated DataFrame for the symbol - """ - if overlap < 2: - raise SyntaxError( - f'{self.__class__.__name__} with overlap smaller than 2' - + ' could have issues with DST.') - if (current is None) or (len(current) < overlap): - updated = self._get_data_yahoo(symbol, **kwargs) - logger.info('Downloading from the start.') - result = self._clean(updated) - # we remove first row if it contains NaNs - if np.any(result.iloc[0].isnull()): - result = result.iloc[1:] - return result - if (now_timezoned() - current.index[-1] - ) < pd.Timedelta(grace_period): - logger.info( - 'Skipping download because stored data is recent enough.') - return current - new = self._get_data_yahoo(symbol, start=current.index[-overlap]) - new = self._clean(new) - return pd.concat([current.iloc[:-overlap], new]) - - def _preload(self, data): - """Prepare data for use by Cvxportfolio. - - We drop the `volume` column expressed in number of stocks and - replace it with `valuevolume` which is an estimate of the (e.g., - US dollar) value of the volume exchanged on the day. - """ - data["valuevolume"] = data["volume"] * data["open"] - del data["volume"] - - return data - -# -# Fred. -# - -class Fred(SymbolData): - """Fred single-symbol data. - - :param symbol: The symbol that we downloaded. - :type symbol: str - :param storage_backend: The storage backend, implemented ones are - ``'pickle'``, ``'csv'``, and ``'sqlite'``. By default ``'pickle'``. - :type storage_backend: str - :param base_storage_location: The location of the storage. We store in a - subdirectory named after the class which derives from this. By default - it's a directory named ``cvxportfolio_data`` in your home folder. - :type base_storage_location: pathlib.Path - :param grace_period: If the most recent observation in the data is less - old than this we do not download new data. By default it's one day. - :type grace_period: pandas.Timedelta - - :attribute data: The downloaded data for the symbol. - """ - - URL = "https://fred.stlouisfed.org/graph/fredgraph.csv" - - # TODO: implement Fred point-in-time - # example: - # https://alfred.stlouisfed.org/graph/alfredgraph.csv?id=CES0500000003&vintage_date=2023-07-06 - # hourly wages time series **as it appeared** on 2023-07-06 - # store using pd.Series() of diff'ed values only. - - def _internal_download(self, symbol): - try: - return pd.read_csv( - self.URL + f'?id={symbol}', - index_col=0, parse_dates=[0])[symbol] - except URLError as exc: - raise DataError(f"Download of {symbol}" - + f" from {self.__class__.__name__} failed." - + " Are you connected to the Internet?") from exc - - def _download( - self, symbol="DFF", current=None, grace_period='5d', **kwargs): - """Download or update pandas Series from Fred. - - If already downloaded don't change data stored locally and only - add new entries at the end. - - Additionally, we allow for a `grace period`, if the data already - downloaded has a last entry not older than the grace period, we - don't download new data. - """ - if current is None: - return self._internal_download(symbol) - if (pd.Timestamp.today() - current.index[-1] - ) < pd.Timedelta(grace_period): - logger.info( - 'Skipping download because stored data is recent enough.') - return current - - new = self._internal_download(symbol) - new = new.loc[new.index > current.index[-1]] - - if new.empty: - logger.info('New downloaded data is empty!') - return current - - assert new.index[0] > current.index[-1] - return pd.concat([current, new]) - - def _preload(self, data): - """Add UTC timezone.""" - data.index = data.index.tz_localize('UTC') - return data - -# -# Sqlite storage backend. -# - -def _open_sqlite(storage_location): - return sqlite3.connect(storage_location/"db.sqlite") - -def _close_sqlite(connection): - connection.close() - -def _loader_sqlite(symbol, storage_location): - """Load data in sqlite format. - - We separately store dtypes for data consistency and safety. - - .. note:: If your pandas object's index has a name it will be lost, - the index is renamed 'index'. If you pass timestamp data (including - the index) it must have explicit timezone. - """ - try: - connection = _open_sqlite(storage_location) - dtypes = pd.read_sql_query( - f"SELECT * FROM {symbol}___dtypes", - connection, index_col="index", - dtype={"index": "str", "0": "str"}) - - parse_dates = 'index' - my_dtypes = dict(dtypes["0"]) - - tmp = pd.read_sql_query( - f"SELECT * FROM {symbol}", connection, - index_col="index", parse_dates=parse_dates, dtype=my_dtypes) - - _close_sqlite(connection) - multiindex = [] - for col in tmp.columns: - if col[:8] == "___level": - multiindex.append(col) - else: - break - if len(multiindex) > 0: - multiindex = [tmp.index.name] + multiindex - tmp = tmp.reset_index().set_index(multiindex) - return tmp.iloc[:, 0] if tmp.shape[1] == 1 else tmp - except pd.errors.DatabaseError: - return None - -def _storer_sqlite(symbol, data, storage_location): - """Store data in sqlite format. - - We separately store dtypes for data consistency and safety. - - .. note:: If your pandas object's index has a name it will be lost, - the index is renamed 'index'. If you pass timestamp data (including - the index) it must have explicit timezone. - """ - connection = _open_sqlite(storage_location) - exists = pd.read_sql_query( - f"SELECT name FROM sqlite_master WHERE type='table' AND name='{symbol}'", - connection) - - if len(exists): - _ = connection.cursor().execute(f"DROP TABLE '{symbol}'") - _ = connection.cursor().execute(f"DROP TABLE '{symbol}___dtypes'") - connection.commit() - - if hasattr(data.index, "levels"): - data.index = data.index.set_names( - ["index"] + - [f"___level{i}" for i in range(1, len(data.index.levels))] - ) - data = data.reset_index().set_index("index") - else: - data.index.name = "index" - - if data.index[0].tzinfo is None: - warnings.warn('Index has not timezone, setting to UTC') - data.index = data.index.tz_localize('UTC') - - data.to_sql(f"{symbol}", connection) - pd.DataFrame(data).dtypes.astype("string").to_sql( - f"{symbol}___dtypes", connection) - _close_sqlite(connection) - - -# -# Pickle storage backend. -# - -def _loader_pickle(symbol, storage_location): - """Load data in pickle format.""" - return pd.read_pickle(storage_location / f"{symbol}.pickle") - -def _storer_pickle(symbol, data, storage_location): - """Store data in pickle format.""" - data.to_pickle(storage_location / f"{symbol}.pickle") - -# -# Csv storage backend. -# - -def _loader_csv(symbol, storage_location): - """Load data in csv format.""" - - index_dtypes = pd.read_csv( - storage_location / f"{symbol}___index_dtypes.csv", - index_col=0)["0"] - - dtypes = pd.read_csv( - storage_location / f"{symbol}___dtypes.csv", index_col=0, - dtype={"index": "str", "0": "str"}) - dtypes = dict(dtypes["0"]) - new_dtypes = {} - parse_dates = [] - for i, level in enumerate(index_dtypes): - if "datetime64[ns" in level: # includes all timezones - parse_dates.append(i) - for i, el in enumerate(dtypes): - if "datetime64[ns" in dtypes[el]: # includes all timezones - parse_dates += [i + len(index_dtypes)] - else: - new_dtypes[el] = dtypes[el] - - tmp = pd.read_csv(storage_location / f"{symbol}.csv", - index_col=list(range(len(index_dtypes))), - parse_dates=parse_dates, dtype=new_dtypes) - - return tmp.iloc[:, 0] if tmp.shape[1] == 1 else tmp - - -def _storer_csv(symbol, data, storage_location): - """Store data in csv format.""" - pd.DataFrame(data.index.dtypes if hasattr(data.index, 'levels') - else [data.index.dtype]).astype("string").to_csv( - storage_location / f"{symbol}___index_dtypes.csv") - pd.DataFrame(data).dtypes.astype("string").to_csv( - storage_location / f"{symbol}___dtypes.csv") - data.to_csv(storage_location / f"{symbol}.csv") - -# -# Market Data -# - -class MarketData: - """Prepare, hold, and serve market data. - - :method serve: Serve data for policy and simulator at time :math:`t`. - """ - - def serve(self, t): - """Serve data for policy and simulator at time :math:`t`. - - :param t: Trading time. It must be included in the timestamps returned - by :meth:`trading_calendar`. - :type t: pandas.Timestamp - - :returns: past_returns, current_returns, past_volumes, current_volumes, - current_prices - :rtype: (pandas.DataFrame, pandas.Series, pandas.DataFrame, - pandas.Series, pandas.Series) - """ - raise NotImplementedError # pragma: no cover - - # pylint: disable=redundant-returns-doc - def trading_calendar( - self, start_time=None, end_time=None, include_end=True): - """Get trading calendar between times. - - :param start_time: Initial time of the trading calendar. Always - inclusive if present. If None, use the first available time. - :type start_time: pandas.Timestamp - :param end_time: Final time of the trading calendar. If None, - use the last available time. - :type end_time: pandas.Timestamp - :param include_end: Include end time. - :type include_end: bool - - :returns: Trading calendar. - :rtype: pandas.DatetimeIndex - """ - raise NotImplementedError # pragma: no cover - - @property - def periods_per_year(self): - """Average trading periods per year. - - :rtype: int - """ - raise NotImplementedError # pragma: no cover - - @property - def full_universe(self): # pylint: disable=redundant-returns-doc - """Full universe, which might not be available for trading. - - :returns: Full universe. - :rtype: pandas.Index - """ - raise NotImplementedError # pragma: no cover - - # pylint: disable=unused-argument, redundant-returns-doc - def partial_universe_signature(self, partial_universe): - """Unique signature of this instance with a partial universe. - - A partial universe is a subset of the full universe that is - available at some time for trading. - - This is used in cvxportfolio.cache to sign back-test caches that - are saved on disk. If not redefined it returns None which disables - on-disk caching. - - :param partial_universe: A subset of the full universe. - :type partial_universe: pandas.Index - - :returns: Signature. - :rtype: str - """ - return None - -# compiled based on Interactive Brokers benchmark rates choices -# (see https://www.ibkrguides.com/kb/article-2949.htm) -# and their FRED codes -RATES = { - 'USDOLLAR': 'DFF', # Federal funds effective rate - 'EURO': 'ECBESTRVOLWGTTRMDMNRT', # BCE short term rate - 'GBPOUND': 'IUDSOIA', # SONIA - 'JPYEN': 'IRSTCB01JPM156N', # updated monthly - } - -class MarketDataInMemory(MarketData): - """Market data that is stored in memory when initialized.""" - - # this is overwritten in the derived classes' initializers - returns = None - - def __init__( - self, trading_frequency, base_location, cash_key, min_history, - online_usage = False): - """This must be called by the derived classes.""" - if (self.returns.index[-1] - self.returns.index[0]) < min_history: - raise DataError( - "The provided returns have less history " - + f"than the min_history {min_history}") - if trading_frequency: - self._downsample(trading_frequency) - self.trading_frequency = trading_frequency - - self._set_read_only() - self._check_sizes() - self._mask = None - self._masked_returns = None - self._masked_volumes = None - self._masked_prices = None - self.base_location = Path(base_location) - self.cash_key = cash_key - self._min_history_timedelta = min_history - self.online_usage = online_usage - - def _mask_dataframes(self, mask): - """Mask internal dataframes if necessary.""" - if (self._mask is None) or not np.all(self._mask == mask): - logger.info("Masking internal %s dataframes.", - self.__class__.__name__) - colmask = self.returns.columns[mask] - # self._masked_returns = self._df_or_ser_set_read_only( - # pd.DataFrame(self.returns.iloc[:, mask], copy=True)) - self._masked_returns = self._df_or_ser_set_read_only( - pd.DataFrame(self.returns.loc[:, colmask], copy=True)) - # self._masked_returns = self._df_or_ser_set_read_only( - # pd.DataFrame(np.array(self.returns.values[:, mask]), - # index=self.returns.index, columns=colmask)) - if not self.volumes is None: - # self._masked_volumes = self._df_or_ser_set_read_only( - # pd.DataFrame(self.volumes.iloc[:, mask[:-1]], copy=True)) - self._masked_volumes = self._df_or_ser_set_read_only( - pd.DataFrame(self.volumes.loc[:, colmask[:-1]], copy=True)) - # self._masked_volumes = self._df_or_ser_set_read_only( - # pd.DataFrame(np.array(self.volumes.values[:, mask[:-1]]), - # index=self.volumes.index, columns=colmask[:-1])) - if not self.prices is None: - # self._masked_prices = self._df_or_ser_set_read_only( - # pd.DataFrame(self.prices.iloc[:, mask[:-1]], copy=True)) - self._masked_prices = self._df_or_ser_set_read_only( - pd.DataFrame(self.prices.loc[:, colmask[:-1]], copy=True)) - self._mask = mask - - @property - def full_universe(self): - """Full universe, which might not be available for trading. - - :returns: Full universe. - :rtype: pandas.Index - """ - return self.returns.columns - - def serve(self, t): - """Serve data for policy and simulator at time :math:`t`. - - :param t: Time of execution, *e.g.*, stock market open of a given day. - :type t: pandas.Timestamp - - :returns: (past_returns, current_returns, past_volumes, - current_volumes, current_prices) - :rtype: (pandas.DataFrame, pandas.Series, pandas.DataFrame or None, - pandas.Series or None, pandas.Series or None) - """ - - mask = self._universe_mask_at_time(t).values - self._mask_dataframes(mask) - - tidx = self.returns.index.get_loc(t) - past_returns = self._df_or_ser_set_read_only( - pd.DataFrame(self._masked_returns.iloc[:tidx])) - current_returns = self._df_or_ser_set_read_only( - pd.Series(self._masked_returns.iloc[tidx])) - - if not self.volumes is None: - tidx = self.volumes.index.get_loc(t) - past_volumes = self._df_or_ser_set_read_only( - pd.DataFrame(self._masked_volumes.iloc[:tidx])) - current_volumes = self._df_or_ser_set_read_only( - pd.Series(self._masked_volumes.iloc[tidx])) - else: - past_volumes = None - current_volumes = None - - if not self.prices is None: - tidx = self.prices.index.get_loc(t) - current_prices = self._df_or_ser_set_read_only( - pd.Series(self._masked_prices.iloc[tidx])) - else: - current_prices = None - - return (past_returns, current_returns, past_volumes, current_volumes, - current_prices) - - def _add_cash_column(self, cash_key, grace_period): - """Add the cash column to an already formed returns dataframe. - - This assumes that the trading periods are about equally spaced. - If, say, you have trading periods with very different lengths you - should redefine this method **and** replace the :class:`CashReturn` - objective term. - """ - - if not cash_key in RATES: - raise NotImplementedError( - 'Currently the only data pipelines built are for cash_key' - f' in {list(RATES)}') - - if self.returns.index.tz is None: - raise DataError( - 'Your provided dataframes are not timezone aware.' - + " This is not recommended, and doesn't allow to add the cash" - + " returns' column internally." - + " You can fix this by adding a timezone manually " - + "using pandas.DataFrame.tz_localize to the dataframes before" - + " you pass them, or you can provide" - + " the cash returns' column as the last column of the returns" - + " dataframe (so it has one more column than volumes and" - + " prices, if provided), and set the cash_key parameter to" - + " its name.") - - data = Fred( - RATES[cash_key], base_location=self.base_location, - grace_period=grace_period) - - cash_returns_per_period = resample_returns( - data.data/100, periods=self.periods_per_year) - - # we merge instead of assigning column because indexes might - # be misaligned (e.g., with tz-aware timestamps) - cash_returns_per_period.name = self.cash_key - original_returns_index = self.returns.index - tmp = pd.concat( - [self.returns, cash_returns_per_period], sort=True, axis=1) - tmp[cash_key] = tmp[cash_key].ffill() - self.returns = tmp.loc[original_returns_index] - - def trading_calendar( - self, start_time=None, end_time=None, include_end=True): - """Get trading calendar from market data. - - :param start_time: Initial time of the trading calendar. Always - inclusive if present. If None, use the first available time. - :type start_time: pandas.Timestamp - :param end_time: Final time of the trading calendar. If None, - use the last available time. - :type end_time: pandas.Timestamp - :param include_end: Include end time. - :type include_end: bool - - :returns: Trading calendar. - :rtype: pandas.DatetimeIndex - """ - result = self.returns.index - result = result[result >= self._earliest_backtest_start] - if start_time: - result = result[result >= start_time] - if end_time: - result = result[(result <= end_time)] - if not include_end: - result = result[:-1] - return result - - def _universe_mask_at_time(self, t): - """Return the valid universe mask at time t.""" - past_returns = self.returns.loc[self.returns.index < t] - if self.online_usage: - valid_universe_mask = past_returns.count() >= self.min_history - else: - valid_universe_mask = ((past_returns.count() >= self.min_history) & - (~self.returns.loc[t].isnull())) - if sum(valid_universe_mask) <= 1: - raise DataError( - f'The trading universe at time {t} has size less or equal' - + ' than one, i.e., only the cash account. There are probably ' - + ' issues with missing data in the provided market returns.') - return valid_universe_mask - - @staticmethod - def _df_or_ser_set_read_only(df_or_ser): - """Set numpy array contained in dataframe to read only. - - This is done on data store internally before it is served to the - policy or the simulator to ensure data consistency in case some - element of the pipeline accidentally corrupts the data. - - This is enough to prevent direct assignement to the resulting - dataframe. However it could still be accidentally corrupted by - assigning to columns or indices that are not present in the - original. We avoid that case as well by returning a wrapped - dataframe (which doesn't copy data on creation) in - serve_data_policy and serve_data_simulator. - """ - data = df_or_ser.values - data.flags.writeable = False - if hasattr(df_or_ser, 'columns'): - return pd.DataFrame(data, index=df_or_ser.index, - columns=df_or_ser.columns) - return pd.Series(data, index=df_or_ser.index, name=df_or_ser.name) - - def _set_read_only(self): - """Set internal dataframes to read-only.""" - - self.returns = self._df_or_ser_set_read_only(self.returns) - - if not self.prices is None: - self.prices = self._df_or_ser_set_read_only(self.prices) - - if not self.volumes is None: - self.volumes = self._df_or_ser_set_read_only(self.volumes) - - @property - def _earliest_backtest_start(self): - """Earliest date at which we can start a backtest.""" - return self.returns.iloc[:, :-1].dropna(how='all').index[ - self.min_history] - - sampling_intervals = { - 'weekly': 'W-MON', 'monthly': 'MS', 'quarterly': 'QS', 'annual': 'AS'} - - # @staticmethod - # def _is_first_interval_small(datetimeindex): - # """Check if post-resampling the first interval is small. - # - # We have no way of knowing exactly if the first interval - # needs to be dropped. We drop it if its length is smaller - # than the average of all others, minus 2 standard deviation. - # """ - # first_interval = (datetimeindex[1] - datetimeindex[0]) - # all_others = (datetimeindex[2:] - datetimeindex[1:-1]) - # return first_interval < (all_others.mean() - 2 * all_others.std()) - - def _downsample(self, interval): - """_downsample market data.""" - if not interval in self.sampling_intervals: - raise SyntaxError( - 'Unsopported trading interval for down-sampling.') - interval = self.sampling_intervals[interval] - new_returns_index = pd.Series(self.returns.index, self.returns.index - ).resample(interval, closed='left', - label='left').first().values - # print(new_returns_index) - self.returns = np.exp(np.log( - 1+self.returns).resample(interval, closed='left', label='left' - ).sum(min_count=1))-1 - self.returns.index = new_returns_index - - # last row is always unknown - self.returns.iloc[-1] = np.nan - - # # we drop the first row if its interval is small - # if self._is_first_interval_small(self.returns.index): - # self.returns = self.returns.iloc[1:] - - # we nan-out the first non-nan element of every col - for col in self.returns.columns[:-1]: - self.returns.loc[ - (~(self.returns[col].isnull())).idxmax(), col] = np.nan - - # and we drop the first row, which is mostly NaNs anyway - self.returns = self.returns.iloc[1:] - - if self.volumes is not None: - new_volumes_index = pd.Series( - self.volumes.index, self.volumes.index - ).resample(interval, closed='left', - label='left').first().values - self.volumes = self.volumes.resample( - interval, closed='left', label='left').sum(min_count=1) - self.volumes.index = new_volumes_index - - # last row is always unknown - self.volumes.iloc[-1] = np.nan - - # # we drop the first row if its interval is small - # if self._is_first_interval_small(self.volumes.index): - # self.volumes = self.volumes.iloc[1:] - - # we nan-out the first non-nan element of every col - for col in self.volumes.columns: - self.volumes.loc[ - (~(self.volumes[col].isnull())).idxmax(), col] = np.nan - - # and we drop the first row, which is mostly NaNs anyway - self.volumes = self.volumes.iloc[1:] - - if self.prices is not None: - new_prices_index = pd.Series( - self.prices.index, self.prices.index - ).resample( - interval, closed='left', label='left').first().values - self.prices = self.prices.resample( - interval, closed='left', label='left').first() - self.prices.index = new_prices_index - - # # we drop the first row if its interval is small - # if self._is_first_interval_small(self.prices.index): - # self.prices = self.prices.iloc[1:] - - # we nan-out the first non-nan element of every col - for col in self.prices.columns: - self.prices.loc[ - (~(self.prices[col].isnull())).idxmax(), col] = np.nan - - # and we drop the first row, which is mostly NaNs anyway - self.prices = self.prices.iloc[1:] - - def _check_sizes(self): - """Check sizes of user-provided dataframes.""" - - if (not self.volumes is None) and ( - not (self.volumes.shape[1] == self.returns.shape[1] - 1) - or not all(self.volumes.columns == self.returns.columns[:-1])): - raise SyntaxError( - 'Volumes should have same columns as returns, minus cash_key.') - - if (not self.prices is None) and ( - not (self.prices.shape[1] == self.returns.shape[1] - 1) - or not all(self.prices.columns == self.returns.columns[:-1])): - raise SyntaxError( - 'Prices should have same columns as returns, minus cash_key.') - - @property - def periods_per_year(self): - """Average trading periods per year inferred from the data. - - :returns: Average periods per year. - :rtype: int - """ - return periods_per_year_from_datetime_index(self.returns.index) - - @property - def min_history(self): - """Min history expressed in periods. - - :returns: How many non-null elements of the past returns for a given - name are required to include it. - :rtype: int - """ - return int(np.round(self.periods_per_year * ( - self._min_history_timedelta / pd.Timedelta('365.24d')))) - - -class UserProvidedMarketData(MarketDataInMemory): - """User-provided market data. - - :param returns: Historical open-to-open returns. The return - at time :math:`t` is :math:`r_t = p_{t+1}/p_t -1` where - :math:`p_t` is the (open) price at time :math:`t`. Must - have datetime index. You can also include cash - returns as its last column, and set ``cash_key`` below to the last - column's name. - :type returns: pandas.DataFrame - :param volumes: Historical market volumes, expressed in units - of value (*e.g.*, US dollars). - :type volumes: pandas.DataFrame or None - :param prices: Historical open prices (*e.g.*, used for rounding - trades in the :class:`MarketSimulator`). - :type prices: pandas.DataFrame or None - :param trading_frequency: Instead of using frequency implied by - the index of the returns, down-sample all dataframes. - We implement ``'weekly'``, ``'monthly'``, ``'quarterly'`` and - ``'annual'``. By default (None) don't down-sample. - :type trading_frequency: str or None - :param min_history: Minimum amount of time for which the returns - are not ``np.nan`` before each assets enters in a back-test. - :type min_history: pandas.Timedelta - :param base_location: The location of the storage, only used - in case it downloads the cash returns. By default - it's a directory named ``cvxportfolio_data`` in your home folder. - :type base_location: pathlib.Path - :param cash_key: Name of the cash account. If not the last column - of the provided returns, it will be downloaded. In that case you should - make sure your provided dataframes have a timezone aware datetime - index. Its returns are the risk-free rate. - :type cash_key: str - :param online_usage: Disable removal of assets that have ``np.nan`` returns - for the given time. Default False. - :type online_usage: bool - """ - - # pylint: disable=too-many-arguments - def __init__(self, returns, volumes=None, prices=None, - copy_dataframes=True, trading_frequency=None, - min_history=pd.Timedelta('365.24d'), - base_location=BASE_LOCATION, - grace_period=pd.Timedelta('1d'), - cash_key='USDOLLAR', - online_usage=False): - - if returns is None: - raise SyntaxError( - "If you don't specify a universe you should pass `returns`.") - - self.base_location = Path(base_location) - self.cash_key = cash_key - - self.returns = pd.DataFrame( - make_numeric(returns), copy=copy_dataframes) - self.volumes = volumes if volumes is None else\ - pd.DataFrame(make_numeric(volumes), copy=copy_dataframes) - self.prices = prices if prices is None else\ - pd.DataFrame(make_numeric(prices), copy=copy_dataframes) - - if cash_key != returns.columns[-1]: - self._add_cash_column(cash_key, grace_period=grace_period) - - # this is mandatory - super().__init__( - trading_frequency=trading_frequency, - base_location=base_location, - cash_key=cash_key, - min_history=min_history, - online_usage=online_usage) - - -class DownloadedMarketData(MarketDataInMemory): - """Market data that is downloaded. - - :param universe: List of names as understood by the data source - used, *e.g.*, ``['AAPL', 'GOOG']`` if using the default - Yahoo Finance data source. - :type universe: list - :param datasource: The data source used. - :type datasource: str or :class:`SymbolData` class - :param cash_key: Name of the cash account, its rates will be downloaded - and added as last columns of the returns. Its returns are the - risk-free rate. - :type cash_key: str - :param base_location: The location of the storage. By default - it's a directory named ``cvxportfolio_data`` in your home folder. - :type base_location: pathlib.Path - :param storage_backend: The storage backend, implemented ones are - ``'pickle'``, ``'csv'``, and ``'sqlite'``. By default ``'pickle'``. - :type storage_backend: str - :param min_history: Minimum amount of time for which the returns - are not ``np.nan`` before each assets enters in a back-test. - :type min_history: pandas.Timedelta - :param grace_period: If the most recent observation of each symbol's - data is less old than this we do not download new data. - By default it's one day. - :type grace_period: pandas.Timedelta - :param trading_frequency: Instead of using frequency implied by - the index of the returns, down-sample all dataframes. - We implement ``'weekly'``, ``'monthly'``, ``'quarterly'`` and - ``'annual'``. By default (None) don't down-sample. - :type trading_frequency: str or None - :param online_usage: Disable removal of assets that have ``np.nan`` returns - for the given time. Default False. - :type online_usage: bool - """ - - # pylint: disable=too-many-arguments - def __init__(self, - universe=(), - datasource='YahooFinance', - cash_key='USDOLLAR', - base_location=BASE_LOCATION, - storage_backend='pickle', - min_history=pd.Timedelta('365.24d'), - grace_period=pd.Timedelta('1d'), - trading_frequency=None, - online_usage=False): - """Initializer.""" - - # drop duplicates and ensure ordering - universe = sorted(set(universe)) - - self.base_location = Path(base_location) - self.cash_key = cash_key - if isinstance(datasource, type): - self.datasource = datasource - else: # try to load in current module - self.datasource = globals()[datasource] - self._get_market_data( - universe, grace_period=grace_period, - storage_backend=storage_backend) - self._add_cash_column(self.cash_key, grace_period=grace_period) - self._remove_missing_recent() - - # this is mandatory - super().__init__( - trading_frequency=trading_frequency, - base_location=base_location, - cash_key=cash_key, - min_history=min_history, - online_usage=online_usage) - - def _get_market_data(self, universe, grace_period, storage_backend): - """Download market data.""" - database_accesses = {} - print('Updating data', end='') - sys.stdout.flush() - - for stock in universe: - logger.info( - 'Updating %s with %s.', stock, self.datasource.__name__) - print('.', end='') - sys.stdout.flush() - database_accesses[stock] = self.datasource( - stock, base_location=self.base_location, - grace_period=grace_period, storage_backend=storage_backend) - print() - - if hasattr(self.datasource, 'IS_OHLCVR') and self.datasource.IS_OHLCVR: - self.returns = pd.DataFrame( - {stock: database_accesses[stock].data['return'] - for stock in universe}) - self.volumes = pd.DataFrame( - {stock: database_accesses[stock].data['valuevolume'] - for stock in universe}) - self.prices = pd.DataFrame( - {stock: database_accesses[stock].data['open'] - for stock in universe}) - else: # for now only Fred for indexes, we assume prices! - assert isinstance(database_accesses[universe[0]].data, pd.Series) - self.prices = pd.DataFrame( - # open prices - {stock: database_accesses[stock].data for stock in universe}) - self.returns = 1 - self.prices / self.prices.shift(-1) - self.volumes = None - - def _remove_missing_recent(self): - """Clean recent data. - - Yahoo Finance may has issues with most recent data; we remove - recent days if there are NaNs. - """ - - if self.prices.iloc[-5:].isnull().any().any(): - logger.debug( - 'Removing some recent lines because there are missing values.') - drop_at = self.prices.iloc[-5:].isnull().any(axis=1).idxmax() - logger.debug('Dropping at index %s', drop_at) - self.returns = self.returns.loc[self.returns.index < drop_at] - if self.prices is not None: - self.prices = self.prices.loc[self.prices.index < drop_at] - if self.volumes is not None: - self.volumes = self.volumes.loc[self.volumes.index < drop_at] - - # for consistency we must also nan-out the last row - # of returns and volumes - self.returns.iloc[-1] = np.nan - if self.volumes is not None: - self.volumes.iloc[-1] = np.nan - - def partial_universe_signature(self, partial_universe): - """Unique signature of this instance with a partial universe. - - A partial universe is a subset of the full universe that is - available at some time for trading. - - This is used in cvxportfolio.cache to sign back-test caches that - are saved on disk. See its implementation below for details. If - not redefined it returns None which disables on-disk caching. - - :param partial_universe: A subset of the full universe. - :type partial_universe: pandas.Index - - :returns: Signature. - :rtype: str - """ - assert isinstance(partial_universe, pd.Index) - assert np.all(partial_universe.isin(self.full_universe)) - result = f'{self.__class__.__name__}(' - result += f'datasource={self.datasource.__name__}, ' - result += f'partial_universe_hash={hash_(np.array(partial_universe))},' - result += f' trading_frequency={self.trading_frequency})' - return result diff --git a/cvxportfolio/data/__init__.py b/cvxportfolio/data/__init__.py new file mode 100644 index 000000000..0c2bc403a --- /dev/null +++ b/cvxportfolio/data/__init__.py @@ -0,0 +1,26 @@ +# Copyright 2023 Enzo Busseti +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This module include classes that download, store, and serve market data. + +The two main abstractions are :class:`SymbolData` and :class:`MarketData`. +Neither are exposed outside this module. Their derived classes instead are. +If you want to interface cvxportfolio with financial data source other +than the ones we provide, you should derive from either of those two classes. +""" + +from .market_data import * +from .symbol_data import * + +__all__ = [ + "YahooFinance", "Fred", "UserProvidedMarketData", "DownloadedMarketData"] diff --git a/cvxportfolio/data/market_data.py b/cvxportfolio/data/market_data.py new file mode 100644 index 000000000..245873948 --- /dev/null +++ b/cvxportfolio/data/market_data.py @@ -0,0 +1,677 @@ +# Copyright 2023 Enzo Busseti +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This module defines the :class:`MarketData` abstraction and derived classes.""" + +import logging +import sys +from pathlib import Path + +import numpy as np +import pandas as pd + +from ..errors import DataError +from ..utils import (hash_, make_numeric, periods_per_year_from_datetime_index, + resample_returns, set_pd_read_only) +from .symbol_data import * +from .symbol_data import OLHCV + +logger = logging.getLogger(__name__) + +__all__ = ['DownloadedMarketData', 'MarketData', 'UserProvidedMarketData'] + + +class MarketData: + """Prepare, hold, and serve market data. + + :method serve: Serve data for policy and simulator at time :math:`t`. + """ + + def serve(self, t): + """Serve data for policy and simulator at time :math:`t`. + + :param t: Trading time. It must be included in the timestamps returned + by :meth:`trading_calendar`. + :type t: pandas.Timestamp + + :returns: past_returns, current_returns, past_volumes, current_volumes, + current_prices + :rtype: (pandas.DataFrame, pandas.Series, pandas.DataFrame, + pandas.Series, pandas.Series) + """ + raise NotImplementedError # pragma: no cover + + def trading_calendar( + self, start_time=None, end_time=None, include_end=True): + """Get trading calendar between times. + + :param start_time: Initial time of the trading calendar. Always + inclusive if present. If None, use the first available time. + :type start_time: pandas.Timestamp + :param end_time: Final time of the trading calendar. If None, + use the last available time. + :type end_time: pandas.Timestamp + :param include_end: Include end time. + :type include_end: bool + + :returns: Trading calendar. + :rtype: pandas.DatetimeIndex + """ + raise NotImplementedError # pragma: no cover + + @property + def periods_per_year(self): + """Average trading periods per year. + + :rtype: int + """ + raise NotImplementedError # pragma: no cover + + @property + def full_universe(self): + """Full universe, which might not be available for trading. + + :returns: Full universe. + :rtype: pandas.Index + """ + raise NotImplementedError # pragma: no cover + + # pylint: disable=unused-argument, redundant-returns-doc + def partial_universe_signature(self, partial_universe): + """Unique signature of this instance with a partial universe. + + A partial universe is a subset of the full universe that is + available at some time for trading. + + This is used in cvxportfolio.cache to sign back-test caches that + are saved on disk. If not redefined it returns None which disables + on-disk caching. + + :param partial_universe: A subset of the full universe. + :type partial_universe: pandas.Index + + :returns: Signature. + :rtype: str + """ + return None + +# compiled based on Interactive Brokers benchmark rates choices +# (see https://www.ibkrguides.com/kb/article-2949.htm) +# and their FRED codes +RATES = { + 'USDOLLAR': 'DFF', # Federal funds effective rate + 'EURO': 'ECBESTRVOLWGTTRMDMNRT', # BCE short term rate + 'GBPOUND': 'IUDSOIA', # SONIA + 'JPYEN': 'IRSTCB01JPM156N', # updated monthly + } + +class MarketDataInMemory(MarketData): + """Market data that is stored in memory when initialized.""" + + # this is overwritten in the derived classes' initializers + returns = None + + def __init__( + self, trading_frequency, base_location, cash_key, min_history, + online_usage = False): + """This must be called by the derived classes.""" + if (self.returns.index[-1] - self.returns.index[0]) < min_history: + raise DataError( + "The provided returns have less history " + + f"than the min_history {min_history}") + if trading_frequency: + self._downsample(trading_frequency) + self.trading_frequency = trading_frequency + + self._set_read_only() + self._check_sizes() + self._mask = None + self._masked_returns = None + self._masked_volumes = None + self._masked_prices = None + self.base_location = Path(base_location) + self.cash_key = cash_key + self._min_history_timedelta = min_history + self.online_usage = online_usage + + def _mask_dataframes(self, mask): + """Mask internal dataframes if necessary.""" + if (self._mask is None) or not np.all(self._mask == mask): + logger.info("Masking internal %s dataframes.", + self.__class__.__name__) + colmask = self.returns.columns[mask] + # self._masked_returns = set_pd_read_only( + # pd.DataFrame(self.returns.iloc[:, mask], copy=True)) + self._masked_returns = set_pd_read_only( + pd.DataFrame(self.returns.loc[:, colmask], copy=True)) + # self._masked_returns = set_pd_read_only( + # pd.DataFrame(np.array(self.returns.values[:, mask]), + # index=self.returns.index, columns=colmask)) + if not self.volumes is None: + # self._masked_volumes = set_pd_read_only( + # pd.DataFrame(self.volumes.iloc[:, mask[:-1]], copy=True)) + self._masked_volumes = set_pd_read_only( + pd.DataFrame(self.volumes.loc[:, colmask[:-1]], copy=True)) + # self._masked_volumes = set_pd_read_only( + # pd.DataFrame(np.array(self.volumes.values[:, mask[:-1]]), + # index=self.volumes.index, columns=colmask[:-1])) + if not self.prices is None: + # self._masked_prices = set_pd_read_only( + # pd.DataFrame(self.prices.iloc[:, mask[:-1]], copy=True)) + self._masked_prices = set_pd_read_only( + pd.DataFrame(self.prices.loc[:, colmask[:-1]], copy=True)) + self._mask = mask + + @property + def full_universe(self): + """Full universe, which might not be available for trading. + + :returns: Full universe. + :rtype: pandas.Index + """ + return self.returns.columns + + def serve(self, t): + """Serve data for policy and simulator at time :math:`t`. + + :param t: Time of execution, *e.g.*, stock market open of a given day. + :type t: pandas.Timestamp + + :returns: (past_returns, current_returns, past_volumes, + current_volumes, current_prices) + :rtype: (pandas.DataFrame, pandas.Series, pandas.DataFrame or None, + pandas.Series or None, pandas.Series or None) + """ + + mask = self._universe_mask_at_time(t).values + self._mask_dataframes(mask) + + tidx = self.returns.index.get_loc(t) + past_returns = set_pd_read_only( + pd.DataFrame(self._masked_returns.iloc[:tidx])) + current_returns = set_pd_read_only( + pd.Series(self._masked_returns.iloc[tidx])) + + if not self.volumes is None: + tidx = self.volumes.index.get_loc(t) + past_volumes = set_pd_read_only( + pd.DataFrame(self._masked_volumes.iloc[:tidx])) + current_volumes = set_pd_read_only( + pd.Series(self._masked_volumes.iloc[tidx])) + else: + past_volumes = None + current_volumes = None + + if not self.prices is None: + tidx = self.prices.index.get_loc(t) + current_prices = set_pd_read_only( + pd.Series(self._masked_prices.iloc[tidx])) + else: + current_prices = None + + return (past_returns, current_returns, past_volumes, current_volumes, + current_prices) + + def _add_cash_column(self, cash_key, grace_period): + """Add the cash column to an already formed returns dataframe. + + This assumes that the trading periods are about equally spaced. + If, say, you have trading periods with very different lengths you + should redefine this method **and** replace the :class:`CashReturn` + objective term. + """ + + if not cash_key in RATES: + raise NotImplementedError( + 'Currently the only data pipelines built are for cash_key' + f' in {list(RATES)}') + + if self.returns.index.tz is None: + raise DataError( + 'Your provided dataframes are not timezone aware.' + + " This is not recommended, and doesn't allow to add the cash" + + " returns' column internally." + + " You can fix this by adding a timezone manually " + + "using pandas.DataFrame.tz_localize to the dataframes before" + + " you pass them, or you can provide" + + " the cash returns' column as the last column of the returns" + + " dataframe (so it has one more column than volumes and" + + " prices, if provided), and set the cash_key parameter to" + + " its name.") + + data = Fred( + RATES[cash_key], base_location=self.base_location, + grace_period=grace_period) + + cash_returns_per_period = resample_returns( + data.data/100, periods=self.periods_per_year) + + # we merge instead of assigning column because indexes might + # be misaligned (e.g., with tz-aware timestamps) + cash_returns_per_period.name = self.cash_key + original_returns_index = self.returns.index + tmp = pd.concat( + [self.returns, cash_returns_per_period], sort=True, axis=1) + tmp[cash_key] = tmp[cash_key].ffill() + self.returns = tmp.loc[original_returns_index] + + def trading_calendar( + self, start_time=None, end_time=None, include_end=True): + """Get trading calendar from market data. + + :param start_time: Initial time of the trading calendar. Always + inclusive if present. If None, use the first available time. + :type start_time: pandas.Timestamp + :param end_time: Final time of the trading calendar. If None, + use the last available time. + :type end_time: pandas.Timestamp + :param include_end: Include end time. + :type include_end: bool + + :returns: Trading calendar. + :rtype: pandas.DatetimeIndex + """ + result = self.returns.index + result = result[result >= self._earliest_backtest_start] + if start_time: + result = result[result >= start_time] + if end_time: + result = result[(result <= end_time)] + if not include_end: + result = result[:-1] + return result + + def _universe_mask_at_time(self, t): + """Return the valid universe mask at time t.""" + past_returns = self.returns.loc[self.returns.index < t] + if self.online_usage: + valid_universe_mask = past_returns.count() >= self.min_history + else: + valid_universe_mask = ((past_returns.count() >= self.min_history) & + (~self.returns.loc[t].isnull())) + if sum(valid_universe_mask) <= 1: + raise DataError( + f'The trading universe at time {t} has size less or equal' + + ' than one, i.e., only the cash account. There are probably ' + + ' issues with missing data in the provided market returns.') + return valid_universe_mask + + def _set_read_only(self): + """Set internal dataframes to read-only.""" + + self.returns = set_pd_read_only(self.returns) + + if not self.prices is None: + self.prices = set_pd_read_only(self.prices) + + if not self.volumes is None: + self.volumes = set_pd_read_only(self.volumes) + + @property + def _earliest_backtest_start(self): + """Earliest date at which we can start a backtest.""" + return self.returns.iloc[:, :-1].dropna(how='all').index[ + self.min_history] + + sampling_intervals = { + 'weekly': 'W-MON', 'monthly': 'MS', 'quarterly': 'QS', 'annual': 'AS'} + + # @staticmethod + # def _is_first_interval_small(datetimeindex): + # """Check if post-resampling the first interval is small. + # + # We have no way of knowing exactly if the first interval + # needs to be dropped. We drop it if its length is smaller + # than the average of all others, minus 2 standard deviation. + # """ + # first_interval = (datetimeindex[1] - datetimeindex[0]) + # all_others = (datetimeindex[2:] - datetimeindex[1:-1]) + # return first_interval < (all_others.mean() - 2 * all_others.std()) + + def _downsample(self, interval): + """_downsample market data.""" + if not interval in self.sampling_intervals: + raise SyntaxError( + 'Unsopported trading interval for down-sampling.') + interval = self.sampling_intervals[interval] + new_returns_index = pd.Series(self.returns.index, self.returns.index + ).resample(interval, closed='left', + label='left').first().values + # print(new_returns_index) + self.returns = np.exp(np.log( + 1+self.returns).resample(interval, closed='left', label='left' + ).sum(min_count=1))-1 + self.returns.index = new_returns_index + + # last row is always unknown + self.returns.iloc[-1] = np.nan + + # # we drop the first row if its interval is small + # if self._is_first_interval_small(self.returns.index): + # self.returns = self.returns.iloc[1:] + + # we nan-out the first non-nan element of every col + for col in self.returns.columns[:-1]: + self.returns.loc[ + (~(self.returns[col].isnull())).idxmax(), col] = np.nan + + # and we drop the first row, which is mostly NaNs anyway + self.returns = self.returns.iloc[1:] + + if self.volumes is not None: + new_volumes_index = pd.Series( + self.volumes.index, self.volumes.index + ).resample(interval, closed='left', + label='left').first().values + self.volumes = self.volumes.resample( + interval, closed='left', label='left').sum(min_count=1) + self.volumes.index = new_volumes_index + + # last row is always unknown + self.volumes.iloc[-1] = np.nan + + # # we drop the first row if its interval is small + # if self._is_first_interval_small(self.volumes.index): + # self.volumes = self.volumes.iloc[1:] + + # we nan-out the first non-nan element of every col + for col in self.volumes.columns: + self.volumes.loc[ + (~(self.volumes[col].isnull())).idxmax(), col] = np.nan + + # and we drop the first row, which is mostly NaNs anyway + self.volumes = self.volumes.iloc[1:] + + if self.prices is not None: + new_prices_index = pd.Series( + self.prices.index, self.prices.index + ).resample( + interval, closed='left', label='left').first().values + self.prices = self.prices.resample( + interval, closed='left', label='left').first() + self.prices.index = new_prices_index + + # # we drop the first row if its interval is small + # if self._is_first_interval_small(self.prices.index): + # self.prices = self.prices.iloc[1:] + + # we nan-out the first non-nan element of every col + for col in self.prices.columns: + self.prices.loc[ + (~(self.prices[col].isnull())).idxmax(), col] = np.nan + + # and we drop the first row, which is mostly NaNs anyway + self.prices = self.prices.iloc[1:] + + def _check_sizes(self): + """Check sizes of user-provided dataframes.""" + + if (not self.volumes is None) and ( + not (self.volumes.shape[1] == self.returns.shape[1] - 1) + or not all(self.volumes.columns == self.returns.columns[:-1])): + raise SyntaxError( + 'Volumes should have same columns as returns, minus cash_key.') + + if (not self.prices is None) and ( + not (self.prices.shape[1] == self.returns.shape[1] - 1) + or not all(self.prices.columns == self.returns.columns[:-1])): + raise SyntaxError( + 'Prices should have same columns as returns, minus cash_key.') + + @property + def periods_per_year(self): + """Average trading periods per year inferred from the data. + + :returns: Average periods per year. + :rtype: int + """ + return periods_per_year_from_datetime_index(self.returns.index) + + @property + def min_history(self): + """Min history expressed in periods. + + :returns: How many non-null elements of the past returns for a given + name are required to include it. + :rtype: int + """ + return int(np.round(self.periods_per_year * ( + self._min_history_timedelta / pd.Timedelta('365.24d')))) + + +class UserProvidedMarketData(MarketDataInMemory): + """User-provided market data. + + :param returns: Historical open-to-open returns. The return + at time :math:`t` is :math:`r_t = p_{t+1}/p_t -1` where + :math:`p_t` is the (open) price at time :math:`t`. Must + have datetime index. You can also include cash + returns as its last column, and set ``cash_key`` below to the last + column's name. + :type returns: pandas.DataFrame + :param volumes: Historical market volumes, expressed in units + of value (*e.g.*, US dollars). + :type volumes: pandas.DataFrame or None + :param prices: Historical open prices (*e.g.*, used for rounding + trades in the :class:`MarketSimulator`). + :type prices: pandas.DataFrame or None + :param trading_frequency: Instead of using frequency implied by + the index of the returns, down-sample all dataframes. + We implement ``'weekly'``, ``'monthly'``, ``'quarterly'`` and + ``'annual'``. By default (None) don't down-sample. + :type trading_frequency: str or None + :param min_history: Minimum amount of time for which the returns + are not ``np.nan`` before each assets enters in a back-test. + :type min_history: pandas.Timedelta + :param base_location: The location of the storage, only used + in case it downloads the cash returns. By default + it's a directory named ``cvxportfolio_data`` in your home folder. + :type base_location: pathlib.Path + :param cash_key: Name of the cash account. If not the last column + of the provided returns, it will be downloaded. In that case you should + make sure your provided dataframes have a timezone aware datetime + index. Its returns are the risk-free rate. + :type cash_key: str + :param online_usage: Disable removal of assets that have ``np.nan`` returns + for the given time. Default False. + :type online_usage: bool + """ + + # pylint: disable=too-many-arguments + def __init__(self, returns, volumes=None, prices=None, + copy_dataframes=True, trading_frequency=None, + min_history=pd.Timedelta('365.24d'), + base_location=BASE_LOCATION, + grace_period=pd.Timedelta('1d'), + cash_key='USDOLLAR', + online_usage=False): + + if returns is None: + raise SyntaxError( + "If you don't specify a universe you should pass `returns`.") + + self.base_location = Path(base_location) + self.cash_key = cash_key + + self.returns = pd.DataFrame( + make_numeric(returns), copy=copy_dataframes) + self.volumes = volumes if volumes is None else\ + pd.DataFrame(make_numeric(volumes), copy=copy_dataframes) + self.prices = prices if prices is None else\ + pd.DataFrame(make_numeric(prices), copy=copy_dataframes) + + if cash_key != returns.columns[-1]: + self._add_cash_column(cash_key, grace_period=grace_period) + + # this is mandatory + super().__init__( + trading_frequency=trading_frequency, + base_location=base_location, + cash_key=cash_key, + min_history=min_history, + online_usage=online_usage) + + +class DownloadedMarketData(MarketDataInMemory): + """Market data that is downloaded. + + :param universe: List of names as understood by the data source + used, *e.g.*, ``['AAPL', 'GOOG']`` if using the default + Yahoo Finance data source. + :type universe: list + :param datasource: The data source used. + :type datasource: str or :class:`SymbolData` class + :param cash_key: Name of the cash account, its rates will be downloaded + and added as last columns of the returns. Its returns are the + risk-free rate. + :type cash_key: str + :param base_location: The location of the storage. By default + it's a directory named ``cvxportfolio_data`` in your home folder. + :type base_location: pathlib.Path + :param storage_backend: The storage backend, implemented ones are + ``'pickle'``, ``'csv'``, and ``'sqlite'``. By default ``'pickle'``. + :type storage_backend: str + :param min_history: Minimum amount of time for which the returns + are not ``np.nan`` before each assets enters in a back-test. + :type min_history: pandas.Timedelta + :param grace_period: If the most recent observation of each symbol's + data is less old than this we do not download new data. + By default it's one day. + :type grace_period: pandas.Timedelta + :param trading_frequency: Instead of using frequency implied by + the index of the returns, down-sample all dataframes. + We implement ``'weekly'``, ``'monthly'``, ``'quarterly'`` and + ``'annual'``. By default (None) don't down-sample. + :type trading_frequency: str or None + :param online_usage: Disable removal of assets that have ``np.nan`` returns + for the given time. Default False. + :type online_usage: bool + """ + + # pylint: disable=too-many-arguments + def __init__(self, + universe=(), + datasource='YahooFinance', + cash_key='USDOLLAR', + base_location=BASE_LOCATION, + storage_backend='pickle', + min_history=pd.Timedelta('365.24d'), + grace_period=pd.Timedelta('1d'), + trading_frequency=None, + online_usage=False): + """Initializer.""" + + # drop duplicates and ensure ordering + universe = sorted(set(universe)) + + self.base_location = Path(base_location) + self.cash_key = cash_key + if isinstance(datasource, type): + self.datasource = datasource + else: # try to load in current module + self.datasource = globals()[datasource] + self._get_market_data( + universe, grace_period=grace_period, + storage_backend=storage_backend) + self._add_cash_column(self.cash_key, grace_period=grace_period) + self._remove_missing_recent() + + # this is mandatory + super().__init__( + trading_frequency=trading_frequency, + base_location=base_location, + cash_key=cash_key, + min_history=min_history, + online_usage=online_usage) + + def _get_market_data(self, universe, grace_period, storage_backend): + """Download market data.""" + database_accesses = {} + print('Updating data', end='') + sys.stdout.flush() + + for stock in universe: + logger.info( + 'Updating %s with %s.', stock, self.datasource.__name__) + print('.', end='') + sys.stdout.flush() + database_accesses[stock] = self.datasource( + stock, base_location=self.base_location, + grace_period=grace_period, storage_backend=storage_backend) + print() + + if issubclass(self.datasource, OLHCV): + self.returns = pd.DataFrame( + {stock: database_accesses[stock].data['return'] + for stock in universe}) + self.volumes = pd.DataFrame( + {stock: database_accesses[stock].data['valuevolume'] + for stock in universe}) + self.prices = pd.DataFrame( + {stock: database_accesses[stock].data['open'] + for stock in universe}) + else: # for now only Fred for indexes, we assume prices! + assert isinstance(database_accesses[universe[0]].data, pd.Series) + self.prices = pd.DataFrame( + # open prices + {stock: database_accesses[stock].data for stock in universe}) + self.returns = 1 - self.prices / self.prices.shift(-1) + self.volumes = None + + def _remove_missing_recent(self): + """Clean recent data. + + Yahoo Finance may has issues with most recent data; we remove + recent days if there are NaNs. + """ + + if self.prices.iloc[-5:].isnull().any().any(): + logger.warning( + 'Removing some recent lines because there are missing values.') + drop_at = self.prices.iloc[-5:].isnull().any(axis=1).idxmax() + logger.warning('Dropping at index %s', drop_at) + self.returns = self.returns.loc[self.returns.index < drop_at] + if self.prices is not None: + self.prices = self.prices.loc[self.prices.index < drop_at] + if self.volumes is not None: + self.volumes = self.volumes.loc[self.volumes.index < drop_at] + + # for consistency we must also nan-out the last row + # of returns and volumes + self.returns.iloc[-1] = np.nan + if self.volumes is not None: + self.volumes.iloc[-1] = np.nan + + def partial_universe_signature(self, partial_universe): + """Unique signature of this instance with a partial universe. + + A partial universe is a subset of the full universe that is + available at some time for trading. + + This is used in cvxportfolio.cache to sign back-test caches that + are saved on disk. See its implementation below for details. If + not redefined it returns None which disables on-disk caching. + + :param partial_universe: A subset of the full universe. + :type partial_universe: pandas.Index + + :returns: Signature. + :rtype: str + """ + assert isinstance(partial_universe, pd.Index) + assert np.all(partial_universe.isin(self.full_universe)) + result = f'{self.__class__.__name__}(' + result += f'datasource={self.datasource.__name__}, ' + result += f'partial_universe_hash={hash_(np.array(partial_universe))},' + result += f' trading_frequency={self.trading_frequency})' + return result diff --git a/cvxportfolio/data/symbol_data.py b/cvxportfolio/data/symbol_data.py new file mode 100644 index 000000000..119c1221a --- /dev/null +++ b/cvxportfolio/data/symbol_data.py @@ -0,0 +1,1182 @@ +# Copyright 2023 Enzo Busseti +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This module defines :class:`SymbolData` and derived classes.""" + +import datetime +import logging +import sqlite3 +import warnings +from pathlib import Path +from urllib.error import URLError + +import numpy as np +import pandas as pd +import requests +import requests.exceptions + +from ..errors import DataError +from ..utils import set_pd_read_only + +logger = logging.getLogger(__name__) + +BASE_LOCATION = Path.home() / "cvxportfolio_data" + +__all__ = [ + '_loader_csv', '_loader_pickle', '_loader_sqlite', + '_storer_csv', '_storer_pickle', '_storer_sqlite', + 'Fred', 'SymbolData', 'YahooFinance', 'BASE_LOCATION'] + +def now_timezoned(): + """Return current timestamp with local timezone. + + :returns: Current timestamp with local timezone. + :rtype: pandas.Timestamp + """ + return pd.Timestamp( + datetime.datetime.now(datetime.timezone.utc).astimezone()) + +class SymbolData: + """Base class for a single symbol time series data. + + The data is either in the form of a Pandas Series or DataFrame + and has datetime index. + + This class needs to be derived. At a minimum, + one should redefine the ``_download`` method, which + implements the downloading of the symbol's time series + from an external source. The method takes the current (already + downloaded and stored) data and is supposed to **only append** to it. + In this way we only store new data and don't modify already downloaded + data. + + Additionally one can redefine the ``_preload`` method, which prepares + data to serve to the user (so the data is stored in a different format + than what the user sees.) We found that this separation can be useful. + + This class interacts with module-level functions named ``_loader_BACKEND`` + and ``_storer_BACKEND``, where ``BACKEND`` is the name of the storage + system used. We define ``pickle``, ``csv``, and ``sqlite`` backends. + These may have limitations. See their docstrings for more information. + + + :param symbol: The symbol that we downloaded. + :type symbol: str + :param storage_backend: The storage backend, implemented ones are + ``'pickle'``, ``'csv'``, and ``'sqlite'``. By default ``'pickle'``. + :type storage_backend: str + :param base_location: The location of the storage. We store in a + subdirectory named after the class which derives from this. By default + it's a directory named ``cvxportfolio_data`` in your home folder. + :type base_location: pathlib.Path + :param grace_period: If the most recent observation in the data is less + old than this we do not download new data. By default it's one day. + :type grace_period: pandas.Timedelta + + :attribute data: The downloaded data for the symbol. + """ + + def __init__(self, symbol, + storage_backend='pickle', + base_location=BASE_LOCATION, + grace_period=pd.Timedelta('1d')): + self._symbol = symbol + self._storage_backend = storage_backend + self._base_location = base_location + self.update(grace_period) + self._data = self.load() + + @property + def storage_location(self): + """Storage location. Directory is created if not existent. + + :rtype: pathlib.Path + """ + loc = self._base_location / f"{self.__class__.__name__}" + loc.mkdir(parents=True, exist_ok=True) + return loc + + @property + def symbol(self): + """The symbol whose data this instance contains. + + :rtype: str + """ + return self._symbol + + @property + def data(self): + """Time series data, updated to the most recent observation. + + :rtype: pandas.Series or pandas.DataFrame + """ + return self._data + + def _load_raw(self): + """Load raw data from database.""" + # we could implement multiprocess safety here + loader = globals()['_loader_' + self._storage_backend] + try: + logger.info( + f"{self.__class__.__name__} is trying to load {self.symbol}" + + f" with {self._storage_backend} backend" + + f" from {self.storage_location}") + return loader(self.symbol, self.storage_location) + except FileNotFoundError: + return None + + def load(self): + """Load data from database using `self.preload` function to process. + + :returns: Loaded time-series data for the symbol. + :rtype: pandas.Series or pandas.DataFrame + """ + return set_pd_read_only(self._preload(self._load_raw())) + + def _store(self, data): + """Store data in database. + + :param data: Time-series data to store. + :type data: pandas.Series or pandas.DataFrame + """ + # we could implement multiprocess safety here + storer = globals()['_storer_' + self._storage_backend] + logger.info( + f"{self.__class__.__name__} is storing {self.symbol}" + + f" with {self._storage_backend} backend" + + f" in {self.storage_location}") + storer(self.symbol, data, self.storage_location) + + def _print_difference(self, current, new): + """Helper method to print difference if update is not append-only. + + This is temporary and will be re-factored. + """ + print("TEMPORARY: Diff between overlap of downloaded and stored") + print((new - current).dropna(how='all').tail(5)) + + def update(self, grace_period): + """Update current stored data for symbol. + + Checks (which raise warnings): + + #. Elements of data are NaN (skipping last row) + #. Update is not append-only. For dataframes check all elements other + than last row of the data which was there before, and for that last + row, only the open price. For Series that doesn't matter, check that + last element is the same. + + :param grace_period: If the time between now and the last value stored + is less than this, we don't update the data already stored. + :type grace_period: pandas.Timedelta + """ + current = self._load_raw() + logger.info( + f"Downloading {self.symbol}" + + f" from {self.__class__.__name__}") + updated = self._download( + self.symbol, current, grace_period=grace_period) + + if np.any(updated.iloc[:-1].isnull()): + logger.warning( + " cvxportfolio.%s('%s').data contains NaNs." + + " You may want to inspect it. If you want, you can delete the" + + " data file in %s to force re-download from the start.", + self.__class__.__name__, self.symbol, self.storage_location) + + try: + if current is not None: + if not np.all( + # we use numpy.isclose because returns may be computed + # via logreturns and numerical errors can sift through + np.isclose(updated.loc[current.index[:-1]], + current.iloc[:-1], equal_nan=True)): + logger.error(f"{self.__class__.__name__} update" + + f" of {self.symbol} is not append-only!") + self._print_difference(current, updated) + if hasattr(current, 'columns'): + # the first column is open price + if not current.iloc[-1, 0] == updated.loc[ + current.index[-1]].iloc[0]: + logger.error( + f"{self.__class__.__name__} update " + + f" of {self.symbol} changed last open price!") + self._print_difference(current, updated) + else: + if not current.iloc[-1] == updated.loc[current.index[-1]]: + logger.error( + f"{self.__class__.__name__} update" + + f" of {self.symbol} changed last value!") + self._print_difference(current, updated) + # this should have become superflous + except KeyError: # pragma: no cover + logger.error("%s update of %s could not be checked for" + + " append-only edits. Was there a DST change?", + self.__class__.__name__, self.symbol) # pragma: no cover + self._store(updated) + + def _download(self, symbol, current, grace_period, **kwargs): + """Download data from external source given already downloaded data. + + This method must be redefined by derived classes. + + :param symbol: The symbol we download. + :type symbol: str + :param current: The data already downloaded. We are supposed to + **only append** to it. If None, no data is present. + :type current: pandas.Series or pandas.DataFrame or None + :rtype: pandas.Series or pandas.DataFrame + """ + raise NotImplementedError # pragma: no cover + + def _preload(self, data): + """Prepare data to serve to the user. + + This method can be redefined by derived classes. + + :param data: The data returned by the storage backend. + :type data: pandas.Series or pandas.DataFrame + :rtype: pandas.Series or pandas.DataFrame + """ + return data # pragma: no cover + + +# +# Yahoo Finance. +# + +def _timestamp_convert(unix_seconds_ts): + """Convert a UNIX timestamp in seconds to a pandas.Timestamp.""" + return pd.Timestamp(unix_seconds_ts*1E9, tz='UTC') + +# Anomalous, extreme, dubious logreturns filtering. + +def _median_scale_around(lrets, window): + """Median absolute logreturn in a window around each timestamp.""" + return np.abs(lrets).rolling(window, center=True, min_periods=1).median() + +# def _mean_scale_around(lrets, window): +# """Root mean squared logreturn in a window around each timestamp. + +# We need a few operations because we skip the observation itself +# """ +# sum = (lrets**2).rolling(window, center=True, min_periods=2).sum() +# count = lrets.rolling(window, center=True, min_periods=2).count() +# return np.sqrt((sum - lrets**2) / (count - 1)) + +def _unlikeliness_score( + test_logreturns, reference_logreturns, scaler, windows): + """Find problematic indexes for test logreturns compared w/ reference.""" + scaled = [ + np.abs(test_logreturns) / scaler(reference_logreturns, window) + for window in windows] + scaled = pd.DataFrame(scaled).T + return scaled.min(axis=1, skipna=True) + + +class OLHCV(SymbolData): # pylint: disable=abstract-method + """Base class for Open-Low-High-Close-Volume symbol data. + + This operates on a dataframe with columns + + .. code-block:: + + ['open', 'low', 'high', 'close', 'volume'] + + or + + .. code-block:: + + ['open', 'low', 'high', 'close', 'volume', 'return'] + + in which case the ``'return'`` column is not processed. It only matters in + the :meth:`_preload`, method: if open-to-open returns are not present, + we compute them there. Otherwise these may be total returns (which include + dividends, ...) and they're dealt with in derived classes. + """ + + FILTERING_WINDOWS = (10, 20, 50, 100, 200) + + # remove open prices when open to close abs logreturn is larger than + # this time the median absolute ones in FILTERING_WINDOWS around it + THRESHOLD_OPEN_TO_CLOSE = 15 + + # remove low/high prices when low/high to close abs logreturn larger than + # this time the median absolute ones in FILTERING_WINDOWS centered on it + THRESHOLD_LOWHIGH_TO_CLOSE = 20 + + # log warning on _preload for abs logreturns (of 4 types) larger than this + # this time the median absolute ones in FILTERING_WINDOWS centered on it + THRESHOLD_WARN_EXTREME_LOGRETS = 50 + + def _process(self, new_data, saved_data=None): + """Base method for processing (cleaning) data. + + It operates on the ``new_data`` dataframe, which is the newly + downloaded data. The ``saved_data`` dataframe is provided as well + (None if there is none). It has the same columns, older timestamps + (possibly overlapping with new_data at the end), and is **read only**: + it is used as reference to help with the cleaning, it has already + been cleaned. + """ + + ## Preliminaries + ## Eliminate non-positive prices, infinity values. + + # NaN nonpositive prices + for column in ["open", "close", "high", "low"]: + self._nan_nonpositive_prices(new_data, column) + + # all infinity values to NaN + self._set_infty_to_nan(new_data, level='info') + + ## Close price. + ## We believe them (for now). We forward fill them if unavailable. + + # forward-fill close + self._fillna_and_message( + new_data, 'close', 'last available', filler='ffill') + + ## Volumes. + ## We set negative to NaN, and fill with zeros. + + # NaN negative volumes + self._nan_negative_volumes(new_data) + + # fill with zeros + self._fillna_and_message( + new_data, 'volume', 'zeros', filler='fillna', filler_arg=0.) + + ## Open price. + ## We remove if lower than low, higher than high, or open to close + ## logreturn is anomalous. Then we fill with close from day before. + + # NaN open if lower than low + self._nan_open_lower_low(new_data) + + # NaN open if higher than high + self._nan_open_higher_high(new_data) + + # NaN anomalous open prices + self._nan_anomalous_prices( + new_data, 'open', threshold=self.THRESHOLD_OPEN_TO_CLOSE, + saved_data=saved_data, level='info') + + # fill open with close from day before + self._fillna_and_message( + new_data, 'open', 'close from period before', filler='fillna', + filler_arg=new_data['close'].shift(1), level='info') + + ## Low price. + ## We remove if higher than close or anomalous low to close logreturn. + ## We fill them with min of open and close. + + # NaN low if higher than close + self._nan_low_higher_close(new_data) + + # NaN low if higher than open (cleaned) + self._nan_low_higher_open(new_data) + + # NaN anomalous low prices + self._nan_anomalous_prices( + new_data, 'low', threshold=self.THRESHOLD_LOWHIGH_TO_CLOSE, + saved_data=saved_data, level='info') + + # fill low with min of open and close + self._fillna_and_message( + new_data, 'low', 'min of open and close', filler='fillna', + filler_arg=new_data[['open', 'close']].min(axis=1), level='info') + + ## High price. + ## We remove if lower than close or anomalous low to close logreturn. + ## We fill them with max of open and close. + + # NaN high if lower than close + self._nan_high_lower_close(new_data) + + # NaN high if lower than open (cleaned) + self._nan_high_lower_open(new_data) + + # NaN anomalous high prices + self._nan_anomalous_prices( + new_data, 'high', threshold=self.THRESHOLD_LOWHIGH_TO_CLOSE, + saved_data=saved_data, level='info') + + # fill high with max of open and close + self._fillna_and_message( + new_data, 'high', 'max of open and close', filler='fillna', + filler_arg=new_data[['open', 'close']].max(axis=1), level='info') + + ## Some asserts + assert new_data.iloc[1:].isnull().sum().sum() == 0 + assert np.all( + new_data['low'].fillna(0.) <= new_data[ + ['open', 'high', 'close']].min(1)) + assert np.all( + new_data['high'].fillna(np.inf) >= new_data[ + ['open', 'low', 'close']].max(1)) + + return new_data + + def _fillna_and_message( + self, data, col_name, message, filler='fillna', filler_arg=None, + level='warning'): + """Fill NaNs in column with chosen method and arg.""" + bad_indexes = data.index[data[col_name].isnull()] + if len(bad_indexes) > 0: + getattr(logger, level)( + '%s("%s").data["%s"] has NaNs on timestamps: %s,' + + ' filling them with %s.', self.__class__.__name__, + self.symbol, col_name, bad_indexes, message) + if filler == 'ffill': + data[col_name] = data[col_name].ffill() + else: + data[col_name] = getattr(data[col_name], filler)(filler_arg) + + def _nan_anomalous_prices( + self, new_data, price_name, threshold, saved_data=None, + level='warning'): + """Set to NaN given price name on its anomalous logrets to close.""" + new_lr_to_close =\ + np.log(new_data['close']) - np.log(new_data[price_name]) + + # if there is saved data, we use it to compute the logrets + # also on the past, but we only NaN (if necessary) elements of + # new data, so the scores computed on the past are not used + if saved_data is None: + all_lr_to_close = new_lr_to_close + else: + old_lr_to_close =\ + np.log(saved_data['close']) - np.log(saved_data[price_name]) + all_lr_to_close = pd.concat( + [old_lr_to_close.loc[ + old_lr_to_close.index < new_lr_to_close.index[0]], + new_lr_to_close]) + # drop old data which we don't need + all_lr_to_close = all_lr_to_close.iloc[ + -len(new_data) - max(self.FILTERING_WINDOWS):] + + # with this we skip over exact zeros (which come from some upstream + # cleaning) and would throw the median off + all_lr_to_close.loc[all_lr_to_close == 0] = np.nan + score = _unlikeliness_score( + all_lr_to_close, all_lr_to_close, scaler=_median_scale_around, + windows=self.FILTERING_WINDOWS) + self._nan_values( + new_data, condition = score.loc[new_data.index] > threshold, + columns_to_nan=price_name, message=f'anomalous {price_name} price', + level=level) + + def _nan_values( + self, data, condition, columns_to_nan, message, level='warning'): + """Set to NaN in-place for indexing condition and chosen columns.""" + + bad_indexes = data.index[condition] + if len(bad_indexes) > 0: + getattr(logger, level)( + '%s("%s") has %s on timestamps: %s,' + + ' setting to nan', + self.__class__.__name__, self.symbol, message, bad_indexes) + data.loc[bad_indexes, columns_to_nan] = np.nan + + def _nan_nonpositive_prices(self, data, prices_name): + """Set non-positive prices (chosen price name) to NaN, in-place.""" + self._nan_values( + data=data, condition = data[prices_name] <= 0, + columns_to_nan = prices_name, + message = f'non-positive {prices_name} prices', level='info') + + def _nan_negative_volumes(self, data): + """Set negative volumes to NaN, in-place.""" + self._nan_values( + data=data, condition = data["volume"] < 0, + columns_to_nan = "volume", message = 'negative volumes', + level='info') + + def _nan_open_lower_low(self, data): + """Set open price to NaN if lower than low, in-place.""" + self._nan_values( + data=data, condition = data['open'] < data['low'], + columns_to_nan = "open", + message = 'open price lower than low price', level='info') + + def _nan_open_higher_high(self, data): + """Set open price to NaN if higher than high, in-place.""" + self._nan_values( + data=data, condition = data['open'] > data['high'], + columns_to_nan = "open", + message = 'open price higher than high price', level='info') + + # def _nan_incompatible_low_high(self, data): + # """Set low and high to NaN if low is higher, in-place.""" + # self._nan_values( + # data=data, condition = data['low'] > data['high'], + # columns_to_nan = ["low", "high"], + # message = 'low price higher than high price') + + def _nan_high_lower_close(self, data): + """Set high price to NaN if lower than close, in-place.""" + self._nan_values( + data=data, condition = data['high'] < data['close'], + columns_to_nan = "high", + message = 'high price lower than close price', level='info') + + def _nan_high_lower_open(self, data): + """Set high price to NaN if lower than open, in-place.""" + self._nan_values( + data=data, condition = data['high'] < data['open'], + columns_to_nan = "high", + message = 'high price lower than open price', level='info') + + def _nan_low_higher_close(self, data): + """Set low price to NaN if higher than close, in-place.""" + self._nan_values( + data=data, condition = data['low'] > data['close'], + columns_to_nan = "low", + message = 'low price higher than close price', level='info') + + def _nan_low_higher_open(self, data): + """Set low price to NaN if higher than open, in-place.""" + self._nan_values( + data=data, condition = data['low'] > data['open'], + columns_to_nan = "low", + message = 'low price higher than open price', level='info') + + def _set_infty_to_nan(self, data, level='warning'): + """Set all +/- infty elements of data to NaN, in-place.""" + + if np.isinf(data).sum().sum() > 0: + getattr(logger, level)( + '%s("%s") has +/- infinity values, setting those to nan', + self.__class__.__name__, self.symbol) + data.iloc[:, :] = np.nan_to_num( + data.values, copy=True, nan=np.nan, posinf=np.nan, + neginf=np.nan) + + def _warn_on_extreme_logreturns( + self, logreturns, threshold, what, level='warning'): + """Log warning if logreturns are extreme.""" + # with this we skip over exact zeros (which we assume come from some + # cleaning) and would bias the scale down + logreturns.loc[logreturns == 0] = np.nan + score = _unlikeliness_score( + logreturns, logreturns, scaler=_median_scale_around, + windows=self.FILTERING_WINDOWS) + dubious_indexes = logreturns.index[score > threshold] + if len(dubious_indexes) > 0: + getattr(logger, level)( + '%s("%s") has dubious %s for timestamps: %s', + self.__class__.__name__, self.symbol, what, dubious_indexes) + + def _quality_check(self, data): + """Log issues with the quality of data given to the user.""" + + # zero volume + zerovol_idx = data.index[data.volume == 0] + if len(zerovol_idx) > 0: + logger.info( + '%s("%s") has volume equal to zero for timestamps: %s', + self.__class__.__name__, self.symbol, zerovol_idx) + + # warn on extreme logreturns + self._warn_on_extreme_logreturns( + np.log(1 + data['return']), self.THRESHOLD_WARN_EXTREME_LOGRETS, + 'total open-to-open returns', level='warning') + + # extreme open2close + self._warn_on_extreme_logreturns( + np.log(data['close']) - np.log(data['open']), + self.THRESHOLD_WARN_EXTREME_LOGRETS, 'open to close returns', + level='info') + + # extreme open2high + self._warn_on_extreme_logreturns( + np.log(data['high']) - np.log(data['open']), + self.THRESHOLD_WARN_EXTREME_LOGRETS, 'open to high returns', + level='info') + + # extreme open2low + self._warn_on_extreme_logreturns( + np.log(data['low']) - np.log(data['open']), + self.THRESHOLD_WARN_EXTREME_LOGRETS, 'open to low returns', + level='info') + + def _preload(self, data): + """Prepare data for use by Cvxportfolio. + + We drop the `volume` column expressed in number of shares and + replace it with `valuevolume` which is an estimate of the (e.g., + US dollar) value of the volume exchanged on the day. + """ + + # this is not used currently, but if we implement an interface to a + # pure OLHCV data source there is no need to store the open-to-open + # returns, they can be computed here + if not 'return' in data.columns: + data['return'] = data[ + 'open'].pct_change().shift(-1) # pragma: no cover + + self._quality_check(data) + + # NaN intraday data + data.loc[data.index[-1], + ["high", "low", "close", "return", "volume"]] = np.nan + + # compute volume in cash units + data["valuevolume"] = data["volume"] * data["open"] + del data["volume"] + + return data + +# TODO: plan +# ffill adj closes & compute adj close logreturns +# use code above to get indexes of wrong ones, raise warnings, set to 0 +# +# check close vs adj close, there should be only dividends (with y finance) +# +# throw out opens that are not in [low, high] +# +# apply similar logic (perhaps using total lrets for the stddev) for +# open-close , close-high , close-low, throw out open/low/close not OK +# +# fill +# +# compute open-open total returns, then check with same logic for errors +# +# when doing append, make past data adhere to same format: recompute adj +# close +# could use volumes as well, if there are jumps in price due to +# splits not recorded, then price * volume should be more stable +# +# + + +class YahooFinance(OLHCV): + """Yahoo Finance symbol data. + + .. versionadded:: 1.2.0 + + The data cleaning logic has been significantly improved, see the + ``data_cleaning.py`` example to view what's done on any given + name (or enable ``'INFO'`` logging messages). It is recommended to + delete the ``~/cvxportfolio_data`` folder with data files downloaded + by previous Cvxportfolio versions. + + :param symbol: The symbol that we downloaded. + :type symbol: str + :param storage_backend: The storage backend, implemented ones are + ``'pickle'``, ``'csv'``, and ``'sqlite'``. + :type storage_backend: str + :param base_storage_location: The location of the storage. We store in a + subdirectory named after the class which derives from this. + :type base_storage_location: pathlib.Path + :param grace_period: If the most recent observation in the data is less + old than this we do not download new data. + :type grace_period: pandas.Timedelta + + :attribute data: The downloaded, and cleaned, data for the symbol. + :type data: pandas.DataFrame + """ + + # Maximum number of contiguous days on which an adjclose price can be + # invalid (e.g., negative); if any such period is found, all data before + # and including it is removed + MAX_CONTIGUOUS_MISSING_ADJCLOSES = 20 + + # remove all data (also one day before and after) when logrets implied by + # adjcloses are anomalous: abs value larger than median abs value time this + # in many windows around it. + # this is redone iteratively up to the MAX_CONTIGUOUS_MISSING_ADJCLOSES, + # so unless the bad adjcloses are only for few days all data up to the + # anomalous event will be deleted + THRESHOLD_BAD_ADJCLOSE = 50 + + # assume any adjclose-to-adjclose log10-return larger than this in absolute + # value (1. is 10x) is false and eliminate both adjcloses around it + # this only applies before ASSUME_FALSE_BEFORE + THRESHOLD_FALSE_LOG10RETS = .5 + + # assume logreturns larger in abs value than threshold above are false + # ONLY before this date, otherwise don't filter them + ASSUME_FALSE_BEFORE = pd.Timestamp('2000-01-01', tz='UTC') + + def _throw_out_all_data_before_many_bad_adjcloses( + self, new_data, level='warning'): + """Throw out all data before many NaN on adjclose column.""" + invalid_indexes = new_data.index[ + new_data.adjclose.isnull().rolling( + self.MAX_CONTIGUOUS_MISSING_ADJCLOSES + ).sum() == self.MAX_CONTIGUOUS_MISSING_ADJCLOSES] + if len(invalid_indexes) > 0: + last_invalid_index = invalid_indexes[-1] + getattr(logger, level)( + '%s("%s").data has invalid adjclose prices for more than' + + ' %s contiguous days until %s; removing all data until then', + self.__class__.__name__, self.symbol, + self.MAX_CONTIGUOUS_MISSING_ADJCLOSES, last_invalid_index) + new_data = pd.DataFrame( + new_data.loc[new_data.index > last_invalid_index], copy=True) + return new_data + + def _remove_data_on_bad_adjcloses(self, new_data, level='warning'): + """Remove adjcloses if implied logreturns are highly anomalous.""" + # worst case (if it goes to end of for loop) + # we throw out all data before the event + for _ in range(self.MAX_CONTIGUOUS_MISSING_ADJCLOSES + 1): + logrets = np.log10(new_data.adjclose.ffill()).diff() + + # with this we skip over exact zeros (which we assume come from + # some cleaning) and would bias the scale down + logrets.loc[logrets == 0.] = np.nan + + score = _unlikeliness_score( + logrets, logrets, scaler=_median_scale_around, + windows=self.FILTERING_WINDOWS) + bad_score = score > self.THRESHOLD_BAD_ADJCLOSE + + too_large_logreturns = np.abs( + logrets) > self.THRESHOLD_FALSE_LOG10RETS + too_large_logreturns &= logrets.index < self.ASSUME_FALSE_BEFORE + + # we eliminate data 1 day before and after any anomalous event + # could be made less aggressive, but better to be safe + bad_indexes = logrets.index[ + bad_score | bad_score.shift(-1) | too_large_logreturns + | too_large_logreturns.shift(-1)] + + if len(bad_indexes) == 0: + break + new_data.loc[bad_indexes] = np.nan + getattr(logger, level)( + '%s("%s").data has anomalous adjclose prices on timestamps' + + '(including one day before and after) %s; removing all' + + 'data (not just adjcloses) on those timestamps.', + self.__class__.__name__, self.symbol, bad_indexes) + + def _process(self, new_data, saved_data=None): + """Process Yahoo Finance specific data, call parent's. + + Here we deal with the adjclose column, call OLHCV._process method, and + compute total open-to-open returns. + """ + + ## Treat adjclose. We believe them (unless impossible). + + # all infinity values to NaN (repeat, but for adjclose) + self._set_infty_to_nan(new_data, level='info') + + # NaN non-positive adj close + self._nan_nonpositive_prices(new_data, "adjclose") + + # Throw out all data before many NaN on adjclose + new_data = self._throw_out_all_data_before_many_bad_adjcloses( + new_data, level='info') + + # Remove all data when highly anomalous adjclose prices are detected + self._remove_data_on_bad_adjcloses(new_data, level='info') + + # Repeat throw out all data before many NaN on adjclose + new_data = self._throw_out_all_data_before_many_bad_adjcloses( + new_data, level='info') + + # forward-fill adj close + self._fillna_and_message( + new_data, 'adjclose', 'last available', filler='ffill', + level='info') + + # eliminate (initial) rows where adjclose is NaN + nan_adjcloses = new_data.adjclose.isnull() + if np.any(nan_adjcloses): + logger.info( + '%s("%s") is eliminating data on %s because the adjclose ' + + 'price is missing.', + self.__class__.__name__, self.symbol, + new_data.index[nan_adjcloses]) + new_data = pd.DataFrame(new_data.loc[~nan_adjcloses], copy=True) + + ## OLHCV._process treats all columns other than adjclose + new_data = super()._process(new_data, saved_data=saved_data) + + ## Compute total open-to-open returns + + # intraday logreturn + intraday_logreturn = np.log( + new_data["close"]) - np.log(new_data["open"]) + + # close to close total logreturn + close_to_close_total_logreturn = np.log( + new_data["adjclose"]).diff().shift(-1) + + # open to open total logreturn + open_to_open_total_logreturn = \ + close_to_close_total_logreturn + intraday_logreturn \ + - intraday_logreturn.shift(-1) + + # open to open total return + new_data['return'] = np.exp(open_to_open_total_logreturn) - 1 + + # eliminate adjclose column + del new_data["adjclose"] + + return new_data + + @staticmethod + def _get_data_yahoo(ticker, start='1900-01-01', end='2100-01-01'): + """Get 1-day OLHC-AC-V from Yahoo finance. + + This is roughly equivalent to + + .. code-block:: + + import yfinance as yf + yf.download(ticker) + + But it does no caching of any sort; only a single request call, + error checking (which result in exceptions going all the way to the + user, in the current design), json parsing, and a minimal effort to + restore the last timestamp. All processing and cleaning is done + elsewhere. + + Result is timestamped with the open time (time-zoned) of the + instrument. + """ + + base_url = 'https://query2.finance.yahoo.com' + + headers = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1)' + ' AppleWebKit/537.36 (KHTML, like Gecko)' + ' Chrome/39.0.2171.95 Safari/537.36'} + + # print(HEADERS) + start = int(pd.Timestamp(start).timestamp()) + end = int(pd.Timestamp(end).timestamp()) + + try: + res = requests.get( + url=f"{base_url}/v8/finance/chart/{ticker}", + params={'interval': '1d', + "period1": start, + "period2": end}, + headers=headers, + timeout=10) # seconds + except requests.ConnectionError as exc: + raise DataError( + f"Download of {ticker} from YahooFinance failed." + + " Are you connected to the Internet?") from exc + + # print(res) + + if res.status_code == 404: + raise DataError( + f'Data for symbol {ticker} is not available.' + + 'Json output:', str(res.json())) + + if res.status_code != 200: + raise DataError( + f'Yahoo finance download of {ticker} failed. Json:', + str(res.json())) # pragma: no cover + + data = res.json()['chart']['result'][0] + + try: + index = pd.DatetimeIndex( + [_timestamp_convert(el) for el in data['timestamp']]) + + df_result = pd.DataFrame( + data['indicators']['quote'][0], index=index) + df_result['adjclose'] = data[ + 'indicators']['adjclose'][0]['adjclose'] + except KeyError as exc: # pragma: no cover + raise DataError(f'Yahoo finance download of {ticker} failed.' + + ' Json:', str(res.json())) from exc # pragma: no cover + + # last timestamp could be not timed to market open + this_periods_open_time = _timestamp_convert( + data['meta']['currentTradingPeriod']['regular']['start']) + + # this should be enough, but be careful + if df_result.index[-1] > this_periods_open_time: + index = df_result.index.to_numpy() + index[-1] = this_periods_open_time + df_result.index = pd.DatetimeIndex(index) + + # # last timestamp is probably broken (not timed to market open) + # # we set its time to same as the day before, but this is wrong + # # on days of DST switch. It's fine though because that line will be + # # overwritten next update + # if df_result.index[-1].time() != df_result.index[-2].time(): + # tm1 = df_result.index[-2].time() + # newlast = df_result.index[-1].replace( + # hour=tm1.hour, minute=tm1.minute, second=tm1.second) + # df_result.index = pd.DatetimeIndex( + # list(df_result.index[:-1]) + [newlast]) + + # these are all the columns, we simply re-order them + return df_result[ + ['open', 'low', 'high', 'close', 'adjclose', 'volume']] + + def _download(self, symbol, current=None, + overlap=5, grace_period='5d', **kwargs): + """Download single stock from Yahoo Finance. + + If data was already downloaded we only download + the most recent missing portion. + + Args: + + symbol (str): yahoo name of the instrument + current (pandas.DataFrame or None): current data present locally + overlap (int): how many lines of current data will be overwritten + by newly downloaded data + kwargs (dict): extra arguments passed to yfinance.download + + Returns: + updated (pandas.DataFrame): updated DataFrame for the symbol + """ + # TODO this could be put at a lower class hierarchy + if overlap < 2: + raise SyntaxError( + f'{self.__class__.__name__} with overlap smaller than 2' + + ' could have issues with DST.') + if (current is None) or (len(current) < overlap): + updated = self._get_data_yahoo(symbol, **kwargs) + logger.info('Downloading from the start.') + result = self._process(updated) + # we remove first row if it contains NaNs + if np.any(result.iloc[0].isnull()): + result = result.iloc[1:] + return result + if (now_timezoned() - current.index[-1] + ) < pd.Timedelta(grace_period): + logger.info( + 'Skipping download because stored data is recent enough.') + return current + new = self._get_data_yahoo(symbol, start=current.index[-overlap]) + new = self._process(new) + return pd.concat([current.iloc[:-overlap], new]) + + +# +# Fred. +# + +class Fred(SymbolData): + """Fred single-symbol data. + + :param symbol: The symbol that we downloaded. + :type symbol: str + :param storage_backend: The storage backend, implemented ones are + ``'pickle'``, ``'csv'``, and ``'sqlite'``. By default ``'pickle'``. + :type storage_backend: str + :param base_storage_location: The location of the storage. We store in a + subdirectory named after the class which derives from this. By default + it's a directory named ``cvxportfolio_data`` in your home folder. + :type base_storage_location: pathlib.Path + :param grace_period: If the most recent observation in the data is less + old than this we do not download new data. By default it's one day. + :type grace_period: pandas.Timedelta + + :attribute data: The downloaded data for the symbol. + """ + + URL = "https://fred.stlouisfed.org/graph/fredgraph.csv" + + # TODO: implement Fred point-in-time + # example: + # https://alfred.stlouisfed.org/graph/alfredgraph.csv?id=CES0500000003&vintage_date=2023-07-06 + # hourly wages time series **as it appeared** on 2023-07-06 + # store using pd.Series() of diff'ed values only. + + def _internal_download(self, symbol): + try: + return pd.to_numeric(pd.read_csv( + self.URL + f'?id={symbol}', + index_col=0, parse_dates=[0])[symbol], errors='coerce') + except URLError as exc: + raise DataError(f"Download of {symbol}" + + f" from {self.__class__.__name__} failed." + + " Are you connected to the Internet?") from exc + + def _download( + self, symbol="DFF", current=None, grace_period='5d', **kwargs): + """Download or update pandas Series from Fred. + + If already downloaded don't change data stored locally and only + add new entries at the end. + + Additionally, we allow for a `grace period`, if the data already + downloaded has a last entry not older than the grace period, we + don't download new data. + """ + if current is None: + return self._internal_download(symbol) + if (pd.Timestamp.today() - current.index[-1] + ) < pd.Timedelta(grace_period): + logger.info( + 'Skipping download because stored data is recent enough.') + return current + + new = self._internal_download(symbol) + new = new.loc[new.index > current.index[-1]] + + if new.empty: + logger.info('New downloaded data is empty!') + return current + + assert new.index[0] > current.index[-1] + return pd.concat([current, new]) + + def _preload(self, data): + """Add UTC timezone.""" + data.index = data.index.tz_localize('UTC') + return data + +# +# Sqlite storage backend. +# + +def _open_sqlite(storage_location): + return sqlite3.connect(storage_location/"db.sqlite") + +def _close_sqlite(connection): + connection.close() + +def _loader_sqlite(symbol, storage_location): + """Load data in sqlite format. + + We separately store dtypes for data consistency and safety. + + .. note:: If your pandas object's index has a name it will be lost, + the index is renamed 'index'. If you pass timestamp data (including + the index) it must have explicit timezone. + """ + try: + connection = _open_sqlite(storage_location) + dtypes = pd.read_sql_query( + f"SELECT * FROM {symbol}___dtypes", + connection, index_col="index", + dtype={"index": "str", "0": "str"}) + + parse_dates = 'index' + my_dtypes = dict(dtypes["0"]) + + tmp = pd.read_sql_query( + f"SELECT * FROM {symbol}", connection, + index_col="index", parse_dates=parse_dates, dtype=my_dtypes) + + _close_sqlite(connection) + multiindex = [] + for col in tmp.columns: + if col[:8] == "___level": + multiindex.append(col) + else: + break + if len(multiindex) > 0: + multiindex = [tmp.index.name] + multiindex + tmp = tmp.reset_index().set_index(multiindex) + return tmp.iloc[:, 0] if tmp.shape[1] == 1 else tmp + except pd.errors.DatabaseError: + return None + +def _storer_sqlite(symbol, data, storage_location): + """Store data in sqlite format. + + We separately store dtypes for data consistency and safety. + + .. note:: If your pandas object's index has a name it will be lost, + the index is renamed 'index'. If you pass timestamp data (including + the index) it must have explicit timezone. + """ + connection = _open_sqlite(storage_location) + exists = pd.read_sql_query( + f"SELECT name FROM sqlite_master WHERE type='table' AND name='{symbol}'", + connection) + + if len(exists): + _ = connection.cursor().execute(f"DROP TABLE '{symbol}'") + _ = connection.cursor().execute(f"DROP TABLE '{symbol}___dtypes'") + connection.commit() + + if hasattr(data.index, "levels"): + data.index = data.index.set_names( + ["index"] + + [f"___level{i}" for i in range(1, len(data.index.levels))] + ) + data = data.reset_index().set_index("index") + else: + data.index.name = "index" + + if data.index[0].tzinfo is None: + warnings.warn('Index has not timezone, setting to UTC') + data.index = data.index.tz_localize('UTC') + + data.to_sql(f"{symbol}", connection) + pd.DataFrame(data).dtypes.astype("string").to_sql( + f"{symbol}___dtypes", connection) + _close_sqlite(connection) + + +# +# Pickle storage backend. +# + +def _loader_pickle(symbol, storage_location): + """Load data in pickle format.""" + return pd.read_pickle(storage_location / f"{symbol}.pickle") + +def _storer_pickle(symbol, data, storage_location): + """Store data in pickle format.""" + data.to_pickle(storage_location / f"{symbol}.pickle") + +# +# Csv storage backend. +# + +def _loader_csv(symbol, storage_location): + """Load data in csv format.""" + + index_dtypes = pd.read_csv( + storage_location / f"{symbol}___index_dtypes.csv", + index_col=0)["0"] + + dtypes = pd.read_csv( + storage_location / f"{symbol}___dtypes.csv", index_col=0, + dtype={"index": "str", "0": "str"}) + dtypes = dict(dtypes["0"]) + new_dtypes = {} + parse_dates = [] + for i, level in enumerate(index_dtypes): + if "datetime64[ns" in level: # includes all timezones + parse_dates.append(i) + for i, el in enumerate(dtypes): + if "datetime64[ns" in dtypes[el]: # includes all timezones + parse_dates += [i + len(index_dtypes)] + else: + new_dtypes[el] = dtypes[el] + + tmp = pd.read_csv(storage_location / f"{symbol}.csv", + index_col=list(range(len(index_dtypes))), + parse_dates=parse_dates, dtype=new_dtypes) + + return tmp.iloc[:, 0] if tmp.shape[1] == 1 else tmp + + +def _storer_csv(symbol, data, storage_location): + """Store data in csv format.""" + pd.DataFrame(data.index.dtypes if hasattr(data.index, 'levels') + else [data.index.dtype]).astype("string").to_csv( + storage_location / f"{symbol}___index_dtypes.csv") + pd.DataFrame(data).dtypes.astype("string").to_csv( + storage_location / f"{symbol}___dtypes.csv") + data.to_csv(storage_location / f"{symbol}.csv") diff --git a/cvxportfolio/tests/test_data.py b/cvxportfolio/tests/test_data.py index bb23f2b50..a9ea2eb6a 100644 --- a/cvxportfolio/tests/test_data.py +++ b/cvxportfolio/tests/test_data.py @@ -81,8 +81,8 @@ def test_yfinance_download(self): data.loc["2023-04-10 13:30:00+00:00", "return"], data.loc["2023-04-11 13:30:00+00:00", "open"] / data.loc["2023-04-10 13:30:00+00:00", "open"] - 1, + rtol=1e-04, atol=1e-07, )) - self.assertTrue(np.isnan(data.iloc[-1]["close"])) def test_fred(self): """Test basic Fred usage.""" @@ -124,6 +124,7 @@ def test_yahoo_finance(self): data.loc["2023-04-05 13:30:00+00:00", "return"], data.loc["2023-04-06 13:30:00+00:00", "open"] / data.loc["2023-04-05 13:30:00+00:00", "open"] - 1, + rtol=1e-04, atol=1e-07, )) store.update(grace_period=pd.Timedelta('1d')) @@ -155,7 +156,8 @@ def test_yahoo_finance_removefirstline(self): and sys.version_info.minor < 11, "Issues with timezoned timestamps.") def test_sqlite3_store_series(self): """Test storing and retrieving of a Series with datetime index.""" - self._base_test_series(_loader_sqlite, _storer_sqlite) + with self.assertWarns(UserWarning): + self._base_test_series(_loader_sqlite, _storer_sqlite) @unittest.skipIf(sys.version_info.major == 3 and sys.version_info.minor < 11, "Issues with timezoned timestamps.") @@ -195,6 +197,9 @@ def _base_test_series(self, loader, storer): """Test storing and retrieving of a Series with datetime index.""" for data in [ + pd.Series( + 0.0, pd.date_range("2020-01-01", "2020-01-10"), + name="test0"), pd.Series( 0.0, pd.date_range("2020-01-01", "2020-01-10", tz='UTC-05:00'), name="test1"), @@ -327,6 +332,373 @@ def _base_test_multiindex(self, loader, storer): self.assertTrue(all(data.index.dtypes == data1.index.dtypes)) self.assertTrue(all(data.dtypes == data1.dtypes)) + def test_download_errors(self): + """Test single-symbol download error.""" + + storer = YahooFinance( + 'AAPL', grace_period=self.data_grace_period, + base_location=self.datadir) + with self.assertRaises(SyntaxError): + # pylint: disable=protected-access + storer._download('AAPL', overlap=1) + + class YahooFinanceErroneous(YahooFinance): + """Modified YF that nans last open price.""" + def _download(self, symbol, current=None, + overlap=5, grace_period='5d', **kwargs): + """Modified download method.""" + res = super()._download(symbol, current, + grace_period=grace_period) + res.iloc[-1, 0 ] = np.nan + return res + + _ = YahooFinanceErroneous('AMZN', base_location=self.datadir) + with self.assertLogs(level='ERROR') as _: + _ = YahooFinanceErroneous( + 'AMZN', base_location=self.datadir) + + class YahooFinanceErroneous2(YahooFinance): + """Modified YF that nans some line.""" + def _download(self, symbol, current=None, + overlap=5, grace_period='5d', **kwargs): + """Modified download method.""" + res = super()._download(symbol, current, + grace_period=grace_period) + res.iloc[-20] = np.nan + return res + with self.assertLogs(level='WARNING') as _: + _ = YahooFinanceErroneous2('GOOGL', + base_location=self.datadir) + with self.assertLogs(level='WARNING') as _: + _ = YahooFinanceErroneous2( + 'GOOGL', base_location=self.datadir) + + class FredErroneous(Fred): + """Modified FRED SymbolData that gives a NaN in the last entry.""" + + def _download(self, symbol, current, grace_period): + """Modified download method.""" + res = super()._download(symbol, current, + grace_period=grace_period) + res.iloc[-1] = np.nan + return res + + _ = FredErroneous('DFF', base_location=self.datadir) + with self.assertLogs(level='ERROR') as _: + _ = FredErroneous( + 'DFF', base_location=self.datadir) + + class YahooFinanceErroneous3(YahooFinance): + """Modified YF that is not append-only.""" + counter = 0 + def _download(self, symbol, current=None, + overlap=5, grace_period='5d', **kwargs): + """Modified download method.""" + res = super()._download(symbol, current, + grace_period=grace_period) + if self.counter > 0: + res.iloc[-2] = 0. + self.counter += 1 + return res + storer = YahooFinanceErroneous3('GOOGL', base_location=self.datadir) + with self.assertLogs(level='ERROR') as _: + storer.update(pd.Timedelta('0d')) + + def test_no_internet(self): + """Test errors thrown when not connected to the internet.""" + + with NoInternet(): + with self.assertRaises(DataError): + cvx.YahooFinance('BABA', base_location=self.datadir) + + with NoInternet(): + with self.assertRaises(DataError): + cvx.Fred('CES0500000003', base_location=self.datadir) + + def test_yahoo_finance_errors(self): + """Test errors with Yahoo Finance.""" + + with self.assertRaises(DataError): + YahooFinance("DOESNTEXIST", base_location=self.datadir) + + def test_yahoo_finance_cleaning(self): + """Test our logic to clean Yahoo Finance data.""" + + # this stock was found to have NaN issues + data = YahooFinance("ENI.MI", base_location=self.datadir).data + self.assertTrue((data.valuevolume == 0).sum() > 0) + self.assertTrue(data.iloc[:-1].isnull().sum().sum() == 0) + + # this stock was found to have phony open/low/high prices + data = YahooFinance('NWG.L', base_location=self.datadir).data + self.assertGreater(data['return'].min(), -0.75) + self.assertLess(data['return'].max(), 0.75) + + # this stock had some extreme returns but they were legitimate + data = YahooFinance('GME', base_location=self.datadir).data + self.assertGreater(data['return'].min(), -0.75) + self.assertGreater(data['return'].max(), 3) + + def test_yahoo_finance_preload_warnings(self): + """Test warnings on _preload if data has issues.""" + + # pylint: disable=protected-access + + raw_data = YahooFinance._get_data_yahoo('ZM') + empty_instance = YahooFinance.__new__(YahooFinance) + empty_instance._symbol = 'ZM' # because the warnings use the symbol + cleaned = empty_instance._process(raw_data, None) + + def _test_warning( + data_transformation, part_of_message, level='WARNING'): + """Test that warning is raised w/ message containing some word.""" + data = pd.DataFrame(cleaned, copy=True) + exec(data_transformation) # pylint: disable=exec-used + # print(data) + with self.assertLogs(level=level) as _: + empty_instance._preload(data) + # print(_) + self.assertTrue(part_of_message in _.output[0]) + + # columns are: open low high close volume return + + # high unexpected return + _test_warning( + 'data.iloc[300,-1] = 4', + 'dubious total open-to-open returns') + + # low unexpected return + _test_warning( + 'data.iloc[300,-1] = -0.9', + 'dubious total open-to-open returns') + + # low unexpected open + _test_warning( + 'data.iloc[300,0] = data.iloc[300,0]*0.1', + 'dubious open to close returns', + level='INFO') + + # high unexpected open + _test_warning( + 'data.iloc[300,0] = data.iloc[300,0]*5', + 'dubious open to close returns', + level='INFO') + + # low unexpected low + _test_warning( + 'data.iloc[300,1] = data.iloc[300,1]*0.1', + 'dubious open to low returns', + level='INFO') + + # high unexpected high + _test_warning( + 'data.iloc[300,2] = data.iloc[300,2]*5', + 'dubious open to high returns', + level='INFO') + + def test_yahoo_finance_remove_on_many_bad_adjcloses(self): + """Test remove old data when many adjcloses are invalid.""" + + # this stock was found to have bad (negative) adjcloses for many + # months at its start + with self.assertLogs(level='WARNING') as _: + YahooFinance('BATS.L', base_location=self.datadir) + self.assertTrue(np.any( + 'contiguous' in el.output for el in _)) + + if hasattr(self, 'assertNoLogs'): + with self.assertNoLogs(level='WARNING'): + YahooFinance('BATS.L', base_location=self.datadir) + + def test_adjcloses_logrets_removal(self): + """Test method to remove adjcloses when its logrets are anomalous.""" + + # this stock had anomalous price changes in the 70s + with self.assertLogs(level='INFO') as _: + d = YahooFinance("SMT.L", base_location=self.datadir).data + self.assertTrue(np.any([ + 'anomalous adjclose prices' in el for el in _.output])) + self.assertTrue(d['return'].max() < 2) + + # this stock was found to have phony adjcloses + with self.assertLogs(level='INFO') as _: + YahooFinance('BA.L', base_location=self.datadir) + self.assertTrue(np.any([ + 'anomalous adjclose prices' in el for el in _.output])) + + with self.assertLogs(level='INFO') as _: + YahooFinance('BA.L', base_location=self.datadir) + self.assertFalse(np.any([ + 'anomalous adjclose prices' in el for el in _.output])) + + def test_yahoo_finance_cleaning_granular(self): + """Test each step of cleaning.""" + + # pylint: disable=protected-access + raw_data = YahooFinance._get_data_yahoo('ZM') + # print(raw_data) + empty_instance = YahooFinance.__new__(YahooFinance) + empty_instance._symbol = 'ZM' # because the warnings use the symbol + + def _test_warning( + data_transformation, part_of_message, level='WARNING'): + """Test that warning is raised w/ message containing some word.""" + data = pd.DataFrame(raw_data, copy=True) + exec(data_transformation) # pylint: disable=exec-used + with self.assertLogs(level=level) as _: + _cleaned = empty_instance._process(data, None) + self.assertTrue( + np.any([part_of_message in el for el in _.output])) + # check all NaNs have been filled + self.assertTrue(_cleaned.iloc[:-1].isnull().sum().sum() == 0) + + def _test_warning_update( + data_transformation, part_of_message, level='WARNING'): + """Test that warning is raised w/ message containing some word.""" + new_data = pd.DataFrame(raw_data.iloc[-20:], copy=True) + saved_data = pd.DataFrame(raw_data.iloc[:-15], copy=True) + exec(data_transformation) # pylint: disable=exec-used + with self.assertLogs(level=level) as _: + _cleaned = empty_instance._process(new_data, saved_data) + self.assertTrue( + np.any([part_of_message in el for el in _.output])) + # check all NaNs have been filled + self.assertTrue(_cleaned.iloc[:-1].isnull().sum().sum() == 0) + + # missing initial adjclose + _test_warning( + 'data.iloc[0,-2] = np.nan', + 'adjclose price is missing', level='INFO') + + # infty + _test_warning( + 'data.iloc[2,2] = np.inf', + 'infinity', level='INFO') + + # non-pos price + _test_warning( + 'data.iloc[2,0] = -1', + 'non-positive open', level='INFO') + _test_warning( + 'data.iloc[2,0] = 0', + 'non-positive open', level='INFO') + _test_warning( + 'data.iloc[4,2] = 0', + 'non-positive high', level='INFO') + + # neg volume + _test_warning( + 'data.iloc[2,-1] = -1', + 'negative volumes', level='INFO') + + # open lower low + _test_warning( + 'data.iloc[1,0] = data.iloc[1,1]*.9', + 'open price lower than low price', level='INFO') + + # open higher high + _test_warning( + 'data.iloc[1,0] = data.iloc[1,2]*1.1', + 'open price higher than high price', level='INFO') + + # low higher close + _test_warning( + 'data.iloc[3,1] = data.iloc[3].close * 1.1', + 'low price higher than close price', level='INFO') + + # high lower close + _test_warning( # had to fix it otherwise open cleaner kicks in + 'close = data.iloc[3].close;' + 'data.iloc[3,0] = close * .95;' # open + 'data.iloc[3,1] = close * .95;' # low + 'data.iloc[3,2] = close * .975', # high + 'high price lower than close price', level='INFO') + + # extreme low price + _test_warning( + 'data.iloc[3,1] = data.iloc[3,1] * .01', + 'anomalous low price', level='INFO') + _test_warning( + 'data.iloc[3,1] = data.iloc[3,1] * .02', + 'anomalous low price', level='INFO') + _test_warning( + 'data.iloc[3,1] = data.iloc[3,1] * .05', + 'anomalous low price', level='INFO') + _test_warning( + 'data.iloc[3,1] = data.iloc[3,1] * .1', + 'anomalous low price', level='INFO') + _test_warning( + 'data.iloc[3,1] = data.iloc[3,1] * .2', + 'anomalous low price', level='INFO') + _test_warning( # changed dtindex until found one that works + 'data.iloc[20,1] = data.iloc[20,1] * .5', + 'anomalous low price', level='INFO') + + # extreme high price + _test_warning( + 'data.iloc[3,2] = data.iloc[3,2] * 100', + 'anomalous high price', level='INFO') + _test_warning( + 'data.iloc[3,2] = data.iloc[3,2] * 50', + 'anomalous high price', level='INFO') + _test_warning( + 'data.iloc[3,2] = data.iloc[3,2] * 20', + 'anomalous high price', level='INFO') + _test_warning( + 'data.iloc[3,2] = data.iloc[3,2] * 10', + 'anomalous high price', level='INFO') + _test_warning( + 'data.iloc[3,2] = data.iloc[3,2] * 5', + 'anomalous high price', level='INFO') + _test_warning( + 'data.iloc[3,2] = data.iloc[3,2] * 2', + 'anomalous high price', level='INFO') + + # extreme open price + _test_warning( + 'data.iloc[3,0] = data.iloc[3,0] * 1.75;' + + 'data.iloc[3,2] = data.iloc[3,0]', + 'anomalous open price', level='INFO') + _test_warning( + 'data.iloc[20,0] = data.iloc[20,0] * 0.5;' + + 'data.iloc[20,1] = data.iloc[20,0]', + 'anomalous open price', level='INFO') + + # extreme open update + _test_warning_update( + 'new_data.iloc[-1,0] = new_data.iloc[-1,0] * 1.75;' + + 'new_data.iloc[-1,2] = new_data.iloc[-1,0]', + 'anomalous open price', level='INFO') + _test_warning_update( + 'new_data.iloc[-1,0] = new_data.iloc[-1,0] * 0.5;' + + 'new_data.iloc[-1,1] = new_data.iloc[-1,0]', + 'anomalous open price', level='INFO') + + # def test_yahoo_finance_wrong_last_time(self): + # """Test that we correct last time if intraday.""" + # + # class YahooFinanceErroneous4(YahooFinance): + # """Modified YF that sets last time wrong.""" + # counter = 0 + # + # @staticmethod + # def _get_data_yahoo( + # ticker, start='1900-01-01', end='2100-01-01'): + # """Modified download method.""" + # res = YahooFinance._get_data_yahoo( + # ticker, start=start, end=end) + # if self.counter > 0: + # res.index = list(res.index)[:-1] + [ + # res.index[-1] - pd.Timedelta('3h')] + # self.counter += 1 + # print(res) + # return res + # + # storer = YahooFinanceErroneous4('GOOGL', base_location=self.datadir) + # print(storer.data) + # #storer.update(pd.Timedelta('0d')) + # #print(storer.data) + class TestMarketData(CvxportfolioTest): """Test MarketData methods and interface.""" @@ -490,6 +862,25 @@ def test_user_provided_market_data(self): prices=self.prices, cash_key='cash', min_history=pd.Timedelta('0d')) + with self.assertRaises(NotImplementedError): + UserProvidedMarketData(returns=self.returns, volumes=used_volumes, + prices=self.prices, cash_key='NOTSUPPORTED', + min_history=pd.Timedelta('0d')) + + with self.assertRaises(ValueError): + UserProvidedMarketData(returns=self.returns, volumes=used_volumes, + prices=self.prices, cash_key='USDOLLAR', + min_history=pd.Timedelta('0d')) + + md = UserProvidedMarketData( + returns=self.returns, volumes=self.volumes, + prices=self.prices, cash_key='cash', + min_history=pd.Timedelta('60d')) + + # try to serve when there's not enough min_history + with self.assertRaises(ValueError): + md.serve(t=self.returns.index[20]) + def test_market_data_full(self): """Test serve method of DownloadedMarketData.""" @@ -536,128 +927,6 @@ def test_signature(self): print(md.partial_universe_signature(md.full_universe)) - def test_download_errors(self): - """Test single-symbol download error.""" - - storer = YahooFinance( - 'AAPL', grace_period=self.data_grace_period, - base_location=self.datadir) - with self.assertRaises(SyntaxError): - # pylint: disable=protected-access - storer._download('AAPL', overlap=1) - - class YahooFinanceErroneous(YahooFinance): - """Modified YF that nans last open price.""" - def _download(self, symbol, current=None, - overlap=5, grace_period='5d', **kwargs): - """Modified download method.""" - res = super()._download(symbol, current, - grace_period=grace_period) - res.iloc[-1, 0 ] = np.nan - return res - - _ = YahooFinanceErroneous('AMZN', base_location=self.datadir) - with self.assertLogs(level='ERROR') as _: - _ = YahooFinanceErroneous( - 'AMZN', base_location=self.datadir) - - class YahooFinanceErroneous2(YahooFinance): - """Modified YF that nans some line.""" - def _download(self, symbol, current=None, - overlap=5, grace_period='5d', **kwargs): - """Modified download method.""" - res = super()._download(symbol, current, - grace_period=grace_period) - res.iloc[-20] = np.nan - return res - with self.assertLogs(level='WARNING') as _: - _ = YahooFinanceErroneous2('GOOGL', - base_location=self.datadir) - with self.assertLogs(level='WARNING') as _: - _ = YahooFinanceErroneous2( - 'GOOGL', base_location=self.datadir) - - class FredErroneous(Fred): - """Modified FRED SymbolData that gives a NaN in the last entry.""" - - def _download(self, symbol, current, grace_period): - """Modified download method.""" - res = super()._download(symbol, current, - grace_period=grace_period) - res.iloc[-1] = np.nan - return res - - _ = FredErroneous('DFF', base_location=self.datadir) - with self.assertLogs(level='ERROR') as _: - _ = FredErroneous( - 'DFF', base_location=self.datadir) - - class YahooFinanceErroneous3(YahooFinance): - """Modified YF that is not append-only.""" - counter = 0 - def _download(self, symbol, current=None, - overlap=5, grace_period='5d', **kwargs): - """Modified download method.""" - res = super()._download(symbol, current, - grace_period=grace_period) - if self.counter > 0: - res.iloc[-2] = 0. - self.counter += 1 - return res - storer = YahooFinanceErroneous3('GOOGL', base_location=self.datadir) - with self.assertLogs(level='ERROR') as _: - storer.update(pd.Timedelta('0d')) - - def test_no_internet(self): - """Test errors thrown when not connected to the internet.""" - - with NoInternet(): - with self.assertRaises(DataError): - cvx.YahooFinance('BABA', base_location=self.datadir) - - with NoInternet(): - with self.assertRaises(DataError): - cvx.Fred('CES0500000003', base_location=self.datadir) - - def test_yahoo_finance_errors(self): - """Test errors with Yahoo Finance.""" - - with self.assertRaises(DataError): - YahooFinance("DOESNTEXIST", base_location=self.datadir) - - def test_yahoo_finance_cleaning(self): - """Test our logic to clean Yahoo Finance data.""" - - # this stock was found to have NaN issues - data = YahooFinance("ENI.MI", base_location=self.datadir).data - self.assertTrue((data.valuevolume == 0).sum() > 0) - self.assertTrue(data.iloc[:-1].isnull().sum().sum() == 0) - - # def test_yahoo_finance_wrong_last_time(self): - # """Test that we correct last time if intraday.""" - # - # class YahooFinanceErroneous4(YahooFinance): - # """Modified YF that sets last time wrong.""" - # counter = 0 - # - # @staticmethod - # def _get_data_yahoo( - # ticker, start='1900-01-01', end='2100-01-01'): - # """Modified download method.""" - # res = YahooFinance._get_data_yahoo( - # ticker, start=start, end=end) - # if self.counter > 0: - # res.index = list(res.index)[:-1] + [ - # res.index[-1] - pd.Timedelta('3h')] - # self.counter += 1 - # print(res) - # return res - # - # storer = YahooFinanceErroneous4('GOOGL', base_location=self.datadir) - # print(storer.data) - # #storer.update(pd.Timedelta('0d')) - # #print(storer.data) - if __name__ == '__main__': unittest.main(warnings='error') # pragma: no cover diff --git a/cvxportfolio/tests/test_utils.py b/cvxportfolio/tests/test_utils.py index efdb7ba01..f4d3d3f4c 100644 --- a/cvxportfolio/tests/test_utils.py +++ b/cvxportfolio/tests/test_utils.py @@ -80,7 +80,7 @@ def test_make_numeric(self): np.array(['1', 2], dtype=object), pd.Series([1, '2', 3], dtype=object), pd.DataFrame([[1, '2.', 3], [4, '5.', 6]], dtype=object)]: - make_numeric(data) + self.assertTrue(np.all(data.astype(float) == make_numeric(data))) for data in [ np.array(['1a', 2], dtype=object), diff --git a/cvxportfolio/utils.py b/cvxportfolio/utils.py index 0f5d3a25d..662ba336a 100644 --- a/cvxportfolio/utils.py +++ b/cvxportfolio/utils.py @@ -29,6 +29,35 @@ 'average_periods_per_year'] +def set_pd_read_only(df_or_ser): + """Set numpy array contained in dataframe or series to read only. + + This is done on data store internally before it is served to the + policy or the simulator to ensure data consistency in case some + element of the pipeline accidentally corrupts the data. + + This is enough to prevent direct assignement to the resulting + dataframe. However it could still be accidentally corrupted by + assigning to columns or indices that are not present in the + original. We avoid that case as well by returning a wrapped + dataframe (which doesn't copy data on creation) in + serve_data_policy and serve_data_simulator. + + :param df_or_ser: Series or Dataframe, only numeric (better if + homogeneous) dtype. + :type df_or_ser: pd.Series or pd.DataFrame + + :returns: Pandas object set to read only. + :rtype: pd.Series or pd.DataFrame + """ + data = df_or_ser.values + data.flags.writeable = False + if hasattr(df_or_ser, 'columns'): + return pd.DataFrame(data, index=df_or_ser.index, + columns=df_or_ser.columns) + return pd.Series(data, index=df_or_ser.index, name=df_or_ser.name) + + def average_periods_per_year(num_periods, first_time, last_time): """Average periods per year of a datetime index (unpacked), rounded to int. diff --git a/docs/examples.rst b/docs/examples.rst index d43bd17b5..102471231 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -12,6 +12,7 @@ We show some of them, along with their results, in the following pages: examples/dow30 examples/timing examples/universes + examples/data_cleaning examples/etfs examples/user_provided_forecasters examples/risk_models diff --git a/docs/examples/data_cleaning.rst b/docs/examples/data_cleaning.rst new file mode 100644 index 000000000..1128baf63 --- /dev/null +++ b/docs/examples/data_cleaning.rst @@ -0,0 +1,9 @@ +Data cleaning +=================== + +This example script is +`available in the repository `_. + +.. literalinclude:: ../../examples/data_cleaning.py + :language: python + :lines: 14- diff --git a/examples/data_cleaning.py b/examples/data_cleaning.py new file mode 100644 index 000000000..23e67d000 --- /dev/null +++ b/examples/data_cleaning.py @@ -0,0 +1,111 @@ +# Copyright 2023 Enzo Busseti +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This script is used to show the data cleaning applied to Yahoo Finance data. + +It is not really an example, and some of the methods shown here are not public, +so not covered by the semantic versioning agreeement (they could change +without notice). +""" + +import logging +import shutil +import tempfile +from pathlib import Path +from time import sleep + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd + +import cvxportfolio as cvx + +# If you change this to logging.INFO you get more logging output from the +# cleaning procedure +logging.getLogger().setLevel(logging.WARNING) + +# Here put any number of stocks for which you wish to analyze the cleaning +TEST_UNIVERSE = ['AAPL', 'GOOG', 'TSLA'] + +# Some names with known issues: +# TEST_UNIVERSE = ['SMT.L', 'NVR', 'HUBB', 'NWG.L'] + +ALL_DROPPED_ROWS_PCT = pd.Series(dtype=float) +ALL_MIN_LR = pd.Series(dtype=float) +ALL_MAX_LR = pd.Series(dtype=float) + +PLOT = True +SLEEP = 1 + +for stock in TEST_UNIVERSE: + sleep(SLEEP) + print(f'\n\t{stock}:') + + # This method is not public: + raw_yfinance = cvx.YahooFinance._get_data_yahoo(stock) + print(f'{stock}: YAHOO FINANCE RAW') + print(raw_yfinance) + + tmpdir = Path(tempfile.mkdtemp()) + cvx_cleaned = cvx.YahooFinance(stock, base_location=tmpdir).data + shutil.rmtree(tmpdir) + print(f'{stock}: CVXPORTFOLIO CLEANED') + print(cvx_cleaned) + + yf_log10r = np.log10(raw_yfinance.adjclose).diff().shift(-1) + cvx_log10r = np.log10(1 + cvx_cleaned['return']) + + if PLOT: + fig, axes = plt.subplots( + 3, figsize=(10/1.62, 10), layout='constrained') + + raw_yfinance.iloc[:, :5].plot(ax=axes[0]) + axes[0].set_yscale('log') + axes[0].set_title(f'{stock}: RAW YAHOO FINANCE') + + cvx_cleaned.iloc[:, :4].plot(ax=axes[1]) + axes[1].set_title(f'{stock}: CVXPORTFOLIO CLEANED DATA') + axes[1].set_yscale('log') + + (yf_log10r.cumsum() - yf_log10r.sum()).plot( + label='Yahoo Finance total close-to-close', ax=axes[2]) + (cvx_log10r.cumsum() - cvx_log10r.sum()).plot( + label='Cvxportfolio total open-to-open', ax=axes[2]) + axes[2].set_title(f'{stock}: CUMULATIVE LOG10 RETURNS (SCALED)') + axes[2].legend() + + plt.show() + + assert cvx_cleaned.index[-1] == raw_yfinance.index[-1] + + print() + dropped_rows = len(raw_yfinance) - len(cvx_cleaned) + dropped_rows_pct = dropped_rows / len(raw_yfinance) + ALL_DROPPED_ROWS_PCT.loc[stock] = dropped_rows_pct*100 + print(f'Cvxportfolio dropped {int(dropped_rows_pct*100)}% of rows') + + ALL_MIN_LR.loc[stock] = np.log(1+cvx_cleaned['return']).min() + ALL_MAX_LR.loc[stock] = np.log(1+cvx_cleaned['return']).max() + + print('Max Cvxportfolio logreturn:', ALL_MAX_LR.loc[stock]) + print('Min Cvxportfolio logreturn:', ALL_MIN_LR.loc[stock] ) + print('How many zero volumes:', (cvx_cleaned['valuevolume'] == 0.).mean()) + +print('\nCvxportfolio dropped rows %:') +print(ALL_DROPPED_ROWS_PCT.sort_values().tail()) + +print('\nCvxportfolio min logreturns:') +print(ALL_MIN_LR.sort_values().head()) + +print('\nCvxportfolio max logreturns:') +print(ALL_MAX_LR.sort_values().tail()) diff --git a/examples/strategies/ftse100_daily.py b/examples/strategies/ftse100_daily.py new file mode 100644 index 000000000..07336929c --- /dev/null +++ b/examples/strategies/ftse100_daily.py @@ -0,0 +1,118 @@ +# Copyright 2023 Enzo Busseti +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This is a simple example strategy which we run every day. + +It is a long-only, unit leverage, allocation on the FTSE 100 universe. + +We will see how it performs online. + +You run it from the root of the repository in the development environment by: + +.. code:: bash + + python -m examples.strategies.ftse100_daily +""" + +import cvxportfolio as cvx + +from ..universes import FTSE100 + +HYPERPAR_OPTIMIZE_START = '2012-01-01' + +OBJECTIVE = 'sharpe_ratio' + + +def policy(gamma_risk, gamma_trade): + """Create fresh policy object, also return handles to hyper-parameters. + + :param gamma_risk: Risk aversion multiplier. + :type gamma_risk: float + :param gamma_trade: Transaction cost aversion multiplier. + :type gamma_trade: float, optional + + :return: Policy object and dictionary mapping hyper-parameter names (which + must match the arguments of this function) to their respective objects. + :rtype: tuple + """ + gamma_risk_hp = cvx.Gamma(initial_value=gamma_risk) + gamma_trade_hp = cvx.Gamma(initial_value=gamma_trade) + return cvx.SinglePeriodOptimization( + cvx.ReturnsForecast() + - gamma_risk_hp * cvx.FullCovariance() + - gamma_trade_hp * cvx.StocksTransactionCost(), + [cvx.LongOnly(), cvx.LeverageLimit(1)], + benchmark=cvx.MarketBenchmark(), + ), {'gamma_risk': gamma_risk_hp, 'gamma_trade': gamma_trade_hp} + + +if __name__ == '__main__': + + RESEARCH = True + + if not RESEARCH: + from .strategy_executor import main + main(policy=policy, hyperparameter_opt_start=HYPERPAR_OPTIMIZE_START, + objective=OBJECTIVE, universe=FTSE100, cash_key='GBPOUND') + + else: + import matplotlib.pyplot as plt + import pandas as pd + + #INDEX_ETF = 'DIA' + + md = cvx.DownloadedMarketData( + FTSE100, cash_key='GBPOUND', grace_period=pd.Timedelta('5d')) + research_sim = cvx.StockMarketSimulator(market_data = md) + + research_policy, _ = policy(1., 1.) + + result_unif = research_sim.backtest( + cvx.Uniform(), start_time=HYPERPAR_OPTIMIZE_START) + print('uniform') + print(result_unif) + + result_market = research_sim.backtest( + cvx.MarketBenchmark(), start_time=HYPERPAR_OPTIMIZE_START) + print('market') + print(result_market) + + exit(0) + + # result_etf = cvx.StockMarketSimulator([INDEX_ETF]).backtest( + # cvx.Uniform(), start_time=HYPERPAR_OPTIMIZE_START) + # print(INDEX_ETF) + # print(result_etf) + + research_sim.optimize_hyperparameters( + research_policy, start_time=HYPERPAR_OPTIMIZE_START, + objective='sharpe_ratio') + + result_opt = research_sim.backtest( + research_policy, start_time=HYPERPAR_OPTIMIZE_START) + print('optimized') + print(result_opt) + + result_unif.plot() + result_opt.plot() + result_market.plot() + #result_etf.plot() + + plt.figure() + result_opt.growth_rates.iloc[-252*4:].cumsum().plot(label='optimized') + result_unif.growth_rates.iloc[-252*4:].cumsum().plot(label='uniform') + result_market.growth_rates.iloc[-252*4:].cumsum().plot(label='market') + #result_etf.growth_rates.iloc[-252*4:].cumsum().plot(label='market etf') + plt.legend() + + plt.show() diff --git a/examples/universes.py b/examples/universes.py index a585fa7d5..139c1b65e 100644 --- a/examples/universes.py +++ b/examples/universes.py @@ -20,7 +20,7 @@ We could also save each universe in a ``json`` file. """ -# This was generated on 2023-12-27 06:55:30.344592+00:00 +# This was generated on 2024-02-14 07:15:36.308012+00:00 SP500 = \ ['A', 'AAL', 'AAPL', 'ABBV', 'ABNB', 'ABT', 'ACGL', 'ACN', 'ADBE', 'ADI', @@ -31,11 +31,11 @@ 'BALL', 'BAX', 'BBWI', 'BBY', 'BDX', 'BEN', 'BF-B', 'BG', 'BIIB', 'BIO', 'BK', 'BKNG', 'BKR', 'BLDR', 'BLK', 'BMY', 'BR', 'BRK-B', 'BRO', 'BSX', 'BWA', 'BX', 'BXP', 'C', 'CAG', 'CAH', 'CARR', 'CAT', 'CB', 'CBOE', 'CBRE', 'CCI', 'CCL', - 'DAY', 'CDNS', 'CDW', 'CE', 'CEG', 'CF', 'CFG', 'CHD', 'CHRW', 'CHTR', 'CI', - 'CINF', 'CL', 'CLX', 'CMA', 'CMCSA', 'CME', 'CMG', 'CMI', 'CMS', 'CNC', 'CNP', - 'COF', 'COO', 'COP', 'COR', 'COST', 'CPB', 'CPRT', 'CPT', 'CRL', 'CRM', - 'CSCO', 'CSGP', 'CSX', 'CTAS', 'CTLT', 'CTRA', 'CTSH', 'CTVA', 'CVS', 'CVX', - 'CZR', 'D', 'DAL', 'DD', 'DE', 'DFS', 'DG', 'DGX', 'DHI', 'DHR', 'DIS', 'DLR', + 'CDNS', 'CDW', 'CE', 'CEG', 'CF', 'CFG', 'CHD', 'CHRW', 'CHTR', 'CI', 'CINF', + 'CL', 'CLX', 'CMA', 'CMCSA', 'CME', 'CMG', 'CMI', 'CMS', 'CNC', 'CNP', 'COF', + 'COO', 'COP', 'COR', 'COST', 'CPB', 'CPRT', 'CPT', 'CRL', 'CRM', 'CSCO', + 'CSGP', 'CSX', 'CTAS', 'CTLT', 'CTRA', 'CTSH', 'CTVA', 'CVS', 'CVX', 'CZR', + 'D', 'DAL', 'DAY', 'DD', 'DE', 'DFS', 'DG', 'DGX', 'DHI', 'DHR', 'DIS', 'DLR', 'DLTR', 'DOV', 'DOW', 'DPZ', 'DRI', 'DTE', 'DUK', 'DVA', 'DVN', 'DXCM', 'EA', 'EBAY', 'ECL', 'ED', 'EFX', 'EG', 'EIX', 'EL', 'ELV', 'EMN', 'EMR', 'ENPH', 'EOG', 'EPAM', 'EQIX', 'EQR', 'EQT', 'ES', 'ESS', 'ETN', 'ETR', 'ETSY', @@ -89,6 +89,21 @@ 'HD', 'HON', 'IBM', 'INTC', 'JNJ', 'JPM', 'KO', 'MCD', 'MMM', 'MRK', 'MSFT', 'NKE', 'PG', 'TRV', 'UNH', 'V', 'VZ', 'WBA', 'WMT'] +FTSE100 = \ +['AAF.L', 'AAL.L', 'ABF.L', 'ADM.L', 'AHT.L', 'ANTO.L', 'AUTO.L', 'AV.L', + 'AZN.L', 'BA.L', 'BARC.L', 'BATS.L', 'BDEV.L', 'BEZ.L', 'BKG.L', 'BME.L', + 'BNZL.L', 'BP.L', 'BRBY.L', 'BT-A.L', 'CCH.L', 'CNA.L', 'CPG.L', 'CRDA.L', + 'CTEC.L', 'DCC.L', 'DGE.L', 'DPLM.L', 'EDV.L', 'ENT.L', 'EXPN.L', 'FCIT.L', + 'FLTR.L', 'FRAS.L', 'FRES.L', 'GLEN.L', 'GSK.L', 'HIK.L', 'HLMA.L', 'HLN.L', + 'HSBA.L', 'HWDN.L', 'IAG.L', 'ICP.L', 'IHG.L', 'III.L', 'IMB.L', 'IMI.L', + 'INF.L', 'ITRK.L', 'JD.L', 'KGF.L', 'LAND.L', 'LGEN.L', 'LLOY.L', 'LSEG.L', + 'MKS.L', 'MNDI.L', 'MNG.L', 'MRO.L', 'NG.L', 'NWG.L', 'NXT.L', 'OCDO.L', + 'PHNX.L', 'PRU.L', 'PSH.L', 'PSN.L', 'PSON.L', 'REL.L', 'RIO.L', 'RKT.L', + 'RMV.L', 'RR.L', 'RS1.L', 'RTO.L', 'SBRY.L', 'SDR.L', 'SGE.L', 'SGRO.L', + 'SHEL.L', 'SKG.L', 'SMDS.L', 'SMIN.L', 'SMT.L', 'SN.L', 'SPX.L', 'SSE.L', + 'STAN.L', 'STJ.L', 'SVT.L', 'TSCO.L', 'TW.L', 'ULVR.L', 'UTG.L', 'UU.L', + 'VOD.L', 'WEIR.L', 'WPP.L', 'WTB.L'] + if __name__ == '__main__': # import json @@ -113,10 +128,16 @@ 'page': "https://en.wikipedia.org/wiki/Dow_Jones_Industrial_Average", 'table_number': 0, 'column_number': 1, + }, + 'ftse100': { + 'page': 'https://en.wikipedia.org/wiki/FTSE_100_Index', + 'table_number': -1, + 'column_number': 1, + 'suffix': '.L', } } - def get_column_wikipedia_page(page, table_number, column_number): + def get_column_wikipedia_page(page, table_number, column_number, **kwargs): """Get a column as list of strings from a table on wikipedia. This is adapted from: @@ -129,6 +150,8 @@ def get_column_wikipedia_page(page, table_number, column_number): :type table_number: int :param column_number: Which column to extract. :type column_number: int + :param kwargs: Unused arguments. + :type kwargs: dict :returns: Sorted strings of the column. :rtype: list @@ -143,17 +166,21 @@ def get_column_wikipedia_page(page, table_number, column_number): column.append(element.strip()) return sorted(column) - def adapt_for_yahoo_finance(tickers_list): + def adapt_for_yahoo_finance(tickers_list, suffix='', **kwargs): """Change tickers to match the spelling of Yahoo Finance. :param tickers_list: Tickers from Wikipedia. :type tickers_list: list + :param suffix: Suffix to add to each ticker, default empty string. + :type suffix: str + :param kwargs: Unused arguments. + :type kwargs: dict :returns: Adapted tickers. :rtype: list """ - return [el.replace('.', '-') for el in tickers_list] + return [el.replace('.', '-') + suffix for el in tickers_list] # re-write this file @@ -177,7 +204,7 @@ def adapt_for_yahoo_finance(tickers_list): for key, value in universes.items(): tickers = adapt_for_yahoo_finance( - get_column_wikipedia_page(**value)) + get_column_wikipedia_page(**value), **value) f.write(f'\n{key.upper()} = \\\n') pprint(tickers, compact=True, width=79, stream=f)