Adaptive Mesh Refinement using DWR

I’m trying to run goal-oriented AMR for a 2D cantilever using dolfinx + dolfin_dg.dolfinx.dwr.LinearAPosterioriEstimator. The primal solve works and residual-based AMR works, but every attempt to construct the DWR estimator fails depending on how I pass arguments.

I’ve seen these two errors (on different edits):

  • Type object not supported.
  • '_BlockedElement' object has no attribute 'mesh'

From the logs of my minimal code (below), I currently get:
[WARN] DWR estimator path failed (Type object not supported.). Using residual-based indicators…

Following is the self contained code, please suggest relevant changes

import time
import numpy as np
from mpi4py import MPI
from petsc4py import PETSc

import ufl
from ufl import dx, ds, dS, inner, sym, grad, tr, Identity, as_vector, div, FacetNormal, jump, avg, CellDiameter

from dolfinx import mesh, fem, io, default_scalar_type
from dolfinx.fem.petsc import assemble_matrix, assemble_vector, apply_lifting, set_bc

_dwr_ok = True
try:
    from dolfin_dg.dolfinx.dwr import LinearAPosterioriEstimator
except Exception:
    _dwr_ok = False

def build_beam_mesh(L=1.0, H=0.2, nx=40, ny=8, cell=mesh.CellType.triangle, comm=MPI.COMM_WORLD):
    return mesh.create_rectangle(comm, points=[[0.0, 0.0], [L, H]], n=[nx, ny], cell_type=cell)


def mark_facets(domain, L, tol=1e-12):
    tdim, fdim = domain.topology.dim, domain.topology.dim - 1
    domain.topology.create_connectivity(tdim, fdim)
    domain.topology.create_connectivity(fdim, tdim)

    is_right = lambda x: np.isclose(x[0], L, atol=tol)
    is_left  = lambda x: np.isclose(x[0], 0.0, atol=tol)

    right_facets = mesh.locate_entities_boundary(domain, fdim, is_right).astype(np.int32)
    left_facets  = mesh.locate_entities_boundary(domain, fdim, is_left ).astype(np.int32)

    all_facets = np.concatenate((right_facets, left_facets)).astype(np.int32)
    facet_vals = np.concatenate((np.full(right_facets.size, 1, dtype=np.int32),
                                 np.full(left_facets.size,  2, dtype=np.int32)))
    order = np.argsort(all_facets, kind="mergesort")
    return mesh.meshtags(domain, fdim, all_facets[order], facet_vals[order])


def elasticity_forms(domain, V, facet_tags, params, traction_tag=1):
    E, nu = params["E"], params["nu"]
    plane_strain = params.get("plane_strain", True)

    gdim = domain.geometry.dim
    u, v = ufl.TrialFunction(V), ufl.TestFunction(V)

    mu = E / (2.0 * (1.0 + nu))
    lmbda = E * nu / ((1 + nu) * (1 - 2 * nu)) if plane_strain else E * nu / (1 - nu**2)

    eps = lambda w: sym(grad(w))
    sigma = lambda w: 2.0 * mu * eps(w) + lmbda * tr(eps(w)) * Identity(gdim)

    ds_f = ufl.Measure("ds", domain=domain, subdomain_data=facet_tags)

    Tx, Ty = params["Tx"], params.get("Ty", 0.0)
    tvec = fem.Constant(domain, default_scalar_type((Tx, Ty)))

    a = inner(sigma(u), eps(v)) * dx
    Lform = inner(tvec, v) * ds_f(traction_tag)

    fdim = domain.topology.dim - 1
    left_facets = facet_tags.find(2)
    dofs_all = fem.locate_dofs_topological(V, fdim, left_facets)
    bcs = [fem.dirichletbc(np.zeros((gdim,), dtype=default_scalar_type), dofs_all, V)]

    ex = as_vector((1.0, 0.0)) if gdim == 2 else as_vector((1.0, 0.0, 0.0))
    j_u = inner(ex, u) * ds(domain=domain)  # DWR goal (linear in TrialFunction)

    return a, Lform, bcs, j_u, sigma

def solve_primal(domain, a, Lform, bcs, V):
    A = assemble_matrix(fem.form(a), bcs=bcs); A.assemble()
    b = assemble_vector(fem.form(Lform))
    apply_lifting(b, [fem.form(a)], [bcs])
    b.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
    set_bc(b, bcs)

    ksp = PETSc.KSP().create(domain.comm)
    ksp.setOperators(A)
    ksp.setType(PETSc.KSP.Type.CG)
    pc = ksp.getPC()
    try:
        pc.setType(PETSc.PC.Type.HYPRE); pc.setHYPREType("boomeramg")
    except PETSc.Error:
        pc.setType(PETSc.PC.Type.GAMG)

    uh = fem.Function(V, name="displacement")
    ksp.solve(b, uh.vector)
    uh.vector.ghostUpdate(addv=PETSc.InsertMode.INSERT, mode=PETSc.ScatterMode.FORWARD)
    return uh

def compute_goal(domain, facet_tags, uh, traction_tag=1):
    ex = as_vector((1.0, 0.0)) if domain.geometry.dim == 2 else as_vector((1.0, 0.0, 0.0))
    ds_f = ufl.Measure("ds", domain=domain, subdomain_data=facet_tags)
    J = fem.assemble_scalar(fem.form(inner(ex, uh) * ds_f(traction_tag)))
    length = fem.assemble_scalar(fem.form(fem.Constant(domain, default_scalar_type(1.0)) * ds_f(traction_tag)))
    return J, J / max(length, 1e-14)

def top_fraction_markers_from_indicators(domain, indicators, frac=0.3):
    nc_local = domain.topology.index_map(domain.topology.dim).size_local
    cells = np.arange(nc_local, dtype=np.int32)
    kth = max(1, int(np.ceil((1.0 - frac) * indicators.size))) - 1
    thresh = np.partition(indicators, kth)[kth]
    return np.unique(cells[indicators >= thresh].astype(np.int32))


def edges_to_refine_from_cells(domain, marked_cells):
    tdim = domain.topology.dim
    domain.topology.create_connectivity(tdim, 1)
    edge_ids = []
    c_to_e = domain.topology.connectivity(tdim, 1)
    for c in marked_cells:
        edge_ids.extend(c_to_e.links(c))
    return np.unique(np.array(edge_ids, dtype=np.int32)) if edge_ids else np.empty((0,), dtype=np.int32)

def refine_mesh(domain, marked_cells, redistribute=True):
    edges = edges_to_refine_from_cells(domain, marked_cells)
    return mesh.refine(domain, edges=edges, redistribute=redistribute) if edges.size else domain

def _get_dwr_indicators(domain, a, Lform, j_u, uh, V_star):
    # Prefer dolfinx forms; fall back to raw UFL. Use [] for dual BCs.
    try:
        lape = LinearAPosterioriEstimator(fem.form(a), fem.form(Lform), fem.form(j_u), uh, V_star, [])
    except Exception:
        lape = LinearAPosterioriEstimator(a, Lform, j_u, uh, V_star, [])

    for name in ("compute_cell_indicators", "compute_cell_estimator", "compute_cell_residuals"):
        if hasattr(lape, name):
            fn = getattr(lape, name)
            try:
                arr = fn(domain)
            except TypeError:
                arr = fn()
            if isinstance(arr, list):
                arr = np.asarray(arr, dtype=np.float64)
            if isinstance(arr, np.ndarray):
                return arr
    raise RuntimeError("Could not obtain DWR cell indicators from estimator.")

def _get_residual_indicators(domain, V, uh, sigma, facet_tags, traction_tag, Tx, Ty):
    W = fem.functionspace(domain, ("DG", 0))
    w = ufl.TestFunction(W)

    h = CellDiameter(domain)
    n = FacetNormal(domain)
    gdim = domain.geometry.dim

    f_body = ufl.as_vector((0.0,)*gdim)
    r_cell = -(div(sigma(uh))) - f_body
    t_int  = sigma(uh) * n
    jump_t = jump(t_int)
    t_presc = ufl.as_vector((Tx, Ty)) if gdim == 2 else ufl.as_vector((Tx, Ty, 0.0))

    eta2 = (inner(r_cell, r_cell) * h**2 * w * dx
            + 0.5 * avg(h) * inner(jump_t, jump_t) * avg(w) * dS
            + h * inner(t_int, t_int) * w * ds(2)
            + h * inner(t_int - t_presc, t_int - t_presc) * w * ds(traction_tag))

    eta = fem.Function(W, name="indicator")
    fem.petsc.assemble_vector(eta.vector, fem.form(eta2))
    eta.vector.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)

    return np.sqrt(np.maximum(eta.vector.array, 0.0)).astype(np.float64)

def run_amr_cantilever(L=1.0, H=0.2, nx0=40, ny0=8,
                       E=210e9, nu=0.3, Tx=1e6, Ty=0.0,
                       max_refine=5, frac=0.3,
                       plane_strain=True,
                       cell_type=mesh.CellType.triangle,
                       out_prefix="beam_amr"):
    comm, rank = MPI.COMM_WORLD, MPI.COMM_WORLD.rank
    domain = build_beam_mesh(L, H, nx0, ny0, cell_type, comm=comm)

    t0_all = time.time()
    for it in range(max_refine + 1):
        facet_tags = mark_facets(domain, L)
        gdim = domain.geometry.dim

        V = fem.functionspace(domain, ("Lagrange", 1, (gdim,)))
        params = dict(E=E, nu=nu, plane_strain=plane_strain, Tx=Tx, Ty=Ty)
        a, Lform, bcs, j_u, sigma = elasticity_forms(domain, V, facet_tags, params, traction_tag=1)

        uh = solve_primal(domain, a, Lform, bcs, V)
        J, Javg = compute_goal(domain, facet_tags, uh, traction_tag=1)

        with io.XDMFFile(comm, f"{out_prefix}_it{it}.xdmf", "w") as xdmf:
            xdmf.write_mesh(domain)
            try:
                xdmf.write_function(uh, it)
            except TypeError:
                xdmf.write_function(uh)

        ndofs = V.dofmap.index_map.size_global * V.dofmap.index_map_bs
        if rank == 0:
            print(f"[it={it}] DOFs={ndofs}  J(u_h)={J: .6e}  J_avg={Javg: .6e}")

        if it == max_refine:
            break

        marked_cells = None
        if _dwr_ok:
            try:
                V_star = fem.functionspace(domain, ("Lagrange", 2, (gdim,)))
                indicators = _get_dwr_indicators(domain, a, Lform, j_u, uh, V_star)
                marked_cells = top_fraction_markers_from_indicators(domain, indicators, frac=frac)
                if rank == 0:
                    kept = int(np.ceil(frac * len(indicators)))
                    print(f"  DWR marking: selecting ~{kept} / {len(indicators)} cells.")
            except Exception as e:
                if rank == 0:
                    print(f"  [WARN] DWR estimator path failed ({e}). Using residual-based indicators...")

        if marked_cells is None:
            indicators = _get_residual_indicators(domain, V, uh, sigma, facet_tags, traction_tag=1, Tx=Tx, Ty=Ty)
            marked_cells = top_fraction_markers_from_indicators(domain, indicators, frac=frac)
            if rank == 0:
                kept = int(np.ceil(frac * len(indicators)))
                print(f"  Residual marking: selecting ~{kept} / {len(indicators)} cells.")

        domain = refine_mesh(domain, marked_cells, redistribute=True)

    if rank == 0:
        print(f"Total wall time: {time.time() - t0_all:.2f} s")


if __name__ == "__main__":
    run_amr_cantilever(
        L=1.0, H=0.2,
        nx0=36, ny0=6,
        E=70e9, nu=0.33,
        Tx=1e6, Ty=0.0,
        plane_strain=True,
        max_refine=5, frac=0.3,
        cell_type=mesh.CellType.triangle,
        out_prefix="beam_amr"
    )

I think dolfin dg is just a bit out of date with API. It might be dealt with by the developer when he has time.
In the meantime, you could consider Adaptive mesh refinement with NetGen and DOLFINx — FEniCSx tutorial
as an alternative (maybe)?

Thank you for sharing, I’ll go through it.