From 734ba953d10f2ced55a197d1bd756f60ee5aaa75 Mon Sep 17 00:00:00 2001 From: Velo <84917738+Velocities@users.noreply.github.com> Date: Sun, 17 Oct 2021 15:53:23 -0400 Subject: [PATCH] Shorten and simplify Note: Some of the changes I have made to this file involve the implementation of the walrus operator --> := If you do not intend on running this code in python versions 3.8 or above, you can remove the walrus operator changes that I have suggested and just go ahead with the rest of the changes. This will do the same thing that your code already does, just in a shorter and more easy-to-understand fashion. --- stockstats.py | 168 ++++++++++++++++++++++++-------------------------- 1 file changed, 80 insertions(+), 88 deletions(-) diff --git a/stockstats.py b/stockstats.py index 8aa07c2..707b847 100644 --- a/stockstats.py +++ b/stockstats.py @@ -61,17 +61,13 @@ class StockDataFrame(pd.DataFrame): ADX_EMA = 6 ADXR_EMA = 6 - CR_MA1 = 5 - CR_MA2 = 10 - CR_MA3 = 20 + CR_MA1, CR_MA2, CR_MA3 = 5, 10, 20 TRIX_EMA_WINDOW = 12 TEMA_EMA_WINDOW = 5 - ATR_SMMA = 14 - - MFI = 14 + ATR_SMMA, MFI = 14, 14 # End of options @@ -102,7 +98,7 @@ def _get_p(df, column, shifts): :param shifts: the range to consider :return: """ - column_name = '{}_{}_p'.format(column, shifts) + column_name = f'{column}_{shifts}_p' # initialize the column if not df.get(column) shifts = StockDataFrame.to_ints(shifts)[::-1] @@ -129,20 +125,17 @@ def to_ints(cls, shifts): @classmethod def to_int(cls, shifts): - numbers = cls.to_ints(shifts) - if len(numbers) != 1: + if len(numbers := cls.to_ints(shifts)) != 1: raise IndexError("only accept 1 number.") return numbers[0] @staticmethod def to_floats(shifts): - floats = map(float, shifts.split(',')) - return sorted(list(set(floats))) + return sorted(list(set(floats := map(float, shifts.split(','))))) @classmethod def to_float(cls, shifts): - floats = cls.to_floats(shifts) - if len(floats) != 1: + if len(floats := cls.to_floats(shifts)) != 1: raise IndexError('only accept 1 float.') return floats[0] @@ -185,7 +178,7 @@ def _get_r(cls, df, column, shifts): :return: None """ shift = cls.to_int(shifts) - rate_key = '{}_{}_r'.format(column, shift) + rate_key = f'{column}_{shift}_r' df[rate_key] = df[column].pct_change(periods=-shift) * 100 @classmethod @@ -198,7 +191,7 @@ def _get_s(cls, df, column, shifts): :return: None """ shift = cls.to_int(shifts) - shifted_key = "{}_{}_s".format(column, shift) + shifted_key = f"{column}_{shift}_s" df[shifted_key] = df[column].shift(-shift) cp = df[shifted_key].copy() StockDataFrame.set_nan(cp, shift) @@ -218,7 +211,7 @@ def _get_c(cls, df, column, shifts): :param shifts: range to count, only to previous :return: result series """ - column_name = '{}_{}_c'.format(column, shifts) + column_name = f'{column}_{shifts}_c' shifts = cls.get_only_one_positive_int(shifts) df[column_name] = df[column].rolling( center=False, @@ -236,7 +229,7 @@ def _get_fc(cls, df, column, shifts): :param shifts: range to count, only to future :return: result series """ - column_name = '{}_{}_fc'.format(column, shifts) + column_name = f'{column}_{shifts}_fc' shift = cls.get_only_one_positive_int(shifts) reversed_series = df[column][::-1] reversed_counts = reversed_series.rolling( @@ -249,7 +242,7 @@ def _get_fc(cls, df, column, shifts): @classmethod def _get_op(cls, df, column, threshold, op): - column_name = '{}_{}_{}'.format(column, threshold, op) + column_name = f'{column}_{threshold}_{op}' threshold = cls.to_float(threshold) f = getattr(operator, op) df[column_name] = f(df[column], threshold) @@ -271,20 +264,19 @@ def _init_shifted_columns(cls, column, df, shifts): # initialize the column if not df.get(column) shifts = cls.to_ints(shifts) - shift_column_names = ['{}_{}_s'.format(column, shift) for shift in + return (shift_column_names := [f'{column}_{shift}_s' for shift in shifts] - [df.get(name) for name in shift_column_names] - return shift_column_names + [df.get(name) for name in shift_column_names]) @classmethod def _get_max(cls, df, column, shifts): - column_name = '{}_{}_max'.format(column, shifts) + column_name = f'{column}_{shifts}_max' shift_column_names = cls._init_shifted_columns(column, df, shifts) df[column_name] = np.max(df[shift_column_names], axis=1) @classmethod def _get_min(cls, df, column, shifts): - column_name = '{}_{}_min'.format(column, shifts) + column_name = f'{column}_{shifts}_min' shift_column_names = cls._init_shifted_columns(column, df, shifts) df[column_name] = np.min(df[shift_column_names], axis=1) @@ -299,7 +291,7 @@ def _get_rsv(df, n_days): :return: None """ n_days = int(n_days) - column_name = 'rsv_{}'.format(n_days) + column_name = f'rsv_{n_days}' low_min = df['low'].rolling( min_periods=1, window=n_days, center=False).min() high_max = df['high'].rolling( @@ -341,13 +333,13 @@ def _get_rsi(cls, df, n_days): df['closepm'] = (d + d.abs()) / 2 df['closenm'] = (-d + d.abs()) / 2 - closepm_smma_column = 'closepm_{}_smma'.format(n_days) - closenm_smma_column = 'closenm_{}_smma'.format(n_days) + closepm_smma_column = f'closepm_{n_days}_smma' + closenm_smma_column = f'closenm_{n_days}_smma' p_ema = df[closepm_smma_column] n_ema = df[closenm_smma_column] - rs_column_name = 'rs_{}'.format(n_days) - rsi_column_name = 'rsi_{}'.format(n_days) + rs_column_name = f'rs_{n_days}' + rsi_column_name = f'rsi_{n_days}' df[rs_column_name] = rs = p_ema / n_ema df[rsi_column_name] = 100 - 100 / (1.0 + rs) @@ -377,7 +369,7 @@ def _get_smma(cls, df, column, windows): :return: result series """ window = cls.get_only_one_positive_int(windows) - column_name = '{}_{}_smma'.format(column, window) + column_name = f'{column}_{window}_smma' smma = df[column].ewm( ignore_na=False, alpha=1.0 / window, min_periods=0, adjust=True).mean() @@ -389,7 +381,7 @@ def _get_trix(cls, df, column=None, windows=None): if column is None and windows is None: column_name = 'trix' else: - column_name = '{}_{}_trix'.format(column, windows) + column_name = f'{column}_{windows}_trix' if column is None: column = 'close' @@ -397,10 +389,10 @@ def _get_trix(cls, df, column=None, windows=None): windows = cls.TRIX_EMA_WINDOW window = cls.get_only_one_positive_int(windows) - single = '{c}_{w}_ema'.format(c=column, w=window) - double = '{c}_{w}_ema_{w}_ema'.format(c=column, w=window) - triple = '{c}_{w}_ema_{w}_ema_{w}_ema'.format(c=column, w=window) - prev_triple = '{}_-1_s'.format(triple) + single = f'{column}_{window}_ema' + double = f'{column}_{window}_ema_{window}_ema' + triple = f'{column}_{window}_ema_{window}_ema_{window}_ema' + prev_triple = f'{triple}_-1_s' df[column_name] = ((df[triple] - df[prev_triple]) * 100 / df[prev_triple]) @@ -421,7 +413,7 @@ def _get_tema(cls, df, column=None, windows=None): if column is None and windows is None: column_name = 'tema' else: - column_name = '{}_{}_tema'.format(column, windows) + column_name = f'{column}_{windows}_tema' if column is None: column = 'close' @@ -429,9 +421,9 @@ def _get_tema(cls, df, column=None, windows=None): windows = cls.TEMA_EMA_WINDOW window = cls.get_only_one_positive_int(windows) - single = '{c}_{w}_ema'.format(c=column, w=window) - double = '{c}_{w}_ema_{w}_ema'.format(c=column, w=window) - triple = '{c}_{w}_ema_{w}_ema_{w}_ema'.format(c=column, w=window) + single = f'{column}_{window}_ema' + double = f'{column}_{window}_ema_{window}_ema' + triple = f'{column}_{window}_ema_{window}_ema_{window}_ema' df[column_name] = 3 * df[single] - 3 * df[double] + df[triple] cls._drop_columns(df, [single, double, triple]) @@ -456,7 +448,7 @@ def _get_wr(cls, df, n_days): hn = df['high'].rolling(min_periods=1, window=n_days, center=False).max() - column_name = 'wr_{}'.format(n_days) + column_name = f'wr_{n_days}' df[column_name] = (hn - df['close']) / (hn - ln) * 100 @classmethod @@ -476,10 +468,10 @@ def _get_cci(cls, df, n_days=None): column_name = 'cci' else: n_days = int(n_days) - column_name = 'cci_{}'.format(n_days) + column_name = f'cci_{n_days}' tp = df['middle'] - tp_sma = df['middle_{}_sma'.format(n_days)] + tp_sma = df[f'middle_{n_days}_sma'] md = df['middle'].rolling( min_periods=1, center=False, window=n_days).apply( lambda x: np.fabs(x - x.mean()).mean()) @@ -518,8 +510,8 @@ def _get_atr(cls, df, window=None): column_name = 'atr' else: window = int(window) - column_name = 'atr_{}'.format(window) - tr_smma_column = 'tr_{}_smma'.format(window) + column_name = f'atr_{window}' + tr_smma_column = f'tr_{window}_smma' df[column_name] = df[tr_smma_column] cls._drop_columns(df, [tr_smma_column]) @@ -549,8 +541,8 @@ def _get_dmi(cls, df): df['pdi'] = cls._get_pdi(df, cls.PDI_SMMA) df['mdi'] = cls._get_mdi(df, cls.MDI_SMMA) df['dx'] = cls._get_dx(df, cls.DX_SMMA) - df['adx'] = df['dx_{}_ema'.format(cls.ADX_EMA)] - df['adxr'] = df['adx_{}_ema'.format(cls.ADXR_EMA)] + df['adx'] = df[f'dx_{cls.ADX_EMA}_ema'] + df['adxr'] = df[f'adx_{cls.ADXR_EMA}_ema'] @classmethod def _get_um_dm(cls, df): @@ -574,11 +566,11 @@ def _get_pdm(cls, df, windows): :return: """ window = cls.get_only_one_positive_int(windows) - column_name = 'pdm_{}'.format(window) + column_name = f'pdm_{window}' um, dm = df['um'], df['dm'] df['pdm'] = np.where(um > dm, um, 0) if window > 1: - pdm = df['pdm_{}_ema'.format(window)] + pdm = df[f'pdm_{window}_ema'] else: pdm = df['pdm'] df[column_name] = pdm @@ -590,7 +582,7 @@ def _get_vr(cls, df, windows=None): column_name = 'vr' else: window = cls.get_only_one_positive_int(windows) - column_name = 'vr_{}'.format(window) + column_name = f'vr_{window}' df['av'] = np.where(df['change'] > 0, df['volume'], 0) avs = df['av'].rolling( @@ -617,11 +609,11 @@ def _get_mdm(cls, df, windows): :return: """ window = cls.get_only_one_positive_int(windows) - column_name = 'mdm_{}'.format(window) + column_name = f'mdm_{window}' um, dm = df['um'], df['dm'] df['mdm'] = np.where(dm > um, dm, 0) if window > 1: - mdm = df['mdm_{}_ema'.format(window)] + mdm = df[f'mdm_{window}_ema'] else: mdm = df['mdm'] df[column_name] = mdm @@ -635,27 +627,27 @@ def _get_pdi(cls, df, windows): :return: """ window = cls.get_only_one_positive_int(windows) - pdm_column = 'pdm_{}'.format(window) - tr_column = 'atr_{}'.format(window) - pdi_column = 'pdi_{}'.format(window) + pdm_column = f'pdm_{window}' + tr_column = f'atr_{window}' + pdi_column = f'pdi_{window}' df[pdi_column] = df[pdm_column] / df[tr_column] * 100 return df[pdi_column] @classmethod def _get_mdi(cls, df, windows): window = cls.get_only_one_positive_int(windows) - mdm_column = 'mdm_{}'.format(window) - tr_column = 'atr_{}'.format(window) - mdi_column = 'mdi_{}'.format(window) + mdm_column = f'mdm_{window}' + tr_column = f'atr_{window}' + mdi_column = f'mdi_{window}' df[mdi_column] = df[mdm_column] / df[tr_column] * 100 return df[mdi_column] @classmethod def _get_dx(cls, df, windows): window = cls.get_only_one_positive_int(windows) - dx_column = 'dx_{}'.format(window) - mdi_column = 'mdi_{}'.format(window) - pdi_column = 'pdi_{}'.format(window) + dx_column = f'dx_{window}' + mdi_column = f'mdi_{window}' + pdi_column = f'pdi_{window}' mdi, pdi = df[mdi_column], df[pdi_column] df[dx_column] = abs(pdi - mdi) / (pdi + mdi) * 100 return df[dx_column] @@ -667,9 +659,9 @@ def _get_kdj_default(cls, df): :param df: k line data frame :return: None """ - df['kdjk'] = df['kdjk_{}'.format(cls.KDJ_WINDOW)] - df['kdjd'] = df['kdjd_{}'.format(cls.KDJ_WINDOW)] - df['kdjj'] = df['kdjj_{}'.format(cls.KDJ_WINDOW)] + df['kdjk'] = df[f'kdjk_{cls.KDJ_WINDOW}'] + df['kdjd'] = df[f'kdjd_{cls.KDJ_WINDOW}'] + df['kdjj'] = df[f'kdjj_{cls.KDJ_WINDOW}'] @classmethod def _get_cr(cls, df, window=26): @@ -692,14 +684,14 @@ def _shifted_cr_sma(cls, df, window): name = cls._temp_name() df[name] = df['cr'].rolling(min_periods=1, window=window, center=False).mean() - to_shift = '{}_-{}_s'.format(name, int(window / 2.5 + 1)) + to_shift = f'{name}_-{int(window / 2.5 + 1)}_s' ret = df[to_shift] del df[name], df[to_shift] return ret @classmethod def _temp_name(cls): - return 'sdf{}'.format(random.randint(0, 10e8)) + return f'sdf{random.randint(0, 10e8)}' @classmethod def _get_middle(cls, df): @@ -724,8 +716,8 @@ def _get_kdjk(cls, df, n_days): :param n_days: calculation range :return: None """ - rsv_column = 'rsv_{}'.format(n_days) - k_column = 'kdjk_{}'.format(n_days) + rsv_column = f'rsv_{n_days}' + k_column = f'kdjk_{n_days}' df[k_column] = list(cls._calc_kd(df.get(rsv_column))) @classmethod @@ -738,8 +730,8 @@ def _get_kdjd(cls, df, n_days): :param n_days: calculation range :return: None """ - k_column = 'kdjk_{}'.format(n_days) - d_column = 'kdjd_{}'.format(n_days) + k_column = f'kdjk_{n_days}' + d_column = f'kdjd_{n_days}' df[d_column] = list(cls._calc_kd(df.get(k_column))) @staticmethod @@ -751,9 +743,9 @@ def _get_kdjj(df, n_days): :param n_days: calculation range :return: None """ - k_column = 'kdjk_{}'.format(n_days) - d_column = 'kdjd_{}'.format(n_days) - j_column = 'kdjj_{}'.format(n_days) + k_column = f'kdjk_{n_days}' + d_column = f'kdjd_{n_days}' + j_column = f'kdjj_{n_days}' df[j_column] = 3 * df[k_column] - 2 * df[d_column] @staticmethod @@ -763,8 +755,8 @@ def remove_random_nan(pd_obj): @staticmethod def _get_d(df, column, shifts): shift = StockDataFrame.to_int(shifts) - shift_column = '{}_{}_s'.format(column, shift) - column_name = '{}_{}_d'.format(column, shift) + shift_column = f'{column}_{shift}_s' + column_name = f'{column}_{shift}_d' df[column_name] = df[column] - df[shift_column] cp = df[column_name].copy() StockDataFrame.set_nan(cp, shift) @@ -780,7 +772,7 @@ def _get_sma(cls, df, column, windows): :return: None """ window = cls.get_only_one_positive_int(windows) - column_name = '{}_{}_sma'.format(column, window) + column_name = f'{column}_{window}_sma' df[column_name] = df[column].rolling(min_periods=1, window=window, center=False).mean() @@ -794,7 +786,7 @@ def _get_ema(cls, df, column, windows): :return: None """ window = cls.get_only_one_positive_int(windows) - column_name = '{}_{}_ema'.format(column, window) + column_name = f'{column}_{window}_ema' if len(df[column]) > 0: df[column_name] = df[column].ewm( ignore_na=False, span=window, @@ -815,8 +807,8 @@ def _get_boll(cls, df): :param df: data :return: None """ - moving_avg = df['close_{}_sma'.format(cls.BOLL_PERIOD)] - moving_std = df['close_{}_mstd'.format(cls.BOLL_PERIOD)] + moving_avg = df[f'close_{cls.BOLL_PERIOD}_sma'] + moving_std = df[f'close_{cls.BOLL_PERIOD}_mstd'] df['boll'] = moving_avg moving_avg = list(map(np.float64, moving_avg)) moving_std = list(map(np.float64, moving_std)) @@ -840,9 +832,9 @@ def _get_macd(cls, df): :param df: data :return: None """ - ema_short = 'close_{}_ema'.format(cls.MACD_EMA_SHORT) - ema_long = 'close_{}_ema'.format(cls.MACD_EMA_LONG) - ema_signal = 'macd_{}_ema'.format(cls.MACD_EMA_SIGNAL) + ema_short = f'close_{cls.MACD_EMA_SHORT}_ema' + ema_long = f'close_{cls.MACD_EMA_LONG}_ema' + ema_signal = f'macd_{cls.MACD_EMA_SIGNAL}_ema' fast = df[ema_short] slow = df[ema_long] df['macd'] = fast - slow @@ -870,7 +862,7 @@ def _get_mstd(cls, df, column, windows): :return: None """ window = cls.get_only_one_positive_int(windows) - column_name = '{}_{}_mstd'.format(column, window) + column_name = f'{column}_{window}_mstd' df[column_name] = df[column].rolling(min_periods=1, window=window, center=False).std() @@ -884,7 +876,7 @@ def _get_mvar(cls, df, column, windows): :return: None """ window = cls.get_only_one_positive_int(windows) - column_name = '{}_{}_mvar'.format(column, window) + column_name = f'{column}_{window}_mvar' df[column_name] = df[column].rolling( min_periods=1, window=window, center=False).var() @@ -900,10 +892,10 @@ def _get_mfi(cls, df, n_days=None): n_days = cls.MFI column_name = 'mfi' else: - column_name = 'mfi_{}'.format(n_days) + column_name = f'mfi_{n_days}' n = int(n_days) - assert n > 0, "n_days '{}' could not be parsed " \ - "to a positive integer".format(n_days) + assert n > 0, f"n_days '{n_days}' could not be parsed " \ + "to a positive integer" df[column_name] = 0.5 if len(df) > n and "volume" in df.columns and (df["volume"] > 0).any(): typical_price = df[["low", "high", "close"]].sum(axis=1) / 3.0 @@ -1037,10 +1029,10 @@ def __init_not_exist_column(cls, df, key): # support all kinds of compare operators cls._get_op(df, c, r, t) else: - func_name = '_get_{}'.format(t) + func_name = f'_get_{t}' getattr(cls, func_name)(df, c, r) else: - func_name = '_get_{}'.format(c) + func_name = f'_get_{c}' getattr(cls, func_name)(df, r) @staticmethod @@ -1059,7 +1051,7 @@ def __getitem__(self, item): try: self.init_columns(self, item) except AttributeError: - log.exception('{} not found.'.format(item)) + log.exception(f'{item} not found.') result = self.retype( super(StockDataFrame, self).__getitem__(item)) return result