Skip to content

Instantly share code, notes, and snippets.

@denis-bz
Created July 16, 2025 15:49
Show Gist options
  • Save denis-bz/ebfd9e6cc4bd661ee4e533212cc57e75 to your computer and use it in GitHub Desktop.
Save denis-bz/ebfd9e6cc4bd661ee4e533212cc57e75 to your computer and use it in GitHub Desktop.
Callback to track and stop scipy solve_ivp 16 Jul 2025

Callback to track and stop scipy solve_ivp

Keywords: scipy solve_ivp callback ODE ordinary-differential-equation IVP initial-value-problem python

Purpose

add a callback function to the scipy ODE solver solve_ivp, so that you can

  • print progress at each timestep
  • tell the solver to stop if h is too small or KeyboardInterrupt or ..., and return the sol so far.

See solve_ivp and Runge-Kutta .
This implementation os minimal, for RK only, a bird in the hand.

Description

Solve_ivp has one new parameter:

callback : callable, or default None. Called after each Runge-Kutta step with one argument, the locals() of _step_impl; this is a dict containing the current t y h ... If the callback returns a string such as "stop from callback ...", then solve_ivp stops, and returns sol with sol.t sol.y ... up to that point.

Why callbacks ?

Many old programs, including solve_ivp , run blind: you start it running, wait until the end, then look at sol to see what happened. Then you can vary parameters y0 atol rtol etc., run again ... pinball. Print statements inside your derivative function F(t, Y) may help (but RK calls this at intermediate steps too). And you can hit ^C to kill a runaway, but then it's dead.

Callbacks are a minimal way of seeing what a solver is doing, with good old print statements, and ^C from the keyboard to return the sol so far.

If debugging is the process of removing software bugs, then programming must be the process of putting them in.

The files in this gist

rk-callback.py : scipy rk.py + 10-line patch for callback=, see "installing" below
callback0.py : a simple example, solve_ivp( ... callback=this )
eey-callback.py : a testcase, e^(e^y) to drive h -> 0

3 utility functions, useful for callbacks --
callevery.py : filter calls to e.g. once a second and every 100 th call
kbint.py : keyboardinterrupt()
timer.py : seconds since the previous call

Installing rk-callback.py

First install a copy of scipy in a virtual environment, see venv . Then overwrite the file rk.py there, as follows:

  1. find rk.py on your computer:
from scipy.integrate import _ivp
print( _ivp.__path__ )
  1. In that directory, make a backup of scipy's rk.py, then copy in this one with callback:
cd .../lib/python3.12/site-packages/scipy/integrate/_ivp  # from _ivp.__path__
mv rk.py rk-nocallback.py
cp -p .../rk-callback.py rk.py
	# diff rk.py rk-nocallback.py -- 10 lines
  1. run a little testcase with sol = solve_ivp( ... callback=mycallback ), such as eey-callback.py .

Notes

This patch is only for RK, not for the stiff solvers Radau etc.; tracking / displaying Nd Jacobians is for people with experience and good stiff test cases, not me.

GUIs with realtime plots of what a solver is doing ? See e.g. Why is GUI programming so hard .

Pilots who have the right stuff enjoy flying blind.

See also

scipy/scipy#21741 from January 2025
extensisq : a dozen scipy-compatible solvers (but no callbacks)
500 scicomp.stack questions/tagged/ode \

Comments welcome

cheers
— denis-bz-py t-online.de 2025-07-16 July

#!/usr/bin/env python3
""" example callback0.py: solve_ivp( callback=callback0 or Callbackevery( callback0 )) """
import numpy as np
from kbint import keyboardinterrupt
def callback0( _locals ) -> "stop msg" or None:
""" example callback:
$scipy/integrate/_ivp/rk.py _step_impl with callback=
callback( locals() )
-> here -> "stop msg" or None
"""
t = _locals["t"]
y = _locals["y"]
h = _locals["h"]
hnew = _locals["h_abs"]
stepok = _locals["step_accepted"] # too big or too small: try again
# rkself = _locals["self"]
# nfev = rkself.nfev
print( "t %-8.5g %d h %-6.3g %-6.3g y %s " % (
t, stepok, h, hnew, _astr( y )))
if hnew < 1e-8:
return "stop from RK callback: hnew %.3g is too small at t %g" % (hnew, t)
maxnorm = np.linalg.norm( y, ord=np.inf )
if maxnorm > 1e10 or np.isnan( maxnorm ):
return "stop from RK callback: |y| %.3g is too big at t %g" % (maxnorm, t)
if keyboardinterrupt():
return "stop, KeyboardInterrupt at t %g" % t
return None
def _astr( x: "ndarray" ) -> "%g / %g %g / min av max":
x = np.asarray( x ).squeeze()
if x.size <= 1:
return "%.5g" % x.item()
elif x.shape == (2,):
return "%10.5g %10.5g" % (x[0], x[1])
else:
return _minavmax( x )
def _minavmax( x ):
x = np.asarray( x )
return "_-¯ %10.4g %10.4g %10.4g" % ( # _-¯ a glyph
x.min(), x.mean(), x.max() )
#!/usr/bin/env python3
""" wrap a func to call it say once a second and every 100 th call """
import numpy as np
from timer import Timer # eg once a second
#...............................................................................
class Callevery( object ):
""" wrap a func to call it say once a second and every 100 th call """
def __init__( s, func, seconds=1, every=100 ):
s.func = func
s.seconds = seconds # may be 0
s.every = int( every )
s.ncall = 0
s.timer = Timer( s.seconds ) # timer.t0 = time.time() wallclock
def __call__( s, **kw ) -> "stop msg" or None:
s.ncall += 1
sec = s.timer() # time.time() - t0
if (sec > 0
or (s.every > 0) and (s.ncall % s.every == 0)):
return s.func( **kw ) # kw args only, else use partial
else:
return None
#!/usr/bin/env python3
""" eey-callback.py: eey( t, y ) sleep, e^(e^y) so h -> 0 """
import sys
import numpy as np
from scipy.integrate import solve_ivp # ( func( t, y ), t_span, y0 ... )
from matplotlib import pyplot as pl
import time
from extensisq import BS5, CFMR7osc, Pr9
from callback0 import callback0 # -> "stop mag" or None
from callevery import Callevery # filter
# from runivp import runivp # solve_ivp + a bit verbose
np.set_printoptions( edgeitems=10, threshold=10, linewidth=120, formatter=dict(
float = lambda x: "%.2g" % x ))
print( 80 * "▄" )
# print( "▄ bp" ); breakpoint()
#...............................................................................
def eey( t, y ):
time.sleep( 0.1 ) # test keyboardinterrupt
return np.exp( np.exp( y ))
# _ivp/rk.py:108: RuntimeWarning: invalid value encountered in dot return np.dot(K.T, self.E) * h
func = eey
y0 = [1] # [1., 0]
tmax = 1
dt = 0
tol = 1e-3
method = "RK45"
sec = 0.1
every = 1
plot = 0
# to change these params, run this.py a=1 b=2,None 'c = expr' ... in sh or ipython --
for arg in sys.argv[1:]:
exec( arg )
atol = tol / 1000
rtol = tol
# callbackevery = Callevery( callback0, seconds=sec, every=every )
methodname = getattr( method, "__name__", method ) # extensisq
params = f"{func.__name__} method={methodname} {y0=} {tmax=:g} {dt=:g} {atol=} {rtol=} "
print( params, "\n" )
#...............................................................................
sol = solve_ivp( func, [0, tmax], y0=y0, t_eval=None, callback=callback0,
method=method, atol=tol / 1000, rtol=tol )
T = sol.t
Y = sol.y
Nx, Nt = Y.shape
nfev, njev = sol.nfev, sol.njev
dT = np.diff( T )
params += " Nt %d nfev %d" % ( Nt, nfev )
print( params, "\n" )
if not sol.success:
print( "sol.message:", sol.message )
if plot:
fig, (yax, tax) = pl.subplots( nrows=2 )
fig.suptitle( params )
if len(Y) != 2:
yax.plot( T, Y.T, labels=np.arange( len(Y) ))
else:
y, yp = Y
ypp = np.gradient( yp, T )
Y3 = np.column_stack(( Y.T, ypp ))
yax.plot( T, Y3, label="y y' y'' ".split() )
yax.legend()
tax.plot( T[1:-1], dT[1:] )
tax.set( ylabel="ΔT" )
#!/usr/bin/env python3
""" keyboardinterrupt(): ^C, signal.SIGINT """
import signal
_kbint = None # global, None -> False -> True
def keyboardinterrupt() -> "True after ^C == signal.SIGINT":
global _kbint
if _kbint is None:
_kbint = False
signal.signal( signal.SIGINT, handle_sigint ) # man 3 signal
return _kbint
def handle_sigint( signo, frame ) -> "global _kbint":
global _kbint
_kbint = True
# traceback.print_stack( limit=5 )
# pdb.set_trace()
if __name__ == "__main__":
import time
for t in range( 10 ):
time.sleep( 1 )
print( t )
if keyboardinterrupt():
print( "keyboardinterrupt" )
break
# rk.py: class RungeKutta + callback 16jul #bz
import numpy as np
from .base import OdeSolver, DenseOutput
from .common import (validate_max_step, validate_tol, select_initial_step,
norm, warn_extraneous, validate_first_step)
from . import dop853_coefficients
# Multiply steps computed from asymptotic behaviour of errors by this.
SAFETY = 0.9
MIN_FACTOR = 0.2 # Minimum allowed decrease in a step size.
MAX_FACTOR = 10 # Maximum allowed increase in a step size.
def rk_step(fun, t, y, f, h, A, B, C, K):
"""Perform a single Runge-Kutta step.
This function computes a prediction of an explicit Runge-Kutta method and
also estimates the error of a less accurate method.
Notation for Butcher tableau is as in [1]_.
Parameters
----------
fun : callable
Right-hand side of the system.
t : float
Current time.
y : ndarray, shape (n,)
Current state.
f : ndarray, shape (n,)
Current value of the derivative, i.e., ``fun(x, y)``.
h : float
Step to use.
A : ndarray, shape (n_stages, n_stages)
Coefficients for combining previous RK stages to compute the next
stage. For explicit methods the coefficients at and above the main
diagonal are zeros.
B : ndarray, shape (n_stages,)
Coefficients for combining RK stages for computing the final
prediction.
C : ndarray, shape (n_stages,)
Coefficients for incrementing time for consecutive RK stages.
The value for the first stage is always zero.
K : ndarray, shape (n_stages + 1, n)
Storage array for putting RK stages here. Stages are stored in rows.
The last row is a linear combination of the previous rows with
coefficients
Returns
-------
y_new : ndarray, shape (n,)
Solution at t + h computed with a higher accuracy.
f_new : ndarray, shape (n,)
Derivative ``fun(t + h, y_new)``.
References
----------
.. [1] E. Hairer, S. P. Norsett G. Wanner, "Solving Ordinary Differential
Equations I: Nonstiff Problems", Sec. II.4.
"""
K[0] = f
for s, (a, c) in enumerate(zip(A[1:], C[1:]), start=1):
dy = np.dot(K[:s].T, a[:s]) * h
K[s] = fun(t + c * h, y + dy)
y_new = y + h * np.dot(K[:-1].T, B)
f_new = fun(t + h, y_new)
K[-1] = f_new
return y_new, f_new
class RungeKutta(OdeSolver):
"""Base class for explicit Runge-Kutta methods."""
C: np.ndarray = NotImplemented
A: np.ndarray = NotImplemented
B: np.ndarray = NotImplemented
E: np.ndarray = NotImplemented
P: np.ndarray = NotImplemented
order: int = NotImplemented
error_estimator_order: int = NotImplemented
n_stages: int = NotImplemented
def __init__(self, fun, t0, y0, t_bound, max_step=np.inf,
rtol=1e-3, atol=1e-6, vectorized=False,
first_step=None, callback=None, **extraneous): #bz
warn_extraneous(extraneous)
super().__init__(fun, t0, y0, t_bound, vectorized,
support_complex=True)
self.y_old = None
self.max_step = validate_max_step(max_step)
self.rtol, self.atol = validate_tol(rtol, atol, self.n)
self.f = self.fun(self.t, self.y)
if first_step is None:
self.h_abs = select_initial_step(
self.fun, self.t, self.y, t_bound, max_step, self.f, self.direction,
self.error_estimator_order, self.rtol, self.atol)
else:
self.h_abs = validate_first_step(first_step, t0, t_bound)
self.K = np.empty((self.n_stages + 1, self.n), dtype=self.y.dtype)
self.error_exponent = -1 / (self.error_estimator_order + 1)
self.h_previous = None
self.callback = callback #bz
def _estimate_error(self, K, h):
return np.dot(K.T, self.E) * h
def _estimate_error_norm(self, K, h, scale):
return norm(self._estimate_error(K, h) / scale)
def _step_impl(self):
t = self.t
y = self.y
max_step = self.max_step
rtol = self.rtol
atol = self.atol
min_step = 10 * np.abs(np.nextafter(t, self.direction * np.inf) - t)
if self.h_abs > max_step:
h_abs = max_step
elif self.h_abs < min_step:
h_abs = min_step
else:
h_abs = self.h_abs
step_accepted = False
step_rejected = False
while not step_accepted:
if h_abs < min_step:
return False, self.TOO_SMALL_STEP
h = h_abs * self.direction
t_new = t + h
if self.direction * (t_new - self.t_bound) > 0:
t_new = self.t_bound
h = t_new - t
h_abs = np.abs(h)
y_new, f_new = rk_step(self.fun, t, y, self.f, h, self.A,
self.B, self.C, self.K)
scale = atol + np.maximum(np.abs(y), np.abs(y_new)) * rtol
error_norm = self._estimate_error_norm(self.K, h, scale)
if error_norm < 1:
if error_norm == 0:
factor = MAX_FACTOR
else:
factor = min(MAX_FACTOR,
SAFETY * error_norm ** self.error_exponent)
if step_rejected:
factor = min(1, factor)
h_abs *= factor
step_accepted = True
else:
h_abs *= max(MIN_FACTOR,
SAFETY * error_norm ** self.error_exponent)
step_rejected = True
#...............................................................................
if self.callback is not None: #bz
stopmsg = self.callback( locals() ) # the lot, t y ... #bz
if stopmsg: #bz
if not isinstance( stopmsg, str ): #bz
stopmsg = "stopped by user callback from RungeKutta" #bz
return False, stopmsg # end solve_ivp, return sol up to now #bz
self.h_previous = h
self.y_old = y
self.t = t_new
self.y = y_new
self.h_abs = h_abs
self.f = f_new
return True, None
def _dense_output_impl(self):
Q = self.K.T.dot(self.P)
return RkDenseOutput(self.t_old, self.t, self.y_old, Q)
class RK23(RungeKutta):
"""Explicit Runge-Kutta method of order 3(2).
This uses the Bogacki-Shampine pair of formulas [1]_. The error is controlled
assuming accuracy of the second-order method, but steps are taken using the
third-order accurate formula (local extrapolation is done). A cubic Hermite
polynomial is used for the dense output.
Can be applied in the complex domain.
Parameters
----------
fun : callable
Right-hand side of the system: the time derivative of the state ``y``
at time ``t``. The calling signature is ``fun(t, y)``, where ``t`` is a
scalar and ``y`` is an ndarray with ``len(y) = len(y0)``. ``fun`` must
return an array of the same shape as ``y``. See `vectorized` for more
information.
t0 : float
Initial time.
y0 : array_like, shape (n,)
Initial state.
t_bound : float
Boundary time - the integration won't continue beyond it. It also
determines the direction of the integration.
first_step : float or None, optional
Initial step size. Default is ``None`` which means that the algorithm
should choose.
max_step : float, optional
Maximum allowed step size. Default is np.inf, i.e., the step size is not
bounded and determined solely by the solver.
rtol, atol : float and array_like, optional
Relative and absolute tolerances. The solver keeps the local error
estimates less than ``atol + rtol * abs(y)``. Here `rtol` controls a
relative accuracy (number of correct digits), while `atol` controls
absolute accuracy (number of correct decimal places). To achieve the
desired `rtol`, set `atol` to be smaller than the smallest value that
can be expected from ``rtol * abs(y)`` so that `rtol` dominates the
allowable error. If `atol` is larger than ``rtol * abs(y)`` the
number of correct digits is not guaranteed. Conversely, to achieve the
desired `atol` set `rtol` such that ``rtol * abs(y)`` is always smaller
than `atol`. If components of y have different scales, it might be
beneficial to set different `atol` values for different components by
passing array_like with shape (n,) for `atol`. Default values are
1e-3 for `rtol` and 1e-6 for `atol`.
vectorized : bool, optional
Whether `fun` may be called in a vectorized fashion. False (default)
is recommended for this solver.
If ``vectorized`` is False, `fun` will always be called with ``y`` of
shape ``(n,)``, where ``n = len(y0)``.
If ``vectorized`` is True, `fun` may be called with ``y`` of shape
``(n, k)``, where ``k`` is an integer. In this case, `fun` must behave
such that ``fun(t, y)[:, i] == fun(t, y[:, i])`` (i.e. each column of
the returned array is the time derivative of the state corresponding
with a column of ``y``).
Setting ``vectorized=True`` allows for faster finite difference
approximation of the Jacobian by methods 'Radau' and 'BDF', but
will result in slower execution for this solver.
Attributes
----------
n : int
Number of equations.
status : string
Current status of the solver: 'running', 'finished' or 'failed'.
t_bound : float
Boundary time.
direction : float
Integration direction: +1 or -1.
t : float
Current time.
y : ndarray
Current state.
t_old : float
Previous time. None if no steps were made yet.
step_size : float
Size of the last successful step. None if no steps were made yet.
nfev : int
Number evaluations of the system's right-hand side.
njev : int
Number of evaluations of the Jacobian.
Is always 0 for this solver as it does not use the Jacobian.
nlu : int
Number of LU decompositions. Is always 0 for this solver.
References
----------
.. [1] P. Bogacki, L.F. Shampine, "A 3(2) Pair of Runge-Kutta Formulas",
Appl. Math. Lett. Vol. 2, No. 4. pp. 321-325, 1989.
"""
order = 3
error_estimator_order = 2
n_stages = 3
C = np.array([0, 1/2, 3/4])
A = np.array([
[0, 0, 0],
[1/2, 0, 0],
[0, 3/4, 0]
])
B = np.array([2/9, 1/3, 4/9])
E = np.array([5/72, -1/12, -1/9, 1/8])
P = np.array([[1, -4 / 3, 5 / 9],
[0, 1, -2/3],
[0, 4/3, -8/9],
[0, -1, 1]])
class RK45(RungeKutta):
"""Explicit Runge-Kutta method of order 5(4).
This uses the Dormand-Prince pair of formulas [1]_. The error is controlled
assuming accuracy of the fourth-order method accuracy, but steps are taken
using the fifth-order accurate formula (local extrapolation is done).
A quartic interpolation polynomial is used for the dense output [2]_.
Can be applied in the complex domain.
Parameters
----------
fun : callable
Right-hand side of the system. The calling signature is ``fun(t, y)``.
Here ``t`` is a scalar, and there are two options for the ndarray ``y``:
It can either have shape (n,); then ``fun`` must return array_like with
shape (n,). Alternatively it can have shape (n, k); then ``fun``
must return an array_like with shape (n, k), i.e., each column
corresponds to a single column in ``y``. The choice between the two
options is determined by `vectorized` argument (see below).
t0 : float
Initial time.
y0 : array_like, shape (n,)
Initial state.
t_bound : float
Boundary time - the integration won't continue beyond it. It also
determines the direction of the integration.
first_step : float or None, optional
Initial step size. Default is ``None`` which means that the algorithm
should choose.
max_step : float, optional
Maximum allowed step size. Default is np.inf, i.e., the step size is not
bounded and determined solely by the solver.
rtol, atol : float and array_like, optional
Relative and absolute tolerances. The solver keeps the local error
estimates less than ``atol + rtol * abs(y)``. Here `rtol` controls a
relative accuracy (number of correct digits), while `atol` controls
absolute accuracy (number of correct decimal places). To achieve the
desired `rtol`, set `atol` to be smaller than the smallest value that
can be expected from ``rtol * abs(y)`` so that `rtol` dominates the
allowable error. If `atol` is larger than ``rtol * abs(y)`` the
number of correct digits is not guaranteed. Conversely, to achieve the
desired `atol` set `rtol` such that ``rtol * abs(y)`` is always smaller
than `atol`. If components of y have different scales, it might be
beneficial to set different `atol` values for different components by
passing array_like with shape (n,) for `atol`. Default values are
1e-3 for `rtol` and 1e-6 for `atol`.
vectorized : bool, optional
Whether `fun` is implemented in a vectorized fashion. Default is False.
Attributes
----------
n : int
Number of equations.
status : string
Current status of the solver: 'running', 'finished' or 'failed'.
t_bound : float
Boundary time.
direction : float
Integration direction: +1 or -1.
t : float
Current time.
y : ndarray
Current state.
t_old : float
Previous time. None if no steps were made yet.
step_size : float
Size of the last successful step. None if no steps were made yet.
nfev : int
Number evaluations of the system's right-hand side.
njev : int
Number of evaluations of the Jacobian.
Is always 0 for this solver as it does not use the Jacobian.
nlu : int
Number of LU decompositions. Is always 0 for this solver.
References
----------
.. [1] J. R. Dormand, P. J. Prince, "A family of embedded Runge-Kutta
formulae", Journal of Computational and Applied Mathematics, Vol. 6,
No. 1, pp. 19-26, 1980.
.. [2] L. W. Shampine, "Some Practical Runge-Kutta Formulas", Mathematics
of Computation,, Vol. 46, No. 173, pp. 135-150, 1986.
"""
order = 5
error_estimator_order = 4
n_stages = 6
C = np.array([0, 1/5, 3/10, 4/5, 8/9, 1])
A = np.array([
[0, 0, 0, 0, 0],
[1/5, 0, 0, 0, 0],
[3/40, 9/40, 0, 0, 0],
[44/45, -56/15, 32/9, 0, 0],
[19372/6561, -25360/2187, 64448/6561, -212/729, 0],
[9017/3168, -355/33, 46732/5247, 49/176, -5103/18656]
])
B = np.array([35/384, 0, 500/1113, 125/192, -2187/6784, 11/84])
E = np.array([-71/57600, 0, 71/16695, -71/1920, 17253/339200, -22/525,
1/40])
# Corresponds to the optimum value of c_6 from [2]_.
P = np.array([
[1, -8048581381/2820520608, 8663915743/2820520608,
-12715105075/11282082432],
[0, 0, 0, 0],
[0, 131558114200/32700410799, -68118460800/10900136933,
87487479700/32700410799],
[0, -1754552775/470086768, 14199869525/1410260304,
-10690763975/1880347072],
[0, 127303824393/49829197408, -318862633887/49829197408,
701980252875 / 199316789632],
[0, -282668133/205662961, 2019193451/616988883, -1453857185/822651844],
[0, 40617522/29380423, -110615467/29380423, 69997945/29380423]])
class DOP853(RungeKutta):
"""Explicit Runge-Kutta method of order 8.
This is a Python implementation of "DOP853" algorithm originally written
in Fortran [1]_, [2]_. Note that this is not a literal translation, but
the algorithmic core and coefficients are the same.
Can be applied in the complex domain.
Parameters
----------
fun : callable
Right-hand side of the system. The calling signature is ``fun(t, y)``.
Here, ``t`` is a scalar, and there are two options for the ndarray ``y``:
It can either have shape (n,); then ``fun`` must return array_like with
shape (n,). Alternatively it can have shape (n, k); then ``fun``
must return an array_like with shape (n, k), i.e. each column
corresponds to a single column in ``y``. The choice between the two
options is determined by `vectorized` argument (see below).
t0 : float
Initial time.
y0 : array_like, shape (n,)
Initial state.
t_bound : float
Boundary time - the integration won't continue beyond it. It also
determines the direction of the integration.
first_step : float or None, optional
Initial step size. Default is ``None`` which means that the algorithm
should choose.
max_step : float, optional
Maximum allowed step size. Default is np.inf, i.e. the step size is not
bounded and determined solely by the solver.
rtol, atol : float and array_like, optional
Relative and absolute tolerances. The solver keeps the local error
estimates less than ``atol + rtol * abs(y)``. Here `rtol` controls a
relative accuracy (number of correct digits), while `atol` controls
absolute accuracy (number of correct decimal places). To achieve the
desired `rtol`, set `atol` to be smaller than the smallest value that
can be expected from ``rtol * abs(y)`` so that `rtol` dominates the
allowable error. If `atol` is larger than ``rtol * abs(y)`` the
number of correct digits is not guaranteed. Conversely, to achieve the
desired `atol` set `rtol` such that ``rtol * abs(y)`` is always smaller
than `atol`. If components of y have different scales, it might be
beneficial to set different `atol` values for different components by
passing array_like with shape (n,) for `atol`. Default values are
1e-3 for `rtol` and 1e-6 for `atol`.
vectorized : bool, optional
Whether `fun` is implemented in a vectorized fashion. Default is False.
Attributes
----------
n : int
Number of equations.
status : string
Current status of the solver: 'running', 'finished' or 'failed'.
t_bound : float
Boundary time.
direction : float
Integration direction: +1 or -1.
t : float
Current time.
y : ndarray
Current state.
t_old : float
Previous time. None if no steps were made yet.
step_size : float
Size of the last successful step. None if no steps were made yet.
nfev : int
Number evaluations of the system's right-hand side.
njev : int
Number of evaluations of the Jacobian. Is always 0 for this solver
as it does not use the Jacobian.
nlu : int
Number of LU decompositions. Is always 0 for this solver.
References
----------
.. [1] E. Hairer, S. P. Norsett G. Wanner, "Solving Ordinary Differential
Equations I: Nonstiff Problems", Sec. II.
.. [2] `Page with original Fortran code of DOP853
<http://www.unige.ch/~hairer/software.html>`_.
"""
n_stages = dop853_coefficients.N_STAGES
order = 8
error_estimator_order = 7
A = dop853_coefficients.A[:n_stages, :n_stages]
B = dop853_coefficients.B
C = dop853_coefficients.C[:n_stages]
E3 = dop853_coefficients.E3
E5 = dop853_coefficients.E5
D = dop853_coefficients.D
A_EXTRA = dop853_coefficients.A[n_stages + 1:]
C_EXTRA = dop853_coefficients.C[n_stages + 1:]
def __init__(self, fun, t0, y0, t_bound, max_step=np.inf,
rtol=1e-3, atol=1e-6, vectorized=False,
first_step=None, **extraneous):
super().__init__(fun, t0, y0, t_bound, max_step, rtol, atol,
vectorized, first_step, **extraneous)
self.K_extended = np.empty((dop853_coefficients.N_STAGES_EXTENDED,
self.n), dtype=self.y.dtype)
self.K = self.K_extended[:self.n_stages + 1]
def _estimate_error(self, K, h): # Left for testing purposes.
err5 = np.dot(K.T, self.E5)
err3 = np.dot(K.T, self.E3)
denom = np.hypot(np.abs(err5), 0.1 * np.abs(err3))
correction_factor = np.ones_like(err5)
mask = denom > 0
correction_factor[mask] = np.abs(err5[mask]) / denom[mask]
return h * err5 * correction_factor
def _estimate_error_norm(self, K, h, scale):
err5 = np.dot(K.T, self.E5) / scale
err3 = np.dot(K.T, self.E3) / scale
err5_norm_2 = np.linalg.norm(err5)**2
err3_norm_2 = np.linalg.norm(err3)**2
if err5_norm_2 == 0 and err3_norm_2 == 0:
return 0.0
denom = err5_norm_2 + 0.01 * err3_norm_2
return np.abs(h) * err5_norm_2 / np.sqrt(denom * len(scale))
def _dense_output_impl(self):
K = self.K_extended
h = self.h_previous
for s, (a, c) in enumerate(zip(self.A_EXTRA, self.C_EXTRA),
start=self.n_stages + 1):
dy = np.dot(K[:s].T, a[:s]) * h
K[s] = self.fun(self.t_old + c * h, self.y_old + dy)
F = np.empty((dop853_coefficients.INTERPOLATOR_POWER, self.n),
dtype=self.y_old.dtype)
f_old = K[0]
delta_y = self.y - self.y_old
F[0] = delta_y
F[1] = h * f_old - delta_y
F[2] = 2 * delta_y - h * (self.f + f_old)
F[3:] = h * np.dot(self.D, K)
return Dop853DenseOutput(self.t_old, self.t, self.y_old, F)
class RkDenseOutput(DenseOutput):
def __init__(self, t_old, t, y_old, Q):
super().__init__(t_old, t)
self.h = t - t_old
self.Q = Q
self.order = Q.shape[1] - 1
self.y_old = y_old
def _call_impl(self, t):
x = (t - self.t_old) / self.h
if t.ndim == 0:
p = np.tile(x, self.order + 1)
p = np.cumprod(p)
else:
p = np.tile(x, (self.order + 1, 1))
p = np.cumprod(p, axis=0)
y = self.h * np.dot(self.Q, p)
if y.ndim == 2:
y += self.y_old[:, None]
else:
y += self.y_old
return y
class Dop853DenseOutput(DenseOutput):
def __init__(self, t_old, t, y_old, F):
super().__init__(t_old, t)
self.h = t - t_old
self.F = F
self.y_old = y_old
def _call_impl(self, t):
x = (t - self.t_old) / self.h
if t.ndim == 0:
y = np.zeros_like(self.y_old)
else:
x = x[:, None]
y = np.zeros((len(x), len(self.y_old)), dtype=self.y_old.dtype)
for i, f in enumerate(reversed(self.F)):
y += f
if i % 2 == 0:
y *= x
else:
y *= 1 - x
y += self.y_old
return y.T
#!/usr/bin/env python3
""" Timer: print info every say 60 seconds in a loop
Usage:
timer = Timer( seconds=60 ) # initialize `t0 = time.time()`
while True: # a long-running loop
...
timer( info ... ) # prints once every 60 seconds
# sec 10: info ...
# sec 61: info
# sec 234: info
If we're in the same minute as the previous call `timer()`,
return 0, do nothing
else print sec, info ... and
return time.time() - t0 seconds
sec = timer() # no args: just return (time.time() - t0 ) or 0
if sec > timelimit:
status = "Timeout: %.1f seconds > timelimit %.1f" % (sec, timelimit)
print( status )
break # return sol or opt as usual
timer( userfunc ... )
calls userfunc( timer.totalsec, ... ) once every __ seconds:
print in color, or plot, or do logging.
Notes:
print( info ... ) prints ndarrays with the caller's `np.set_printoptions()`.
`time.time()` is wallclock time, not time.process_time() = the sum of all cores.
See also: ivmon.py, wraps `func( t, y )` for scipy solve_ivp.
"""
# keywords: python time timer monitor once-a-minute
# see also: callbacks, ivmon.py, monitor.py
import numpy as np
import time
#...............................................................................
class Timer( object ):
def __init__( self, seconds=60 ):
self.seconds = seconds
self.t0 = time.time()
self.oldminute = -1
self.fmt = "sec %d:" # msec %.3f
def __call__( self, *args, **kw ):
if self.seconds <= 0:
return 0
self.totalsec = time.time() - self.t0 # wallclock since t0
minute = int( self.totalsec / self.seconds ) # or second or hour ...
if minute == self.oldminute:
return 0
self.oldminute = minute
if args:
arg0 = args[0]
if callable( arg0 ): # timer( userfunc ): call it
return arg0( self.totalsec, *args[1:], **kw )
else:
print( self.fmt % self.totalsec, *args, **kw )
return self.totalsec
def sec_windowtitle( sec, *args, **kw ) -> "print sec __: args in window title":
title = "\033]0;sec %d:" % sec # centred, wobbles
print( title, *args, end="\007", **kw ) # with np printoptions
__version__ = "2025-04-14 apr" # denis-bz-py t-online.de
#...............................................................................
if __name__ == "__main__":
import sys
print( 80 * "▄" )
np.set_printoptions( edgeitems=5, threshold=10, linewidth=120, formatter=dict(
float = lambda x: "%.2g" % x ))
sec = 1
nrand = 10
# to change these params, run this.py a=1 b=None 'c = expr' ... in sh or ipython --
for arg in sys.argv[1:]:
exec( arg )
rng = np.random.default_rng( seed=0 )
sleep = rng.exponential( size=nrand )
print( "sleep times:", sleep )
print( sleep.cumsum().astype(int) ) # != cumsum( ints )
timer = Timer( sec )
def psec( sec, *args ):
print( "%.3g" % sec, *args )
for s in sleep:
time.sleep( s ) # flaky on old mac ?
timer( 1, 2, 3 )
# timer( sec_windowtitle, 1, 2, 3 )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment