Coupling 2D FEM transient EM model with 0D circuit equation in DOLFINx

Dear @dokken and all,

I’m working with DOLFINx 0.9 to simulate the time evolution of eddy currents in a 2D axisymmetric electromagnetic model. The model is driven by a decaying toroidal current, and the goal is to evaluate how currents are induced in passive conductors (e.g., copper coils).

I have successfully implemented the transient formulation using:

  • CG1 for the poloidal magnetic flux \Psi(r, z)
  • DG0 for material properties \mu, \sigma

The governing equation is:

\nabla \cdot \left( \frac{1}{2\pi r} \mu^{-1} \nabla \Psi \right) + \frac{\sigma}{2\pi r} \frac{\partial \Psi}{\partial t} = J_\phi

In the attached example:

  • The source current J_\phi is applied over a small circular region (wire)
  • Two passive rectangular copper coils (passive1, passive2) experience induced currents via the \sigma \frac{\partial \Psi}{\partial t} term
  • I use Expression and assemble_scalar to compute the total induced current in each region at every time step

Goal

I would now like to upgrade the model by explicitly introducing electrical constraints among passive components, modeled as 0D circuits. Specifically, I want to couple each group of passive conductors (defined by physical groups) to a 0D Faraday’s law:

L \frac{di}{dt} + R i = V_0 - \frac{d \bar{\Psi}}{dt}

Where:

  • i(t): total current in the passive circuit
  • \bar{\Psi}: average poloidal flux over the materials of the circuit
  • L, R: known inductance and resistance values
  • V_0: external driving voltage (assumed 0 for now)

In the current example, I would like to connect the two passive coils in series so as to have the same Amps of eddy currents on both conductors.


Objectives

  • Define one or more circuits, each composed of a subset of physical groups
  • Couple the 2D FEM problem with ODEs for these circuits
  • Ensure the flux consistency between the average magnetic flux and the circuit equation
  • Obtain a numerically stable implementation, suitable for use inside nonlinear iterations (e.g., Picard iterations)

Questions

  1. How would you suggest implementing this FEM–circuit coupling in DOLFINx?
  2. Is it feasible to represent the 0D current(s) as additional unknown(s) in the same linear system? (e.g. using a mixed formulation or block system)
  3. Are there any recommended strategies to ensure numerical stability and scalability of this approach?

Any insights, references, or example structures would be extremely appreciated.
Thanks in advance for your help!


import numpy as np
import gmsh
from mpi4py import MPI
from dolfinx import default_scalar_type
from dolfinx.io.gmshio import model_to_mesh
from dolfinx.fem import (
    dirichletbc,
    Expression,
    Function,
    functionspace,
    locate_dofs_topological,
    form,
    assemble_scalar,
)
from dolfinx.fem.petsc import (
    apply_lifting,
    assemble_matrix,
    assemble_vector,
    create_vector,
    set_bc,
)
from dolfinx.mesh import locate_entities_boundary
from ufl import (
    TestFunction,
    TrialFunction,
    dot,
    dx,
    grad,
    SpatialCoordinate,
    Measure,
)
from petsc4py import PETSc
import matplotlib.pyplot as plt

model_rank = 0
mesh_comm = MPI.COMM_WORLD

# Geometry parameters
R = 10
r_wire = 0.005
rw, zw = 0.5, 0.0
r_cu1_offset = -0.1
z_cu1_offset = 0.5
r_cu2_offset = -0.05
z_cu2_offset = 0.25
h_cu = 0.2
w_cu = 0.2

lc_air = 1
lc_wire = 0.001
lc_cu = 0.25

# Tags
tag_air = 1
tag_wire = 2
tag_passive1 = 3
tag_passive2 = 4

rho_cu = 1.68e-8
sigma_cu = 1 / rho_cu

Iw0 = 1e6
deltat = 0.05
dt = 1e-3


def Iw(t):
    return Iw0 * (1 - t / deltat)


def integrate_f(f, mesh, ct=None, tag=None) -> float:
    dx_obj = dx if ct is None else Measure("dx", subdomain_data=ct, domain=mesh)
    return (
        assemble_scalar(form(f * dx_obj))
        if tag is None
        else assemble_scalar(form(f * dx_obj(tag)))
    )


Aw = r_wire * np.pi**2

gmsh.initialize()
gmsh.model.add("Transient with circuits")

# --- Air region (half disk + rectangle to close D shape) ---
p_center = gmsh.model.occ.addPoint(0.0, 0.0, 0, lc_air)
p_top = gmsh.model.occ.addPoint(0.0, R, 0, lc_air)
p_right = gmsh.model.occ.addPoint(R, 0.0, 0, lc_air)
p_bottom = gmsh.model.occ.addPoint(0.0, -R, 0, lc_air)
arc1 = gmsh.model.occ.addCircleArc(p_top, p_center, p_right)
arc2 = gmsh.model.occ.addCircleArc(p_right, p_center, p_bottom)
line1 = gmsh.model.occ.addLine(p_bottom, p_top)
loop_air = gmsh.model.occ.addCurveLoop([arc1, arc2, line1])
gmsh.model.occ.synchronize()

# --- Circular wire (full disk) ---
wire = gmsh.model.occ.addDisk(rw, zw, 0.0, r_wire, r_wire, tag=tag_wire)
gmsh.model.occ.synchronize()

# --- Passive coil 1 (rectangle) ---
z1_bot = zw + z_cu1_offset - h_cu / 2
r1_left = rw + r_cu1_offset - w_cu / 2
passive1 = gmsh.model.occ.addRectangle(
    r1_left, z1_bot, 0.0, w_cu, h_cu, tag=tag_passive1
)
gmsh.model.occ.synchronize()

# --- Passive coil 2 (rectangle) ---
z2_bot = zw - z_cu2_offset - h_cu / 2
r2_left = rw + r_cu2_offset - w_cu / 2
passive2 = gmsh.model.occ.addRectangle(
    r2_left, z2_bot, 0.0, w_cu, h_cu, tag=tag_passive2
)

air = gmsh.model.occ.addPlaneSurface([loop_air, wire, passive1, passive2], tag=tag_air)
gmsh.model.occ.synchronize()

# --- Physical surfaces ---
gmsh.model.addPhysicalGroup(2, [air], tag_air)
gmsh.model.setPhysicalName(2, tag_air, "Air")
gmsh.model.addPhysicalGroup(2, [wire], tag_wire)
gmsh.model.setPhysicalName(2, tag_wire, "Wire")
gmsh.model.addPhysicalGroup(2, [passive1], tag_passive1)
gmsh.model.setPhysicalName(2, tag_passive1, "Passive1")
gmsh.model.addPhysicalGroup(2, [passive2], tag_passive2)
gmsh.model.setPhysicalName(2, tag_passive2, "Passive2")

# --- Mesh options ---
# Mesh size control options

# Gather all refined surfaces
refined_surfaces = [(2, wire), (2, passive1), (2, passive2)]

# Get boundary edges of the refined surfaces
gmsh.model.occ.synchronize()
edges = gmsh.model.getBoundary(refined_surfaces, oriented=False)

# Field 1: distance from these edges
gmsh.model.mesh.field.add("Distance", 1)
gmsh.model.mesh.field.setNumbers(1, "EdgesList", [e[1] for e in edges])

# Field 2: threshold-based size depending on distance
gmsh.model.mesh.field.add("Threshold", 2)
gmsh.model.mesh.field.setNumber(2, "IField", 1)
gmsh.model.mesh.field.setNumber(2, "LcMin", 0.1)  # finest near coils
gmsh.model.mesh.field.setNumber(2, "LcMax", 1.0)  # coarser away
gmsh.model.mesh.field.setNumber(2, "DistMin", 0.01)  # radius within which it's LcMin
gmsh.model.mesh.field.setNumber(2, "DistMax", 0.2)  # after which it's LcMax

# Combine (you can add more fields if needed)
gmsh.model.mesh.field.add("Min", 5)
gmsh.model.mesh.field.setNumbers(5, "FieldsList", [2])
gmsh.model.mesh.field.setAsBackgroundMesh(5)

gmsh.option.setNumber(
    "Mesh.CharacteristicLengthFactor", 0.25
)  # Global scaling factor for all mesh sizes (refines everything)
gmsh.option.setNumber(
    "Mesh.MeshSizeFromPoints", 1
)  # Use lc values provided at each Point definition
gmsh.option.setNumber("Mesh.MeshSizeExtendFromBoundary", 2)  # Smoo
gmsh.model.mesh.generate(2)
gmsh.fltk.run()  # uncomment to preview mesh

# --- Convert to DolfinX
mesh, ct, _ = model_to_mesh(gmsh.model, MPI.COMM_WORLD, 0, gdim=2)
gmsh.finalize()

# %%

Q = functionspace(mesh, ("DG", 0))
mu = Function(Q)
J = Function(Q)
sigma = Function(Q)
J.x.array[:] = 0
mu.x.array[:] = 4 * np.pi * 1e-7

passive1_cells = ct.find(tag_passive1)
passive2_cells = ct.find(tag_passive2)
sigma.x.array[passive1_cells] = sigma_cu
sigma.x.array[passive2_cells] = sigma_cu
wire_cells = ct.find(tag_wire)

V = functionspace(mesh, ("Lagrange", 1))
tdim = mesh.topology.dim
facets = locate_entities_boundary(mesh, tdim - 1, lambda x: np.full(x.shape[1], True))
dofs = locate_dofs_topological(V, tdim - 1, facets)
bc = dirichletbc(default_scalar_type(0), dofs, V)

Psi = Function(V)
Psi_old = Function(V)

u = TrialFunction(V)
v = TestFunction(V)

x = SpatialCoordinate(mesh)
a = (1 / mu) * (1 / (2 * np.pi * x[0])) * dot(grad(u), grad(v)) * dx + (
    1 / (2 * np.pi * x[0])
) * (sigma / dt) * u * v * dx

L = J * v * dx + (1 / (2 * np.pi * x[0])) * (sigma / dt) * Psi_old * v * dx

print("Assembling A matrix....")
a_form = form(a)
A = assemble_matrix(a_form, bcs=[bc])
A.assemble()
print("A matrix assembled")

print("Creating solver....")
solver = PETSc.KSP().create(mesh.comm)
solver.setOperators(A)
solver.setType(PETSc.KSP.Type.CG)
solver.getPC().setType(PETSc.PC.Type.SOR)
print("Solver created")

L_form = form(L)

Je_expr = Expression(
    (-sigma / (2 * np.pi * x[0]) * (Psi - Psi_old) / dt),
    Q.element.interpolation_points(),
)
Je = Function(Q)

t = np.arange(0.0, deltat, dt)

Ie = np.zeros(shape=(len(t), 2), dtype=default_scalar_type)

for i, ti in enumerate(t):
    print(f"****** t = {ti:.4f} s ******")
    J.x.array[wire_cells] = np.full_like(
        wire_cells, Iw(ti) / Aw, dtype=default_scalar_type
    )

    print("Assembling b vector....")
    b = create_vector(L_form)
    assemble_vector(b, L_form)
    apply_lifting(b, [a_form], [[bc]])
    b.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
    set_bc(b, [bc])
    print("b vector assembled")

    print("Solving...")
    solver.solve(b, Psi.x.petsc_vec)
    Psi.x.petsc_vec.ghostUpdate(
        addv=PETSc.InsertMode.INSERT_VALUES, mode=PETSc.ScatterMode.FORWARD
    )
    print("Done")

    Je.interpolate(Je_expr)
    for j, tag in enumerate([passive1, passive2]):
        Ie[i, j] = integrate_f(Je, mesh, ct, tag)

    Psi_old.x.petsc_vec.copy(Psi.x.petsc_vec)

# %%

fig, ax1 = plt.subplots()

color1 = "tab:blue"
ax1.set_xlabel("Time [s]")
ax1.set_ylabel("Driving current $I_w(t)$ [A]", color=color1)
line1, = ax1.plot(t, Iw(t), color=color1, label="$I_w(t)$")
ax1.tick_params(axis="y", labelcolor=color1)

ax2 = ax1.twinx()
color2 = "tab:red"
line2, = ax2.plot(t, Ie[:, 0], color="tab:red", linestyle="--", label="$I_{e1}(t)$")
line3, = ax2.plot(t, Ie[:, 1], color="tab:green", linestyle=":", label="$I_{e2}(t)$")
ax2.set_ylabel("Induced currents $I_e(t)$ [A]", color=color2)
ax2.tick_params(axis="y", labelcolor=color2)

lines = [line1, line2, line3]
labels = [line.get_label() for line in lines]
ax1.legend(lines, labels, loc="upper right")

plt.title("Driving and Induced Currents Over Time")
plt.grid(True)
plt.tight_layout()
plt.show()

I did something similar a number of years ago, using a linear FEM that would certainly be implementable in FEniCSx. Have a look at this paper for the formulation and implementation details. Eddy currents, non-linear magnetization and moving parts are modelled in the time domain.

2 Likes

We had a master student that recently worked on a similar PDE-ODE coupled model (Monodomain model for cardiac electrophysiology) and used dolfinx-external-operator for this. His code is found here: masters_project/demos_monolithic/external_operator.py at main · aledha/masters_project · GitHub

Perhaps @a-latyshev has any additional tips for how to make it work.

Otherwise, I also know that a similar problem is solved in ambit for example when coupling a 3D mechanics model to a 0D model of the cardiac circulation system (see also Ambit – A FEniCS-based cardiovascular multi-physics solver — ambit 1.1.3 documentation)

3 Likes

Hi @BillS and @Henrik_Nicolay_Finsb,

Thank you both very much for your valuable insights and references — they’re exactly what I needed. The ambit example and the use of dolfinx-external-operator in monolithic coupling are very relevant to the type of PDE-ODE interaction I’m working on (eddy currents and circuit dynamics). I’ll be diving into those codes shortly.

Regarding the suggestion about @a-latyshev, would there be a recommended way to get in touch or follow up with him? I’d really appreciate hearing any thoughts he might have about making such coupling robust in practice.

Thanks again!

Hi @mnotazio :waving_hand:,

I am here :slight_smile:

I don’t have additional tips beyond what @Henrik_Nicolay_Finsb wrote. To me, the master project mentioned above has all the pieces you need. In particular, in this function f_impl, they solve ODE and then use it to evaluate the external operators that are used in the main variational form.

External operator is an interface that facilitates the work with the variational forms, where some parts are defined externally with respect to the main problem.

I’d suggest giving it a shot first and then coming back here if you get stuck at some point with a particular problem.

We have detailed tutorials on using external operators. Check it out to get a general idea: dolfinx-external-operator — External operators within FEniCSx/DOLFINx

1 Like

Hi @a-latyshev, @Henrik_Nicolay_Finsb and all,

Thanks for the guidance so far. As suggested by you, I’ve been following the structure from the masters_project you mentioned, especially the use of f_impl to evolve the ODE and return new state values to be used via FEMExternalOperator.

As mentioned before, I’m trying to couple a transient PDE (magneto-quasi-static axisymmetric problem for Ψ) with a 0D circuit equation:

L \frac{di}{dt} + Ri = - \frac{d\overline{\Psi}}{dt}

The current i(t) is updated using a forward Euler step inside a f_impl-like function, and the updated value is inserted into the variational form (as an induced current source in the passive regions).

Everything should be structured as in the tutorials:

  • I return np.array([i_new]) from the external operator
  • I use FEMExternalOperator(Psi, i_old, function_space=V0, external_function=f_external)
  • I do not use split(...)
  • I call replace_external_operators(L) before compiling the form

But when I try to compile the RHS with form(L_updated), I get:

AssertionError
File ".../dolfinx/fem/forms.py", line 249, in _form
    assert all([d is data[0] for d in data])

:magnifying_glass_tilted_left: Questions

  • What do you think the above error is due to?
  • Is the quadrature_element defined correctly for this type of problem? How do I have to choose the value_shape and quad_degree?
  • Am I handling the i and i_old functions correctly during the time loop? Are they initialized in the proper manner?

Thanks in advance for your time and sorry for probably silly questions.
Below the example code is attached.


:puzzle_piece: Example

import numpy as np
import gmsh
from mpi4py import MPI
from dolfinx import default_scalar_type
from dolfinx.io.gmshio import model_to_mesh
from dolfinx.fem import (
    dirichletbc,
    Function,
    functionspace,
    locate_dofs_topological,
    form,
    assemble_scalar,
)
from dolfinx.fem.petsc import assemble_vector, LinearProblem
from dolfinx.mesh import locate_entities_boundary
from ufl import (
    TestFunction,
    TrialFunction,
    dot,
    dx,
    grad,
    SpatialCoordinate,
    Measure,
    split,
)
from basix.ufl import quadrature_element
from petsc4py import PETSc
import matplotlib.pyplot as plt

from dolfinx_external_operator import (
    FEMExternalOperator,
    replace_external_operators,
    evaluate_operands,
    evaluate_external_operators,
)

model_rank = 0
mesh_comm = MPI.COMM_WORLD

# Geometry parameters
R = 10
r_wire = 0.005
rw, zw = 0.5, 0.0
r_cu1_offset = -0.1
z_cu1_offset = 0.5
r_cu2_offset = -0.05
z_cu2_offset = 0.25
h_cu = 0.2
w_cu = 0.2

lc_air = 1
lc_wire = 0.001
lc_cu = 0.25

# Tags
tag_air = 1
tag_wire = 2
tag_passive1 = 3
tag_passive2 = 4

rho_cu = 1.68e-8
sigma_cu = 1 / rho_cu

Iw0 = 1e6
deltat = 0.05
dt = 1e-3


def Iw(t):
    return Iw0 * (1 - t / deltat)


def integrate_f(f, mesh, ct=None, tag=None) -> float:
    dx_obj = dx if ct is None else Measure("dx", subdomain_data=ct, domain=mesh)
    return (
        assemble_scalar(form(f * dx_obj))
        if tag is None
        else assemble_scalar(form(f * dx_obj(tag)))
    )


Aw = r_wire * np.pi**2

gmsh.initialize()
gmsh.model.add("Transient with circuits")

# --- Air region (half disk + rectangle to close D shape) ---
p_center = gmsh.model.occ.addPoint(0.0, 0.0, 0, lc_air)
p_top = gmsh.model.occ.addPoint(0.0, R, 0, lc_air)
p_right = gmsh.model.occ.addPoint(R, 0.0, 0, lc_air)
p_bottom = gmsh.model.occ.addPoint(0.0, -R, 0, lc_air)
arc1 = gmsh.model.occ.addCircleArc(p_top, p_center, p_right)
arc2 = gmsh.model.occ.addCircleArc(p_right, p_center, p_bottom)
line1 = gmsh.model.occ.addLine(p_bottom, p_top)
loop_air = gmsh.model.occ.addCurveLoop([arc1, arc2, line1])
gmsh.model.occ.synchronize()

# --- Circular wire (full disk) ---
wire = gmsh.model.occ.addDisk(rw, zw, 0.0, r_wire, r_wire, tag=tag_wire)
gmsh.model.occ.synchronize()

# --- Passive coil 1 (rectangle) ---
z1_bot = zw + z_cu1_offset - h_cu / 2
r1_left = rw + r_cu1_offset - w_cu / 2
passive1 = gmsh.model.occ.addRectangle(
    r1_left, z1_bot, 0.0, w_cu, h_cu, tag=tag_passive1
)
gmsh.model.occ.synchronize()

# --- Passive coil 2 (rectangle) ---
z2_bot = zw - z_cu2_offset - h_cu / 2
r2_left = rw + r_cu2_offset - w_cu / 2
passive2 = gmsh.model.occ.addRectangle(
    r2_left, z2_bot, 0.0, w_cu, h_cu, tag=tag_passive2
)

air = gmsh.model.occ.addPlaneSurface([loop_air, wire, passive1, passive2], tag=tag_air)
gmsh.model.occ.synchronize()

# --- Physical surfaces ---
gmsh.model.addPhysicalGroup(2, [air], tag_air)
gmsh.model.setPhysicalName(2, tag_air, "Air")
gmsh.model.addPhysicalGroup(2, [wire], tag_wire)
gmsh.model.setPhysicalName(2, tag_wire, "Wire")
gmsh.model.addPhysicalGroup(2, [passive1], tag_passive1)
gmsh.model.setPhysicalName(2, tag_passive1, "Passive1")
gmsh.model.addPhysicalGroup(2, [passive2], tag_passive2)
gmsh.model.setPhysicalName(2, tag_passive2, "Passive2")

# --- Mesh options ---
# Mesh size control options

# Gather all refined surfaces
refined_surfaces = [(2, wire), (2, passive1), (2, passive2)]

# Get boundary edges of the refined surfaces
gmsh.model.occ.synchronize()
edges = gmsh.model.getBoundary(refined_surfaces, oriented=False)

# Field 1: distance from these edges
gmsh.model.mesh.field.add("Distance", 1)
gmsh.model.mesh.field.setNumbers(1, "EdgesList", [e[1] for e in edges])

# Field 2: threshold-based size depending on distance
gmsh.model.mesh.field.add("Threshold", 2)
gmsh.model.mesh.field.setNumber(2, "IField", 1)
gmsh.model.mesh.field.setNumber(2, "LcMin", 0.1)  # finest near coils
gmsh.model.mesh.field.setNumber(2, "LcMax", 1.0)  # coarser away
gmsh.model.mesh.field.setNumber(2, "DistMin", 0.01)  # radius within which it's LcMin
gmsh.model.mesh.field.setNumber(2, "DistMax", 0.2)  # after which it's LcMax

# Combine (you can add more fields if needed)
gmsh.model.mesh.field.add("Min", 5)
gmsh.model.mesh.field.setNumbers(5, "FieldsList", [2])
gmsh.model.mesh.field.setAsBackgroundMesh(5)

gmsh.option.setNumber(
    "Mesh.CharacteristicLengthFactor", 0.25
)  # Global scaling factor for all mesh sizes (refines everything)
gmsh.option.setNumber(
    "Mesh.MeshSizeFromPoints", 1
)  # Use lc values provided at each Point definition
gmsh.option.setNumber("Mesh.MeshSizeExtendFromBoundary", 2)  # Smoo
gmsh.model.mesh.generate(2)
# gmsh.fltk.run()  # uncomment to preview mesh

# --- Convert to DolfinX
mesh, ct, _ = model_to_mesh(gmsh.model, MPI.COMM_WORLD, 0, gdim=2)
gmsh.finalize()

# %%

Q = functionspace(mesh, ("DG", 0))
mu = Function(Q)
J_wire = Function(Q)
sigma = Function(Q)
J_wire.x.array[:] = 0
mu.x.array[:] = 4 * np.pi * 1e-7

passive1_cells = ct.find(tag_passive1)
passive2_cells = ct.find(tag_passive2)
sigma.x.array[passive1_cells] = sigma_cu
sigma.x.array[passive2_cells] = sigma_cu
wire_cells = ct.find(tag_wire)

V = functionspace(mesh, ("Lagrange", 1))
tdim = mesh.topology.dim
facets = locate_entities_boundary(mesh, tdim - 1, lambda x: np.full(x.shape[1], True))
dofs = locate_dofs_topological(V, tdim - 1, facets)
bc = dirichletbc(default_scalar_type(0), dofs, V)

Psi = Function(V)
Psi_old = Function(V)

mesh.topology.create_connectivity(tdim, tdim)
passive1_dofs = locate_dofs_topological(V, tdim, ct.find(tag_passive1))
passive2_dofs = locate_dofs_topological(V, tdim, ct.find(tag_passive2))

u = TrialFunction(V)
v = TestFunction(V)

# %%

quad_degree = 1  # is it correct for this type of problem?
value_shape = ()  # scalar
V0_el = quadrature_element(
    cell=mesh.ufl_cell().cellname(),
    degree=quad_degree,
    value_shape=value_shape,
    scheme="default",
)
V0 = functionspace(mesh, V0_el)

i_old = Function(V0)
i_old.x.array[:] = 0  # assume no eddy currents at t = 0

# Grover's formula as approximation
r = 1e-3
k = 0.5
L1 = (
    4
    * np.pi
    * 1e-7
    * (
        w_cu * np.log((2 * w_cu + np.sqrt(w_cu**2 + h_cu**2)) / r)
        + h_cu * np.log((2 * h_cu + np.sqrt(w_cu**2 + h_cu**2)) / r)
        - 2 * np.sqrt(w_cu**2 + h_cu**2)
        + k
    )
)
L2 = L1
L = L1 + L2

# Ohm's law
R1 = rho_cu * (2 * np.pi * (rw + r_cu1_offset)) / Aw
R2 = rho_cu * (2 * np.pi * (rw + r_cu2_offset)) / Aw
R = R1 + R2


def faraday_law():
    """
    Update the current in a passive circuit using Faraday's law.

    This function computes the updated current i(t+dt) in a passive conductor loop,
    based on the average time derivative of the poloidal flux function Psi across 
    two copper regions. It applies a forward Euler step to the circuit equation:

        L * di/dt + R * i = -dPsi/dt

    where the electromotive force (EMF) is approximated from the spatial mean of ∂Psi/∂t.

    Returns
    -------
    np.ndarray
        Array of shape (1,) containing the updated current value at the next time step.
    """
    dPsi_dt = (Psi.x.array[:] - Psi_old.x.array[:]) / dt
    mean1 = dPsi_dt[passive1_dofs].mean()
    mean2 = dPsi_dt[passive2_dofs].mean()
    mean_avg = 0.5 * (mean1 + mean2)
    i_old_val = i_old.x.array[0]
    di_dt = -(mean_avg + R * i_old) / L

    return np.array([i_old_val + dt * di_dt])


def f_external(derivatives):
    if derivatives == (0, 0):
        return faraday_law
    else:
        raise NotImplementedError


i_old_array = np.array([0.0])

circuit_ext = FEMExternalOperator(
    Psi, i_old, function_space=V0, external_function=f_external
)
# i = split(circuit_ext)[0]
i = i_old

dx = Measure("dx", domain=mesh, metadata={"quadrature_degree": quad_degree})
dx_passive1 = dx(subdomain_data=ct, subdomain_id=tag_passive1)
dx_passive2 = dx(subdomain_data=ct, subdomain_id=tag_passive2)
area1 = assemble_scalar(form(1.0 * dx_passive1))
area2 = assemble_scalar(form(1.0 * dx_passive2))
x = SpatialCoordinate(mesh)

a = (1 / mu) * (2 * np.pi * x[0]) * dot(grad(u), grad(v)) * dx + (2 * np.pi * x[0]) * (
    sigma / dt
) * u * v * dx
L = J_wire * v * dx + (1 / (2 * np.pi * x[0])) * (sigma / dt) * Psi_old * v * dx
# apply 0D currents on passive elements as additional J_phi
L += (2 * np.pi * x[0]) * (i / area1) * v * dx_passive1
L += (2 * np.pi * x[0]) * (i / area2) * v * dx_passive2  # series connection

L_updated, operators = replace_external_operators(L)
L_compiled = form(L_updated)

num_steps = int(deltat / dt)
t = 0.0

problem = LinearProblem(a, L_compiled, u=Psi, bcs=[bc])

for n in range(num_steps):
    operands = evaluate_operands(operators)
    i_old.x.petsc_vec[:] = evaluate_external_operators(operators, operands)

    Psi.x.array[:] = 0.0
    problem.A.assemble()

    J_wire.x.array[wire_cells] = Iw(t) / Aw
    with problem.b.localForm() as b_local:
        b_local.set(0)
    assemble_vector(problem.b, problem.L)
    problem.b.ghostUpdate(addv=PETSc.InsertMode.ADD, mode=PETSc.ScatterMode.REVERSE)
    
    problem.solver.solve(problem.b, Psi.x.petsc_vec)
    Psi.x.scatter_forward()

    Psi_old.x.array[:] = Psi.x.array[:]
    t += dt

Update

Dear @a-latyshev, @Henrik_Nicolay_Finsb, @dokken and all,

Seems I managed to solve the error

AssertionError
File ".../dolfinx/fem/forms.py", line 249, in _form
    assert all([d is data[0] for d in data])

by specifying subdomain_data for dx:

dx = Measure(
    "dx", domain=mesh, subdomain_data=ct, metadata={"quadrature_degree": quad_degree}
)

Now the code runs but seems to not evaluate the solution of the FEM problem because Psi function is always 0.0 at each DOF for each timestep, even though J_wire is being updated at each iteration,

Below the example code is attached. Thanks again for your time and support.


import numpy as np
import gmsh
from mpi4py import MPI
from dolfinx import default_scalar_type
from dolfinx.io.gmshio import model_to_mesh
from dolfinx.fem import (
    dirichletbc,
    Function,
    functionspace,
    locate_dofs_topological,
    form,
    assemble_scalar,
)
from dolfinx.fem.petsc import assemble_vector, LinearProblem
from dolfinx.mesh import locate_entities_boundary
from ufl import (
    TestFunction,
    TrialFunction,
    dot,
    dx,
    grad,
    SpatialCoordinate,
    Measure,
    split,
)
from basix.ufl import quadrature_element
from petsc4py import PETSc

import matplotlib.pyplot as plt
import pyvista as pv
from dolfinx.plot import vtk_mesh

from dolfinx_external_operator import (
    FEMExternalOperator,
    replace_external_operators,
    evaluate_operands,
    evaluate_external_operators,
)

model_rank = 0
mesh_comm = MPI.COMM_WORLD

# Geometry parameters
R = 10
r_wire = 0.005
rw, zw = 0.5, 0.0
r_cu1_offset = -0.1
z_cu1_offset = 0.5
r_cu2_offset = -0.05
z_cu2_offset = 0.25
h_cu = 0.2
w_cu = 0.2

lc_air = 1
lc_wire = 0.001
lc_cu = 0.25

# Tags
tag_air = 1
tag_wire = 2
tag_passive1 = 3
tag_passive2 = 4

rho_cu = 1.68e-8
sigma_cu = 1 / rho_cu

Iw0 = 1e6
deltat = 0.05
dt = 1e-3


def Iw(t):
    return Iw0 * (1 - t / deltat)


def integrate_f(f, mesh, ct=None, tag=None) -> float:
    dx_obj = dx if ct is None else Measure("dx", subdomain_data=ct, domain=mesh)
    return (
        assemble_scalar(form(f * dx_obj))
        if tag is None
        else assemble_scalar(form(f * dx_obj(tag)))
    )


Aw = r_wire * np.pi**2

gmsh.initialize()
gmsh.model.add("Transient with circuits")

# --- Air region (half disk + rectangle to close D shape) ---
p_center = gmsh.model.occ.addPoint(0.0, 0.0, 0, lc_air)
p_top = gmsh.model.occ.addPoint(0.0, R, 0, lc_air)
p_right = gmsh.model.occ.addPoint(R, 0.0, 0, lc_air)
p_bottom = gmsh.model.occ.addPoint(0.0, -R, 0, lc_air)
arc1 = gmsh.model.occ.addCircleArc(p_top, p_center, p_right)
arc2 = gmsh.model.occ.addCircleArc(p_right, p_center, p_bottom)
line1 = gmsh.model.occ.addLine(p_bottom, p_top)
loop_air = gmsh.model.occ.addCurveLoop([arc1, arc2, line1])
gmsh.model.occ.synchronize()

# --- Circular wire (full disk) ---
wire = gmsh.model.occ.addDisk(rw, zw, 0.0, r_wire, r_wire, tag=tag_wire)
gmsh.model.occ.synchronize()

# --- Passive coil 1 (rectangle) ---
z1_bot = zw + z_cu1_offset - h_cu / 2
r1_left = rw + r_cu1_offset - w_cu / 2
passive1 = gmsh.model.occ.addRectangle(
    r1_left, z1_bot, 0.0, w_cu, h_cu, tag=tag_passive1
)
gmsh.model.occ.synchronize()

# --- Passive coil 2 (rectangle) ---
z2_bot = zw - z_cu2_offset - h_cu / 2
r2_left = rw + r_cu2_offset - w_cu / 2
passive2 = gmsh.model.occ.addRectangle(
    r2_left, z2_bot, 0.0, w_cu, h_cu, tag=tag_passive2
)

air = gmsh.model.occ.addPlaneSurface([loop_air, wire, passive1, passive2], tag=tag_air)
gmsh.model.occ.synchronize()

# --- Physical surfaces ---
gmsh.model.addPhysicalGroup(2, [air], tag_air)
gmsh.model.setPhysicalName(2, tag_air, "Air")
gmsh.model.addPhysicalGroup(2, [wire], tag_wire)
gmsh.model.setPhysicalName(2, tag_wire, "Wire")
gmsh.model.addPhysicalGroup(2, [passive1], tag_passive1)
gmsh.model.setPhysicalName(2, tag_passive1, "Passive1")
gmsh.model.addPhysicalGroup(2, [passive2], tag_passive2)
gmsh.model.setPhysicalName(2, tag_passive2, "Passive2")

# --- Mesh options ---
# Mesh size control options

# Gather all refined surfaces
refined_surfaces = [(2, wire), (2, passive1), (2, passive2)]

# Get boundary edges of the refined surfaces
gmsh.model.occ.synchronize()
edges = gmsh.model.getBoundary(refined_surfaces, oriented=False)

# Field 1: distance from these edges
gmsh.model.mesh.field.add("Distance", 1)
gmsh.model.mesh.field.setNumbers(1, "EdgesList", [e[1] for e in edges])

# Field 2: threshold-based size depending on distance
gmsh.model.mesh.field.add("Threshold", 2)
gmsh.model.mesh.field.setNumber(2, "IField", 1)
gmsh.model.mesh.field.setNumber(2, "LcMin", 0.1)  # finest near coils
gmsh.model.mesh.field.setNumber(2, "LcMax", 1.0)  # coarser away
gmsh.model.mesh.field.setNumber(2, "DistMin", 0.01)  # radius within which it's LcMin
gmsh.model.mesh.field.setNumber(2, "DistMax", 0.2)  # after which it's LcMax

# Combine (you can add more fields if needed)
gmsh.model.mesh.field.add("Min", 5)
gmsh.model.mesh.field.setNumbers(5, "FieldsList", [2])
gmsh.model.mesh.field.setAsBackgroundMesh(5)

gmsh.option.setNumber(
    "Mesh.CharacteristicLengthFactor", 0.25
)  # Global scaling factor for all mesh sizes (refines everything)
gmsh.option.setNumber(
    "Mesh.MeshSizeFromPoints", 1
)  # Use lc values provided at each Point definition
gmsh.option.setNumber("Mesh.MeshSizeExtendFromBoundary", 2)  # Smoo
gmsh.model.mesh.generate(2)
# gmsh.fltk.run()  # uncomment to preview mesh

# --- Convert to DolfinX
mesh, ct, _ = model_to_mesh(gmsh.model, MPI.COMM_WORLD, 0, gdim=2)
gmsh.finalize()

# %%

Q = functionspace(mesh, ("DG", 0))
mu = Function(Q)
J_wire = Function(Q)
sigma = Function(Q)
J_wire.x.array[:] = 0
mu.x.array[:] = 4 * np.pi * 1e-7

passive1_cells = ct.find(tag_passive1)
passive2_cells = ct.find(tag_passive2)
sigma.x.array[passive1_cells] = sigma_cu
sigma.x.array[passive2_cells] = sigma_cu
wire_cells = ct.find(tag_wire)

V = functionspace(mesh, ("Lagrange", 1))
tdim = mesh.topology.dim
facets = locate_entities_boundary(mesh, tdim - 1, lambda x: np.full(x.shape[1], True))
dofs = locate_dofs_topological(V, tdim - 1, facets)
bc = dirichletbc(default_scalar_type(0), dofs, V)

Psi = Function(V)
Psi_old = Function(V)

mesh.topology.create_connectivity(tdim, tdim)
passive1_dofs = locate_dofs_topological(V, tdim, ct.find(tag_passive1))
passive2_dofs = locate_dofs_topological(V, tdim, ct.find(tag_passive2))

u = TrialFunction(V)
v = TestFunction(V)

# %%

quad_degree = 1  # is it correct for this type of problem?
value_shape = (1,)  # scalar
V0_el = quadrature_element(
    cell=mesh.ufl_cell().cellname(),
    degree=quad_degree,
    value_shape=value_shape,
    scheme="default",
)
V0 = functionspace(mesh, V0_el)

i_old = Function(V0)
i_old.x.array[:] = 0  # assume no eddy currents at t = 0

# Grover's formula as approximation
r = 1e-3
k = 0.5
L1 = (
    4
    * np.pi
    * 1e-1
    * (
        w_cu * np.log((2 * w_cu + np.sqrt(w_cu**2 + h_cu**2)) / r)
        + h_cu * np.log((2 * h_cu + np.sqrt(w_cu**2 + h_cu**2)) / r)
        - 2 * np.sqrt(w_cu**2 + h_cu**2)
        + k
    )
)
L2 = L1
Leq = L1 + L2

# Ohm's law
R1 = rho_cu * (2 * np.pi * (rw + r_cu1_offset)) / Aw
R2 = rho_cu * (2 * np.pi * (rw + r_cu2_offset)) / Aw
Req = R1 + R2


def faraday_law(*args, **kwargs):
    """
    Update the current in a passive circuit using Faraday's law.

    This function computes the updated current i(t+dt) in a passive conductor loop,
    based on the average time derivative of the poloidal flux function Psi across
    two copper regions. It applies a forward Euler step to the circuit equation:

        L * di/dt + R * i = -dPsi/dt

    where the electromotive force (EMF) is approximated from the spatial mean of ∂Psi/∂t.

    Returns
    -------
    np.ndarray
        Array of shape (1,) containing the updated current value at the next time step.
    """
    dPsi_dt = (Psi.x.array[:] - Psi_old.x.array[:]) / dt
    mean1 = dPsi_dt[passive1_dofs].mean()
    mean2 = dPsi_dt[passive2_dofs].mean()
    emf = 0.5 * (mean1 + mean2)
    i_old_val = float(i_old.x.array[0])
    di_dt = -(emf + Req * i_old_val) / Leq
    new_i = i_old_val + dt * di_dt
    return np.array([new_i], dtype=np.float64)


def f_external(derivatives):
    if derivatives == (0, 0):
        return faraday_law
    else:
        raise NotImplementedError


circuit_ext = FEMExternalOperator(
    Psi, i_old, function_space=V0, external_function=f_external
)
i = split(circuit_ext)[0]

dx = Measure(
    "dx", domain=mesh, subdomain_data=ct, metadata={"quadrature_degree": quad_degree}
)
dx_passive1 = dx(subdomain_data=ct, subdomain_id=tag_passive1)
dx_passive2 = dx(subdomain_data=ct, subdomain_id=tag_passive2)
area1 = assemble_scalar(form(1.0 * dx_passive1))
area2 = assemble_scalar(form(1.0 * dx_passive2))
x = SpatialCoordinate(mesh)

a = (1 / mu) * (2 * np.pi * x[0]) * dot(grad(u), grad(v)) * dx + (2 * np.pi * x[0]) * (
    sigma / dt
) * u * v * dx
L = J_wire * v * dx + (1 / (2 * np.pi * x[0])) * (sigma / dt) * Psi_old * v * dx
# apply 0D currents on passive elements as additional J_phi
L += (2 * np.pi * x[0]) * (i / area1) * v * dx_passive1
L += (2 * np.pi * x[0]) * (i / area2) * v * dx_passive2  # series connection

L_updated, operators = replace_external_operators(L)
L_compiled = form(L_updated)

num_steps = int(deltat / dt)
t = 0.0

problem = LinearProblem(a, L_compiled, u=Psi, bcs=[bc],
                        petsc_options={"ksp_type": "cg", "pc_type": "jacobi"})

# grid = pv.UnstructuredGrid(*vtk_mesh(mesh, tdim))

time_vals = []
current_vals = []
for n in range(num_steps):
    operands = evaluate_operands(operators)
    # i_old.x.petsc_vec[:] = evaluate_external_operators(operators, operands)
    new_val = evaluate_external_operators(operators, operands)[0]
    i_old.x.array[:] = new_val

    J_wire.x.array[wire_cells] = Iw(t) / Aw

    # -----------------------------
    # plotter = pv.Plotter()
    # num_local_cells = mesh.topology.index_map(tdim).size_local
    # grid.cell_data["J_wire"] = J_wire.x.array[:]
    # grid.set_active_scalars("J_wire")
    # actor = plotter.add_mesh(grid, show_edges=True)
    # plotter.view_xy()
    # plotter.show(title=f"Current density J_wire, t = {t:.4f} s")
    # -----------------------------

    Psi.x.array[:] = 0.0
    problem.A.assemble()

    with problem.b.localForm() as b_local:
        b_local.set(0)
    assemble_vector(problem.b, problem.L)
    problem.b.ghostUpdate(addv=PETSc.InsertMode.ADD, mode=PETSc.ScatterMode.REVERSE)

    problem.solver.solve(problem.b, Psi.x.petsc_vec)
    Psi.x.scatter_forward()

    assert np.any(Psi.x.array[:] != 0), "Ψ null"

    Psi_old.x.array[:] = Psi.x.array[:]
    t += dt

    time_vals.append(t)
    current_vals.append(i_old.x.array[0])


plt.plot(time_vals, current_vals)
plt.xlabel("Time [s]")
plt.ylabel("Induced current [A]")
plt.title("Current induced in passive circuit")
plt.grid()
plt.tight_layout()
plt.show()

Principal remark: It seems that you use i like a constant in your formulation, why not use just fem.Constant instead of the external operator? In this case, you could call your function that computes i in the loop and then update its value.

If you really need external operators, here are my remarks on your implementation:

You wrote that your function faraday_law returns “Array of shape (1,) containing the updated current value at the next time step”, but it is not correct. The external operator exists in the function space, so it has values at all quadrature points. In addition, it depends on the operands Psi and i_old that are computed at quadrature points as well so faraday_law should receive their NumPy values. I modified your code accordingly:

def faraday_law_(Psi, Psi_old, i_old):
    """
    Update the current in a passive circuit using Faraday's law.

    This function computes the updated current i(t+dt) in a passive conductor loop,
    based on the average time derivative of the poloidal flux function Psi across
    two copper regions. It applies a forward Euler step to the circuit equation:

        L * di/dt + R * i = -dPsi/dt

    where the electromotive force (EMF) is approximated from the spatial mean of ∂Psi/∂t.

    Returns
    -------
    np.ndarray
        Array of size size = num_cells * num_quadrature_points * math_shape
    """
    # Pis, Psi_old, i_old - NumPy arrays of the size =
    # num_cells * num_quadrature_points * shape
    dPsi_dt = (Psi - Psi_old) / dt
    mean1 = dPsi_dt[passive1_dofs].mean()
    mean2 = dPsi_dt[passive2_dofs].mean()
    emf = 0.5 * (mean1 + mean2)
    i_old_val = i_old # constant or global vector?
    di_dt = -(emf + Req * i_old_val) / Leq
    new_i = i_old_val + dt * di_dt
    return new_i


def faraday_law(Psi_, i_old_):
    Psi_old_ = Psi_old.x.array # NumPy array
    new_i_= faraday_law_(Psi_, Psi_old_, i_old_)
    return new_i_.reshape(-1)

Now new_i is a NumPy array that has values in all quadrature points.

1 Like