Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvements for WMA, FWMA and Linreg #776

Open
wants to merge 1 commit into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions pandas_ta/overlap/fwma.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
# -*- coding: utf-8 -*-
import numpy as np
from pandas import Series
from pandas_ta._typing import DictLike, Int
from pandas_ta.utils import (
fibonacci,
v_ascending,
v_offset,
v_pos_default,
v_series,
weights
v_series
)


def fwma(
close: Series, length: Int = None, asc: bool = None,
offset: Int = None, **kwargs: DictLike
Expand Down Expand Up @@ -47,21 +46,26 @@ def fwma(

# Calculate
fibs = fibonacci(n=length, weighted=True)
fwma = close.rolling(length, min_periods=length) \
.apply(weights(fibs), raw=True)
# Reverse the weights
fib_weights = fibs[::-1]
# Total weight for normalization
total_weight = fibs.sum()
fwma_values = np.convolve(close, fib_weights, 'valid') / total_weight
_fwma = np.concatenate((np.full(length-1, np.nan), fwma_values))
_fwma = Series(_fwma, index=close.index)

# Offset
if offset != 0:
fwma = fwma.shift(offset)
_fwma = _fwma.shift(offset)

# Fill
if "fillna" in kwargs:
fwma.fillna(kwargs["fillna"], inplace=True)
_fwma.fillna(kwargs["fillna"], inplace=True)
if "fill_method" in kwargs:
fwma.fillna(method=kwargs["fill_method"], inplace=True)
_fwma.fillna(method=kwargs["fill_method"], inplace=True)

# Name and Category
fwma.name = f"FWMA_{length}"
fwma.category = "overlap"
_fwma.name = f"FWMA_{length}"
_fwma.category = "overlap"

return fwma
return _fwma
121 changes: 52 additions & 69 deletions pandas_ta/overlap/linreg.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
# -*- coding: utf-8 -*-
from sys import float_info as sflt
from numpy import arctan, nan, pi, zeros_like
from numpy.version import version as np_version
import numpy as np
import pandas as pd
from pandas import Series
from pandas_ta._typing import DictLike, Int
from pandas_ta.maps import Imports
from pandas_ta.utils import (
strided_window,
v_offset,
v_pos_default,
v_series,
v_talib,
zero
v_talib
)


def linreg(
close: Series, length: Int = None, talib: bool = None,
offset: Int = None, **kwargs: DictLike
Expand Down Expand Up @@ -69,94 +65,81 @@ def linreg(
slope = kwargs.pop("slope", False)
tsf = kwargs.pop("tsf", False)

# Calculate
np_close = close.values

if Imports["talib"] and mode_tal and not r:
from talib import LINEARREG, LINEARREG_ANGLE, LINEARREG_INTERCEPT, LINEARREG_SLOPE, TSF
if tsf:
linreg = TSF(close, timeperiod=length)
_linreg = TSF(close, timeperiod=length)
elif slope:
linreg = LINEARREG_SLOPE(close, timeperiod=length)
_linreg = LINEARREG_SLOPE(close, timeperiod=length)
elif intercept:
linreg = LINEARREG_INTERCEPT(close, timeperiod=length)
_linreg = LINEARREG_INTERCEPT(close, timeperiod=length)
elif angle:
linreg = LINEARREG_ANGLE(close, timeperiod=length)
_linreg = LINEARREG_ANGLE(close, timeperiod=length)
else:
linreg = LINEARREG(close, timeperiod=length)
_linreg = LINEARREG(close, timeperiod=length)
else:
linreg_ = zeros_like(np_close)
# [1, 2, ..., n] from 1 to n keeps Sum(xy) low
x = range(1, length + 1)
np_close = close.to_numpy()
x = np.arange(1, length + 1)
x_sum = 0.5 * length * (length + 1)
x2_sum = x_sum * (2 * length + 1) / 3
divisor = length * x2_sum - x_sum * x_sum

# Needs to be reworked outside the method
def linear_regression(series):
y_sum = series.sum()
xy_sum = (x * series).sum()

m = (length * xy_sum - x_sum * y_sum) / divisor
if slope:
return m
b = (y_sum * x2_sum - x_sum * xy_sum) / divisor
if intercept:
return b

if angle:
theta = arctan(m)
if degrees:
theta *= 180 / pi
return theta

if r:
y2_sum = (series * series).sum()
rn = length * xy_sum - x_sum * y_sum
rd = (divisor * (length * y2_sum - y_sum * y_sum)) ** 0.5
if zero(rd) == 0:
rd = sflt.epsilon
return rn / rd

return m * length + b if not tsf else m * (length - 1) + b

if np_version >= "1.20.0":
divisor = length * x2_sum - x_sum ** 2

if np.__version__ >= "1.20.0":
from numpy.lib.stride_tricks import sliding_window_view
linreg_ = [
linear_regression(_) for _ in sliding_window_view(
np_close, length)
]
windows = sliding_window_view(np_close, window_shape=length)
else:
windows = np.array([np_close[i:i+length] for i in range(len(np_close)-length+1)])

y_sums = windows.sum(axis=1)
xy_sums = (x * windows).sum(axis=1)
m_values = (length * xy_sums - x_sum * y_sums) / divisor
b_values = (y_sums * x2_sum - x_sum * xy_sums) / divisor

if slope:
result = m_values
elif intercept:
result = b_values
elif angle:
theta = np.arctan(m_values)
if degrees:
theta *= 180 / np.pi
result = theta
elif r:
y2_sums = (windows ** 2).sum(axis=1)
rn = length * xy_sums - x_sum * y_sums
rd = np.sqrt(divisor * (length * y2_sums - y_sums ** 2))
rd[rd == 0] = np.finfo(float).eps # Prevent division by zero
result = rn / rd
else:
linreg_ = [
linear_regression(_) for _ in strided_window(
np_close, length)
]
result = m_values * length + b_values

# Match the length of the input series
_linreg = np.concatenate((np.full(length - 1, np.nan), result))

linreg = Series([nan] * (length - 1) + linreg_, index=close.index)
_linreg = pd.Series(_linreg, index=close.index)

# Offset
if offset != 0:
linreg = linreg.shift(offset)
_linreg = _linreg.shift(offset)

# Fill
if "fillna" in kwargs:
linreg.fillna(kwargs["fillna"], inplace=True)
_linreg.fillna(kwargs["fillna"], inplace=True)
if "fill_method" in kwargs:
linreg.fillna(method=kwargs["fill_method"], inplace=True)
_linreg.fillna(method=kwargs["fill_method"], inplace=True)

# Name and Category
linreg.name = f"LINREG"
_linreg.name = "LINREG"
if slope:
linreg.name += "m"
_linreg.name += "m"
if intercept:
linreg.name += "b"
_linreg.name += "b"
if angle:
linreg.name += "a"
_linreg.name += "a"
if r:
linreg.name += "r"
_linreg.name += "r"

linreg.name += f"_{length}"
linreg.category = "overlap"
_linreg.name += f"_{length}"
_linreg.category = "overlap"

return linreg
return _linreg
30 changes: 12 additions & 18 deletions pandas_ta/overlap/wma.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from numpy import arange, dot
import numpy as np
from pandas import Series
from pandas_ta._typing import DictLike, Int
from pandas_ta.maps import Imports
Expand Down Expand Up @@ -54,32 +54,26 @@ def wma(
# Calculate
if Imports["talib"] and mode_tal:
from talib import WMA
wma = WMA(close, length)
_wma = WMA(close, length)
else:
total_weight = 0.5 * length * (length + 1)
weights_ = Series(arange(1, length + 1))
weights = weights_ if asc else weights_[::-1]

def linear(w):
def _compute(x):
return dot(x, w) / total_weight
return _compute

close_ = close.rolling(length, min_periods=length)
wma = close_.apply(linear(weights), raw=True)
weights = np.arange(1, length + 1) if asc else np.arange(length, 0, -1)
_wma = np.convolve(close, weights[::-1], 'valid') / total_weight
_wma = np.concatenate((np.full(length-1, np.nan), _wma))
_wma = Series(_wma, index=close.index)

# Offset
if offset != 0:
wma = wma.shift(offset)
_wma = _wma.shift(offset)

# Fill
if "fillna" in kwargs:
wma.fillna(kwargs["fillna"], inplace=True)
_wma.fillna(kwargs["fillna"], inplace=True)
if "fill_method" in kwargs:
wma.fillna(method=kwargs["fill_method"], inplace=True)
_wma.fillna(method=kwargs["fill_method"], inplace=True)

# Name and Category
wma.name = f"WMA_{length}"
wma.category = "overlap"
_wma.name = f"WMA_{length}"
_wma.category = "overlap"

return wma
return _wma
Loading