diff --git a/.gitignore b/.gitignore index 29ea1cd5e3..608a2bf519 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ __pycache__/ _build build/ dist/ +tests/test_pit_data/ *.pkl *.hd5 diff --git a/qlib/data/base.py b/qlib/data/base.py index 496ae38ee2..f7d3df682a 100644 --- a/qlib/data/base.py +++ b/qlib/data/base.py @@ -267,10 +267,10 @@ class PFeature(Feature): def __str__(self): return "$$" + self._name - def _load_internal(self, instrument, start_index, end_index, cur_time, period=None): + def _load_internal(self, instrument, start_index, end_index, cur_time, period=None, start_time=None): from .data import PITD # pylint: disable=C0415 - return PITD.period_feature(instrument, str(self), start_index, end_index, cur_time, period) + return PITD.period_feature(instrument, str(self), start_index, end_index, cur_time, period, start_time) class ExpressionOps(Expression): diff --git a/qlib/data/cache.py b/qlib/data/cache.py index 3264dcd020..f1e6bd3764 100644 --- a/qlib/data/cache.py +++ b/qlib/data/cache.py @@ -160,6 +160,7 @@ def __init__(self, mem_cache_size_limit=None, limit_type="length"): self.__calendar_mem_cache = klass(size_limit) self.__instrument_mem_cache = klass(size_limit) self.__feature_mem_cache = klass(size_limit) + self.__pit_mem_cache = klass(size_limit) def __getitem__(self, key): if key == "c": @@ -168,6 +169,8 @@ def __getitem__(self, key): return self.__instrument_mem_cache elif key == "f": return self.__feature_mem_cache + elif key == "p": + return self.__pit_mem_cache else: raise KeyError("Unknown memcache unit") @@ -175,6 +178,7 @@ def clear(self): self.__calendar_mem_cache.clear() self.__instrument_mem_cache.clear() self.__feature_mem_cache.clear() + self.__pit_mem_cache.clear() class MemCacheExpire: diff --git a/qlib/data/data.py b/qlib/data/data.py index aba75c0b1a..2cb6dcfd28 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -33,8 +33,7 @@ normalize_cache_fields, code_to_fname, time_to_slc_point, - read_period_data, - get_period_list, + get_period_list_by_offset, ) from ..utils.paral import ParallelExt from .ops import Operators # pylint: disable=W0611 # noqa: F401 @@ -48,7 +47,10 @@ class ProviderBackendMixin: def get_default_backend(self): backend = {} - provider_name: str = re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2] + if hasattr(self, "provider_name"): + provider_name = getattr(self, "provider_name") + else: + provider_name: str = re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2] # set default storage class backend.setdefault("class", f"File{provider_name}Storage") # set default storage module @@ -336,6 +338,10 @@ def feature(self, instrument, field, start_time, end_time, freq): class PITProvider(abc.ABC): + @property + def provider_name(self): + return "PIT" + @abc.abstractmethod def period_feature( self, @@ -741,29 +747,39 @@ def feature(self, instrument, field, start_index, end_index, freq): return self.backend_obj(instrument=instrument, field=field, freq=freq)[start_index : end_index + 1] -class LocalPITProvider(PITProvider): +class LocalPITProvider(PITProvider, ProviderBackendMixin): # TODO: Add PIT backend file storage # NOTE: This class is not multi-threading-safe!!!! - def period_feature(self, instrument, field, start_index, end_index, cur_time, period=None): + def __init__(self, remote=False, backend={}): + super().__init__() + self.remote = remote + self.backend = backend + + def period_feature(self, instrument, field, start_offset, end_offset, cur_time, period=None, start_time=None): + """get raw data from PIT + we have 3 modes to query data from PIT, all method need current datetime + + 1. given period, return value observed at current datetime + return series with index as datetime + 2. given start_time, return value **observed by each day** from start_time to current datetime + return series with index as datetime + 3. given start_offset and end_offset, return period data between [-start_offset, end_offset] observed at current datetime + return series with index as period + + """ if not isinstance(cur_time, pd.Timestamp): raise ValueError( f"Expected pd.Timestamp for `cur_time`, got '{cur_time}'. Advices: you can't query PIT data directly(e.g. '$$roewa_q'), you must use `P` operator to convert data to each day (e.g. 'P($$roewa_q)')" ) - assert end_index <= 0 # PIT don't support querying future data - - DATA_RECORDS = [ - ("date", C.pit_record_type["date"]), - ("period", C.pit_record_type["period"]), - ("value", C.pit_record_type["value"]), - ("_next", C.pit_record_type["index"]), - ] - VALUE_DTYPE = C.pit_record_type["value"] + assert end_offset <= 0 # PIT don't support querying future data field = str(field).lower()[2:] instrument = code_to_fname(instrument) + backend_obj = self.backend_obj(instrument=instrument, field=field) + # {For acceleration # start_index, end_index, cur_index = kwargs["info"] # if cur_index == start_index: @@ -776,58 +792,73 @@ def period_feature(self, instrument, field, start_index, end_index, cur_time, pe # self.period_index[field] = {} # For acceleration} - if not field.endswith("_q") and not field.endswith("_a"): - raise ValueError("period field must ends with '_q' or '_a'") + key = (instrument, field) quarterly = field.endswith("_q") - index_path = C.dpm.get_data_uri() / "financial" / instrument.lower() / f"{field}.index" - data_path = C.dpm.get_data_uri() / "financial" / instrument.lower() / f"{field}.data" - if not (index_path.exists() and data_path.exists()): - raise FileNotFoundError("No file is found.") - # NOTE: The most significant performance loss is here. - # Does the acceleration that makes the program complicated really matters? - # - It makes parameters of the interface complicate - # - It does not performance in the optimal way (places all the pieces together, we may achieve higher performance) - # - If we design it carefully, we can go through for only once to get the historical evolution of the data. - # So I decide to deprecated previous implementation and keep the logic of the program simple - # Instead, I'll add a cache for the index file. - data = np.fromfile(data_path, dtype=DATA_RECORDS) - - # find all revision periods before `cur_time` - cur_time_int = int(cur_time.year) * 10000 + int(cur_time.month) * 100 + int(cur_time.day) - loc = np.searchsorted(data["date"], cur_time_int, side="right") - if loc <= 0: - return pd.Series(dtype=C.pit_record_type["value"]) - last_period = data["period"][:loc].max() # return the latest quarter - first_period = data["period"][:loc].min() - period_list = get_period_list(first_period, last_period, quarterly) + if key in H["p"]: + df = H["p"][key] + else: + if not field.endswith("_q") and not field.endswith("_a"): + raise ValueError("period field must ends with '_q' or '_a'") + # index_path = C.dpm.get_data_uri() / "financial" / instrument.lower() / f"{field}.index" + data_path = C.dpm.get_data_uri() / "financial" / instrument.lower() / f"{field}.data" + if not data_path.exists(): + raise FileNotFoundError("No file is found.") + ## get first period offset + ## NOTE: current index file return offset from a given period not date + ## so we cannot findout the offset by given date + ## stop using index in this version + # start_point = get_pitdata_offset(index_path, period, ) + data = backend_obj.np_data() + df = pd.DataFrame(data) + df.sort_values(by=["date", "period"], inplace=True) + df["date"] = pd.to_datetime(df["date"].astype(str)) + H["f"][key] = df + + # return df if period is not None: - # NOTE: `period` has higher priority than `start_index` & `end_index` - if period not in period_list: - return pd.Series(dtype=C.pit_record_type["value"]) - else: - period_list = [period] + retur = df[df["period"] == period].set_index("date")["value"] + elif start_time is not None: + # df is sorted by date, and the term whose period is monotonically non-decreasing is selected. + s_sign = pd.Series(False, index=df.index) + max_p = df["period"].iloc[0] + for i in range(0, len(s_sign)): + if df["period"].iloc[i] >= max_p: + s_sign.iloc[i] = True + max_p = df["period"].iloc[i] + df_sim = df[s_sign].drop_duplicates(subset=["date"], keep="last") + s_part = df_sim.set_index("date")[start_time:]["value"] + if s_part.empty: + return pd.Series(dtype="float64") + if start_time != s_part.index[0] and start_time >= df["date"].iloc[0]: + # add previous value to result to avoid nan in the first period + pre_value = pd.Series(df[df["date"] < start_time]["value"].iloc[-1], index=[start_time]) + s_part = pd.concat([pre_value, s_part]) + return s_part else: - period_list = period_list[max(0, len(period_list) + start_index - 1) : len(period_list) + end_index] - value = np.full((len(period_list),), np.nan, dtype=VALUE_DTYPE) - for i, p in enumerate(period_list): - # last_period_index = self.period_index[field].get(period) # For acceleration - value[i], now_period_index = read_period_data( - index_path, data_path, p, cur_time_int, quarterly # , last_period_index # For acceleration - ) - # self.period_index[field].update({period: now_period_index}) # For acceleration - # NOTE: the index is period_list; So it may result in unexpected values(e.g. nan) - # when calculation between different features and only part of its financial indicator is published - series = pd.Series(value, index=period_list, dtype=VALUE_DTYPE) - - # {For acceleration - # if cur_index == end_index: - # self.all_fields.remove(field) - # if not len(self.all_fields): - # del self.all_fields - # del self.period_index - # For acceleration} - - return series + df_remain = df[(df["date"] <= cur_time)] + if df_remain.empty: + return pd.Series(dtype="float64") + last_observe_date = df_remain["date"].iloc[-1] + # keep only the latest period value + df_remain = df_remain.sort_values(by=["period"]).drop_duplicates(subset=["period"], keep="last") + df_remain = df_remain.set_index("period") + + cache_key = ( + instrument, + field, + last_observe_date, + start_offset, + end_offset, + quarterly, + ) # f"{instrument}.{field}.{last_observe_date}.{start_offset}.{end_offset}.{quarterly}" + if cache_key in H["p"]: + retur = H["p"][cache_key] + else: + last_period = df_remain.index[-1] + period_list = get_period_list_by_offset(last_period, start_offset, end_offset, quarterly) + retur = df_remain["value"].reindex(period_list, fill_value=np.nan) + H["p"][cache_key] = retur + return retur class LocalExpressionProvider(ExpressionProvider): diff --git a/qlib/data/pit.py b/qlib/data/pit.py index 33d5e0c5cc..97a6dff938 100644 --- a/qlib/data/pit.py +++ b/qlib/data/pit.py @@ -24,31 +24,45 @@ class P(ElemOperator): def _load_internal(self, instrument, start_index, end_index, freq): _calendar = Cal.calendar(freq=freq) resample_data = np.empty(end_index - start_index + 1, dtype="float32") - - for cur_index in range(start_index, end_index + 1): - cur_time = _calendar[cur_index] - # To load expression accurately, more historical data are required - start_ws, end_ws = self.feature.get_extended_window_size() - if end_ws > 0: - raise ValueError( - "PIT database does not support referring to future period (e.g. expressions like `Ref('$$roewa_q', -1)` are not supported" - ) - - # The calculated value will always the last element, so the end_offset is zero. + # To load expression accurately, more historical data are required + start_ws, end_ws = self.feature.get_extended_window_size() + # if start_ws = 0, means expression use only current data, so pit history data is not required + if start_ws == 0 and end_ws == 0: try: - s = self._load_feature(instrument, -start_ws, 0, cur_time) - resample_data[cur_index - start_index] = s.iloc[-1] if len(s) > 0 else np.nan + # get start and end date + s = self._load_feature(instrument, 0, 0, _calendar[end_index], None, _calendar[start_index]) + if len(s) == 0: + return pd.Series(dtype="float32", name=str(self)) + # index in s may not in calendar, so we need to reindex it to continue date first + s = s.reindex(pd.date_range(start=s.index[0], end=_calendar[end_index])).fillna(method="ffill") + resample_data = s.reindex(_calendar[start_index : end_index + 1]).fillna(method="ffill").values except FileNotFoundError: get_module_logger("base").warning(f"WARN: period data not found for {str(self)}") return pd.Series(dtype="float32", name=str(self)) + else: + for cur_index in range(start_index, end_index + 1): + cur_time = _calendar[cur_index] + + if end_ws > 0: + raise ValueError( + "PIT database does not support referring to future period (e.g. expressions like `Ref('$$roewa_q', -1)` are not supported" + ) + + # The calculated value will always the last element, so the end_offset is zero. + try: + s = self._load_feature(instrument, -start_ws, 0, cur_time) + resample_data[cur_index - start_index] = s.iloc[-1] if len(s) > 0 else np.nan + except FileNotFoundError: + get_module_logger("base").warning(f"WARN: period data not found for {str(self)}") + return pd.Series(dtype="float32", name=str(self)) resample_series = pd.Series( resample_data, index=pd.RangeIndex(start_index, end_index + 1), dtype="float32", name=str(self) ) return resample_series - def _load_feature(self, instrument, start_index, end_index, cur_time): - return self.feature.load(instrument, start_index, end_index, cur_time) + def _load_feature(self, instrument, start_index, end_index, cur_time, period=None, start_time=None): + return self.feature.load(instrument, start_index, end_index, cur_time, period, start_time) def get_longest_back_rolling(self): # The period data will collapse as a normal feature. So no extending and looking back @@ -67,5 +81,5 @@ def __init__(self, feature, period): def __str__(self): return f"{super().__str__()}[{self.period}]" - def _load_feature(self, instrument, start_index, end_index, cur_time): + def _load_feature(self, instrument, start_index, end_index, cur_time, period=None, start_time=None): return self.feature.load(instrument, start_index, end_index, cur_time, self.period) diff --git a/qlib/data/storage/file_storage.py b/qlib/data/storage/file_storage.py index 8a100a2d19..125cd35a2d 100644 --- a/qlib/data/storage/file_storage.py +++ b/qlib/data/storage/file_storage.py @@ -7,13 +7,21 @@ import numpy as np import pandas as pd +from qlib.data.storage.storage import PITStorage from qlib.utils.time import Freq from qlib.utils.resam import resam_calendar from qlib.config import C from qlib.data.cache import H from qlib.log import get_module_logger -from qlib.data.storage import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstKT, InstVT +from qlib.data.storage import ( + CalendarStorage, + InstrumentStorage, + FeatureStorage, + CalVT, + InstKT, + InstVT, +) logger = get_module_logger("file_storage") @@ -48,7 +56,10 @@ def support_freq(self) -> List[str]: if len(self.provider_uri) == 1 and C.DEFAULT_FREQ in self.provider_uri: freq_l = filter( lambda _freq: not _freq.endswith("_future"), - map(lambda x: x.stem, self.dpm.get_data_uri(C.DEFAULT_FREQ).joinpath("calendars").glob("*.txt")), + map( + lambda x: x.stem, + self.dpm.get_data_uri(C.DEFAULT_FREQ).joinpath("calendars").glob("*.txt"), + ), ) else: freq_l = self.provider_uri.keys() @@ -140,7 +151,10 @@ def data(self) -> List[CalVT]: _calendar = self._read_calendar() if Freq(self._freq_file) != Freq(self.freq): _calendar = resam_calendar( - np.array(list(map(pd.Timestamp, _calendar))), self._freq_file, self.freq, self.region + np.array(list(map(pd.Timestamp, _calendar))), + self._freq_file, + self.freq, + self.region, ) return _calendar @@ -287,6 +301,7 @@ def __init__(self, instrument: str, field: str, freq: str, provider_uri: dict = super(FileFeatureStorage, self).__init__(instrument, field, freq, **kwargs) self._provider_uri = None if provider_uri is None else C.DataPathManager.format_provider_uri(provider_uri) self.file_name = f"{instrument.lower()}/{field.lower()}.{freq.lower()}.bin" + self._start_index = None def clear(self): with self.uri.open("wb") as _: @@ -303,6 +318,7 @@ def write(self, data_array: Union[List, np.ndarray], index: int = None) -> None: "if you need to clear the FeatureStorage, please execute: FeatureStorage.clear" ) return + self._start_index = None if not self.uri.exists(): # write index = 0 if index is None else index @@ -320,7 +336,9 @@ def write(self, data_array: Union[List, np.ndarray], index: int = None) -> None: _old_data = np.fromfile(fp, dtype=" None: def start_index(self) -> Union[int, None]: if not self.uri.exists(): return None - with self.uri.open("rb") as fp: - index = int(np.frombuffer(fp.read(4), dtype=" Union[int, None]: @@ -377,3 +396,194 @@ def __getitem__(self, i: Union[int, slice]) -> Union[Tuple[int, float], pd.Serie def __len__(self) -> int: self.check() return self.uri.stat().st_size // 4 - 1 + + +class FilePITStorage(FileStorageMixin, PITStorage): + """PIT data is a special case of Feature data, it looks like + + date period value _next + 0 20070428 200701 0.090219 4294967295 + 1 20070817 200702 0.139330 4294967295 + 2 20071023 200703 0.245863 4294967295 + 3 20080301 200704 0.347900 80 + 4 20080313 200704 0.395989 4294967295 + + It is sorted by [date, period]. + + next field currently is not used. just for forward compatible. + """ + + # NOTE: + # PIT data should have two files, one is the index file, the other is the data file. + + # pesudo code: + # date_index = calendar.index(date) + # data_start_index, data_end_index = index_file[date_index] + # data = data_file[data_start_index:data_end_index] + + # the index file is like feature's data file, but given a start index in index file, it will return the first and the last observe index of the data file. + # the data file has tree columns, the first column is observe date, the second column is financial period, the third column is the value. + + # so given start and end date, we can get the start_index and end_index from calendar. + # use it to read two line from index file, then we can get the start and end index of the data file. + + # but consider this implementation, we will create a index file which will have 50 times lines than the data file. Is it a good idea? + # if we just create a index file the same line with data file, we have to read the whole index file for any time slice search, so why not read whole data file? + + def __init__(self, instrument: str, field: str, freq: str = "day", provider_uri: dict = None, **kwargs): + super(FilePITStorage, self).__init__(instrument, field, freq, **kwargs) + + if not field.endswith("_q") and not field.endswith("_a"): + raise ValueError("period field must ends with '_q' or '_a'") + self.quarterly = field.endswith("_q") + + self._provider_uri = None if provider_uri is None else C.DataPathManager.format_provider_uri(provider_uri) + self.file_name = f"{instrument.lower()}/{field.lower()}.data" + self.uri.parent.mkdir(parents=True, exist_ok=True) + self.raw_dtype = [ + ("date", C.pit_record_type["date"]), + ("period", C.pit_record_type["period"]), + ("value", C.pit_record_type["value"]), + ("_next", C.pit_record_type["index"]), # not used in current implementation + ] + self.dtypes = np.dtype(self.raw_dtype) + self.itemsize = self.dtypes.itemsize + self.dtype_string = "".join([i[1] for i in self.raw_dtype]) + self.columns = [i[0] for i in self.raw_dtype] + + @property + def uri(self) -> Path: + if self.freq not in self.support_freq: + raise ValueError(f"{self.storage_name}: {self.provider_uri} does not contain data for {self.freq}") + return self.dpm.get_data_uri(self.freq).joinpath(f"{self.storage_name}", self.file_name) + + def clear(self): + with self.uri.open("wb") as _: + pass + + @property + def data(self) -> pd.DataFrame: + return self[:] + + def update(self, data_array: np.ndarray) -> None: + """update data to storage, replace current data from start_date to end_date with given data_array + + Args: + data_array: Structured arrays contains date, period, value and next. same with self.raw_dtype + """ + if not self.uri.exists() or len(self) == 0: + # write + index = 0 + self.write(data_array, index) + else: + # sort it + data_array = np.sort(data_array, order=["date", "period"]) + # get index + update_start_date = data_array[0][0] + update_end_date = data_array[-1][0] + current_data = self.np_data() + index = (current_data["date"] >= update_start_date).argmax() + end_index = (current_data["date"] > update_end_date).argmax() + new_data = np.concatenate([data_array, current_data[end_index:]]) + self.write(new_data, index) + + def write(self, data_array: np.ndarray, index: int = None) -> None: + """write data to storage at specific index + + Args: + data_array: Structured arrays contains date, period, value and next + index: target index to start writing. Defaults to None. + """ + + if len(data_array) == 0: + logger.info( + "len(data_array) == 0, write" + "if you need to clear the FeatureStorage, please execute: FeatureStorage.clear" + ) + return + # check data_array dtype + if data_array.dtype != self.dtypes: + raise ValueError(f"data_array.dtype = {data_array.dtype}, self.dtypes = {self.dtypes}") + + # sort data_array with first 2 columns + data_array = np.sort(data_array, order=["date", "period"]) + + if not self.uri.exists(): + # write + index = 0 if index is None else index + with self.uri.open("wb") as fp: + data_array.tofile(fp) + else: + if index is None or index > self.end_index: + index = self.end_index + 1 + with self.uri.open("rb+") as fp: + fp.seek(index * self.itemsize) + data_array.tofile(fp) + + @property + def start_index(self) -> Union[int, None]: + return 0 + + @property + def end_index(self) -> Union[int, None]: + if not self.uri.exists(): + return None + # The next data appending index point will be `end_index + 1` + return self.start_index + len(self) - 1 + + def np_data(self, i: Union[int, slice] = None) -> np.ndarray: + """return numpy structured array + + Args: + i: index or slice. Defaults to None. + + Returns: + np.ndarray + """ + if not self.uri.exists(): + if isinstance(i, int): + return None, None + elif isinstance(i, slice): + return np.array(dtype=self.dtypes) + else: + raise TypeError(f"type(i) = {type(i)}") + + if i is None: + i = slice(None, None) + storage_start_index = self.start_index + storage_end_index = self.end_index + with self.uri.open("rb") as fp: + if isinstance(i, int): + if storage_start_index > i: + raise IndexError(f"{i}: start index is {storage_start_index}") + fp.seek(i * self.itemsize) + return np.array([struct.unpack(self.dtype_string, fp.read(self.itemsize))], dtype=self.dtypes) + elif isinstance(i, slice): + start_index = storage_start_index if i.start is None else i.start + end_index = storage_end_index if i.stop is None else i.stop - 1 + si = max(start_index, storage_start_index) + if si > end_index: + return np.array(dtype=self.dtypes) + fp.seek(start_index * self.itemsize) + # read n bytes + count = end_index - si + 1 + data = np.frombuffer(fp.read(self.itemsize * count), dtype=self.dtypes) + return data + else: + raise TypeError(f"type(i) = {type(i)}") + + def __getitem__(self, i: Union[int, slice]) -> Union[Tuple[int, float], pd.DataFrame]: + if isinstance(i, int): + return pd.Series(self.np_data(i), index=self.columns, name=i) + elif isinstance(i, slice): + data = self.np_data(i) + si = self.start_index if i.start is None else i.start + if si < 0: + si = len(self) + si + return pd.DataFrame(data, index=pd.RangeIndex(si, si + len(data)), columns=self.columns) + else: + raise TypeError(f"type(i) = {type(i)}") + + def __len__(self) -> int: + self.check() + return self.uri.stat().st_size // self.itemsize diff --git a/qlib/data/storage/storage.py b/qlib/data/storage/storage.py index 2eb7da1de6..d5151c7d3d 100644 --- a/qlib/data/storage/storage.py +++ b/qlib/data/storage/storage.py @@ -492,3 +492,165 @@ def __len__(self) -> int: """ raise NotImplementedError("Subclass of FeatureStorage must implement `__len__` method") + + +class PITStorage(FeatureStorage): + """PIT data is a special case of Feature data, it looks like + + date period value _next + 0 20070428 200701 0.090219 4294967295 + 1 20070817 200702 0.139330 4294967295 + 2 20071023 200703 0.245863 4294967295 + 3 20080301 200704 0.347900 80 + 4 20080313 200704 0.395989 4294967295 + + It is sorted by [date, period]. + + next field currently is not used. just for forward compatible. + """ + + @property + def storage_name(self) -> str: + return "financial" # for compatibility + + def np_data(self, i: Union[int, slice] = None) -> np.ndarray: + """return numpy structured array + + Args: + i: index or slice. Defaults to None. + + Returns: + np.ndarray + """ + + raise NotImplementedError("Subclass of FeatureStorage must implement `write` method") + + @property + def data(self) -> pd.DataFrame: + """get all data + + dataframe index is date, columns are report_period and value + + Notes + ------ + if data(storage) does not exist, return empty pd.DataFrame: `return pd.DataFrame(dtype=np.float32)` + """ + raise NotImplementedError("Subclass of FeatureStorage must implement `data` method") + + def write(self, data_array: np.ndarray, index: int = None): + """Write data_array to FeatureStorage starting from index. + + Notes + ------ + If index is None, append data_array to feature. + + If len(data_array) == 0; return + + If (index - self.end_index) >= 1, self[end_index+1: index] will be filled with np.nan + + Examples + --------- + .. code-block:: + + pit data: + date period value _next + 0 20070428 200701 0.090219 4294967295 + 1 20070817 200702 0.139330 4294967295 + 2 20071023 200703 0.245863 4294967295 + 3 20080301 200704 0.347900 80 + 4 20080313 200704 0.395989 4294967295 + + + >>> s.write(np.array([(20070917, 200703, 0.239330, 0)], dtype=s.raw_dtype), 1) + + feature: + date period value _next + 0 20070428 200701 0.090219 4294967295 + 1 20070917 200703 0.239330 0 + 2 20071023 200703 0.245863 4294967295 + 3 20080301 200704 0.347900 80 + 4 20080313 200704 0.395989 4294967295 + + """ + raise NotImplementedError("Subclass of FeatureStorage must implement `write` method") + + def rewrite(self, data: Union[List, np.ndarray, Tuple]): + """overwrite all data in FeatureStorage with data + + Parameters + ---------- + data: Union[List, np.ndarray, Tuple] + data + index: int + data start index + """ + self.clear() + self.write(data, 0) + + def update(self, data_array: np.ndarray) -> None: + """update data to storage, replace current data from start_date to end_date with given data_array + + Args: + data_array: Structured arrays contains date, period, value and next. same with self.raw_dtype + + Examples + --------- + .. code-block:: + + pit data: + date period value _next + 0 20070428 200701 0.090219 4294967295 + 1 20070817 200702 0.139330 4294967295 + 2 20071023 200703 0.245863 4294967295 + 3 20080301 200704 0.347900 80 + 4 20080313 200704 0.395989 4294967295 + + >>> s.update(np.array([(20070917, 200703, 0.111111, 0), (20100314, 200703, 0.111111, 0)], dtype=s.raw_dtype)) + date period value _next + 0 20070428 200701 0.090219 4294967295 + 1 20070817 200702 0.139330 4294967295 + 2 20070917 200703 0.111111 0 + 3 20100314 200703 0.111111 0 + + """ + raise NotImplementedError("Subclass of FeatureStorage must implement `update` method") + + @overload + def __getitem__(self, s: slice) -> pd.Series: + """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step] + + Returns + ------- + pd.Series(values, index=pd.RangeIndex(start, len(values)) + """ + + @overload + def __getitem__(self, i: int) -> Tuple[int, float]: + """x.__getitem__(y) <==> x[y]""" + + def __getitem__(self, i) -> Union[Tuple[int, float], pd.Series]: + """x.__getitem__(y) <==> x[y] + + Notes + ------- + if data(storage) does not exist: + if isinstance(i, int): + return (None, None) + if isinstance(i, slice): + # return empty pd.Series + return pd.Series(dtype=np.float32) + """ + raise NotImplementedError( + "Subclass of FeatureStorage must implement `__getitem__(i: int)`/`__getitem__(s: slice)` method" + ) + + def __len__(self) -> int: + """ + + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + + """ + raise NotImplementedError("Subclass of FeatureStorage must implement `__len__` method") diff --git a/qlib/utils/__init__.py b/qlib/utils/__init__.py index 732638b236..3c46a803dc 100644 --- a/qlib/utils/__init__.py +++ b/qlib/utils/__init__.py @@ -31,6 +31,12 @@ unpack_archive_with_buffer, get_tmp_file_with_buffer, ) +from .file import ( + get_or_create_path, + save_multiple_parts_file, + unpack_archive_with_buffer, + get_tmp_file_with_buffer, +) from ..config import C from ..log import get_module_logger, set_log_with_config @@ -98,79 +104,45 @@ def get_period_list(first: int, last: int, quarterly: bool) -> List[int]: return res -def get_period_offset(first_year, period, quarterly): - if quarterly: - offset = (period // 100 - first_year) * 4 + period % 100 - 1 - else: - offset = period - first_year - return offset - - -def read_period_data( - index_path, - data_path, - period, - cur_date_int: int, - quarterly, - last_period_index: int = None, -): +def get_period_list_by_offset(last: int, start_offset: int, end_offset: int, quarterly: bool) -> List[int]: """ - At `cur_date`(e.g. 20190102), read the information at `period`(e.g. 201803). - Only the updating info before cur_date or at cur_date will be used. + This method will be used in PIT database. + It return all the possible values between `first(offset-last)` and `end` (first and end is included) Parameters ---------- - period: int - date period represented by interger, e.g. 201901 corresponds to the first quarter in 2019 - cur_date_int: int - date which represented by interger, e.g. 20190102 - last_period_index: int - it is a optional parameter; it is designed to avoid repeatedly access the .index data of PIT database when - sequentially observing the data (Because the latest index of a specific period of data certainly appear in after the one in last observation). + offset: int + offset quarter or year from last + quarterly : bool + will it return quarterly index or yearly index. Returns ------- - the query value and byte index the index value - """ - DATA_DTYPE = "".join( - [ - C.pit_record_type["date"], - C.pit_record_type["period"], - C.pit_record_type["value"], - C.pit_record_type["index"], - ] - ) - - PERIOD_DTYPE = C.pit_record_type["period"] - INDEX_DTYPE = C.pit_record_type["index"] + List[int] + the possible index between [first, last] + """ + assert end_offset <= 0 + if not quarterly: + assert all(1900 <= x <= 2099 for x in (last,)), "invalid arguments" + return list(range(last + start_offset, last + 1 + end_offset)) + else: + assert all(190000 <= x <= 209904 for x in (last,)), "invalid arguments" + res = [] + # last minus offset quarters + for year in range(int(last // 100 + start_offset // 4 - 1), int(last // 100 + 1) + end_offset): + for q in range(1, 5): + period = year * 100 + q + if period <= last: + res.append(year * 100 + q) + return res[len(res) + start_offset - 1 : len(res) + end_offset + 1] - NAN_VALUE = C.pit_record_nan["value"] - NAN_INDEX = C.pit_record_nan["index"] - # find the first index of linked revisions - if last_period_index is None: - with open(index_path, "rb") as fi: - (first_year,) = struct.unpack(PERIOD_DTYPE, fi.read(struct.calcsize(PERIOD_DTYPE))) - all_periods = np.fromfile(fi, dtype=INDEX_DTYPE) - offset = get_period_offset(first_year, period, quarterly) - _next = all_periods[offset] +def get_period_offset(first_year, period, quarterly): + if quarterly: + offset = (period // 100 - first_year) * 4 + period % 100 - 1 else: - _next = last_period_index - - # load data following the `_next` link - prev_value = NAN_VALUE - prev_next = _next - - with open(data_path, "rb") as fd: - while _next != NAN_INDEX: - fd.seek(_next) - date, period, value, new_next = struct.unpack(DATA_DTYPE, fd.read(struct.calcsize(DATA_DTYPE))) - if date > cur_date_int: - break - prev_next = _next - _next = new_next - prev_value = value - return prev_value, prev_next + offset = period - first_year + return offset def np_ffill(arr: np.array): diff --git a/tests/test_pit.py b/tests/test_pit.py index 8320e1d361..359be618dd 100644 --- a/tests/test_pit.py +++ b/tests/test_pit.py @@ -3,6 +3,8 @@ import sys + +import numpy as np import qlib import shutil import unittest @@ -12,6 +14,7 @@ from pathlib import Path from qlib.data import D +from qlib.data.storage.file_storage import FilePITStorage from qlib.tests.data import GetData sys.path.append(str(Path(__file__).resolve().parent.parent.joinpath("scripts"))) @@ -70,11 +73,84 @@ def setUp(self): qlib.init(provider_uri=provider_uri) def to_str(self, obj): - return "".join(str(obj).split()) + return "\n".join(str(obj).split()) def check_same(self, a, b): self.assertEqual(self.to_str(a), self.to_str(b)) + def test_storage_read(self): + s = FilePITStorage("sh600519", "roewa_q") + np_data = s.np_data(1) + self.assertEqual(np_data.shape, (1,)) + data = s.data + self.check_same( + data.head(), + """ + date period value _next + 0 20070428 200701 0.090219 4294967295 + 1 20070817 200702 0.139330 4294967295 + 2 20071023 200703 0.245863 4294967295 + 3 20080301 200704 0.347900 80 + 4 20080313 200704 0.395989 4294967295 + """, + ) + + def test_storage_write(self): + base = FilePITStorage("sh600519", "roewa_q") + s = FilePITStorage("sh600519", "roewa2_q") + + shutil.copy(base.uri, s.uri) + s.write( + np.array([(20070917, 200703, 0.239330, 0)], dtype=s.raw_dtype), + 1, + ) + data = s.data + self.check_same( + data.head(), + """ + date period value _next + 0 20070428 200701 0.090219 4294967295 + 1 20070917 200703 0.239330 0 + 2 20071023 200703 0.245863 4294967295 + 3 20080301 200704 0.347900 80 + 4 20080313 200704 0.395989 4294967295 + """, + ) + + def test_storage_slice(self): + s = FilePITStorage("sh600519", "roewa_q") + data = s[1:4] + self.check_same( + data, + """ + date period value _next + 1 20070817 200702 0.139330 4294967295 + 2 20071023 200703 0.245863 4294967295 + 3 20080301 200704 0.347900 80 + """, + ) + + def test_storage_update(self): + base = FilePITStorage("sh600519", "roewa_q") + s = FilePITStorage("sh600519", "roewa3_q") + + shutil.copy(base.uri, s.uri) + s.update( + np.array([(20070917, 200703, 0.111111, 0), (20100314, 200703, 0.111111, 0)], dtype=s.raw_dtype), + ) + data = s.data + self.check_same( + data.head(), + """ + date period value _next + 0 20070428 200701 0.090219 4294967295 + 1 20070817 200702 0.139330 4294967295 + 2 20070917 200703 0.111111 0 + 3 20100314 200703 0.111111 0 + 4 20100402 200904 0.335461 4294967295 + """, + ) + def test_query(self): instruments = ["sh600519"] fields = ["P($$roewa_q)", "P($$yoyni_q)"] @@ -107,7 +183,13 @@ def test_query(self): def test_no_exist_data(self): fields = ["P($$roewa_q)", "P($$yoyni_q)", "$close"] - data = D.features(["sh600519", "sh601988"], fields, start_time="2019-01-01", end_time="2019-07-19", freq="day") + data = D.features( + ["sh600519", "sh601988"], + fields, + start_time="2019-01-01", + end_time="2019-07-19", + freq="day", + ) data["$close"] = 1 # in case of different dataset gives different values expect = """ P($$roewa_q) P($$yoyni_q) $close