-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dense Output Interpolator for nbsolve_ivp #61
Comments
Hello @jewaniuk ! Thank you for your note, this is exactly the forum to bring up this issue as Dense Outputs have not been implemented for the numba-safe functions in CyRK yet. I think the first question I want to confirm is that you do indeed need the nbrk version of the solver and can't use the cysolver versions? If you need something that you call from a @njit
def run():
solution = nbsolve_ivp( ... ) Then the answer is yes. If the answer is no then we should talk more about how to use CySolver for your specific case as that is working today. Assuming that you do need a numba-safe version then it will need to be implemented. I'd love any help you could provide as I don't have a ton of time to work on this, but of course no pressure! Just as a quick intro (I can fill in more details on how to do the following in additional messages): There are two options on how to implement this:
|
Hi @jrenaud90! Thanks for getting back to me so quickly. I'm going to respond to your message, however, I should first note that I was in a bit of a rush when I wrote the OP, so I ended up working on it until I came up with a band-aid solution. In the end, I copied To answer your first question, I was not calling After reading the rest of your post, I completely understand why option 2 is preferable, and I agree it is likely the long-term solution. Unfortunately, I'm really not sure how much help I would be with option 2, again given my lack of experience. I think I could probably handle option 1 eventually, but if it will end up being replaced then I'm not sure if there is a point right now. That being said, if someone else is looking for this feature as well in the short-term, I would be willing to help them get option 1 going. Thanks again for this great project! Sorry I can't be more helpful. import numpy as np
from numba import njit, prange
from typing import Tuple, Optional
from collections import namedtuple
# placeholders
EMPTY_ARR = np.empty(0, dtype=np.float64)
EPS = np.finfo(np.float64).eps
EPS_10 = 10.0 * EPS
EPS_100 = 100.0 * EPS
INF = np.inf
# settings for the ode solver
SAFETY = 0.9 # multiply steps computed from asymptotic behaviour of errors by this
MIN_FACTOR = 0.2 # minimum allowed decrease in a step size
MAX_FACTOR = 10.0 # maximum allowed increase in a step size
ORDER = 5
ERROR_ESTIMATOR_ORDER = 4
ERROR_EXPO = 1.0 / (ERROR_ESTIMATOR_ORDER + 1.0)
N_STAGES = 6
N_STAGES_PLUS_1 = 7
RK45_C = np.array([0, 1 / 5, 3 / 10, 4 / 5, 8 / 9, 1], order='C', dtype=np.float64)
RK45_A = np.array([[0, 0, 0, 0, 0], [1 / 5, 0, 0, 0, 0], [3 / 40, 9 / 40, 0, 0, 0], [44 / 45, -56 / 15, 32 / 9, 0, 0], [19372 / 6561, -25360 / 2187, 64448 / 6561, -212 / 729, 0], [9017 / 3168, -355 / 33, 46732 / 5247, 49 / 176, -5103 / 18656]], order='C', dtype=np.float64)
RK45_B = np.array([35 / 384, 0, 500 / 1113, 125 / 192, -2187 / 6784, 11 / 84], order='C', dtype=np.float64)
RK45_E = np.array([-71 / 57600, 0, 71 / 16695, -71 / 1920, 17253 / 339200, -22 / 525, 1 / 40], order='C', dtype=np.float64)
RK45_P = np.array([[1, -8048581381 / 2820520608, 8663915743 / 2820520608, -12715105075 / 11282082432], [0, 0, 0, 0], [0, 131558114200 / 32700410799, -68118460800 / 10900136933, 87487479700 / 32700410799], [0, -1754552775 / 470086768, 14199869525 / 1410260304, -10690763975 / 1880347072], [0, 127303824393 / 49829197408, -318862633887 / 49829197408, 701980252875 / 199316789632], [0, -282668133 / 205662961, 2019193451 / 616988883, -1453857185 / 822651844], [0, 40617522 / 29380423, -110615467 / 29380423, 69997945 / 29380423]], order='C', dtype=np.float64)
LEN_C = len(RK45_C)
# container to store results
results_wrapper = namedtuple("results", ("success", "message", "status", "nfev", "t", "y", "t_events"))
@njit(parallel=True)
def dot(A,B,C):
m, n = A.shape
p = B.shape[1]
for i in prange(m):
for j in prange(p):
s = 0
for k in prange(n):
s += A[i,k]*B[k,j]
C[i,j] = s
return C
@njit(cache=False, fastmath=False)
def nbsolve_ivp(
ode: callable,
t_span: Tuple[float, float],
y0: np.ndarray,
args: tuple = tuple(),
rtol: float = 1.0e-3,
atol: float = 1.0e-6,
max_step: float = np.inf,
first_step: float = None,
max_num_steps: int = 0,
t_eval: np.ndarray = EMPTY_ARR,
events: Optional[callable] = None,
):
""" A Numba-safe Runge-Kutta Integrator based on Scipy's solve_ivp RK integrator.
Parameters
----------
ode : callable
An njit-compiled function that defines the derivatives of the problem.
t_span : Tuple[float, float]
A tuple of the beginning and end of the integration domain's dependent variables.
y0 : np.ndarray
1D array of the initial values of the problem at t_span[0]
args : tuple = tuple()
Any additional arguments that are passed to dffeq.
rtol : float = 1.e-3
Integration relative tolerance used to determine optimal step size.
atol : float = 1.e-6
Integration absolute tolerance used to determine optimal step size.
max_step : float = np.inf
Maximum allowed step size.
first_step : float = None
Initial step size. If `None`, then the function will attempt to determine an appropriate initial step.
max_num_steps : int = 0
Maximum number of steps integrator is allowed to take.
If set to 0 (the default) then an infinite number of steps are allowed.
t_eval : np.ndarray = None
If provided, then the function will interpolate the integration results to provide them at the
requested t-steps.
events : callable = None
If provided, then this njit-compiled function will be called at each time step to determine if an event
occurs. Currently, if the event (only 1 allowed) occurs, then the integration will be terminated.
"""
# interpret inputs, prepare for solving
t0 = t_span[0]
tf = t_span[1]
direction = np.sign(tf - t0) if tf != t0 else 1
y0 = np.asarray(y0)
N = y0.size
sqrtN = np.sqrt(N)
dtype = y0.dtype
Neval = t_eval.size
run_interpolation = Neval > 0
check_event = events is not None
t_event = EMPTY_ARR
if rtol < EPS_100:
rtol = EPS_100
rtol_array = np.empty(N, dtype=np.float64)
atol_array = np.empty(N, dtype=np.float64)
for i in range(N):
rtol_array[i] = rtol
atol_array[i] = atol
if max_num_steps == 0:
use_max_num_steps = False
elif max_num_steps < 0:
raise AttributeError("Negative number of max steps provided.")
else:
use_max_num_steps = True
if direction > 0:
t_eval_i = 0
else:
t_eval = t_eval[::-1] # make order of t_eval increasing for use with np.searchsorted
t_eval_i = Neval # upper bound for slices
# initialize containers to store results during integration
if run_interpolation:
t_array_ind = 0
t_array = np.empty(Neval, dtype=np.float64)
y_array = np.empty((N, Neval), dtype=dtype)
else:
t_list = [t0]
y_list = [np.copy(y0)]
# initialize integrator status codes
status = 0
message = "Integration is/was ongoing (perhaps it was interrupted?)."
# initialize RK-K variable
K = np.empty((N_STAGES_PLUS_1, N), dtype=dtype)
# recast some constants into the correct dtype, so they can be used with y0
C = RK45_C
A = np.asarray(RK45_A, dtype=dtype)
B = np.asarray(RK45_B, dtype=dtype)
E = np.asarray(RK45_E, dtype=dtype)
if run_interpolation:
P = np.asarray(RK45_P, dtype=dtype)
A_10 = A[1, 0]
# initialize variables for the start of integration
t_old = t0
t_new = t0
y_new = np.empty_like(y0)
y_old = np.empty_like(y0)
dydt_old = np.empty_like(y0)
dydt_new = np.empty_like(y0)
for i in range(N):
y0_i = y0[i]
y_new[i] = y0_i
y_old[i] = y0_i
output = np.asarray(ode(t_new, y_new, *args), dtype=dtype)
for i in range(N):
dydt_old[i] = output[i]
if check_event:
event_old = events(t_old, y_old, *args)
# determine first step size
first_step_found = False
if first_step is not None:
step_size = max_step
if first_step < 0.0:
status = -8
message = "Attribute error."
raise AttributeError("Error in user-provided step size, must be a positive number.")
elif first_step > np.abs(tf - t0):
status = -8
message = "Attribute error."
raise AttributeError("Error in user-provided step size, cannot exceed bounds.")
elif first_step != 0.0:
step_size = first_step
first_step_found = True
if not first_step_found:
# select an initial step size based on the differential equation
if N == 0:
step_size = INF
else:
# take the norm of d0 and d1
d0 = 0.0
d1 = 0.0
for i in range(N):
scale = atol_array[i] + np.abs(y_old[i]) * rtol_array[i]
d0_abs = np.abs(y_old[i] / scale)
d1_abs = np.abs(dydt_old[i] / scale)
d0 += (d0_abs * d0_abs)
d1 += (d1_abs * d1_abs)
d0 = np.sqrt(d0) / sqrtN
d1 = np.sqrt(d1) / sqrtN
if d0 < 1.0e-5 or d1 < 1.0e-5:
h0 = 1.0e-6
else:
h0 = 0.01 * d0 / d1
y1 = y_old + h0 * direction * dydt_old
t1 = t_old + h0 * direction
# use the differential equation to estimate the first step size
output = np.asarray(ode(t1, y1, *args), dtype=dtype)
d2 = 0.0
for i in range(N):
dydt_new[i] = output[i]
scale = atol_array[i] + np.abs(y_old[i]) * rtol_array[i]
d2_abs = np.abs((dydt_new[i] - dydt_old[i]) / scale)
d2 += (d2_abs * d2_abs)
d2 = np.sqrt(d2) / (h0 * sqrtN)
if d1 <= 1.0e-15 and d2 <= 1.0e-15:
h1 = max(1.e-6, h0 * 1.e-3)
else:
h1 = (0.01 / max(d1, d2)) ** ERROR_EXPO
next_after = 10.0 * abs(np.nextafter(t_old, direction * np.inf) - t_old)
step_size = max(next_after, min(100. * h0, h1))
# check if an invalid initial condition was given
if N == 0:
status = -6
message = "Integration never started, y-size is zero."
# iterate through time until the solution is complete
len_t = 1 # already one time step due to the initial conditions
while status == 0:
# exit if the end of the time domain has been reached
if t_new == tf:
t_old = tf
t_new = tf
status = 1
break
# exit if the maximum number of steps has been reached
if use_max_num_steps:
if len_t > max_num_steps:
status = -7
message = "Maximum number of steps (set by user) exceeded during integration."
break
# run RK integration step, determine step size based on previous loop, find minimum step size based on the value of t (less floating point numbers between numbers when t is large)
next_after = 10.0 * abs(np.nextafter(t_old, direction * np.inf) - t_old)
min_step = next_after
# look for over/undershoots in previous step size
if step_size > max_step:
step_size = max_step
elif step_size < min_step:
step_size = min_step
# determine new step size
step_accepted = False
step_rejected = False
step_error = False
while not step_accepted:
if step_size < min_step:
step_error = True
break
# move time forward for this particular step size
step = step_size * direction
t_new = t_old + step
# check that we are not at the end of integration with that move
if direction * (t_new - tf) > 0:
t_new = tf
# correct the step if we were at the end of integration
step = t_new - t_old
step_size = np.abs(step)
# calculate derivative using RK method
# dot product (K, A) * step
for s in range(1, LEN_C):
time_ = t_old + C[s] * step
if s == 1:
for i in range(N):
# set the first column of K
temp_double_numeric = dydt_old[i]
K[0, i] = temp_double_numeric
# calculate y_new for s == 1
y_new[i] = y_old[i] + (temp_double_numeric * A_10 * step)
else:
for j in range(s):
A_sj = A[s, j] * step
for i in range(N):
if j == 0:
# initialize
y_new[i] = y_old[i]
y_new[i] += K[j, i] * A_sj
# update K with a new result from the ode
output = np.asarray(ode(time_, y_new, *args), dtype=dtype)
for i in range(N):
K[s, i] = output[i]
# dot product (K, B) * step
for j in range(N_STAGES):
temp = B[j] * step
for i in range(N):
if j == 0:
# initialize
y_new[i] = y_old[i]
y_new[i] += K[j, i] * temp
# find final dydt for this timestep
output = np.asarray(ode(t_new, y_new, *args), dtype=dtype)
for i in range(N):
dydt_new[i] = output[i]
# estimate error to change the step size for next step
# dot product (K, E) * step / scale
error_norm = 0.0
for i in range(N):
# check how well this step performed
scale = atol_array[i] + max(np.abs(y_old[i]), np.abs(y_new[i])) * rtol_array[i]
# set last array of K equal to dydt
K[N_STAGES, i] = dydt_new[i]
for j in range(N_STAGES_PLUS_1):
if j == 0:
# initialize
error_dot_1 = 0.0
error_dot_1 += K[j, i] * E[j]
error_norm_abs = np.abs(error_dot_1)
error_norm_abs *= (step / scale)
error_norm += (error_norm_abs * error_norm_abs)
error_norm = np.sqrt(error_norm) / sqrtN
if error_norm < 1.0:
# the error is low, let's update this step for the next time loop
if error_norm == 0.0:
step_factor = MAX_FACTOR
else:
step_factor = min(MAX_FACTOR, SAFETY * error_norm ** -ERROR_EXPO)
if step_rejected:
# There were problems with this step size on the previous step loop. Make sure factor does not exasperate them.
step_factor = min(step_factor, 1.0)
step_size = step_size * step_factor
step_accepted = True
else:
step_size = step_size * max(MIN_FACTOR, SAFETY * error_norm ** -ERROR_EXPO)
step_rejected = True
# exit if there was an issue with step convergence
if step_error:
status = -1
message = "Error in step size calculation, required step size is less than spacing between numbers."
break
elif not step_accepted:
status = -7
message = "Error in step size calculation, error in step size acceptance."
break
# check event for this timestep, exit if event occurs
if check_event:
event_new = events(t_new, y_new, *args)
if event_old < 0 and event_new > 0:
t_event = np.empty(1, np.float64)
t_event[0] = t_new
status = 1
event_old = event_new
# save data from this step
if run_interpolation:
# if interpolation was requested, run it up to the end of this step
if direction > 0:
t_eval_i_new = np.searchsorted(t_eval, t_new, side="right")
Neval_step = t_eval_i_new - t_eval_i
t_eval_step = np.empty(Neval_step, dtype=np.float64)
for i in range(Neval_step):
t_eval_step[i] = t_eval[t_eval_i + i]
else:
t_eval_i_new = np.searchsorted(t_eval, t_new, side = "left")
Neval_step = t_eval_i - t_eval_i_new
t_eval_step = np.empty(Neval_step, dtype=np.float64)
for i in range(Neval_step):
t_eval_step[i] = t_eval[t_eval_i - i - 1]
if Neval_step > 0:
Q = np.empty((N, P.shape[1]), dtype=dtype)
Q = dot(K.T, P, Q)
Qdim = Q.shape[1]
x = (t_eval_step - t_old) / step
p = np.empty((Qdim, Neval_step), dtype=dtype)
expo = 0.0
for i in range(Qdim):
expo += 1.0
for j in range(Neval_step):
p[i,j] = x[j] ** expo
y_eval_step = np.empty((N, Neval_step), dtype=dtype)
y_eval_step = dot(Q, p, y_eval_step)
y_eval_step *= step
if y_eval_step.ndim == 2:
y_eval_step += y_old[:, None]
else:
y_eval_step += y_old
for j in range(Neval_step):
t_array[t_array_ind] = t_eval_step[j]
for i in range(N):
y_array[i, t_array_ind] = y_eval_step[i, j]
t_array_ind += 1
t_eval_i = t_eval_i_new
else:
t_list.append(t_new)
y_list.append(np.copy(y_new))
len_t += 1
# end of step loop, update the now variables
t_old = t_new
for i in range(N):
y_old[i] = y_new[i]
dydt_old[i] = dydt_new[i]
# concatenate final results if not already done
if not run_interpolation:
# to match the format that scipy follows, take the transpose of y
t_array = np.empty(len_t, dtype=np.float64)
y_array = np.empty((N, len_t), dtype=dtype)
for t_i in range(len_t):
t_array[t_i] = t_list[t_i]
y_array_t_i = y_list[t_i]
for y_i in range(N):
y_array[y_i, t_i] = y_array_t_i[y_i]
# determine whether the integration was successful
success = False
if status == 1:
success = True
message = "Integration completed without issue."
result = results_wrapper(success, message, status, len_t, t_array, y_array, t_event)
return result |
Hey @jewaniuk that's great! This looks like a good solution until a more permanent one comes across. If you want to put the relevant updates into a pull request I'd happily accept it and then you'd show up as a contributor to CyRK. No worries if not though! |
Hello! First of all, I think this is an amazing project. It has been extremely helpful for me so far and I look forward to using it more thoroughly in the future. I apologize as I don't know the proper etiquette for this, however, I saw in your documentation for the relatively new Dense Output feature (cy and py only) that we should create a new issue if we need a feature like this for the numba implementation. This is where I've found myself. I have been using the numba implementation extensively and have thus designed my scripts around it already. However, in the past, I only really cared about the final result, at the end of the time domain. Now, I need to access results across the entire domain, and I am often finding weird results come out of the basic numpy interpolator. I have looked at the scipy implementation, however, I am struggling to understand how I might be able to make a replica that would fit with nbsolve_ivp (I'm a physicist, not a programmer). I would be interested in helping in any way that I can, but am clueless as to where to start. Please let me know if you'd like to see details or a working example. Thanks!
The text was updated successfully, but these errors were encountered: