Hi Community,
I have a problem in my code, I don’t know what is the cause. My project is in Deep Reinforcement Learning, I want to optimize the drag coefficient with PPO. So created 4 files : generate_mesh, Env2DCylinder (for main functions to that I need), FlowSolver(for solving the navier stokes equation and calculate the drag) and run_simulation (for PPO algorithme). The broblem is that the drag is always 0 and the algorithme is working normally.
I need your help , below is the flow solver class :
import dolfinx
import ufl
from mpi4py import MPI
from petsc4py import PETSc
from dolfinx.io import XDMFFile
from dolfinx.fem import (functionspace, Function, dirichletbc, form, locate_dofs_topological, Constant, assemble_scalar)
from ufl import TestFunction, TrialFunction, inner, div, grad, lhs, rhs, dx
from dolfinx.fem.petsc import (apply_lifting, assemble_matrix, assemble_vector,
create_vector, create_matrix, set_bc)
from basix.ufl import element
import numpy as np
class InletVelocity:
def __init__(self, V, t=0.0):
"""
Initialise la classe InletVelocity.
Args:
V : l'espace fonctionnel dans lequel la condition de vitesse sera définie.
t : le temps initial, utilisé pour des conditions d'inflow dépendantes du temps.
"""
self.V = V
self.t = t
def __call__(self, x):
"""
Définir le profil de vitesse d'inflow.
Args:
x : coordonnées spatiales où la condition aux limites sera appliquée.
Returns:
Un tableau numpy contenant les valeurs de la vitesse pour chaque point de x.
"""
values = np.zeros((self.V.mesh.geometry.dim, x.shape[1]), dtype=PETSc.ScalarType)
# Exemple de profil de vitesse parabolique en 2D (ajustez selon vos besoins)
values[0, :] = 4 * 1.5 * x[1] * (0.41 - x[1]) / (0.41**2)
values[1, :] = 0.0 # Composante y de la vitesse, modifiez si nécessaire
print("Shape of values:", values.shape)
n_points = values.shape[1]
print("Number of points (n_points):", n_points)
return values
def update_time(self, t):
"""
Mettre à jour le temps (si nécessaire pour des conditions dépendantes du temps).
Args:
t : Le nouveau temps à définir.
"""
self.t = t
class JetVelocity:
def __init__(self, jet_x, jet_y):
self.jet_x = jet_x
self.jet_y = jet_y
def __call__(self, x):
values = np.zeros((2, x.shape[1]), dtype=PETSc.ScalarType)
values[0] = self.jet_x
values[1] = self.jet_y
return values
class FlowSolver:
"""IPCS scheme with jets acting on a static cylinder."""
def __init__(self, flow_params, geometry_params, solver_params):
# Fluid parameters
mu = flow_params['mu'] # Dynamic viscosity
rho = flow_params['rho'] # Density
comm = MPI.COMM_WORLD
mesh = None
self.cylinder_noslip_tag = 4
with XDMFFile(comm, geometry_params['mesh'], "r") as xdmf:
mesh = xdmf.read_mesh()
facet_tags = xdmf.read_meshtags(mesh,name = "Facet tags")
fdim = mesh.topology.dim - 1
gdim = mesh.topology.dim
mesh.topology.create_connectivity(fdim, gdim)
print("Topological dimension (tdim):", mesh.topology.dim)
print("Geometric dimension (gdim):", mesh.geometry.dim)
# Time step and physical constants
dt = solver_params['dt']
k = Constant(mesh, PETSc.ScalarType(dt))
mu = Constant(mesh, PETSc.ScalarType(mu))
rho = Constant(mesh, PETSc.ScalarType(rho))
# Define function spaces for velocity and pressure
v_cg2 = element("Lagrange", mesh.topology.cell_name(), 2, shape=(mesh.geometry.dim, ))
s_cg1 = element("Lagrange", mesh.topology.cell_name(), 1)
V = functionspace(mesh, v_cg2)
Q = functionspace(mesh, s_cg1)
# Define test and trial functions
u = TrialFunction(V)
v = TestFunction(V)
p = TrialFunction(Q)
q = TestFunction(Q)
# Previous and current solution functions
u_n = Function(V)
u_ = Function(V)
p_ = Function(Q)
p_n = Function(Q)
# Define the inflow velocity using the InletVelocity class
inflow_velocity = InletVelocity(V)
u_inlet = Function(V)
u_inlet.interpolate(inflow_velocity) # Interpolating the inflow velocity profile
# Boundary conditions
inlet_marker, outlet_marker, wall_marker, obstacle_marker, jet_marker = 2, 3, 4, 5, 6
inlet_facets = facet_tags.find(inlet_marker)
wall_facets = facet_tags.find(wall_marker)
cylinder_facets = facet_tags.find(obstacle_marker)
outlet_facets = facet_tags.find(outlet_marker)
print(f"Number of cylinder facets found: {len(cylinder_facets)}")
bcu_inlet = dirichletbc(u_inlet, locate_dofs_topological(V, fdim, inlet_facets))
bcu_wall = dirichletbc(PETSc.ScalarType(np.array((0.0, 0.0), dtype=PETSc.ScalarType)), locate_dofs_topological(V, fdim, wall_facets), V)
bcu_cylinder = dirichletbc(PETSc.ScalarType(np.array((0.0, 0.0), dtype=PETSc.ScalarType)), locate_dofs_topological(V, fdim, cylinder_facets), V)
bcp_outlet = dirichletbc(PETSc.ScalarType(0), locate_dofs_topological(Q, fdim, outlet_facets), Q)
# Jet boundary conditions (initially set to zero)
self.bcu_jet = []
jet_tags = range(5, 5 + len(geometry_params['jet_positions']))
self.jet_functions = []
for index, tag in enumerate(jet_tags):
jet_facets = facet_tags.find(jet_marker)
print(f"Jet {index} is applied to {len(jet_facets)} facets.")
jet_bc = Function(V)
self.jet_functions.append(jet_bc)
self.bcu_jet.append(dirichletbc(jet_bc, locate_dofs_topological(V, fdim, jet_facets)))
# Combine boundary conditions
self.bcu = [bcu_inlet, bcu_cylinder, bcu_wall] + self.bcu_jet
self.bcp = [bcp_outlet]
# Variational forms for the IPCS scheme
F1 = (rho * inner((u - u_n) / k, v) * dx +
rho * inner(grad(u_n) * u_n, v) * dx +
mu * inner(grad(u), grad(v)) * dx -
inner(p_n, div(v)) * dx)
a1 = form(lhs(F1))
L1 = form(rhs(F1))
A1 = create_matrix(a1)
assemble_matrix(A1, a1, bcs=self.bcu)
A1.assemble()
b1 = create_vector(L1)
# Step 2: Pressure equation
a2 = form(inner(grad(p), grad(q)) * dx)
L2 = form(-rho / k * div(u_) * q * dx)
A2 = assemble_matrix(a2, bcs=self.bcp)
A2.assemble()
b2 = create_vector(L2)
# Step 3: Velocity correction
a3 = form(inner(u, v) * dx)
L3 = form(inner(u_, v) * dx - k * inner(grad(p_ - p_n), v) * dx)
A3 = assemble_matrix(a3)
A3.assemble()
b3 = create_vector(L3)
# PETSc setup for the linear solvers
self.solver1 = PETSc.KSP().create(mesh.comm)
self.solver1.setOperators(A1)
self.solver1.setType(PETSc.KSP.Type.BCGS)
pc1 = self.solver1.getPC()
pc1.setType(PETSc.PC.Type.JACOBI)
self.solver2 = PETSc.KSP().create(mesh.comm)
self.solver2.setOperators(A2)
self.solver2.setType(PETSc.KSP.Type.MINRES)
pc2 = self.solver2.getPC()
pc2.setType(PETSc.PC.Type.HYPRE)
pc2.setHYPREType("boomeramg")
self.solver3 = PETSc.KSP().create(mesh.comm)
self.solver3.setOperators(A3)
self.solver3.setType(PETSc.KSP.Type.CG)
pc3 = self.solver3.getPC()
pc3.setType(PETSc.PC.Type.SOR)
# Store attributes for the simulation
self.mu = mu
self.rho = rho
self.u_, self.u_n = u_, u_n
self.p_, self.p_n = p_, p_n
self.dt = dt
self.mesh = mesh
self.obstacle_marker = obstacle_marker
self.facet_tags = facet_tags
self.a1,self.A1, self.L1, self.b1 = a1,A1, L1, b1
self.a2,self.A2, self.L2, self.b2 = a2,A2, L2, b2
self.a3,self.A3, self.L3, self.b3 = a3,A3, L3, b3
def sample_pressure_at_location(self, location):
"""
Sample the pressure at a specific location in the mesh.
Args:
location (tuple): The (x, y) coordinates of the location where pressure is to be sampled.
Returns:
float: The pressure value at the specified location.
"""
# Ensure the location is in the correct shape
point = np.array(location, dtype=np.float64)
# Create a point source at the specified location
V = self.p_.function_space
p_sample = Function(V)
def interpolate_function(x):
# Handle the dimensionality mismatch
if x.shape[0] == 3: # If x has a third dimension, assume it's irrelevant and ignore it
x = x[:2, :]
# Compute the distance from the point to all points in x
distances = np.linalg.norm(x - point.reshape(-1, 1), axis=0)
# Return a boolean array where the closest point has the pressure
return np.where(distances < 1e-10, 1.0, 0.0)
# Interpolate the function at the specific location
p_sample.interpolate(interpolate_function)
# Extract the pressure value at that point
p_value = p_sample.vector.array
# Since we're sampling a single point, just return the first value that is non-zero
return np.max(p_value)
def evolve(self, jet_bc_values):
# Update the jet boundary conditions if needed
for Q, jet_function in zip(jet_bc_values, self.jet_functions):
# print(f"Applying jet with value {Q} to boundary condition {jet_function}")
# Update the jet velocity with the given control value
jet_velocity = JetVelocity(Q, 0.0) # Assuming the jet only has a component in one direction, adjust as needed
jet_function.interpolate(jet_velocity)
# Check which facets are being affected
for i, jet_function in enumerate(self.jet_functions):
affected_facets = jet_function.function_space.mesh.topology.index_map(1).size_local
print(f"Jet {i} is applied to {affected_facets} facets.")
self.u_n.x.array[:] = self.u_.x.array[:] # Update the previous velocity
self.p_n.x.array[:] = self.p_.x.array[:] # Update the previous pressure
# Step 1: Tentative velocity step
self.A1.zeroEntries()
assemble_matrix(self.A1, self.a1, bcs=self.bcu)
self.A1.assemble()
with self.b1.localForm() as loc_b1:
loc_b1.set(0)
assemble_vector(self.b1, self.L1)
apply_lifting(self.b1, [self.a1], [self.bcu])
self.b1.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
set_bc(self.b1, self.bcu)
u_s = Function(self.u_.function_space) # Tentative velocity
self.solver1.solve(self.b1, u_s.vector)
u_s.x.scatter_forward()
# Step 2: Pressure correction step
with self.b2.localForm() as loc_b2:
loc_b2.set(0)
assemble_vector(self.b2, self.L2)
apply_lifting(self.b2, [self.a2], [self.bcp])
self.b2.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
set_bc(self.b2, self.bcp)
phi = Function(self.p_.function_space) # Pressure correction
self.solver2.solve(self.b2, phi.vector)
phi.x.scatter_forward()
self.p_.vector.axpy(1.0, phi.vector) # p_ = p_ + phi
self.p_.x.scatter_forward()
# Step 3: Velocity correction step
with self.b3.localForm() as loc_b3:
loc_b3.set(0)
assemble_vector(self.b3, self.L3)
self.b3.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
self.solver3.solve(self.b3, self.u_.vector)
self.u_.x.scatter_forward()
# Update the solution for the next time step
self.u_n.x.array[:] = self.u_.x.array[:]
self.p_n.x.array[:] = self.p_.x.array[:]
def compute_drag(self):
# Normal vector on the boundary, pointing outward
n = -ufl.FacetNormal(self.mesh)
# Define the measure on the obstacle boundary
dObs = ufl.Measure("ds", domain=self.mesh, subdomain_data=self.facet_tags, subdomain_id=self.obstacle_marker)
# Tangential velocity component
u_t = inner(ufl.as_vector((n[1], -n[0])), self.u_)
# Drag force formulation (integral)
drag_form= form((2 / 0.1) * (self.mu / self.rho * inner(grad(u_t), n) * n[1]) * dObs)
drag = assemble_scalar(drag_form)
MPI.COMM_WORLD.barrier()
return drag
def compute_lift(self):
n = -ufl.FacetNormal(self.mesh)
dObs = ufl.Measure("ds", domain=self.mesh, subdomain_data=self.facet_tags, subdomain_id=self.cylinder_noslip_tag)
u_t = inner(ufl.as_vector((n[1], -n[0])), self.u_)
lift_form = form((2 / 0.1) * (self.mu / self.rho * inner(grad(u_t), n) * n[0] - self.p_ * n[1]) * dObs)
lift = assemble_scalar(lift_form)
MPI.COMM_WORLD.barrier()
return lift