I met this error when I try to evaluate the funcion V:
V = FunctionSpace(mesh, v_cg2)
u_ = Function(V)
u_.eval(inlet_cylinder_coord, inlet_cylinder_cell)
and inlet_cylinder_coord
and inlet_cylinder_cell
are defined by:
obstacle_cell = locate_dofs_topological(V, fdim, ft.find(self.obstacle_marker))
inlet_cell = locate_dofs_topological(V, fdim, ft.find(self.inlet_marker))
inlet_cylinder_cell = np.concatenate([inlet_cell, obstacle_cell], axis=0)
inlet_cylinder_coord = mesh.geometry.x[inlet_cylinder_cell]
Here are the full error info:
Traceback (most recent call last):
File "/home/wulong/pycharm-community-2021.3.3/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_exec2.py", line 3, in Exec
exec(exp, global_vars, local_vars)
File "<input>", line 1, in <module>
File "/home/wulong/anaconda3/envs/drl/lib/python3.10/site-packages/dolfinx/fem/function.py", line 328, in eval
self._cpp_object.eval(_x, _cells, u)
RuntimeError: Newton method failed to converge for non-affine geometry
And here are the full code:
import gmsh
import numpy as np
import sys
from mpi4py import MPI
from fluid_mechanics.area import *
from petsc4py import PETSc
import dolfinx
from dolfinx import geometry
from dolfinx.fem import (Constant, Function, FunctionSpace,
assemble_scalar, dirichletbc, form, locate_dofs_topological, set_bc)
from dolfinx.fem.petsc import (apply_lifting, assemble_matrix, assemble_vector,
create_vector, create_matrix, set_bc)
from dolfinx.geometry import BoundingBoxTree, compute_collisions, compute_colliding_cells
from dolfinx.io import (XDMFFile, distribute_entity_data, gmshio)
from utils.encoder import normalization
from ufl import (FacetNormal, FiniteElement, Identity, Measure, TestFunction, TrialFunction, VectorElement,
as_vector, div, dot, ds, dx, inner, lhs, grad, nabla_grad, rhs, sym)
class simulation:
def __init__(self, c_x, c_y, o_x, o_y, r, r2,
obstacle_type='cylinder',
save_mesh='false',
save='false',
visualize='false'):
self.L = 2.2
self.H = 0.41
self.c_x = c_x
self.c_y = c_y
self.r = r
self.gdim = 2
self.model_rank = 0
self.mesh_comm = MPI.COMM_WORLD
self.save = save
self.o_x = o_x
self.o_y = o_y
self.r2 = r2
self.obstacle_type = obstacle_type
self.save_mesh = save_mesh
self.visualize = visualize
def generate_mesh(self):
# L*H:computational domain's size
# (c_x,c_y):the coordinates of big cylinder
# r:radius of the big cylinder
# r2:radius of the small cylinder
gmsh.initialize()
gmsh.option.setNumber('General.Verbosity', 1)
rectangle = gmsh.model.occ.addRectangle(0, 0, 0, self.L, self.H, tag=1)
obstacle1 = gmsh.model.occ.addDisk(self.c_x, self.c_y, 0, self.r, self.r)
pre_fluid = gmsh.model.occ.cut([(self.gdim, rectangle)], [(self.gdim, obstacle1)])
if self.r2 == 0:
fluid = pre_fluid
else:
if self.obstacle_type == 'cylinder':
obstacle2 = gmsh.model.occ.addDisk(self.o_x, self.o_y, 0, self.r2, self.r2)
elif self.obstacle_type == 'rectangular':
obstacle2 = gmsh.model.occ.addRectangle(self.o_x, self.o_y, 0, self.r2, self.r2)
fluid = gmsh.model.occ.cut([(self.gdim, rectangle)], [(self.gdim, obstacle2)])
gmsh.model.occ.synchronize()
fluid_marker = 1
volumes = gmsh.model.getEntities(dim=self.gdim)
assert (len(volumes) == 1)
gmsh.model.addPhysicalGroup(volumes[0][0], [volumes[0][1]], fluid_marker)
gmsh.model.setPhysicalName(volumes[0][0], fluid_marker, "Fluid")
self.inlet_marker, self.outlet_marker, self.wall_marker, self.obstacle_marker = 2, 3, 4, 5
inflow, outflow, walls, obstacle = [], [], [], []
boundaries = gmsh.model.getBoundary(volumes, oriented=False)
for boundary in boundaries:
center_of_mass = gmsh.model.occ.getCenterOfMass(boundary[0], boundary[1])
if np.allclose(center_of_mass, [0, self.H / 2, 0]):
inflow.append(boundary[1])
elif np.allclose(center_of_mass, [self.L, self.H / 2, 0]):
outflow.append(boundary[1])
elif np.allclose(center_of_mass, [self.L / 2, self.H, 0]) or np.allclose(center_of_mass, [self.L / 2, 0, 0]):
walls.append(boundary[1])
else:
obstacle.append(boundary[1])
gmsh.model.addPhysicalGroup(1, walls, self.wall_marker)
gmsh.model.setPhysicalName(1, self.wall_marker, "Walls")
gmsh.model.addPhysicalGroup(1, inflow, self.inlet_marker)
gmsh.model.setPhysicalName(1, self.inlet_marker, "Inlet")
gmsh.model.addPhysicalGroup(1, outflow, self.outlet_marker)
gmsh.model.setPhysicalName(1, self.outlet_marker, "Outlet")
gmsh.model.addPhysicalGroup(1, obstacle, self.obstacle_marker)
gmsh.model.setPhysicalName(1, self.obstacle_marker, "Obstacle")
res_min = self.r / 3
distance_field = gmsh.model.mesh.field.add("Distance")
gmsh.model.mesh.field.setNumbers(distance_field, "EdgesList", obstacle)
threshold_field = gmsh.model.mesh.field.add("Threshold")
gmsh.model.mesh.field.setNumber(threshold_field, "IField", distance_field)
gmsh.model.mesh.field.setNumber(threshold_field, "LcMin", res_min)
gmsh.model.mesh.field.setNumber(threshold_field, "LcMax", 0.25 * self.H)
gmsh.model.mesh.field.setNumber(threshold_field, "DistMin", self.r)
gmsh.model.mesh.field.setNumber(threshold_field, "DistMax", 2 * self.H)
min_field = gmsh.model.mesh.field.add("Min")
gmsh.model.mesh.field.setNumbers(min_field, "FieldsList", [threshold_field])
gmsh.model.mesh.field.setAsBackgroundMesh(min_field)
gmsh.option.setNumber("Mesh.Algorithm", 8)
gmsh.option.setNumber("Mesh.RecombinationAlgorithm", 2)
gmsh.option.setNumber("Mesh.RecombineAll", 1)
gmsh.option.setNumber("Mesh.SubdivisionAlgorithm", 1)
gmsh.model.mesh.generate(self.gdim)
gmsh.model.mesh.setOrder(2)
gmsh.model.mesh.optimize("Netgen")
mesh, _, ft = gmshio.model_to_mesh(gmsh.model, self.mesh_comm, self.model_rank, gdim=self.gdim)
ft.name = "facet markers"
if self.visualize == 'true':
if '-nopopup' not in sys.argv:
gmsh.fltk.run()
if self.save_mesh == 'true':
gmsh.write("cylinders.msh")
return mesh, ft
def compute(self, mesh, ft, probes, jet):
jet_position = jet.get("position")
jet_x = jet.get("jet_x")
jet_y = jet.get("jet_y")
inlet_cylinder = probes.get("inlet_cylinder")
outlet = probes.get("outlet")
probes = probes.get("probes")
bb_tree = geometry.BoundingBoxTree(mesh, mesh.topology.dim)
domain = mesh.geometry.x # The coordinates of all mesh
jet_cells = []
jet_points = []
probes_cells = []
domain_cells = []
probes_points_on_pro = []
domain_points_on_pro = []
domain_cell_candidates = geometry.compute_collisions(bb_tree, np.array(domain))
probes_cell_candidates = geometry.compute_collisions(bb_tree, probes.T)
jet_cell_candidates = geometry.compute_collisions(bb_tree, jet_position.T)
domain_colliding_cells = geometry.compute_colliding_cells(mesh, domain_cell_candidates, np.array(domain))
probes_colliding_cells = geometry.compute_colliding_cells(mesh, probes_cell_candidates, probes.T)
jet_colliding_cells = geometry.compute_colliding_cells(mesh, jet_cell_candidates, jet_position.T)
for i, point in enumerate(np.array(domain)):
if len(domain_colliding_cells.links(i)) > 0:
#print(len(domain_colliding_cells.links(i)))
domain_points_on_pro.append(point)
domain_cells.append(domain_colliding_cells.links(i)[0])
for i, point in enumerate(probes.T):
if len(probes_colliding_cells.links(i)) > 0:
probes_points_on_pro.append(point)
probes_cells.append(probes_colliding_cells.links(i)[0])
for i, jet_point in enumerate(jet_position.T):
if len(jet_colliding_cells.links(i)) > 0:
jet_points.append(jet_point)
jet_cells.append(jet_colliding_cells.links(i)[0])
t = 0
T = 7 # Final time
dt = 1 / 1600 # Time step size
self.num_steps = 50
k = Constant(mesh, PETSc.ScalarType(dt))
mu = Constant(mesh, PETSc.ScalarType(0.001)) # Dynamic viscosity
rho = Constant(mesh, PETSc.ScalarType(1)) # Density
v_cg2 = VectorElement("CG", mesh.ufl_cell(), 2)
s_cg1 = FiniteElement("CG", mesh.ufl_cell(), 1)
V = FunctionSpace(mesh, v_cg2)
Q = FunctionSpace(mesh, s_cg1)
fdim = mesh.topology.dim - 1
# Define boundary conditions
class InletVelocity():
def __init__(self, t):
self.t = t
def __call__(self, x):
values = np.zeros((2, x.shape[1]), dtype=PETSc.ScalarType)
values[0] = 4 * 1.5 * x[1] * (0.41 - x[1]) / (0.41 ** 2)
return values
class JetVelocity():
def __init__(self, t, jet_x, jet_y):
self.t_jet = t
self.jet_x = jet_x
self.jet_y = jet_y
def __call__(self, x):
values = np.zeros((2, x.shape[1]), dtype=PETSc.ScalarType)
values[1] = self.jet_y
index = np.where(x[1, :] < 0.2)
values[1, index] = -self.jet_y
values[0] = self.jet_x
return values
# Inlet
u_inlet = Function(V)
u_jet = Function(V)
inlet_velocity = InletVelocity(t)
jet_velocity = JetVelocity(t, jet_x, jet_y)
u_inlet.interpolate(inlet_velocity)
u_jet.interpolate(jet_velocity)
bcu_inflow = dirichletbc(u_inlet, locate_dofs_topological(V, fdim, ft.find(self.inlet_marker)))
# Jets
bcu_jets = dirichletbc(u_jet, locate_dofs_topological(V, fdim + 1, jet_cells))
# Walls
u_nonslip = np.array((0,) * mesh.geometry.dim, dtype=PETSc.ScalarType)
bcu_walls = dirichletbc(u_nonslip, locate_dofs_topological(V, fdim, ft.find(self.wall_marker)), V)
# Obstacle
bcu_obstacle = dirichletbc(u_nonslip, locate_dofs_topological(V, fdim, ft.find(self.obstacle_marker)), V)
bcu = [bcu_inflow, bcu_obstacle, bcu_walls, bcu_jets]
# Outlet
bcp_outlet = dirichletbc(PETSc.ScalarType(0), locate_dofs_topological(Q, fdim, ft.find(self.outlet_marker)), Q)
bcp = [bcp_outlet]
obstacle_cell = locate_dofs_topological(V, fdim, ft.find(self.obstacle_marker))
inlet_cell = locate_dofs_topological(V, fdim, ft.find(self.inlet_marker))
inlet_cylinder_cell = np.concatenate([inlet_cell, obstacle_cell], axis=0)
inlet_cylinder_coord = mesh.geometry.x[inlet_cylinder_cell]
outlet_cells = locate_dofs_topological(V, fdim, ft.find(self.outlet_marker))
outlet_coord = mesh.geometry.x[outlet_cells]
u = TrialFunction(V)
v = TestFunction(V)
u_ = Function(V)
u_.name = "u"
u_s = Function(V)
u_n = Function(V)
u_n1 = Function(V)
p = TrialFunction(Q)
q = TestFunction(Q)
p_ = Function(Q)
p_.name = "p"
phi = Function(Q)
# the first step
f = Constant(mesh, PETSc.ScalarType((0, 0)))
F1 = rho / k * dot(u - u_n, v) * dx
F1 += inner(dot(1.5 * u_n - 0.5 * u_n1, 0.5 * nabla_grad(u + u_n)), v) * dx
F1 += 0.5 * mu * inner(grad(u + u_n), grad(v)) * dx - dot(p_, div(v)) * dx
F1 += dot(f, v) * dx
a1 = form(lhs(F1))
L1 = form(rhs(F1))
A1 = create_matrix(a1)
b1 = create_vector(L1)
# the second step
a2 = form(dot(grad(p), grad(q)) * dx)
L2 = form(-rho / k * dot(div(u_s), q) * dx)
A2 = assemble_matrix(a2, bcs=bcp)
A2.assemble()
b2 = create_vector(L2)
# the last step
a3 = form(rho * dot(u, v) * dx)
L3 = form(rho * dot(u_s, v) * dx - k * dot(nabla_grad(phi), v) * dx)
A3 = assemble_matrix(a3)
A3.assemble()
b3 = create_vector(L3)
# Solver for step 1
solver1 = PETSc.KSP().create(mesh.comm)
solver1.setOperators(A1)
solver1.setType(PETSc.KSP.Type.BCGS)
pc1 = solver1.getPC()
pc1.setType(PETSc.PC.Type.JACOBI)
# Solver for step 2
solver2 = PETSc.KSP().create(mesh.comm)
solver2.setOperators(A2)
solver2.setType(PETSc.KSP.Type.MINRES)
pc2 = solver2.getPC()
pc2.setType(PETSc.PC.Type.HYPRE)
pc2.setHYPREType("boomeramg")
# Solver for step 3
solver3 = PETSc.KSP().create(mesh.comm)
solver3.setOperators(A3)
solver3.setType(PETSc.KSP.Type.CG)
pc3 = solver3.getPC()
pc3.setType(PETSc.PC.Type.SOR)
# -
u_probes = []
p_probes = []
inlet_cylinder_probes = []
outlet_probes = []
time_list = []
eval_time = []
init_domain_v_probes = []
init_domain_p_probes = []
xdmf = XDMFFile(mesh.comm, f"active_{jet_position}_{jet_x}_{jet_y}.xdmf", "w")
xdmf.write_mesh(mesh)
for i in range(self.num_steps):
# Update current time step
t += dt
# Update inlet velocity
inlet_velocity.t = t
u_inlet.interpolate(inlet_velocity)
# Step 1: Tentative velocity step
A1.zeroEntries()
assemble_matrix(A1, a1, bcs=bcu)
A1.assemble()
with b1.localForm() as loc:
loc.set(0)
assemble_vector(b1, L1)
apply_lifting(b1, [a1], [bcu])
b1.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
set_bc(b1, bcu)
solver1.solve(b1, u_s.vector)
u_s.x.scatter_forward()
# Step 2: Pressure corrrection step
with b2.localForm() as loc:
loc.set(0)
assemble_vector(b2, L2)
apply_lifting(b2, [a2], [bcp])
b2.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
set_bc(b2, bcp)
solver2.solve(b2, phi.vector)
phi.x.scatter_forward()
p_.vector.axpy(1, phi.vector)
p_.x.scatter_forward()
# Step 3: Velocity correction step
with b3.localForm() as loc:
loc.set(0)
assemble_vector(b3, L3)
b3.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
solver3.solve(b3, u_.vector)
u_.x.scatter_forward()
# write the data of probes
if i / 256 == 0:
u_values = u_.eval(probes_points_on_pro, probes_cells)
p_values = p_.eval(probes_points_on_pro, probes_cells)
u_probes.append(u_values)
p_probes.append(p_values)
xdmf.write_function(u_, t)
xdmf.write_function(p_, t)
if i == 0:
#print(V.tabulate_dof_coordinates())
init_v_values = u_.eval(domain_points_on_pro, domain_cells)
init_p_values = p_.eval(domain_points_on_pro, domain_cells)
init_domain_v_probes.append(init_v_values)
init_domain_p_probes.append(init_p_values)
if i == self.num_steps - 1:
inlet_cylinder_values = u_.eval(inlet_cylinder_coord, inlet_cylinder_cell)
outlet_values = p_.eval(outlet_coord, outlet_cells)
inlet_cylinder_probes.append(inlet_cylinder_values)
outlet_probes.append(outlet_values)
time_list.append(i)
eval_time.append(i)
gmsh.clear()
xdmf.close()
# Update variable with solution form this time step
with u_.vector.localForm() as loc_, u_n.vector.localForm() as loc_n, u_n1.vector.localForm() as loc_n1:
loc_n.copy(loc_n1)
loc_.copy(loc_n)
time = np.repeat(np.array(time_list), 4)
eval_time = np.repeat(np.array(eval_time), len(V.tabulate_dof_coordinates()))
domain_eval = np.tile(np.array(domain), (50, 1))
u0 = np.array(u_probes)[:, :, 0]
u1 = np.array(u_probes)[:, :, 1]
inlet_u0 = np.array(inlet_cylinder_probes)[:, :, 0]
inlet_u1 = np.array(inlet_cylinder_probes)[:, :, 1]
init_u0 = np.array(init_domain_v_probes)[:, :, 0]
init_u1 = np.array(init_domain_v_probes)[:, :, 1]
probes_points_on_pro = np.tile(np.transpose(probes_points_on_pro), 50)
obstacle_cell = locate_dofs_topological(V, fdim, ft.find(self.obstacle_marker))
inlet_cell = locate_dofs_topological(V, fdim, ft.find(self.inlet_marker))
inlet_cylinder_cell = np.concatenate([inlet_cell, obstacle_cell], axis=0)
inlet_cylinder_coord = mesh.geometry.x[inlet_cylinder_cell]
probes = {
"p": np.array(p_probes).reshape(-1, 1).flatten(),
"u0": u0.reshape(-1, 1).flatten(),
"u1": u1.reshape(-1, 1).flatten(),
"t": time,
"x": probes_points_on_pro[0],
"y": probes_points_on_pro[1]
}
jet_config = {
"jet_x": jet.get("jet_x"),
"jet_y": jet.get("jet_y")
}
domain_train = {
"x": domain[:, 0],
"y": domain[:, 1],
"z": domain[:, 2]
}
cylinder_inlet = {
"x": inlet_cylinder_coord[:, 0],
"y": inlet_cylinder_coord[:, 1],
"u0": inlet_u0.reshape(-1, 1).flatten(),
"u1": inlet_u1.reshape(-1, 1).flatten()
}
outlet = {
"x": outlet_coord[:, 0],
"y": outlet_coord[:, 1],
"p": np.array(outlet_probes).reshape(-1, 1).flatten()
}
IC_0_1 = {
"x": domain[:, 0],
"y": domain[:, 1],
"z": domain[:, 2],
"p": np.zeros((len(domain))),
"u0": init_u0.reshape(-1, 1).flatten(),
"u1": init_u1.reshape(-1, 1).flatten(),
}
eval = {
"x": domain_eval[:, 0],
"y": domain_eval[:, 1],
"t": eval_time
}
return cylinder_inlet, outlet, domain_train, IC_0_1, probes, jet_config, eval
if __name__ == "__main__":
from fluid_mechanics.area import *
from math import sin, cos, pi
import os
import pandas as pd
dataset_dir = "datasets"
ic_dir = "datasets/initial"
probes_dir = "datasets/probe"
if not os.path.exists(dataset_dir):
os.makedirs(dataset_dir)
if not os.path.exists(probes_dir):
os.makedirs(probes_dir)
if not os.path.exists(ic_dir):
os.makedirs(ic_dir)
r = 0.05
x1 = r * cos(pi / 2) + 0.3
y1 = r * sin(pi / 2) + 0.2
x2 = r * cos(3 / 2 * pi) + 0.3
y2 = r * sin(3 * pi / 2) + 0.2
jet_positions = [(x1, y1, 0), (x2, y2, 0)]
jet_coordinates = positions(jet_positions)
jet = {
"jet_x": 0,
"jet_y": 0,
"position": jet_coordinates
}
input_points = pinn_inlet()
outlet_points = pinn_outlet()
cylinder_points = pinn_cylinder()
inlet_cylinder_points = add_points(input_points, cylinder_points)
probes_points = circle_probes(0.05, 4)
points = {
"inlet_cylinder": inlet_cylinder_points,
"outlet": outlet_points,
"probes": probes_points
}
jet_x = jet.get("jet_x")
jet_y = jet.get("jet_y")
simu = simulation(c_x=0.3, c_y=0.2, o_x=0.3, o_y=0.2, r=0.05, r2=0.02)
mesh, ft = simu.generate_mesh()
cylinder_inlet, outlet, domain_train, IC_0_1, probes, jet_config, eval = simu.compute(mesh=mesh, ft=ft,
probes=points,
jet=jet)