Scalability for adaptive time-stepping

Hey Fenics-Community, I am solving a simple diffusion Problem. Because of the instability at the beginning of my problem (I am using a zero-concentration profile at timestep 0) and the dirichlet boundary 1 at the marked domain, i opted into an adaptive time-step algorithm, so that the computational cost can be minimized.

However, i have a problem if i try to run the simulation in parallel using the command:

mpirun -n 4 python3 test_script.py

If i run it with this command, it takes a very long time i haven’t even gotten solutions yet. If i run it without the mpirun, it converges pretty quickly. I will paste the code here, i have a mechanism which allows me to turn of the adaptive time-stepping. If i run the program without the adaptive time-stepping in parallel it works fine and speeds up the computation.
I used classes to structure my code. Here is the code:

import numpy as np

from dolfinx import fem, io, mesh
import ufl

from mpi4py import MPI
from petsc4py import PETSc

from core.util.helper import errornorm

class BoundaryConditions:
    def __init__(self, boundary_conditions):
        self._dirichlet = []
        self._neumann = []
        self._empty = []
        self._transmission = []

        for key, items in boundary_conditions.items():
            if not isinstance(items, list):
                items = [items]
            for item in items:
                if key.strip().lower() == "dirichlet":
                    if isinstance(item, tuple):
                        self._dirichlet.append(item)
                    elif isinstance(item, list):
                        for val in item:
                            self._dirichlet.append(val)
                    else:
                        raise TypeError("Value in boundary-dict is not list or tuple")
                elif key.strip().lower() == "neumann":
                    if isinstance(item, tuple):
                        self._neumann.append(item)
                    elif isinstance(item, list):
                        for val in item:
                            self._neumann.append(val)
                    else:
                        raise TypeError("Value in boundary-dict is not list or tuple")
                elif key.strip().lower() == "empty":
                    if isinstance(item, int):
                        self._empty.append(item)
                    elif isinstance(item, list):
                        for val in item:
                            self._empty.append(val)
                    else:
                        raise TypeError("Value in boundary-dict in empty is not list or int")

    @property
    def dirichlet(self):
        return self._dirichlet
    
    @dirichlet.setter
    def dirichlet(self, value):
        self._original_dirichlet = self._dirichlet
        self._dirichlet = value

    @property
    def neumann(self):
        return self._neumann
    
    @property
    def empty(self):
        return self._empty
    
    @property
    def transmission(self):
        return self._transmission

class Base(object):

    _results_path = "./calculations/"

    characteristics = {"c": 1,
                       "l": 1,
                       "t": 1,
                       "D": 1}
    

    solver_parameters = {}


    def __init__(self, msh, facet_tag, l_c, p_deg=1) -> None:
        self.msh = msh
        self._facet_tag = facet_tag

        
        self._save_file = io.XDMFFile(self.msh.comm, self._results_path + "concentration.xdmf", "w")
        self._save_file.write_mesh(msh)

        self.characteristics["l"] = l_c

        self.msh.topology.create_connectivity(self.msh.topology.dim - 1, self.msh.topology.dim)
        with io.XDMFFile(self.msh.comm, self._results_path + "tags.xdmf", "w") as save_file:
            save_file.write_mesh(msh)
            save_file.write_meshtags(self._facet_tag)


        L = ufl.FiniteElement("Lagrange", self.msh.ufl_cell(), p_deg)

        self._V = fem.FunctionSpace(self.msh, L)

        self._c = fem.Function(self._V, name="Concentration")
        self._c_n = fem.Function(self._V)

        self._c_np1 = ufl.TrialFunction(self._V)
        self._dc = ufl.TestFunction(self._V)

    @property
    def results_path(self):
        return self._results_path
    
    @results_path.setter
    def results_path(self, path):
        if path[-1] == "/":
            self._results_path = path
        else:
            self._results_path = path + "/"
        self._save_file = io.XDMFFile(self.msh.comm, self._results_path + "concentration.xdmf", "w")
        self._save_file.write_mesh(self.msh)
    
    @property
    def c(self):
        _c = fem.Function(self._V, name="Concentration")
        _c.interpolate(self._c * self.characteristics["c"])
        return _c
        
    @property
    def V(self):
        return self._V
    
    @property
    def coefficients(self):
        coeffs = {}
        coeffs["D"] = self._coefficients["D"] * self.characteristics["D"]
        coeffs["h"] = self._coefficients["h"] / self.characteristics["h"]
        return coeffs
    
    def _material_law(self, c):
        j = - self._coefficients["D"] * ufl.grad(c)
        return j

    def _nondimensionalization(self):
        D_c = 1
        c_c = 1

        t_c = (self.characteristics["l"] ** 2) / D_c

        self.characteristics["c"] = c_c
        self.characteristics["t"] = t_c
        self.characteristics["D"] = D_c
        
        self._coefficients["D"] /= self.characteristics["D"]
        if "h" in self._coefficients.keys():
            h_c = self.characteristics["l"] / self.characteristics["D"]

            self.characteristics["h"] = h_c

            self._coefficients["h"] *= self.characteristics["h"]
        
    def assemble_boundary_conditions(self, boundary_conditions):
        self._bcs = boundary_conditions
        dirichlet = []
        for condition in self._bcs.dirichlet:
            facets = self._facet_tag.find(condition[1])
            dofs = fem.locate_dofs_topological(self._V, self.msh.topology.dim - 1, facets)
            u_D = fem.Function(self._V)
            u_D.interpolate(condition[0])
            dirichlet.append(fem.dirichletbc(u_D, dofs))

        self._bcs.dirichlet = dirichlet

    def _save_to_file(self, phi, t):
        self._save_file.write_function(phi, t)

    def calculate(self, coefficients, diffusion_time, theta=0.5, dt=None, c_0=None, save_solution=False, tol=0.1):

        if c_0:
            self._c_n.interpolate(c_0)
        else:
            self._c_n.interpolate(lambda x: x[0] * 0)

        self._coefficients = coefficients
        if self._bcs.transmission and not "h" in self._coefficients.keys():
            raise ValueError("If a transmission condition is applied a transmission coefficient must be given!")
        
        self._nondimensionalization()
        if not dt:
            if theta == 0.5:
                p = 2
            else:
                p = 1

            t = 0
            dt = 1 * 1e-8
            while t < diffusion_time / self.characteristics["t"]:
                print(t * self.characteristics["t"])
                if t + dt > diffusion_time / self.characteristics["t"]:
                    dt = (diffusion_time / self.characteristics["t"]) - t

                c_low = self._calculate_low(dt, theta)
                c_high = self._calculate_high(dt, theta)

                eta = errornorm(c_high, c_low, norm_type="L2", msh=self.msh) / ((2 ** p) - 1)
                if eta < 1e-14:
                    dt *= 10
                    continue

                safety = 0.5
                dt_new = (((safety * tol) / eta) ** (1 / p)) * dt
                if eta <= tol:
                    self._c.interpolate(c_high)
                    self._c_n.interpolate(c_high)
                    t += dt
                    if save_solution:
                        self._save_to_file(self._c, t * self.characteristics["t"])
                dt = dt_new
        else:
            t = 0
            dt /= self.characteristics["t"]
            while t < diffusion_time / self.characteristics["t"]:
                print(t * self.characteristics["t"])
                if t < 10 * dt:
                    c = self._calculate(dt, 1)
                else:
                    c = self._calculate(dt, theta)
                self._c.interpolate(c)
                self._c_n.interpolate(c)
                t += dt
                if save_solution:
                    self._save_to_file(self._c, t * self.characteristics["t"])
        self._save_file.close()
        print(f"Done calculating")

    def _weak_form(self, dt, theta):
        n = ufl.FacetNormal(self.msh)
        dV = ufl.Measure("dx", domain=self.msh)
        ds = ufl.Measure('ds', domain=self.msh, subdomain_data=self._facet_tag)

        res = ufl.inner(self._c_np1, self._dc) * dV - ufl.inner(self._c_n, self._dc) * dV
        res -= dt * ufl.inner((theta * self._material_law(self._c_np1)) + ((1 - theta) * self._material_law(self._c_n)), ufl.grad(self._dc)) * dV

        for item in self._bcs.empty:
            res += dt * self._dc * ufl.inner((theta * self._material_law(self._c_np1)) + \
                                        ((1 - theta) * self._material_law(self._c_n)), n) * ds(item)

        for item in self._bcs.neumann:
            res += dt * self._dc * (theta * item[0] + \
                                    (1 - theta) * ufl.inner(self._material_law(self._c_n), n)) * ds(item[1])

        for item in self._bcs.transmission:
            res += dt * self._coefficients["h"] * self._dc * (theta * (self._c_np1 - item[0]) + \
                                                              (1 - theta) * (self._c_n - item[0])) * ds(item[1])

            
        bilinear_form = fem.form(ufl.lhs(res))
        linear_form = fem.form(ufl.rhs(res))
        return bilinear_form, linear_form

    def _setup_solver(self):
        self.solver = PETSc.KSP().create(self.msh.comm)
        self.solver.setOperators(self._A)
        self.solver.setType(PETSc.KSP.Type.PREONLY)
        self.solver.getPC().setType(PETSc.PC.Type.LU)

    def _calculate_low(self, dt, theta):
        c_low = self._calculate(dt, theta)
        return c_low

    def _calculate(self, dt, theta):
        a, L = self._weak_form(dt, theta)
        self._A = fem.petsc.assemble_matrix(a, bcs=self._bcs.dirichlet)
        self._A.assemble()
        self._b = fem.petsc.create_vector(L)

        with self._b.localForm() as loc_b:
            loc_b.set(0)
        
        fem.petsc.assemble_vector(self._b, L)

        fem.petsc.apply_lifting(self._b, [a], [self._bcs.dirichlet])
        self._b.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
        fem.petsc.set_bc(self._b, self._bcs.dirichlet)

        self._setup_solver()
        c = fem.Function(self._V)
        self.solver.solve(self._b, c.vector)
        return c

    def _calculate_high(self, dt, theta):
        c_n = fem.Function(self._V)
        c_n.interpolate(self._c_n)
        c_half = self._calculate(dt / 2, theta)
        self._c_n.interpolate(c_half)
        c_high = self._calculate(dt / 2, theta)
        self._c_n.interpolate(c_n)
        return c_high

With the errornorm function:

def errornorm(u, uh, norm_type, msh):
    comm = uh.function_space.mesh.comm
    if norm_type == "L2":
        error = fem.form((uh - u) ** 2 * ufl.dx(msh))
        norm = np.sqrt(comm.allreduce(fem.assemble_scalar(error), MPI.SUM))
    else:
        raise NotImplementedError


    return norm

It is used as follows:

width = 10
length = 30
depth = 1
res = 30
p_deg = 1

temperature = 200
concentration = 1
time = 8 #h

D_0 = fits.diffusion_coeff(temperature)

msh = setup_test_mesh(width, length, depth, res)

facet_tag = setup_test_tags(msh, width)

diff = Base(msh, facet_tag=facet_tag, l_c=width, p_deg=p_deg)

u_ex = lambda x: x[0] * 0 + concentration
bcs = {"dirichlet": [(u_ex, 0)]}
boundary = BoundaryConditions(in_bcs)

diff.assemble_boundary_conditions(in_boundary)
diff.calculate({"D": float(D_0)}, time_in * 3600, dt=dt, save_solution=True, tol=tol)

with the helper functions:

def setup_test_tags(msh, width):
    boundaries = [(0, lambda x:np.isclose(x[1], 0)),
                  (1, lambda x: np.isclose(x[0], -0.5)),
                  (2, lambda x: np.isclose(x[0], +0.5)),
                  (3, lambda x: np.isclose(x[1], -1.))]

    facet_tag = helper.generate_facet_tags(msh, boundaries)

    return facet_tag

def generate_facet_tags(msh, boundaries):
    facet_indices, facet_markers = [], []
    fdim = msh.topology.dim - 1
    for (marker, locator) in boundaries:
        facets = mesh.locate_entities(msh, fdim, locator)
        facet_indices.append(facets)
        facet_markers.append(np.full_like(facets, marker))
    facet_indices = np.hstack(facet_indices).astype(np.int32)
    facet_markers = np.hstack(facet_markers).astype(np.int32)
    sorted_facets = np.argsort(facet_indices)
    facet_tag = mesh.meshtags(msh, fdim, facet_indices[sorted_facets], facet_markers[sorted_facets])
    return facet_tag

def setup_test_mesh(width, length, depth, res):
    nondim_width = 1
    nondim_length = length / width 
    nondim_depth = depth / width
    if depth < 1e-14:
        msh = mesh.create_rectangle(MPI.COMM_WORLD,
                                    [(-nondim_width / 2, -nondim_length),
                                     (nondim_width / 2, 0)], [res, res])
    else:
        msh = mesh.create_box(MPI.COMM_WORLD, 
                            [(-nondim_width / 2, -nondim_length, 0), 
                            (nondim_width / 2, 0, nondim_depth)], [res, res, 5])
    return msh

I would really appreciate it, if i get some help fix the issues i have in my code! Thank you already in advance for reading through my post. I hope you have a wonderful day!

You should set mumps or superlu_dist as backend for the LU solver, as you need a direct solver that supports parallel runs.
See; https://github.com/FEniCS/dolfinx/blob/200e94c046aa4dcc42e93c87c6cee35ef97600b2/python/demo/demo_stokes.py#L506

I dont have time to Give you any further comments, as your code is really quite large.

Thank you so much for taking the time to look over the code i provided. I know it is quite large :confused:.

I have rewritten the _setup_solver function:

def _setup_solver(self):
    self.solver = PETSc.KSP().create(self.msh.comm)
    self.solver.setOperators(self._A)
    self.solver.setType(PETSc.KSP.Type.PREONLY)

    pc = self.solver.getPC()
    pc.setType("lu")
    pc.setFactorSolverType("mumps")
    pc.setFactorSetUpSolverType()
    pc.getFactorMatrix().setMumpsIcntl(icntl=24, ival=1)
    pc.getFactorMatrix().setMumpsIcntl(icntl=25, ival=0)

I have used the code you provided. But still i get the following results:

no-mpi:
2023-08-28 11:45:28.310 ( 166.569s) [main            ]             loguru.cpp:526   INFO| atexit

mpi: 
2023-08-28 11:50:27.154 ( 223.722s) [main            ]         TimeLogger.cpp:28    INFO| Elapsed wall, usr, sys time: 0.016760, 0.010000, 0.000000

Where the process with mpi did not finish and was still calculating.

I think you should start with something smaller and more self contained. Make sure that you can get a simple simulation on a coarse grid to run in parallel, then add more and more complexity gradually.

2 Likes