Fine,here are my code:
self.inlet_marker, self.outlet_marker, self.wall_marker, self.obstacle_marker = 2, 3, 4, 5
mesh.topology.create_connectivity(mesh.topology.dim - 1, mesh.topology.dim)
boundary_facets = ft.find(self.inlet_marker)
domain = mesh.geometry.x # The coordinates of all mesh
boundary_vertices = dolfinx.mesh.compute_incident_entities(mesh.topology, boundary_facets, 1, 0)
vertex_to_geometry = dolfinx.cpp.mesh.entities_to_geometry(mesh._cpp_object, 0, boundary_vertices, False)
c_to_v = mesh.topology.connectivity(mesh.topology.dim, 0)
mesh.topology.create_connectivity(0, mesh.topology.dim)
v_to_c = mesh.topology.connectivity(0, mesh.topology.dim)
num_dofs = V.dofmap.index_map.size_local + V.dofmap.index_map.num_ghosts
dof_to_geometry_map = np.full(num_dofs, -1, dtype=np.int32)
dofmap = V.dofmap
layout = dofmap.dof_layout
for i, (vertex, node) in enumerate(zip(boundary_vertices, vertex_to_geometry)):
cell = v_to_c.links(vertex)[0]
cell_dofs = dofmap.cell_dofs(cell)
cvs = c_to_v.links(cell)
local_index = np.flatnonzero(cvs == vertex)[0]
dof = cell_dofs[layout.entity_dofs(0, local_index)]
dof_to_geometry_map[dof] = node
And here are the full code:
import gmsh
import numpy as np
import sys
from mpi4py import MPI
import pandas as pd
from petsc4py import PETSc
import dolfinx
from dolfinx import geometry
from dolfinx.fem import (Constant, Function, FunctionSpace,
assemble_scalar, dirichletbc, form, locate_dofs_topological, set_bc)
from dolfinx.fem.petsc import (apply_lifting, assemble_matrix, assemble_vector,
create_vector, create_matrix, set_bc)
from dolfinx.geometry import BoundingBoxTree, compute_collisions, compute_colliding_cells
from dolfinx.io import (XDMFFile, distribute_entity_data, gmshio)
from utils.encoder import normalization
from ufl import (FacetNormal, FiniteElement, Identity, Measure, TestFunction, TrialFunction, VectorElement,
as_vector, div, dot, ds, dx, inner, lhs, grad, nabla_grad, rhs, sym)
class simulation:
def __init__(self, c_x, c_y, o_x, o_y, r, r2,
obstacle_type='cylinder',
save_mesh='false',
save='false',
visualize='false'):
self.L = 2.2
self.H = 0.41
self.c_x = c_x
self.c_y = c_y
self.r = r
self.gdim = 2
self.model_rank = 0
self.mesh_comm = MPI.COMM_WORLD
self.save = save
self.o_x = o_x
self.o_y = o_y
self.r2 = r2
self.obstacle_type = obstacle_type
self.save_mesh = save_mesh
self.visualize = visualize
def generate_mesh(self):
# L*H:computational domain's size
# (c_x,c_y):the coordinates of big cylinder
# r:radius of the big cylinder
# r2:radius of the small cylinder
gmsh.initialize()
gmsh.option.setNumber('General.Verbosity', 1)
rectangle = gmsh.model.occ.addRectangle(0, 0, 0, self.L, self.H, tag=1)
obstacle1 = gmsh.model.occ.addDisk(self.c_x, self.c_y, 0, self.r, self.r)
pre_fluid = gmsh.model.occ.cut([(self.gdim, rectangle)], [(self.gdim, obstacle1)])
if self.r2 == 0:
fluid = pre_fluid
else:
if self.obstacle_type == 'cylinder':
obstacle2 = gmsh.model.occ.addDisk(self.o_x, self.o_y, 0, self.r2, self.r2)
elif self.obstacle_type == 'rectangular':
obstacle2 = gmsh.model.occ.addRectangle(self.o_x, self.o_y, 0, self.r2, self.r2)
fluid = gmsh.model.occ.cut([(self.gdim, rectangle)], [(self.gdim, obstacle2)])
gmsh.model.occ.synchronize()
fluid_marker = 1
volumes = gmsh.model.getEntities(dim=self.gdim)
assert (len(volumes) == 1)
gmsh.model.addPhysicalGroup(volumes[0][0], [volumes[0][1]], fluid_marker)
gmsh.model.setPhysicalName(volumes[0][0], fluid_marker, "Fluid")
self.inlet_marker, self.outlet_marker, self.wall_marker, self.obstacle_marker = 2, 3, 4, 5
inflow, outflow, walls, obstacle = [], [], [], []
boundaries = gmsh.model.getBoundary(volumes, oriented=False)
for boundary in boundaries:
center_of_mass = gmsh.model.occ.getCenterOfMass(boundary[0], boundary[1])
if np.allclose(center_of_mass, [0, self.H / 2, 0]):
inflow.append(boundary[1])
elif np.allclose(center_of_mass, [self.L, self.H / 2, 0]):
outflow.append(boundary[1])
elif np.allclose(center_of_mass, [self.L / 2, self.H, 0]) or np.allclose(center_of_mass, [self.L / 2, 0, 0]):
walls.append(boundary[1])
else:
obstacle.append(boundary[1])
gmsh.model.addPhysicalGroup(1, walls, self.wall_marker)
gmsh.model.setPhysicalName(1, self.wall_marker, "Walls")
gmsh.model.addPhysicalGroup(1, inflow, self.inlet_marker)
gmsh.model.setPhysicalName(1, self.inlet_marker, "Inlet")
gmsh.model.addPhysicalGroup(1, outflow, self.outlet_marker)
gmsh.model.setPhysicalName(1, self.outlet_marker, "Outlet")
gmsh.model.addPhysicalGroup(1, obstacle, self.obstacle_marker)
gmsh.model.setPhysicalName(1, self.obstacle_marker, "Obstacle")
res_min = self.r / 3
distance_field = gmsh.model.mesh.field.add("Distance")
gmsh.model.mesh.field.setNumbers(distance_field, "EdgesList", obstacle)
threshold_field = gmsh.model.mesh.field.add("Threshold")
gmsh.model.mesh.field.setNumber(threshold_field, "IField", distance_field)
gmsh.model.mesh.field.setNumber(threshold_field, "LcMin", res_min)
gmsh.model.mesh.field.setNumber(threshold_field, "LcMax", 0.25 * self.H)
gmsh.model.mesh.field.setNumber(threshold_field, "DistMin", self.r)
gmsh.model.mesh.field.setNumber(threshold_field, "DistMax", 2 * self.H)
min_field = gmsh.model.mesh.field.add("Min")
gmsh.model.mesh.field.setNumbers(min_field, "FieldsList", [threshold_field])
gmsh.model.mesh.field.setAsBackgroundMesh(min_field)
gmsh.option.setNumber("Mesh.Algorithm", 8)
gmsh.option.setNumber("Mesh.RecombinationAlgorithm", 2)
gmsh.option.setNumber("Mesh.RecombineAll", 1)
gmsh.option.setNumber("Mesh.SubdivisionAlgorithm", 1)
gmsh.model.mesh.generate(self.gdim)
gmsh.model.mesh.setOrder(2)
gmsh.model.mesh.optimize("Netgen")
mesh, _, ft = gmshio.model_to_mesh(gmsh.model, self.mesh_comm, self.model_rank, gdim=self.gdim)
ft.name = "facet markers"
if self.visualize == 'true':
if '-nopopup' not in sys.argv:
gmsh.fltk.run()
if self.save_mesh == 'true':
gmsh.write("cylinders.msh")
return mesh, ft
def compute(self, mesh, ft, points, jet, save='False'):
jet_position = jet.get("position")
jet_x = jet.get("jet_x")
jet_y = jet.get("jet_y")
bb_tree = geometry.BoundingBoxTree(mesh, mesh.topology.dim)
jet_cells = []
jet_points = []
cells = []
points_on_pro = []
cell_candidates = geometry.compute_collisions(bb_tree, points.T)
jet_cell_candidates = geometry.compute_collisions(bb_tree, jet_position.T)
colliding_cells = geometry.compute_colliding_cells(mesh, cell_candidates, points.T)
jet_colliding_cells = geometry.compute_colliding_cells(mesh, jet_cell_candidates, jet_position.T)
for i, point in enumerate(points.T):
if len(colliding_cells.links(i)) > 0:
points_on_pro.append(point)
cells.append(colliding_cells.links(i)[0])
for i, jet_point in enumerate(jet_position.T):
if len(jet_colliding_cells.links(i)) > 0:
jet_points.append(jet_point)
jet_cells.append(jet_colliding_cells.links(i)[0])
t = 0
T = 7 # Final time
dt = 1 / 1600 # Time step size
self.num_steps = 50 # int(T/dt)
k = Constant(mesh, PETSc.ScalarType(dt))
mu = Constant(mesh, PETSc.ScalarType(0.001)) # Dynamic viscosity
rho = Constant(mesh, PETSc.ScalarType(1)) # Density
v_cg2 = VectorElement("CG", mesh.ufl_cell(), 2)
s_cg1 = FiniteElement("CG", mesh.ufl_cell(), 1)
V = FunctionSpace(mesh, v_cg2)
Q = FunctionSpace(mesh, s_cg1)
fdim = mesh.topology.dim - 1
# Define boundary conditions
class InletVelocity():
def __init__(self, t):
self.t = t
def __call__(self, x):
values = np.zeros((2, x.shape[1]), dtype=PETSc.ScalarType)
values[0] = 4 * 1.5 * x[1] * (0.41 - x[1]) / (0.41 ** 2)
return values
class JetVelocity():
def __init__(self, t, jet_x, jet_y):
self.t_jet = t
self.jet_x = jet_x
self.jet_y = jet_y
def __call__(self, x):
values = np.zeros((2, x.shape[1]), dtype=PETSc.ScalarType)
values[1] = self.jet_y
index = np.where(x[1, :] < 0.2)
values[1, index] = -self.jet_y
values[0] = self.jet_x
return values
# Inlet
u_inlet = Function(V)
u_jet = Function(V)
inlet_velocity = InletVelocity(t)
jet_velocity = JetVelocity(t, jet_x, jet_y)
u_inlet.interpolate(inlet_velocity)
u_jet.interpolate(jet_velocity)
bcu_inflow = dirichletbc(u_inlet, locate_dofs_topological(V, fdim, ft.find(self.inlet_marker)))
# Jets
bcu_jets = dirichletbc(u_jet, locate_dofs_topological(V, fdim + 1, jet_cells))
# Walls
u_nonslip = np.array((0,) * mesh.geometry.dim, dtype=PETSc.ScalarType)
bcu_walls = dirichletbc(u_nonslip, locate_dofs_topological(V, fdim, ft.find(self.wall_marker)), V)
# Obstacle
bcu_obstacle = dirichletbc(u_nonslip, locate_dofs_topological(V, fdim, ft.find(self.obstacle_marker)), V)
bcu = [bcu_inflow, bcu_obstacle, bcu_walls, bcu_jets]
# Outlet
bcp_outlet = dirichletbc(PETSc.ScalarType(0), locate_dofs_topological(Q, fdim, ft.find(self.outlet_marker)), Q)
bcp = [bcp_outlet]
u = TrialFunction(V)
v = TestFunction(V)
u_ = Function(V)
u_.name = "u"
u_s = Function(V)
u_n = Function(V)
u_n1 = Function(V)
p = TrialFunction(Q)
q = TestFunction(Q)
p_ = Function(Q)
p_.name = "p"
phi = Function(Q)
# the first step
f = Constant(mesh, PETSc.ScalarType((0, 0)))
F1 = rho / k * dot(u - u_n, v) * dx
F1 += inner(dot(1.5 * u_n - 0.5 * u_n1, 0.5 * nabla_grad(u + u_n)), v) * dx
F1 += 0.5 * mu * inner(grad(u + u_n), grad(v)) * dx - dot(p_, div(v)) * dx
F1 += dot(f, v) * dx
a1 = form(lhs(F1))
L1 = form(rhs(F1))
A1 = create_matrix(a1)
b1 = create_vector(L1)
# the second step
a2 = form(dot(grad(p), grad(q)) * dx)
L2 = form(-rho / k * dot(div(u_s), q) * dx)
A2 = assemble_matrix(a2, bcs=bcp)
A2.assemble()
b2 = create_vector(L2)
# the last step
a3 = form(rho * dot(u, v) * dx)
L3 = form(rho * dot(u_s, v) * dx - k * dot(nabla_grad(phi), v) * dx)
A3 = assemble_matrix(a3)
A3.assemble()
b3 = create_vector(L3)
# Solver for step 1
solver1 = PETSc.KSP().create(mesh.comm)
solver1.setOperators(A1)
solver1.setType(PETSc.KSP.Type.BCGS)
pc1 = solver1.getPC()
pc1.setType(PETSc.PC.Type.JACOBI)
# Solver for step 2
solver2 = PETSc.KSP().create(mesh.comm)
solver2.setOperators(A2)
solver2.setType(PETSc.KSP.Type.MINRES)
pc2 = solver2.getPC()
pc2.setType(PETSc.PC.Type.HYPRE)
pc2.setHYPREType("boomeramg")
# Solver for step 3
solver3 = PETSc.KSP().create(mesh.comm)
solver3.setOperators(A3)
solver3.setType(PETSc.KSP.Type.CG)
pc3 = solver3.getPC()
pc3.setType(PETSc.PC.Type.SOR)
# -
u_probes = []
p_probes = []
for i in range(self.num_steps):
# Update current time step
t += dt
# Update inlet velocity
inlet_velocity.t = t
u_inlet.interpolate(inlet_velocity)
# Step 1: Tentative velocity step
A1.zeroEntries()
assemble_matrix(A1, a1, bcs=bcu)
A1.assemble()
with b1.localForm() as loc:
loc.set(0)
assemble_vector(b1, L1)
apply_lifting(b1, [a1], [bcu])
b1.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
set_bc(b1, bcu)
solver1.solve(b1, u_s.vector)
u_s.x.scatter_forward()
# Step 2: Pressure corrrection step
with b2.localForm() as loc:
loc.set(0)
assemble_vector(b2, L2)
apply_lifting(b2, [a2], [bcp])
b2.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
set_bc(b2, bcp)
solver2.solve(b2, phi.vector)
phi.x.scatter_forward()
p_.vector.axpy(1, phi.vector)
p_.x.scatter_forward()
# Step 3: Velocity correction step
with b3.localForm() as loc:
loc.set(0)
assemble_vector(b3, L3)
b3.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
solver3.solve(b3, u_.vector)
u_.x.scatter_forward()
# write the data of probes
u_values = u_.eval(points_on_pro, cells)
p_values = p_.eval(points_on_pro, cells)
u_probes.append(u_values)
p_probes.append(p_values)
mesh.topology.create_connectivity(mesh.topology.dim - 1, mesh.topology.dim)
boundary_facets = ft.find(self.inlet_marker)
domain = mesh.geometry.x # The coordinates of all mesh
boundary_vertices = dolfinx.mesh.compute_incident_entities(mesh.topology, boundary_facets, 1, 0)
vertex_to_geometry = dolfinx.cpp.mesh.entities_to_geometry(mesh._cpp_object, 0, boundary_vertices, False)
c_to_v = mesh.topology.connectivity(mesh.topology.dim, 0)
mesh.topology.create_connectivity(0, mesh.topology.dim)
v_to_c = mesh.topology.connectivity(0, mesh.topology.dim)
num_dofs = V.dofmap.index_map.size_local + V.dofmap.index_map.num_ghosts
dof_to_geometry_map = np.full(num_dofs, -1, dtype=np.int32)
dofmap = V.dofmap
layout = dofmap.dof_layout
for i, (vertex, node) in enumerate(zip(boundary_vertices, vertex_to_geometry)):
cell = v_to_c.links(vertex)[0]
cell_dofs = dofmap.cell_dofs(cell)
cvs = c_to_v.links(cell)
local_index = np.flatnonzero(cvs == vertex)[0]
dof = cell_dofs[layout.entity_dofs(0, local_index)]
dof_to_geometry_map[dof] = node
# Update variable with solution form this time step
with u_.vector.localForm() as loc_, u_n.vector.localForm() as loc_n, u_n1.vector.localForm() as loc_n1:
loc_n.copy(loc_n1)
loc_.copy(loc_n)
gmsh.clear()
probes = {
"p": p_probes,
"u0": np.array(u_probes)[:, :, 0],
"u1": np.array(u_probes)[:, :, 1]
}
jet_config = {
"jet_x": jet.get("jet_x"),
"jet_y": jet.get("jet_y")
}
domain_train = {
}
cylinder_inlet, outlet, IC_0_1 = {}, {}, {}
return cylinder_inlet, outlet, domain_train, IC_0_1, probes, jet_config
if __name__ == "__main__":
from fluid_mechanics.area import *
from math import sin, cos, pi
r = 0.05
x1 = r * cos(pi / 2) + 0.3
y1 = r * sin(pi / 2) + 0.2
x2 = r * cos(3 / 2 * pi) + 0.3
y2 = r * sin(3 * pi / 2) + 0.2
jet_positions = [(x1, y1, 0), (x2, y2, 0)]
jet_coordinates = positions(jet_positions)
jet = {
"jet_x": 0,
"jet_y": 0,
"position": jet_coordinates
}
points = np.array([[0.35, 0.3, 0.3, 0.25],
[0.2, 0.25, 0.15, 0.2],
[0, 0, 0, 0]])
jet_x = jet.get("jet_x")
jet_y = jet.get("jet_y")
simu = simulation(c_x=0.3, c_y=0.2, o_x=0.3, o_y=0.2, r=0.05, r2=0.02)
mesh, ft = simu.generate_mesh()
cylinder_inlet, outlet, domain_train, IC_0_1, probes, jet_config = simu.compute(mesh=mesh, ft=ft, points=points,
jet=jet, save='True')