Hello everyone,
I’m working with a mixed element function space in FEniCS Legacy, and I’m trying to implement checkpointing (both reading and writing) to save and reload simulation states. However, after reading the checkpoint files, the solution vectors on the mesh seem to be incorrect.
Here’s an outline of my approach:
- I have a mixed element function space consisting of two fields.
- For checkpointing, I write the mesh and the solution functions (
c
andphi
) into XDMF files. - When reloading the solution, I use
FunctionAssigner
to assign the values to the mixed function.
I suspect that something is going wrong during the reading or reassignment process, but I’m unsure of the correct procedure. Below is a part of my checkpointing code:
def writecheckpoint(self):
with fe.XDMFFile("mesh.xdmf") as mesh_file:
mesh_file.write(self.mesh)
c_, phi_ = self.sf0.split(deepcopy=True)
with fe.XDMFFile("phi_.xdmf") as file:
file.write_checkpoint(phi_, "phi_", self.Time, fe.XDMFFile.Encoding.HDF5, append=False)
with fe.XDMFFile("c_.xdmf") as file:
file.write_checkpoint(c_, "c_", self.Time, fe.XDMFFile.Encoding.HDF5, append=False)
def readcheckpoint(self):
phi_ = fe.Function(self.spacephi_)
c_ = fe.Function(self.spacec_)
with fe.XDMFFile("phi_.xdmf") as file:
file.read_checkpoint(phi_, "phi_")
with fe.XDMFFile("c_.xdmf") as file:
file.read_checkpoint(c_, "c_")
assigner = fe.FunctionAssigner(self.ME, [self.spacec_, self.spacephi_])
assigner.assign(self.sf0, [c_, phi_])
The issue arises after reading the files—the solution vectors seem incorrect, and the results don’t match what I expect from the saved state.
What is the correct way to write checkpoint functions for a mixed element function space in FEniCS Legacy? Am I missing any crucial steps in reading or writing these files?
Any insights or suggestions would be greatly appreciated!
This is the minimal code for solving a diffusion equation using mixed elemnts. i am solving for C and phi or the second argument is allways zero.
import fenics as fe
import numpy as np
from tqdm import tqdm
fe.set_log_level(fe.LogLevel.ERROR)
#################### Define Parallel Variables ####################
comm = fe.MPI.comm_world
rank = fe.MPI.rank(comm)
size = fe.MPI.size(comm)
############################# END ################################
class Diffusion:
def __init__(self, params_dict):
self.params_dict = params_dict
self.mesh = self.params_dict["mesh"]
self.dt = self.params_dict["dt"]
self.D = self.params_dict["D"] # Diffusion Coefficient
self.T_hot = self.params_dict["T_hot"]
self.T_cold = self.params_dict["T_cold"]
self.Center = self.params_dict["Center"]
self.rad = self.params_dict["rad"]
self.Time = self.params_dict["Time"]
# Initialize
self.funcspace()
self.form()
self.define_solver()
if self.params_dict["checkpoint"]== True:
self.readcheckpoint()
else:
self.InitialCondition()
self.define_solver()
def funcspace(self):
P1 = fe.FiniteElement("Lagrange", self.mesh.ufl_cell(), 1)
element = fe.MixedElement([P1, P1])
self.ME = fe.FunctionSpace(self.mesh, element)
self.v_c , self.v_phi = fe.TestFunctions(self.ME)
self.sf = fe.Function(self.ME)
self.sf0 = fe.Function(self.ME)
self.c , self.phi = fe.split(self.sf) # current time step
self.c_ , self.phi_ = fe.split(self.sf0) # previous time step
self.spacec_, _ = self.ME.sub(0).collapse(collapsed_dofs=True)
self.spacephi_, _ = self.ME.sub(1).collapse(collapsed_dofs=True)
def form(self):
# 1. ∫( (dc/dt * v_c) + D * (∇c · ∇v_c) ) dx = 0
dcdt = (self.c - self.c_) / self.dt
gradc = fe.grad(self.c)
eqdiff = (
fe.inner(dcdt,self.v_c)
+self.D*fe.inner(gradc, fe.grad(self.v_c ))
) * fe.dx
# 2. phi is always zero
dphidt = (self.phi-self.phi_)/self.dt
eqphi = fe.inner(dphidt,self.v_phi)*fe.dx
self.F = eqdiff + eqphi
def define_solver(self):
J = fe.derivative(self.F, self.sf)
problem = fe.NonlinearVariationalProblem(self.F, self.sf, J=J)
solver = fe.NonlinearVariationalSolver(problem)
prm = solver.parameters
prm["newton_solver"]["absolute_tolerance"] = 1E-8
prm["newton_solver"]["relative_tolerance"] = 1E-7
prm["newton_solver"]["maximum_iterations"] = 100
self.solver = solver
def solve(self):
self.solver.solve()
self.sf0.assign(self.sf)
def InitialCondition(self):
class InitialConditions(fe.UserExpression):
def __init__(self, rad, center, T_hot, T_cold, **kwargs):
super().__init__(**kwargs)
self.rad = rad
self.center = center
self.T_hot = T_hot
self.T_cold = T_cold
def eval(self, values, x):
xc, yc = self.center
x, y = x[0], x[1]
dist = (x-xc)**2+(y-yc)**2
if dist <= self.rad**2:
values[0] = self.T_hot
else:
values[0] = self.T_cold
values[1] = 0
def value_shape(self):
return (2,)
Expr = InitialConditions(rad= self.rad, center = self.Center, T_hot= self.T_hot, T_cold= self.T_cold)
self.sf0.interpolate(Expr)
def writecheckpoint(self):
with fe.XDMFFile("mesh.xdmf") as mesh_file:
mesh_file.write(self.mesh)
c_, phi_ = self.sf0.split(deepcopy=True)
with fe.XDMFFile("phi_.xdmf") as file:
file.write_checkpoint(phi_, "phi_", self.Time, fe.XDMFFile.Encoding.HDF5, append=False)
with fe.XDMFFile("c_.xdmf") as file:
file.write_checkpoint(c_, "c_", self.Time, fe.XDMFFile.Encoding.HDF5, append=False)
def readcheckpoint(self):
if self.params_dict["checkpoint"]== True:
phi_ = fe.Function(self.spacephi_)
c_ = fe.Function(self.spacec_)
with fe.XDMFFile("phi_.xdmf") as file:
file.read_checkpoint(phi_, "phi_")
with fe.XDMFFile("c_.xdmf") as file:
file.read_checkpoint(c_, "c_")
assigner = fe.FunctionAssigner(self.ME, [self.spacec_, self.spacephi_])
assigner.assign(self.sf0, [c_, phi_])
############### Define Parameters ############
params = {
"dt" : 1E-2,
"dy": 1E-1,
"D" : 0.4,
"Nx" : 15,
"Ny" : 15,
"Center" :[7.5,7.5],
"rad": 2,
"T_cold": 0,
"T_hot": 20,
"Time": 0,
"checkpoint": False
}
############## Define Mesh ################
nx = (int)(params["Nx"]/params["dy"])
ny = (int)(params["Ny"]/params["dy"])
mesh = fe.RectangleMesh(fe.Point(0, 0), fe.Point(params["Nx"], params["Ny"]), nx, ny)
# if checkpoint is true, read mesh from file and start from previous solution
if params["checkpoint"]== True:
mesh = fe.Mesh()
with fe.XDMFFile("mesh.xdmf") as file:
file.read(mesh)
params["mesh"] = mesh
################# END ######################
################# Define file ##############
file = fe.XDMFFile("Diff.xdmf")
file.parameters["rewrite_function_mesh"] = True
file.parameters["flush_output"] = True
file.parameters["functions_share_mesh"] = True
################# END ######################
################# Define Diffusion Object ###
diff_problem = Diffusion(params)
Time = params["Time"]
dt = params["dt"]
for it in (range(int(1e4))):
diff_problem.solve()
Time += dt
params["Time"] = Time
if it%20 == 0:
c, phi = diff_problem.sf0.split(deepcopy=True)
c.rename("c", "c")
phi.rename("phi", "phi")
file.write(c, Time)
file.write(phi, Time)
if rank == 0:
print(f"iteration = ",it , flush=True)
# write checkpoint
if it%20 == 0:
diff_problem.writecheckpoint()