Dear Community,
I am currently experiencing difficulties implementing a nonlinear Poisson equation using an HDG scheme with static condensation. I am essentially trying to combine the HDG tutorial (HDG scheme for the Poisson equation — DOLFINx 0.9.0 documentation) with the static condensation tutorial (Static condensation of linear elasticity — DOLFINx 0.9.0 documentation), while also implementing a Newton solver. My ultimate goal is to apply this same scheme to a compressible Euler HDG formulation.
I suspect there might be an issue with how my numba kernels are defined for static condensation, as my local form J[0][0] includes several integral types. I tried passing the tutorial example to my Newton solver (not for solving but just in the init function) to see if I could properly assemble the condensed matrix. It appears that when there is only a single integration measure, it works fine. I then attempted to define a kernel for each integration measure and add them within the Numba function, but without success - the code crashes with an enigmatic PETSc error message. Would anyone have any advice on this matter? The code consists of three files:
- demo_HDG.py: a test case similar to the tutorial
from mpi4py import MPI
import numpy as np
from dolfinx.cpp.mesh import cell_num_entities
from ufl import (dot, grad, inner, Measure, CellDiameter,
FacetNormal, SpatialCoordinate, exp, TestFunction)
from dolfinx.fem import (locate_dofs_topological, Constant,
Function, functionspace, dirichletbc)
from dolfinx.mesh import locate_entities_boundary
from dolfinx.mesh import CellType, GhostMode, create_unit_square, create_submesh
from block import extract_rows, derivative_block
import sys
from petsc4py import PETSc
# sys.path.append('../')
from new_newton import StaticCondensationNewtonSolver
def compute_cell_boundary_facets(msh):
"""Compute facets for cell boundary integration"""
tdim = msh.topology.dim
fdim = tdim - 1
n_f = cell_num_entities(msh.topology.cell_type, fdim)
n_c = msh.topology.index_map(tdim).size_local
return np.vstack((np.repeat(np.arange(n_c), n_f),
np.tile(np.arange(n_f), n_c))).T.flatten()
def kappa(u):
return 1 + u**2
def set_form(u, ubar, v, vbar, f, dx, dx_f, dS):
Res = inner(kappa(u) * grad(u), grad(v)) * dx
Res += inner(dot(kappa(u) * grad(u), n), vbar-v) * dS
Res += gamma * inner(ubar-u, vbar-v) * dS
Res += inner(f, v) * dx_c
Res += inner(Constant(facet_mesh, PETSc.ScalarType(0.0)), vbar) * dx_f
return Res
# Initialize MPI and create mesh
comm = MPI.COMM_WORLD
N = 2
# Create mesh (2D square)
msh = create_unit_square(comm, N, N, CellType.quadrilateral,
ghost_mode = GhostMode.none)
# Create facet submesh
tdim = msh.topology.dim
fdim = tdim - 1
# Create all required topology connections,
msh.topology.create_connectivity(fdim, tdim)
msh.topology.create_entities(fdim)
facet_imap = msh.topology.index_map(fdim)
num_facets = facet_imap.size_local + facet_imap.num_ghosts
facets = np.arange(num_facets, dtype=np.int32)
facet_mesh, facet_mesh_to_mesh = create_submesh(msh, fdim, facets)[:2]
# Create connectivity for facet_mesh
facet_mesh.topology.create_connectivity(fdim, fdim)
# Define function spaces
k = 2 # Polynomial order
V = functionspace(msh, ("Discontinuous Lagrange", k))
Vbar = functionspace(facet_mesh, ("Discontinuous Lagrange", k))
u, ubar = Function(V), Function(Vbar)
v, vbar = TestFunction(V), TestFunction(Vbar)
# Define measures
dx_c = Measure("dx", domain=msh)
cbf = compute_cell_boundary_facets(msh) #cell_boundary_facets
cb = 1 #cell_boundaries
ds_c = Measure("ds", subdomain_data=[(cb, cbf)], domain=msh)
dx_f = Measure("dx", domain=facet_mesh)
# Mesh to facet mesh mapping
mesh_to_facet_mesh = np.full(num_facets, -1)
mesh_to_facet_mesh[facet_mesh_to_mesh] = np.arange(len(facet_mesh_to_mesh))
entity_maps = {facet_mesh: mesh_to_facet_mesh}
# Define forms
h = CellDiameter(msh)
n = FacetNormal(msh)
gamma = 16.0 * k**2 / h
# Source term
x = SpatialCoordinate(msh)
f = 10.0 * exp(-((x[0]-0.5) * (x[0]-0.5) + (x[1]-0.5) * (x[1]-0.5)) / 0.02)
Res = set_form(u, ubar, v, vbar, f, dx_c, dx_f, ds_c(cb))
Fr = extract_rows(Res, [v, vbar])
J = derivative_block(Fr, [u, ubar])
# Boundary conditions
def locate_boundary(x, y_value):
return np.isclose(x[1], y_value)
top = locate_entities_boundary(msh, fdim, lambda x: locate_boundary(x, 1.0))
bottom = locate_entities_boundary(msh, fdim, lambda x: locate_boundary(x, 0.0))
# Convert to facet mesh indices
facet_mesh_bdr_facets = mesh_to_facet_mesh[np.concatenate([top, bottom])]
dofs = locate_dofs_topological(Vbar, fdim, facet_mesh_bdr_facets)
# Apply sinusoidal boundary condition
def boundary_value(x):
values = np.zeros(x.shape[1])
values[:] = np.sin(5 * x[0])
return values
# Create boundary condition
boundary_function = Function(Vbar)
boundary_function.interpolate(lambda x: np.sin(5 * x[0]))
bc = dirichletbc(boundary_function, dofs)
solver = StaticCondensationNewtonSolver(Fr, J, [bc], [], entity_maps, [V, Vbar], msh)
#This line makes dolfinx crash
solver.assembly_test()
# Let's show what we need
print("Type of J:", type(J))
print("Type of J[0][0]:", type(J[0][0]))
print("\nFunction spaces:")
print("V (local space):")
print("- type:", type(V))
print("- dimension:", V.element.space_dimension)
print("\nVbar (global space):")
print("- type:", type(Vbar))
print("- dimension:", Vbar.element.space_dimension)
# Debug HDG forms
print("Analysis of HDG forms:")
# Verification of Fr
print("\nAnalysis of Fr:")
for i, f in enumerate(Fr):
print(f"\nFr[{i}] :")
try:
args = f.arguments()
print(f"- number of arguments: {len(args)}")
for j, arg in enumerate(args):
space = arg.ufl_function_space()
print(f"- argument {j} space: {space}")
print(f"- argument {j} element: {space.ufl_element()}")
except Exception as e:
print(f"- detailed error: {str(e)}")
# Verification of J with more details
print("\nAnalysis of J:")
for i in range(2):
for j in range(2):
print(f"\nJ[{i}][{j}] :")
try:
form = J[i][j]
# Let's show the form itself
print(f"- form: {form}")
# Show the integrals
integrals = form.integrals()
print(f"- number of integrals: {len(integrals)}")
for k, integral in enumerate(integrals):
print(f"- integral {k} type: {integral.integral_type()}")
except Exception as e:
print(f"- detailed error: {str(e)}")
# For spaces, let's use attributes directly
print("\nSpaces used:")
print(f"V (local) dimension: {V.element.space_dimension}")
print(f"Vbar (global) dimension: {Vbar.element.space_dimension}")
#%%Debug
# Imports for linear elasticity example
from basix.ufl import element
from petsc4py import PETSc
import ufl
from dolfinx.mesh import locate_entities_boundary, meshtags
# Stress (Se) and displacement (Ue) elements
Se = element("DG", msh.basix_cell(), 1, shape=(2, 2), symmetry=True, dtype=PETSc.RealType) # type: ignore
Ue = element("Lagrange", msh.basix_cell(), 2, shape=(2,), dtype=PETSc.RealType) # type: ignore
S = functionspace(msh, Se)
U = functionspace(msh, Ue)
sigma, tau = ufl.TrialFunction(S), ufl.TestFunction(S)
u, v = ufl.TrialFunction(U), ufl.TestFunction(U)
# Locate all facets at the free end and assign them value 1. Sort the
# facet indices (requirement for constructing MeshTags)
free_end_facets = np.sort(locate_entities_boundary(msh, 1, lambda x: np.isclose(x[0], 48.0)))
mt = meshtags(msh, 1, free_end_facets, 1)
ds = ufl.Measure("ds", subdomain_data=mt)
# Homogeneous boundary condition in displacement
u_bc = Function(U)
u_bc.x.array[:] = 0
# Displacement BC is applied to the left side
left_facets = locate_entities_boundary(msh, 1, lambda x: np.isclose(x[0], 0.0))
bdofs = locate_dofs_topological(U, 1, left_facets)
bc = dirichletbc(u_bc, bdofs)
# Elastic stiffness tensor and Poisson ratio
E, nu = 1.0, 1.0 / 3.0
def sigma_u(u):
"""Constitutive relation for stress-strain. Assuming plane-stress in XY"""
eps = 0.5 * (ufl.grad(u) + ufl.grad(u).T)
sigma = E / (1.0 - nu**2) * ((1.0 - nu) * eps + nu * ufl.Identity(2) * ufl.tr(eps))
return sigma
a00 = ufl.inner(sigma, tau) * ufl.dx
a10 = -ufl.inner(sigma, ufl.grad(v)) * ufl.dx
a01 = -ufl.inner(sigma_u(u), tau) * ufl.dx
# dummy form
a11 = -ufl.inner(ufl.grad(u), ufl.grad(v)) * ufl.dx
solver = StaticCondensationNewtonSolver(Fr, [[a00, a01], [a10, a11]], [bc], [], entity_maps, [S, U], msh)
solver.assembly_test()
- block.py: borrowed from the dolfin_dg module :
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jan 8 18:07:35 2025
@author: bouteillerp
"""
import ufl
import dolfinx.fem
class SeparateSpaceFormSplitter(ufl.corealg.multifunction.MultiFunction):
def split(self, form, v, u=None):
self.vu = tuple((v, u))
return ufl.algorithms.map_integrands.map_integrand_dags(self, form)
def argument(self, obj):
if obj not in self.vu:
return ufl.constantvalue.Zero(shape=obj.ufl_shape)
return obj
expr = ufl.corealg.multifunction.MultiFunction.reuse_if_untouched
def extract_rows(F, v, module = ufl):
"""
Parameters
----------
F : UFL residual formulation
v : Ordered list of test functions
Returns
-------
List of extracted block residuals ordered corresponding to each test
function
"""
vn = len(v)
L = [None for _ in range(vn)]
fs = SeparateSpaceFormSplitter()
for vi in range(vn):
# Do the initial split replacing testfunctions with zero
L[vi] = fs.split(F, v[vi])
# Now remove the empty forms. Why don't FFC/UFL do this already?
L_reconstruct = []
for integral in L[vi].integrals():
arguments = ufl.algorithms.analysis.extract_arguments(integral)
if len(arguments) < 1:
continue
# Sanity checks: Should be only one test function, and it should
# be the one we want to keep
assert len(arguments) == 1
assert arguments[0] is v[vi]
L_reconstruct.append(integral)
# Generate the new form with the removed zeroes
L[vi] = ufl.Form(L_reconstruct)
return L
def derivative_block(F, u, du=None, coefficient_derivatives=None):
"""
Parameters
----------
F : Block residual formulation
u : Ordered solution functions
du : Ordered trial functions
coefficient_derivatives : Prescribed derivative map
Returns
-------
Block matrix corresponding to the ordered components of the
Gateaux/Frechet derivative.
"""
import ufl
if isinstance(F, ufl.Form):
return ufl.derivative(F, u, du, coefficient_derivatives)
if not isinstance(F, (list, tuple)):
raise TypeError("Expecting F to be a list of Forms. Found: %s"
% str(F))
if not isinstance(u, (list, tuple)):
raise TypeError("Expecting u to be a list of Coefficients. Found: %s"
% str(u))
if du is not None:
if not isinstance(du, (list, tuple)):
raise TypeError("Expecting du to be a list of Arguments. Found: %s"
% str(u))
import itertools
from ufl.algorithms.apply_derivatives import apply_derivatives
from ufl.algorithms.apply_algebra_lowering import apply_algebra_lowering
m, n = len(u), len(F)
if du is None:
du = [None] * m
J = [[None for _ in range(m)] for _ in range(n)]
for (i, j) in itertools.product(range(n), range(m)):
gateaux_derivative = ufl.derivative(F[i], u[j], du[j],
coefficient_derivatives)
gateaux_derivative = apply_derivatives(
apply_algebra_lowering(gateaux_derivative))
if gateaux_derivative.empty():
gateaux_derivative = None
J[i][j] = gateaux_derivative
return J
- new_newton.py: intended as a dolfinx version of the Newton solver from dolfin_dg, but which required leopart :
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jan 9 11:06:22 2025
@author: bouteillerp
"""
from mpi4py import MPI
import cffi
import numba
import numba.core.typing.cffi_utils as cffi_support
import numpy as np
from petsc4py import PETSc
from dolfinx.fem import (
Form,
IntegralType,
form,
form_cpp_class,
)
from dolfinx.fem.petsc import (apply_lifting, assemble_matrix, assemble_vector,
create_matrix, create_vector, set_bc)
from dolfinx.jit import ffcx_jit
from ffcx.codegeneration.utils import numba_ufcx_kernel_signature as ufcx_signature
from dolfinx.fem import Function
class StaticCondensationNewtonSolver:
def __init__(self, F, J, bcs, bcs_homog, entity_maps, V_list, msh, rtol=1e-12, atol=1e-10, maximum_iterations=20):
"""
F : [F_local, F_global] - Résidus par blocs
J : [[J_ll, J_lg], [J_gl, J_gg]] - Jacobien par blocs
"""
# Compilation JIT des noyaux avec ffcx_jit
ufcx_ll, _, _ = ffcx_jit(MPI.COMM_WORLD, J[0][0],
form_compiler_options={"scalar_type": PETSc.ScalarType})
ufcx_lg, _, _ = ffcx_jit(MPI.COMM_WORLD, J[0][1],
form_compiler_options={"scalar_type": PETSc.ScalarType})
ufcx_gl, _, _ = ffcx_jit(MPI.COMM_WORLD, J[1][0],
form_compiler_options={"scalar_type": PETSc.ScalarType})
ufcx_gg, _, _ = ffcx_jit(MPI.COMM_WORLD, J[1][1],
form_compiler_options={"scalar_type": PETSc.ScalarType})
#%%####### Nice try but not working
# kernels_ll = []
# i = 0
# while True:
# integral = ufcx_ll.form_integrals[i]
# if not integral: # Arrêt quand on atteint NULL
# break
# kernel = getattr(integral, f"tabulate_tensor_{np.dtype(PETSc.ScalarType).name}")
# kernels_ll.append(kernel)
# i += 1
# kernels_lg = []
# i = 0
# while True:
# integral = ufcx_lg.form_integrals[i]
# if not integral:
# break
# kernel = getattr(integral, f"tabulate_tensor_{np.dtype(PETSc.ScalarType).name}")
# kernels_lg.append(kernel)
# i += 1
# kernels_gl = []
# i = 0
# while True:
# integral = ufcx_gl.form_integrals[i]
# if not integral:
# break
# kernel = getattr(integral, f"tabulate_tensor_{np.dtype(PETSc.ScalarType).name}")
# kernels_gl.append(kernel)
# i += 1
# kernels_gg = []
# i = 0
# while True:
# integral = ufcx_gg.form_integrals[i]
# if not integral:
# break
# kernel = getattr(integral, f"tabulate_tensor_{np.dtype(PETSc.ScalarType).name}")
# kernels_gg.append(kernel)
# i += 1
# print(f"Nombre d'intégrales trouvées :")
# print(f"- J_ll : {len(kernels_ll)}")
# print(f"- J_lg : {len(kernels_lg)}")
# print(f"- J_gl : {len(kernels_gl)}")
# print(f"- J_gg : {len(kernels_gg)}")
#%%
# Extraction des kernels
kernel_ll = getattr(ufcx_ll.form_integrals[0],
f"tabulate_tensor_{np.dtype(PETSc.ScalarType).name}")
# #Aditional kernel for boundary integrals
# kernel_ll_1 = getattr(ufcx_ll.form_integrals[1],
# f"tabulate_tensor_{np.dtype(PETSc.ScalarType).name}")
# kernel_ll_2 = getattr(ufcx_ll.form_integrals[2],
# f"tabulate_tensor_{np.dtype(PETSc.ScalarType).name}")
kernel_lg = getattr(ufcx_lg.form_integrals[0],
f"tabulate_tensor_{np.dtype(PETSc.ScalarType).name}")
kernel_gl = getattr(ufcx_gl.form_integrals[0],
f"tabulate_tensor_{np.dtype(PETSc.ScalarType).name}")
kernel_gg = getattr(ufcx_gg.form_integrals[0],
f"tabulate_tensor_{np.dtype(PETSc.ScalarType).name}")
print("\nAnalyse des intégrales de tous les blocs :")
print("\nJ[0][0] (A_ll) :")
i = 0
while ufcx_ll.form_integrals[i]:
print(f"- kernel {i} : {ufcx_ll.form_integrals[i]}")
i += 1
print("\nJ[0][1] (A_lg) :")
i = 0
while ufcx_lg.form_integrals[i]:
print(f"- kernel {i} : {ufcx_lg.form_integrals[i]}")
i += 1
print("\nJ[1][0] (A_gl) :")
i = 0
while ufcx_gl.form_integrals[i]:
print(f"- kernel {i} : {ufcx_gl.form_integrals[i]}")
i += 1
print("\nJ[1][1] (A_gg) :")
i = 0
while ufcx_gg.form_integrals[i]:
print(f"- kernel {i} : {ufcx_gg.form_integrals[i]}")
i += 1
ffi = cffi.FFI()
cffi_support.register_type(ffi.typeof("double _Complex"), numba.types.complex128)
V, Vbar = V_list[0], V_list[1]
local_size = V.element.space_dimension
global_size = Vbar.element.space_dimension
# Static condensation kernel
@numba.cfunc(ufcx_signature(PETSc.ScalarType, PETSc.RealType), nopython=True) # type: ignore
def tabulate_condensed(A_, w_, c_, coords_, entity_local_index, permutation=ffi.NULL):
# Préparation des matrices locales
A = numba.carray(A_, (global_size, global_size), dtype=PETSc.ScalarType)
# Tabulation des sous-blocs
A_ll = np.zeros((local_size, local_size), dtype=PETSc.ScalarType)
kernel_ll(ffi.from_buffer(A_ll), w_, c_, coords_, entity_local_index, permutation)
# A_ll_surf = np.zeros((local_size, local_size), dtype=PETSc.ScalarType)
# kernel_ll_1(ffi.from_buffer(A_ll_surf), w_, c_, coords_, entity_local_index, permutation)
# A_ll += A_ll_surf # Addition des contributions volumique et surfacique
# A_ll_2 = np.zeros((local_size, local_size), dtype=PETSc.ScalarType)
# kernel_ll_2(ffi.from_buffer(A_ll_2), w_, c_, coords_, entity_local_index, permutation)
# A_ll += A_ll_2
A_lg = np.zeros((local_size, global_size), dtype=PETSc.ScalarType)
kernel_lg(ffi.from_buffer(A_lg), w_, c_, coords_, entity_local_index, permutation)
A_gl = np.zeros((global_size, local_size), dtype=PETSc.ScalarType)
kernel_gl(ffi.from_buffer(A_gl), w_, c_, coords_, entity_local_index, permutation)
A_gg = np.zeros((global_size, global_size), dtype=PETSc.ScalarType)
kernel_gg(ffi.from_buffer(A_gg), w_, c_, coords_, entity_local_index, permutation)
# Condensation statique
A[:, :] = A_gg - A_gl @ np.linalg.solve(A_ll, A_lg)
# Création de la forme condensée
formtype = form_cpp_class(PETSc.ScalarType)
cells = np.arange(msh.topology.index_map(msh.topology.dim).size_local)
integrals = {IntegralType.cell: [(-1, tabulate_condensed.address, cells,
np.array([], dtype=np.int8))]}
self.a_cond = Form(formtype([Vbar._cpp_object, Vbar._cpp_object],
integrals, [], [], False, {}, None))
# Solver configuration
self.solver = PETSc.KSP().create(MPI.COMM_WORLD)
self.solver.setOperators(create_matrix(self.a_cond))
self.solver.setType(PETSc.KSP.Type.PREONLY)
pc = self.solver.getPC()
pc.setType(PETSc.PC.Type.LU)
pc.setFactorSolverType("mumps")
# Configuration du solveur linéaire pour la back-substitution
self.local_solver = PETSc.KSP().create(MPI.COMM_WORLD)
self.local_solver.setType(PETSc.KSP.Type.PREONLY)
pc_local = self.local_solver.getPC()
pc_local.setType(PETSc.PC.Type.LU)
self.bcs = bcs
self.bcs_homog = bcs_homog
self.rtol = rtol
self.atol = atol
self.maximum_iterations = maximum_iterations
# self._setup_forms(F, J, entity_maps)
# def _setup_forms(self, F, J, entity_maps):
# """Configuration des formes variationnelles."""
# self.F_local = form(F[0])
# self.F_global = form(F[1], entity_maps = entity_maps)
# self.J_local = form(J[0][0])
# self.J_coupling = form(J[0][1], entity_maps = entity_maps)
# self.J_global = form(J[1][1], entity_maps = entity_maps)
def assembly_test(self):
A_cond = assemble_matrix(self.a_cond, bcs = self.bcs)
print("Assembly")
# def solve(self, u, ubar):
# """
# Newton loop, probably buggued but it does not matter for now
# """
# # Application des conditions aux limites
# du = Function(u.function_space)
# dubar = Function(ubar.function_space)
# for bc in self.bcs:
# # set_bc(u.x.petsc_vec, bc)
# set_bc(ubar.x.petsc_vec, bc)
# # Assemblage initial
# A_cond = assemble_matrix(self.a_cond, bcs=self.bcs)
# b = assemble_vector(self.F_global)
# apply_lifting(b, [self.a_cond], bcs=[self.bcs])
# for it in range(self.maximum_iterations):
# # Assemblage du système condensé
# A_cond = assemble_matrix(self.a_cond, bcs=self.bcs)
# A_cond.assemble()
# self.solver.setOperators(A_cond)
# # Assemblage du résidu
# b = assemble_vector(self.F_global)
# apply_lifting(b, [self.a_cond], bcs=[self.bcs])
# b.ghostUpdate(addv=PETSc.InsertMode.ADD, mode=PETSc.ScatterMode.REVERSE)
# # Résolution du système condensé
# b.scale(-1.0) # -F(u_n)
# self.solver.solve(b, dubar.x.petsc_vec)
# dubar.x.scatter_forward()
# for bc in self.bcs_homog:
# set_bc(dubar.x.petsc_vec, bc)
# # Back-substitution pour obtenir du
# du.x.array[:] = self._back_substitute(du, dubar)
# du.x.scatter_forward()
# # Mise à jour de la solution : u_(n+1) = u_n + du
# u.x.array[:] += du.x.array
# ubar.x.array[:] += dubar.x.array
# u.x.scatter_forward()
# ubar.x.scatter_forward()
# print(f"Failed to converge in {self.maximum_iterations} iterations")
# return self.maximum_iterations, False
# def _back_substitute(self, u, ubar):
# """Back-substitution, probably buggued but it does not matter for now"""
# A_ll = assemble_matrix(self.J_local)
# A_ll.assemble()
# self.local_solver.setOperators(A_ll)
# A_lg = assemble_matrix(self.J_coupling)
# b_l = assemble_vector(self.F_local)
# # Calcul de -A_ll^{-1}(A_lg * ubar + b_l)
# temp_vec = create_vector(self.F_local)
# A_lg.mult(ubar.x.petsc_vec, temp_vec)
# temp_vec.axpy(1.0, b_l)
# temp_vec.scale(-1.0)
# # Résolution du système local
# u_local = create_vector(self.F_local)
# self.local_solver.solve(temp_vec, u_local)
# return u_local.array
Any guidance would be greatly appreciated.
Sincerely,
Paul