Hello, I am currently working on a code to simulate the incompressible NavierStokes equations. The definition of the problem can be found at this topic. Only the geometry changed, having three cylinders instead of one. The problem I face concerns multiprocessing. I have to say that I tested the cylinder tutorial and multiprocessing works perfectly, clearly speeding up the simulation.
With my code, multiprocessing does not throw error, but makes the computing several times slower, and I don’t know why. I am clearly missing something concerning the use of multiprocessing in FEniCSx, so I put my code below. It is a bit long indeed, but it contains everything (parameters, mesh generation etc…) and the code is spaced and well commented. The code should be directly launchable via the command mpirun np n_proc python my_script.py
where n_proc
is the number of processors you want to run in parallel and my_script.py
is the python file in which you should wrote (if you have time to try) my code (which is written below).
My code is divided in 6 parts:

All the imports required.

A class
FluidDynamics_env
that represents my fluid environment, it is a base to built a GYM environment later. It has three main methods: a
__init__
method that defines the parameters of my 2D flow simulation, along with the type of elements and the function definitions (mixedelement function, velocitypressure).  a
init_unsteady_ns
method that defines the problem to solve, that is to say the discretization scheme (described at this topic)  a
one_time_step
method that, as the name suggests, calculates the solution one time step further. It is also where the solver is defined.
 a

The definition of classes for boundary conditions. Only Dirichlet conditions:
 for velocity: no slip at the walls and cylinders (obstacles, that can rotate around their respective axis), and Poseuille velocity profile at the inlet.
 for pressure, always zero at the outlet.

The mesh creation, well commented. There are two lines commented, you can uncomment them to see the mesh in a GUI.

Definition of the boundary conditions using classes of part 3.

The parameters of my simulation (gathered in a dictionary named
parameters
), instanciation ofFluidDynamics_env
object (whose class is defined in part 2), and flow calculation with afor
loop, the calculation time is printed.
My script generates a xdmf file called solver_non_linear_test.xdmf
. You can watch the result easily in paraview. I set the number of steps num_steps
at 10 in the part 6 of my code, so that it is fast. With one processor, it runs for me in 3 seconds, but it take 3 times longer when running with 10 procs.
So my whole question is to know how I could fix this. The code is written just below, parts are indicated.
############################################
# 1 Imports #
############################################
import gmsh
import os
import numpy as np
import matplotlib.pyplot as plt
import tqdm.autonotebook
from mpi4py import MPI
from petsc4py import PETSc
from dolfinx.cpp.mesh import to_type, cell_entity_type
from dolfinx.fem import (Constant, Function, FunctionSpace, VectorFunctionSpace, assemble_scalar,
dirichletbc, form, locate_dofs_topological, Expression)
from dolfinx.fem.petsc import (apply_lifting, assemble_matrix, assemble_vector, create_vector, create_matrix, set_bc, LinearProblem, NonlinearProblem)
from dolfinx.graph import create_adjacencylist
from dolfinx.geometry import BoundingBoxTree, compute_collisions, compute_colliding_cells
from dolfinx.io import (VTXWriter, distribute_entity_data, gmshio, XDMFFile)
from dolfinx.mesh import create_mesh, meshtags_from_entities
from ufl import (FacetNormal, FiniteElement, Identity, Measure, TestFunction, TestFunctions, TrialFunction, VectorElement, MixedElement, as_vector, div, dot, ds, dx, inner, lhs, grad, nabla_grad, rhs, sym, curl, derivative, split)
from dolfinx.nls.petsc import NewtonSolver
from os import path, mkdir
from dolfinx import log
import gym
import decimal
import sys
import math
mesh_comm = MPI.COMM_WORLD
############################################
# 2 Custom solver #
############################################
class FluidDynamics_env():
"""
Class that will be used to start the simulation by initializing everything (__init__ and init_unsteady_ns methods),
each time step will be performed by one_time_step method.
"""
def __init__(self, parameters):
"""
Constructor method. We retrieve the parameters of the simulation, makes them instance attributes,
create a mixed function space for velocity and pressure and specify the boundary conditions. The path
for the eventual exported data is retrieved.
Parameters

parameters : dict
dictionary that contains parameters of our simulation.
"""
# retrieve the parameters dictionary and its features
self.parameters = parameters
[self.dt, self.num_steps, self.nu, self.BDF, self.mesh, self.ft]\
= [parameters[key] for key in ["dt", "num_steps", "nu", "BDF", "mesh", "ft"]]
# time initialization (beware of python's addition!)
d = decimal.Decimal(str(self.dt))
self.n_decimal = abs(d.as_tuple().exponent)
self.t = 0.0
# time at which the simulation will stop
self.T = int(self.dt * self.num_steps)
# creating the space on which the solution will be defined, here it is TaylorHood element P2P1.
velocity_element = VectorElement("Lagrange", self.mesh.ufl_cell(), 2)
pressure_element = FiniteElement("Lagrange", self.mesh.ufl_cell(), 1)
W = FunctionSpace(self.mesh, MixedElement([velocity_element, pressure_element]))
self.velocity_element = velocity_element
self.pressure_element = pressure_element
self.W = W
# boundary conditions
bcs = []
fdim = self.mesh.topology.dim  1
for bc in parameters["bcs"]:
i = {"velocity":0, "pressure":1}[bc[0]]
boundary_facets = self.ft.find(bc[2])
boundary_dofs = locate_dofs_topological((W.sub(i), W.sub(i).collapse()[0]), self.mesh.topology.dim1, boundary_facets)
user_defined_bc = Function(W.sub(i).collapse()[0])
user_defined_bc.interpolate(bc[1])
bcs.append(dirichletbc(user_defined_bc, boundary_dofs, W.sub(i)))
self.bcs = bcs
# relaxation constant (see discretization scheme)
self.relax = Constant(self.mesh, PETSc.ScalarType(1e6))
# initialization of the number of steps
self.n_steps = 0
# making the parameters dictionary an instance attribute
self.parameters = parameters
# writing solutions to file
self.xdmf = XDMFFile(mesh_comm, "solver_non_linear_test.xdmf", "w")
self.xdmf.write_mesh(self.mesh)
# definition of the assign function (not yet implemented in dolfinx)
def custom_assign(self, copy_func, original_func):
"""
Function to copy the nodal values from a function to another.
Parameters

copy_func : Function
function that will receive the nodal values of original_func
original_func : Function
function from which nodal values are copied past into copy_func
"""
copy_func.x.array[:] = original_func.x.array
def write_function_to_file(self, file, func, *names):
"""
method to write the solution. Since w is a mixed function space, we name each subfunction
(velocity and pressure) and write them in a file for visualization in paraview
Parameters

file : str
name of the file where solutions will be written
func : Function
function to write in file
names : list of str
names of the functions (in our case "velocity" and "pressure")
"""
if len(names) > 1:
for i, name in enumerate(names):
sub_func = func.sub(i)
sub_func.name = name
file.write_function(sub_func, self.t)
else:
func.name = names[0]
file.write_function(func, self.t)
def init_unsteady_ns(self):
"""
Initialization of the unsteady, incompressible NavierStokes problem. The temporal scheme (BDF for
Backward Differential Formula) is defined (for time step 1, BDF1 is used then BDF2 is used).
Initial conditions are set and the discretization scheme F (without BDF) is defined.
"""
# initiate finite element test functions, v for the velocity field and q for the pressure
v, q = TestFunctions(self.W)
# initiate solution vector, it is a function of a mixed function space, the first one is for velocity field
# and the second for pressure field (u on V and p on Q respectively, with W:=V*Q)
self.w = Function(self.W)
u, p = split(self.w)
# only to visualize first step in file as it is self.w that will be written
set_bc(self.w.vector, self.bcs)
# write solution on step 0 (initial solution given by boundary conditions and zero everywhere else)
if self.n_steps == 0:
self.write_function_to_file(self.xdmf, self.w, "velocity", "pressure")
# definition of the temporal scheme (here a BDF). A little trick is used to create an adaptative temporal
# scheme. Meaning that at step 1, BDF1 is used, and after that, BDF2 is used. To better understand, see
# one_time_step method.
if self.BDF >= 1: # BDF1
self.w_prev1 = Function(self.W)
u_prev1, p_prev1 = split(self.w_prev1)
temporal_scheme_list = [inner(u  u_prev1, v)]
assign_list = ['self.custom_assign(self.w_prev1, self.w)']
if self.BDF >= 2: #BDF2
self.w_prev2 = Function(self.W)
u_prev2, p_prev2 = split(self.w_prev2)
temporal_scheme_list.append(inner(Constant(self.mesh, PETSc.ScalarType(1.5)) * u\
 Constant(self.mesh, PETSc.ScalarType(2.0)) * u_prev1\
+ Constant(self.mesh, PETSc.ScalarType(0.5)) * u_prev2, v))
assign_list.append('self.custom_assign(self.w_prev2, self.w_prev1)')
self.temporal_scheme_list = temporal_scheme_list
assign_list.reverse()
self.assign_list = assign_list
# definition of the discretization scheme (without BDF)
self.F = (inner(dot(grad(u), u_prev1), v) + inner(dot(grad(u_prev1), u), v)  inner(dot(grad(u_prev1), u_prev1), v)\
+ self.nu * inner(grad(u), grad(v))\
 p * div(v)\
 q * div(u)\
 self.relax * p * q) * self.dt * dx
def one_time_step(self):
"""
Method that performs one time step forward (time step is self.dt)
For the first time step, BDF1 is used as there is only one solution known: the initial condition.
"""
if self.n_steps < self.BDF:
printr0("changing solver")
if self.n_steps == 0:
self.init_unsteady_ns()
set_bc(self.w_prev1.vector, self.bcs)
# defining the temporal scheme for the first steps of the simulation
temporal_scheme = self.temporal_scheme_list[self.n_steps]
# reunion of the BDF (tempora scheme) and F to form the complete discretization scheme
F_BDF = temporal_scheme * dx + self.F
# defining the problem
problem = NonlinearProblem(F_BDF, self.w, bcs=self.bcs)
self.solver = NewtonSolver(mesh_comm, problem)
# ksp = self.solver.krylov_solver
# opts = PETSc.Options()
# option_prefix = ksp.getOptionsPrefix()
# opts[f"{option_prefix}ksp_type"] = "preonly"
# opts[f"{option_prefix}pc_type"] = "lu"
# opts[f"{option_prefix}pc_factor_mat_solver_type"] = "mumps"
# ksp.setFromOptions()
# incrementing the time by one time step
self.t = round(self.t + self.dt, self.n_decimal)
# incrementing the number of steps
self.n_steps += 1
# now that w_prev1 and w_prev2 are defined (thanks to the previous assignation) as the solutions at
# 1 and 2 previous time steps, we can use the discretizaton scheme to solve the problem for the unknown
# w !
tic = time.perf_counter()
n, converged = self.solver.solve(self.w)
assert(converged)
toc = time.perf_counter()
printr0("solving took {} seconds".format(toc  tic))
# now the assignation takes place (w_prev1 becomes w_prev2 and w becomes w_prev1)
tic = time.perf_counter()
for assignation in self.assign_list:
eval(assignation)
toc = time.perf_counter()
printr0("assigning took {} seconds".format(toc  tic))
# here the little trick previously seen in init_unsteady_ns is used. For the first time step BDF1 is used,
# the unknown is w, and the kwnow solution is w_prev1: the initial condition (v and p equal zero), this is
# done by evaluating the first element of assign_list. For the other steps, BDF2 is used, the unknown is w,
# and the two previously known solutions are w_prev1 and w_prev2.
tic = time.perf_counter()
self.write_function_to_file(self.xdmf, self.w, "velocity", "pressure")
toc = time.perf_counter()
printr0("write to file took {} seconds".format(toc  tic))
############################################
# 3 Classes for Boundary conditions #
############################################
class Inlet_Velocity():
def __init__(self, U_0, H, gdim):
"""
Parameters

U_0 : float
max velocity of the Poiseuille profile
H : float
spanwise length
gdim : int
dimension of the velocity field (here 2)
"""
self.U_0 = U_0
self.H = H
self.gdim = gdim
def __call__(self, x):
"""
Parameters

x : ?
coordinate at which the boundary condition will be set
Returns

numpy.ndarray
velocity vector at coordinate x
"""
values = np.zeros((self.gdim, x.shape[1]), dtype=PETSc.ScalarType)
values[0] = self.U_0*(self.H + x[1])*(self.H  x[1])/(self.H**2)
return values
# Cylinder velocity profile
class Obstacle_Velocity():
def __init__(self, Omega, center, gdim):
"""
Parameters

center : list
coordinates of the center of the cylinder
gdim : int
dimension of the velocity field (here 2)
Omega : float
rotation velocity
"""
self.center = center
self.Omega = Omega
self.gdim = gdim
def __call__(self, x):
"""
Parameters

x : ?
coordinate at which the boundary condition will be set
Returns

numpy.ndarray
velocity vector at coordinate x
"""
values = np.zeros((self.gdim, x.shape[1]), dtype=PETSc.ScalarType)
values[0] =  self.Omega * (x[1]  self.center[1])
values[1] = self.Omega * (x[0]  self.center[0])
return values
# Constant condition
class Constant_Dirichlet():
def __init__(self, *values):
"""
Parameters

values : float or list of floats
values of the field at the locations of the boundary condition
"""
self.values = values
def __call__(self, x):
"""
Parameters

x : ?
coordinate at which the boundary condition will be set
Returns

numpy.ndarray
field vector at coordinate x
"""
values = np.zeros((len(self.values), x.shape[1]), dtype=PETSc.ScalarType)
for i, value in enumerate(self.values):
values[i] = value
return values
############################################
# 4 Mesh Creation #
############################################
import time
wall_marker, inlet_marker, outlet_marker = range(1, 4)
obstacle_markers = list(range(4, 7))
### LOADING MESH AND BOUNDARY MARKERS
## As we have generated the mesh, we now need to load the mesh and corresponding
## facet markers into DOLFINx. See https://jsdokken.com/src/tutorial_gmsh.html for explanations
# from mpi4py import MPI
# mesh_comm = MPI.COMM_WORLD
gdim = 2
# mesh, cell_tags, facet_tags = gmshio.read_from_msh("pinball_mesh.msh", mesh_comm, 0, gdim=gdim)
def printr0(*message):
"""
Function to print a message only for the processor with rank 0
Parameters

message :
the message to print
"""
if mesh_comm.rank == 0:
print(*message)
# Calculating the time that mesh generation will take
tic = time.perf_counter()
gmsh.initialize()
r = 0.5 # radius of the 3 cylinders
c_x, c_y = [3*math.sqrt(3)*r/2, 0, 0], [0, 3*r/2,3*r/2] # coordinates of the centers of the three cylinders (front, top and bottom)
mesh_comm = MPI.COMM_WORLD
model_rank = 0
gdim = 2
if mesh_comm.rank == model_rank:
gmsh.model.add("fluidic_pinball")
# target mesh size for any point
lc = 1
if mesh_comm.rank == model_rank:
## 0 dim : Points definition
# Rectangle delimiting the flow
gmsh.model.geo.addPoint(12*r, 12*r, 0, lc, 1)
gmsh.model.geo.addPoint(40*r, 12*r, 0, lc, 2)
gmsh.model.geo.addPoint(40*r, 12*r, 0, lc, 3)
gmsh.model.geo.addPoint(12*r, 12*r, 0, lc, 4)
# Points to create the circles for the three cylinders (4 points for each cylinder)
for i in range(3):
gmsh.model.geo.addPoint(c_x[i], c_y[i], 0, lc, 5 + 5*i) # center
gmsh.model.geo.addPoint(c_x[i] + r, c_y[i], 0, lc, 6 + 5*i)
gmsh.model.geo.addPoint(c_x[i], c_y[i] + r, 0, lc, 7 + 5*i)
gmsh.model.geo.addPoint(c_x[i]  r, c_y[i], 0, lc, 8 + 5*i)
gmsh.model.geo.addPoint(c_x[i], c_y[i]  r, 0, lc, 9 + 5*i)
# Points to create the polygon (pentagon in this case) that will include the cylinders and the wake
# In that pentagon, the mesh will be refined
gmsh.model.geo.addPoint(c_x[0]  2*r, c_y[0], 0, lc, 20)
gmsh.model.geo.addPoint(c_x[1]  2*r, c_y[1] + 2*r, 0, lc, 21)
gmsh.model.geo.addPoint(40*r  4*r, c_y[1] + 2*r, 0, lc, 22)
gmsh.model.geo.addPoint(40*r  4*r, c_y[2]  2*r, 0, lc, 23)
gmsh.model.geo.addPoint(c_x[2]  2*r, c_y[2]  2*r, 0, lc, 24)
## 1 dim: Lines and Curves
# Lines for the rectangle
gmsh.model.geo.addLine(1, 2, 1)
gmsh.model.geo.addLine(2, 3, 2)
gmsh.model.geo.addLine(3, 4, 3)
gmsh.model.geo.addLine(4, 1, 4)
# Lines for the pentagone
gmsh.model.geo.addLine(20, 21, 5)
gmsh.model.geo.addLine(21, 22, 6)
gmsh.model.geo.addLine(22, 23, 7)
gmsh.model.geo.addLine(23, 24, 8)
gmsh.model.geo.addLine(24, 20, 9)
# Creation of the circles for the three cylinders (4 CircleArcs for each circle)
for i in range(3):
gmsh.model.geo.addCircleArc(6 + 5*i, 5 + 5*i, 7 + 5*i, tag=10 + 4*i)
gmsh.model.geo.addCircleArc(7 + 5*i, 5 + 5*i, 8 + 5*i, tag=11 + 4*i)
gmsh.model.geo.addCircleArc(8 + 5*i, 5 + 5*i, 9 + 5*i, tag=12 + 4*i)
gmsh.model.geo.addCircleArc(9 + 5*i, 5 + 5*i, 6 + 5*i, tag=13 + 4*i)
## 2 dim
# Creation of the plane surface: rectangle  pentagone
gmsh.model.geo.addCurveLoop(list(range(1, 5)), 1)
gmsh.model.geo.addCurveLoop(list(range(5, 10)), 2)
gmsh.model.geo.addPlaneSurface([1, 2], 1)
# Creation of a curve for the polygon
curved_loop_list = [2]
if mesh_comm.rank == model_rank:
# Creation of curves loops for the three cylinders
for i in range(3):
gmsh.model.geo.addCurveLoop([10 + 4*i, 11 + 4*i, 12 + 4*i, 13 + 4*i], 3 + i)
curved_loop_list.append(3 + i)
# Creation of the plane surface: polygon  cylinders
gmsh.model.geo.addPlaneSurface(curved_loop_list, 2)
gmsh.model.geo.synchronize()
## Fields
# Distance field to calculate distance from the three cylinders
gmsh.model.mesh.field.add("Distance", 1)
gmsh.model.mesh.field.setNumbers(1, "CurvesList", list(range(10, 22)))
gmsh.model.mesh.field.setNumber(1, "Sampling", 100)
# Threshold field to refine the mesh around the three cylinders
gmsh.model.mesh.field.add("Threshold", 2)
gmsh.model.mesh.field.setNumber(2, "InField", 1)
gmsh.model.mesh.field.setNumber(2, "SizeMin", lc / 20)
gmsh.model.mesh.field.setNumber(2, "SizeMax", lc)
gmsh.model.mesh.field.setNumber(2, "DistMin", r/2)
gmsh.model.mesh.field.setNumber(2, "DistMax", r)
# Constant mesh size inside polygon
gmsh.model.mesh.field.add("Constant", 3)
gmsh.model.mesh.field.setNumber(3, "VIn", lc/5)
gmsh.model.mesh.field.setNumber(3, "VOut", lc)
gmsh.model.mesh.field.setNumbers(3, "SurfacesList", [2])
# Final field as in t10.py, take the minimum mesh size for all Threshold fields.
gmsh.model.mesh.field.add("Min", 5)
gmsh.model.mesh.field.setNumbers(5, "FieldsList", [2, 3])
gmsh.model.mesh.field.setAsBackgroundMesh(5)
## Create Physical Groups
gmsh.model.addPhysicalGroup(1, [1, 3], name="walls") # walls: up and down sides of the rectangle
gmsh.model.addPhysicalGroup(1, [4], name="inlet") # inlet: left side of the rectangle
gmsh.model.addPhysicalGroup(1, [2], name="outlet") # outlet: right side of the rectangle
gmsh.model.addPhysicalGroup(1, list(range(10, 14)), name="front_cylinder") # front obstacle (left most)
gmsh.model.addPhysicalGroup(1, list(range(14, 18)), name="top_cylinder") # top obstacle (the up most)
gmsh.model.addPhysicalGroup(1, list(range(18, 22)), name="bottom_cylinder") # bottom cylinder (the down most)
gmsh.model.addPhysicalGroup(2, [1, 2], name="control_volume") # Reunion of the two plane surfaces: where fluid will flow
gmsh.model.geo.synchronize()
# Generate and write to file
gmsh.model.mesh.generate(gdim)
gmsh.write("pinball_mesh.msh")
gmsh.write("pinball_mesh.vtk")
# Launch the GUI to see the results:
# if 'nopopup' not in sys.argv:
# gmsh.fltk.run()
mesh, cell_tags, facet_tags = gmshio.model_to_mesh(gmsh.model, mesh_comm, model_rank, gdim=gdim)
mesh.name = "Pinball"
cell_tags.name = "Cell markers"
facet_tags.name = "Facet markers"
# printing how long the mesh creation took
toc = time.perf_counter()
printr0("Mesh generation took {} secondes !".format(toctic))
############################################
# 5 Boundary conditions definition #
############################################
# rotation velocities for the three cylinders
Omegas = [12, 12, 12]
# inlet velocity profile (Poiseuille)
bcu_inlet = ["velocity", Inlet_Velocity(2, 12*r, gdim), inlet_marker]
# velocity is null at the walls
bcu_walls = ["velocity", Constant_Dirichlet(0,0), wall_marker]
# no slip condition according to the rotation velocities
bcu_obstacles = []
for i in range(len(Omegas)):
bcu_obstacles.append(["velocity", Obstacle_Velocity(Omegas[i], [c_x[i], c_y[i]], gdim), obstacle_markers[i]])
# pressure is null at the outlet
bcp_outlet = ["pressure", Constant_Dirichlet(0), outlet_marker]
############################################
# Parameters definition and lauching #
############################################
parameters = {
"num_steps": 10, # number of steps (number of times the pde system will be solved)
"dt": 0.1, # time step
"nu": 1/200, # a parameter in the NavierStokes equations (the pde system to solve)
"BDF": 2, # Backward Differentation Formula order
"bcs": [bcu_inlet, bcu_walls, *bcu_obstacles, bcp_outlet], # boundary conditions
"mesh": mesh,
"ft": facet_tags,
"c": cell_tags,
}
# Instantiation of a FluidDynamics_env object
simulation = FluidDynamics_env(parameters)
# calculating how long the simulation took
tic = time.perf_counter()
for i in range(parameters["num_steps"]):
printr0("#***********#\n Step number {} #**********#".format(i))
# performing on step
simulation.one_time_step()
if i == 2:
tic = time.perf_counter()
toc = time.perf_counter()
printr0("Simulation took {} secondes !".format(toctic))
gmsh.finalize()
I would be grateful if you could run my code. I don’t think there is a need to understand everything about it, but with the great scheme of its functioning (the 6 parts), I think it can be relatively easy to find out why multiprocessing fails at speeding up the simulation (provided one have a better understanding than me of how multiprocessing works with FEniCSx).
Thanks and regards.