Skip to content

Commit

Permalink
Optimize RSI indicator with Numba JIT compilation
Browse files Browse the repository at this point in the history
- Implement Numba-accelerated RSI calculation function
- Replace vectorized implementation with loop-based Numba implementation
- Improve computational efficiency for Relative Strength Index calculation
- Maintain consistent function interface and return types
  • Loading branch information
saleh-mir committed Feb 13, 2025
1 parent f1e9191 commit 6285398
Showing 1 changed file with 53 additions and 67 deletions.
120 changes: 53 additions & 67 deletions jesse/indicators/rsi.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,61 @@
import numpy as np
from typing import Union
from numba import njit

from jesse.helpers import get_candle_source, slice_candles


@njit
def _rsi(p: np.ndarray, period: int) -> np.ndarray:
"""
Compute the Relative Strength Index using a loop and Wilder's smoothing.
"""
n = len(p)
rsi_arr = np.full(n, np.nan)
if n < period + 1:
return rsi_arr
# Calculate differences between consecutive prices.
diff = np.empty(n - 1)
for i in range(n - 1):
diff[i] = p[i+1] - p[i]

# Compute initial average gain and loss over the first 'period' differences.
sum_gain = 0.0
sum_loss = 0.0
for i in range(period):
change = diff[i]
if change > 0:
sum_gain += change
else:
sum_loss += -change
avg_gain = sum_gain / period
avg_loss = sum_loss / period

# Compute first RSI value at index 'period'
if avg_loss == 0:
rsi_arr[period] = 100.0
else:
rs = avg_gain / avg_loss
rsi_arr[period] = 100 - (100 / (1 + rs))

# Recursively update average gain and loss and compute subsequent RSI values.
for i in range(period, n - 1):
change = diff[i]
gain = change if change > 0 else 0.0
loss = -change if change < 0 else 0.0
avg_gain = (avg_gain * (period - 1) + gain) / period
avg_loss = (avg_loss * (period - 1) + loss) / period
if avg_loss == 0:
rsi_arr[i+1] = 100.0
else:
rs = avg_gain / avg_loss
rsi_arr[i+1] = 100 - (100 / (1 + rs))
return rsi_arr


def rsi(candles: np.ndarray, period: int = 14, source_type: str = "close", sequential: bool = False) -> Union[float, np.ndarray]:
"""
RSI - Relative Strength Index
RSI - Relative Strength Index using Numba for optimization
:param candles: np.ndarray
:param period: int - default: 14
Expand All @@ -20,70 +69,7 @@ def rsi(candles: np.ndarray, period: int = 14, source_type: str = "close", seque
else:
candles = slice_candles(candles, sequential)
source = get_candle_source(candles, source_type=source_type)

# Convert source to a numpy array of floats

p = np.asarray(source, dtype=float)
n = len(p)

# Not enough data to compute RSI if we don't have at least period+1 price points
if n < period + 1:
return np.nan if not sequential else np.full(n, np.nan)

# Compute price differences
delta = np.diff(p)
gains = np.where(delta > 0, delta, 0)
losses = np.where(delta < 0, -delta, 0)

# Initialize average gains and losses arrays (length equal to len(gains))
avg_gain = np.zeros_like(gains)
avg_loss = np.zeros_like(losses)

# Vectorized computation of average gains and losses using matrix operations to mimic Wilder's smoothing method
alpha = 1/period
# Compute the initial simple averages
A0_gain = np.mean(gains[:period])
A0_loss = np.mean(losses[:period])
# The number of smoothed values to compute (for gains and losses) equals the length of gains from index (period-1) to end
# Since gains has length (n-1), we want L = (n-1) - (period-1) = n - period
L = len(gains) - (period - 1)

# Vectorized smoothing for avg_gain:
# x_gain: gains from index 'period' onward, length = (n-1) - period = L - 1
x_gain = gains[period:]
if L > 1:
t_values = np.arange(1, L) # t=1,...,L-1
# Build lower triangular matrix of shape (L-1, L-1) with elements (1-alpha)^(t-1-k)
M_gain = np.tril((1 - alpha) ** (np.subtract.outer(np.arange(L-1), np.arange(L-1))))
# weighted sum: for each t from 1 to L-1, sum_{k=0}^{t-1} (1-alpha)^(t-1-k)*x_gain[k]
weighted_sum_gain = M_gain.dot(x_gain)
A_gain = np.empty(L)
A_gain[0] = A0_gain
A_gain[1:] = A0_gain * ((1 - alpha) ** t_values) + alpha * weighted_sum_gain
else:
A_gain = np.array([A0_gain])
avg_gain[period - 1:] = A_gain

# Vectorized smoothing for avg_loss:
x_loss = losses[period:]
if L > 1:
t_values = np.arange(1, L)
M_loss = np.tril((1 - alpha) ** (np.subtract.outer(np.arange(L-1), np.arange(L-1))))
weighted_sum_loss = M_loss.dot(x_loss)
A_loss = np.empty(L)
A_loss[0] = A0_loss
A_loss[1:] = A0_loss * ((1 - alpha) ** t_values) + alpha * weighted_sum_loss
else:
A_loss = np.array([A0_loss])
avg_loss[period - 1:] = A_loss

# Prepare RSI result array of same length as price array, fill initial values with NaN
rsi_values = np.full(n, np.nan)

# Vectorized computation of RSI for indices period to end
with np.errstate(divide='ignore', invalid='ignore'):
rs = avg_gain[period - 1:] / avg_loss[period - 1:]
rsi_comp = 100 - 100 / (1 + rs)
rsi_comp = np.where(avg_loss[period - 1:] == 0, 100.0, rsi_comp)
rsi_values[period:] = rsi_comp

return rsi_values if sequential else rsi_values[-1]
result = _rsi(p, period)
return result if sequential else result[-1]

0 comments on commit 6285398

Please sign in to comment.