Skip to content

Commit

Permalink
Optimize KAMA indicator with Numba JIT compilation
Browse files Browse the repository at this point in the history
- Implement Numba-accelerated KAMA (Kaufman Adaptive Moving Average) calculation function
- Replace vectorized implementation with loop-based Numba implementation
- Improve computational efficiency for KAMA indicator
- Maintain consistent function interface and return types
  • Loading branch information
saleh-mir committed Feb 13, 2025
1 parent 19b5306 commit 6d44f2d
Showing 1 changed file with 43 additions and 46 deletions.
89 changes: 43 additions & 46 deletions jesse/indicators/kama.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,49 @@
from typing import Union

import numpy as np

from numba import njit
from jesse.helpers import get_candle_source, slice_candles


def kama(candles: np.ndarray, period: int = 14, fast_length: int = 2, slow_length: int = 30, source_type: str = "close", sequential: bool = False) -> Union[float, np.ndarray]:
@njit
def _calculate_kama(src: np.ndarray, period: int, fast_length: int, slow_length: int) -> np.ndarray:
"""
Core KAMA calculation using Numba
"""
KAMA - Kaufman Adaptive Moving Average
n = len(src)
result = np.empty(n, dtype=np.float64)
result[:period] = src[:period] # First 'period' values are same as source

if n <= period:
return result

# Calculate the efficiency ratio multiplier
fast_alpha = 2.0 / (fast_length + 1)
slow_alpha = 2.0 / (slow_length + 1)
alpha_diff = fast_alpha - slow_alpha

# Start the calculation after the initial period
for i in range(period, n):
# Calculate Efficiency Ratio
change = abs(src[i] - src[i - period])
volatility = 0.0
for j in range(i - period + 1, i + 1):
volatility += abs(src[j] - src[j - 1])

er = change / volatility if volatility != 0 else 0.0

# Calculate smoothing constant
sc = (er * alpha_diff + slow_alpha) ** 2

# Calculate KAMA
result[i] = result[i - 1] + sc * (src[i] - result[i - 1])

return result


def kama(candles: np.ndarray, period: int = 14, fast_length: int = 2, slow_length: int = 30,
source_type: str = "close", sequential: bool = False) -> Union[float, np.ndarray]:
"""
KAMA - Kaufman Adaptive Moving Average using Numba for optimization
:param candles: np.ndarray
:param period: int - default: 14, lookback period for the calculation
Expand All @@ -24,47 +60,8 @@ def kama(candles: np.ndarray, period: int = 14, fast_length: int = 2, slow_lengt
candles = slice_candles(candles, sequential)
src = get_candle_source(candles, source_type=source_type)

src = np.asarray(src)
n = len(src)
src = np.asarray(src, dtype=np.float64)

result = _calculate_kama(src, period, fast_length, slow_length)

# If not enough data, return the source
if n <= period:
return src if sequential else src[-1]

# m: number of points computed with the recursive formula
m = n - period

# Vectorized momentum and volatility
momentum = np.abs(src[period:] - src[:-period]) # shape (m,)
diff_array = np.abs(np.diff(src)) # shape (n-1,)
volatility = np.convolve(diff_array, np.ones(period, dtype=diff_array.dtype), mode='valid') # shape (m,)
er = np.where(volatility != 0, momentum / volatility, 0.0) # shape (m,)

fast_alpha = 2 / (fast_length + 1)
slow_alpha = 2 / (slow_length + 1)
A_vec = (er * (fast_alpha - slow_alpha) + slow_alpha) ** 2 # shape (m,)

# Compute the matrix-based solution of the recursion:
# For indices i from 0 to m-1 (corresponding to overall index period+i), we have:
# kama[period+i] = sum_{j=0}^{i} [ weight_matrix[i, j] * (A_vec[j] * src[period+j]) ] + base_term[i]
# where weight_matrix is a lower-triangular matrix computed using cumulative logs to represent the products of (1 - A_vec).
B = 1 - A_vec
eps = 1e-10
c = np.concatenate(([0.0], np.cumsum(np.log(B + eps)))) # shape (m+1,)
c_row = c[1:].reshape(-1, 1) # shape (m, 1)
c_col = c[1:].reshape(1, -1) # shape (1, m)
weight_matrix = np.exp(c_row - c_col) # shape (m, m)
weight_matrix = np.tril(weight_matrix) # keep only lower-triangular elements

X = A_vec * src[period:] # shape (m,)
weighted_sum = np.sum(weight_matrix * X.reshape(1, m), axis=1) # shape (m,)

base_term = np.exp(c[1:]) * src[period-1] # shape (m,)

kama_computed = weighted_sum + base_term # shape (m,)

result = np.empty(n, dtype=float)
result[:period] = src[:period]
result[period:] = kama_computed

return result if sequential else result[-1]

0 comments on commit 6d44f2d

Please sign in to comment.