2.1. How to implement IVP
interface in Python¶
Let’s have a look at how to implement the IVP
interface in Python.
We will use the classical Runge–Kutta 4th order method (RK4). In this method, given the solution \(y_n\) at time \(t_n\) and time step \(h\), the solution \(y_{n+1}\) at the next time step \(t_{n+1} = t_n + h\) is computed as
where
For simplicity, we implement the RK4 method only with fixed time step (so, it is user’s responsibility to choose the time step small enough).
The implementation should inherit from the abstract base class
IVPInterface
,
which has the following abstract methods:
class IVPInterface(abc.ABC):
@abc.abstractmethod
def set_initial_value(self, y0: np.ndarray, t0: float) -> Union[int, None]:
"""Set initial value y(t0) = y0."""
@abc.abstractmethod
def set_rhs_fn(self, rhs: Callable) -> Union[int, None]:
"""Specify right-hand side function f."""
@abc.abstractmethod
def set_tolerances(self, rtol: float, atol: float) -> Union[int, None]:
"""Specify relative and absolute tolerances, respectively."""
@abc.abstractmethod
def set_user_data(self, user_data: object) -> Union[int, None]:
"""Specify additional data that will be used for right-hand side function."""
@abc.abstractmethod
def set_integrator(
self, integrator_name: str, integrator_params: Dict
) -> Union[int, None]:
"""Set integrator, if the name is recognizable."""
@abc.abstractmethod
def integrate(self, t: float, y: np.ndarray) -> Union[int, None]:
"""Integrate to time `t` and write solution to `y`."""
@abc.abstractmethod
def print_stats(self):
"""Print integration statistics."""
We start by creating a new Python file named rk4.py
and importing the IVPInterface
class and NumPy:
import numpy as np
from openinterfaces._impl.interfaces.ivp import IVPInterface
from typing import Callable, Dict, Union
Note that we also import Callable
, Dict
, and Union
from the typing
module
to use them in type hints (which is completely optional, of course, in Python).
Then we declare our implementation class as a subclass of IVPInterface
:
and define the __init__
method to initialize the instance variables
(most of them are just type hints here, because we will initialize them later
in the respective methods, however, user_data
and n_rhs_evals
are
initialized here because they are not necessarily initialized later):
class RK4FixedStepIntegrator(IVPInterface):
def __init__(self):
self.y: np.ndarray
self.t: float
self.rhs: Callable
self.user_data = None
self.n_rhs_evals = 0 # Number of right-hand side evaluations
# Auxiliary arrays for RK4
self.k1: np.ndarray
self.k2: np.ndarray
self.k3: np.ndarray
self.k4: np.ndarray
Next, we need to store the initial value and time in the set_initial_value
method:
def set_initial_value(self, y0: np.ndarray, t0: float) -> Union[int, None]:
self.y = y0
self.t = t0
self.k1 = np.zeros_like(y0)
self.k2 = np.zeros_like(y0)
self.k3 = np.zeros_like(y0)
self.k4 = np.zeros_like(y0)
where we also allocate auxiliary arrays for the RK4 method.
Then we set the right-hand side function in the set_rhs_fn
method:
def set_rhs_fn(self, rhs: Callable) -> Union[int, None]:
self.rhs = rhs
and any user data that the user wants to pass to the right-hand side function
in the set_user_data
method:
def set_user_data(self, user_data: object) -> Union[int, None]:
self.user_data = user_data
Because we are using fixed time step and our implementation implements
only one algorithm (integrator), we can keep the set_tolerances
and
set_integrator
methods empty:
def set_tolerances(self, rtol: float, atol: float) -> Union[int, None]:
pass
def set_integrator(
self, integrator_name: str, integrator_params: Dict
) -> Union[int, None]:
pass
Otherwise, we would need to store the tolerances and integrator parameters as properties (as we do with the initial value, right-hand side function, and user data, and reinitialize the integrator accordingly if needed.
At last, we implement the integrate
method using the
Eq. 2.1.1 and Eq. 2.1.2:
def integrate(self, t: float, y: np.ndarray) -> Union[int, None]:
h = t - self.t # Time step
self.rhs(self.t, self.y, self.k1[:], self.user_data)
self.rhs(self.t + h / 2, self.y + h / 2 * self.k1, self.k2[:], self.user_data)
self.rhs(self.t + h / 2, self.y + h / 2 * self.k2, self.k3[:], self.user_data)
self.rhs(self.t + h, self.y + h * self.k3, self.k4[:], self.user_data)
self.n_rhs_evals += 4 # We evaluated the right-hand side four times
y[:] = self.y + h / 6 * (self.k1 + 2 * self.k2 + 2 * self.k3 + self.k4)
self.t = t
self.y = y # Write the solution to the output array
Note that we also update the number of right-hand side evaluations
in each call to the integrate
method.
Besides, we write into the output array y
the computed solution
and keep a link to it in self.y
for the next call to integrate
.
Additionally, we use internal (implementation-dependent) arrays k1
, k2
,
k3
, and k4
to store the auxiliary values defined by the logic
of the RK4 method.
Finally, we implement the print_stats
method to print the number of
right-hand side evaluations:
def print_stats(self):
print(f"Number of right-hand side evaluations: {self.n_rhs_evals}")
2.1.1. Make the implementation available¶
We want to make our implementation discoverable by Open Interfaces. To do that, we need to do two things:
Choose the name for the implementation (here, we choose
rk4_example
)Put the implementation in a folder hierarchy somewhere on disk: for example,
$HOME/oif_impl/ivp/our_rk4/RK4.py
,Create a file named
rk4_example.conf
(its basename is the same as the implementation name) in the$HOME/oif_impl/ivp/rk4_example/
with the following content:python ivp.rk4_example.rk4 RK4FixedStepIntegrator
In this file, the first line specifies the language of the implementation while the second line specifies the module to import and the class name that will be instantiated by Open Interfaces.
Set the environment variable
OIF_IMPL_PATH
to point to$HOME/oif_impl
:export OIF_IMPL_PATH=$HOME/oif_impl:$OIF_IMPL_PATH
Because we put our implementation in a non-standard location, we also need to set the
PYTHONPATH
environment variable to include$HOME
(so that Python can find our implementation):export PYTHONPATH=$HOME/oif_impl:$PYTHONPATH
so that
import ivp.rk4_example.rk4
works correctly (it will be done automatically by Open Interfaces for us).
Now we are ready to use our implementation.
2.1.2. Usage example¶
To test how our implementation works, we can solve a simple ODE \(y' = -y\), \(y(0) = 1\) with the exact solution \(y(t) = e^{-t}\).
We start with imports:
import numpy as np
from openinterfaces.interfaces.ivp import IVP
Then, we define the right-hand side function:
def rhs(t, y, dydt, user_data):
dydt[:] = -y
return 0
where return 0
indicates success.
Then we instantiate the Gateway component IVP
and pass the name of the desired implementation to it:
solver = IVP("rk4_example")
and set the initial value, right-hand side function, and user data (if any):
solver.set_initial_value(np.array([1.0]), 0.0)
solver.set_rhs_fn(rhs)
And then we can integrate to a desired time using relatively small time steps (recall that our implementation uses fixed time step) and print the numerical and exact solutions at each time point:
times = np.linspace(0, 1, 11)
for t in times[1:]:
y = np.zeros(1)
solver.integrate(t, y)
print(f"t = {t:.2f}, y = {y[0]:.4f}, exact = {np.exp(-t):.4f}")
Finally, we can print the statistics of the integration:
solver.print_stats()
The complete code of the usage example is as follows:
import numpy as np
from openinterfaces.interfaces.ivp import IVP
def rhs(t, y, dydt, user_data):
dydt[:] = -y
return 0
def main():
solver = IVP("rk4_example")
solver.set_initial_value(np.array([1.0]), 0.0)
solver.set_rhs_fn(rhs)
solver.set_user_data(None)
times = np.linspace(0, 1, 11)
for t in times[1:]:
solver.integrate(t)
print(f"t = {t:.2f}, y = {solver.y[0]:.4f}, exact = {np.exp(-t):.4f}")
solver.print_stats()
if __name__ == "__main__":
main()
with output:
[dispatch] Configuration file: /home/user/oif_impl/ivp/rk4_example/rk4_example.conf
[dispatch] Backend name: python
[dispatch] Implementation details: 'ivp.rk4_example.rk4fixedstep
RK4FixedStepIntegrator'
[bridge_python] Backend is already initialized
[bridge_python] libpython path: /home/user/sw/miniforge3/envs/um02-open-interfaces/lib
[bridge_python] Loading libpython3.13.so
[dispatch_python] /home/user/sw/miniforge3/envs/um02-open-interfaces/bin/python
[dispatch_python] 3.13.2 | packaged by conda-forge | (main, Feb 17 2025, 14:10:22) [GCC 13.3.0]
[dispatch_python] NumPy version: 2.2.4
[bridge_python] Provided module name: 'ivp.rk4_example.rk4fixedstep'
[bridge_python] Provided class name: 'RK4FixedStepIntegrator'
t = 0.10, y = 0.9048, exact = 0.9048
t = 0.20, y = 0.8187, exact = 0.8187
t = 0.30, y = 0.7408, exact = 0.7408
t = 0.40, y = 0.6703, exact = 0.6703
t = 0.50, y = 0.6065, exact = 0.6065
t = 0.60, y = 0.5488, exact = 0.5488
t = 0.70, y = 0.4966, exact = 0.4966
t = 0.80, y = 0.4493, exact = 0.4493
t = 0.90, y = 0.4066, exact = 0.4066
t = 1.00, y = 0.3679, exact = 0.3679
Number of right-hand side evaluations: 40