Here my full code:
import ufl
from mpi4py import MPI
from slepc4py import SLEPc
from petsc4py import PETSc
from dolfinx import mesh as m, fem
from dolfinx import default_scalar_type
from dolfinx.fem.petsc import LinearProblem
from dolfinx.fem import (Constant, Function, functionspace, dirichletbc, form)
def read_kle(file_npz, mesh, VD, eps=1e-5):
data = np.load(file_npz)
if 'kle_interpolated' in data:
kle_values = data['kle_interpolated'][::-1]
if 'x' in data and 'y' in data:
x = data['x']
y = data['y']
elif 'X' in data and 'Y' in data:
x = data['X'][0, :]
y = data['Y'][:, 0]
nx = len(x)
ny = len(y)
kle_func = fem.Function(VD, name="Cell_KLE")
mesh_geometry = mesh.geometry
mesh_topology = mesh.topology
tdim = mesh.topology.dim
cell_midpoints = []
num_cells = mesh.topology.index_map(tdim).size_local
cells = mesh_topology.connectivity(tdim, 0)
for cell_idx in range(num_cells):
vertex_indices = cells.links(cell_idx)
vertices = [mesh_geometry.x[v_idx] for v_idx in vertex_indices]
midpoint_x = sum(v[0] for v in vertices) / len(vertices)
midpoint_y = sum(v[1] for v in vertices) / len(vertices)
i = np.where((x < midpoint_x + eps) & (x > midpoint_x - eps))[0][0]
j = np.where((y < midpoint_y + eps) & (y > midpoint_y - eps))[0][0]
i = max(0, min(i, nx-1))
j = max(0, min(j, ny-1))
kle_func.x.array[cell_idx] = kle_values[j, i]
return kle_func
def get_index(i, j, N_x):
return j * N_x + i
class LinearExpression:
def __init__(self):
self.ind = 0
self.h_x = 1.0
self.h_y = 1.0
self.v_x = [0.0, 1.0, 0.0, 1.0]
self.v_y = [0.0, 0.0, 1.0, 1.0]
self.p_1 = [0.0, 0.0]
self.p_2 = [1.0, 1.0]
def set_ind(self, ind, p_1, p_2):
self.ind = ind
self.h_x = p_2[0] - p_1[0]
self.h_y = p_2[1] - p_1[1]
self.p_1 = p_1
self.p_2 = p_2
self.v_x = [p_1[0], p_2[0], p_1[0], p_2[0]]
self.v_y = [p_1[1], p_1[1], p_2[1], p_2[1]]
def __call__(self, x):
"""Make the class callable for dolfinx interpolation"""
return self.eval(x)
def eval(self, x, cell=None):
"""
Evaluate the expression at points x
Args:
x: Point coordinates with shape (dim, num_points)
cell: Cell indices (not used here)
Returns:
Values at x with shape (num_points,)
"""
if x.ndim == 1: # Single point with shape (dim,)
return self._eval_single_point(x)
# x has shape (dim, num_points)
num_points = x.shape[1]
result = np.zeros(num_points)
for i in range(num_points):
point = x[:, i]
result[i] = self._eval_single_point(point)
return result
def _eval_single_point(self, point):
"""Evaluate at a single point with coordinates [x, y]"""
x, y = point[0], point[1]
# Check if point is inside the domain
if (x > self.p_2[0] + 1e-10 or x < self.p_1[0] - 1e-10 or
y > self.p_2[1] + 1e-10 or y < self.p_1[1] - 1e-10):
return 0.0
# Compute normalized coordinates in [0,1]×[0,1]
xi = (x - self.p_1[0]) / self.h_x
eta = (y - self.p_1[1]) / self.h_y
# Bilinear shape functions (hat functions)
if self.ind == 0: # Bottom-left corner (0,0)
return (1.0 - xi) * (1.0 - eta)
elif self.ind == 1: # Bottom-right corner (1,0)
return xi * (1.0 - eta)
elif self.ind == 2: # Top-left corner (0,1)
return (1.0 - xi) * eta
elif self.ind == 3: # Top-right corner (1,1)
return xi * eta
else:
raise ValueError(f"Invalid corner index: {self.ind}")
def main(kle_number, directory_kle, fine_cell, coarse_cell, eigenvectors_count):
mesh = m.create_unit_square(MPI.COMM_WORLD, fine_cell, fine_cell,
cell_type=dolfinx.mesh.CellType.quadrilateral)
subdomRoot = m.meshtags(mesh, mesh.topology.dim,
np.arange(mesh.topology.index_map(mesh.topology.dim).size_local),
np.ones(mesh.topology.index_map(mesh.topology.dim).size_local, dtype=np.int32))
mesh.topology.create_connectivity(mesh.topology.dim, 0)
print('Mesh loaded: ', mesh.topology.index_map(mesh.topology.dim).size_local, mesh.topology.index_map(0).size_local)
V = functionspace(mesh, ("CG", 1))
u = Function(V)
VD = functionspace(mesh, ("DG", 0))
kle_file_path = os.path.join(directory_kle, f'kle_{kle_number}.npz')
print(f"Processing {kle_file_path}...")
kle = read_kle(kle_file_path, mesh, VD)
l = 1.0
h = l / coarse_cell
x = np.linspace(0, l, int(l / h) + 1)
y = np.linspace(0, l, int(l / h) + 1)
xy = np.meshgrid(x, y)
xy = np.dstack([xy[0], xy[1]])
xy = xy.reshape(-1, 2)
eps = 1e-7
w_i = 0
for p in xy:
def in_subdomain(x):
return np.logical_and(
np.logical_and(x[0] >= p[0] - h - eps, x[0] <= p[0] + h + eps),
np.logical_and(x[1] >= p[1] - h - eps, x[1] <= p[1] + h + eps)
)
marked_cells = m.locate_entities(mesh, mesh.topology.dim, in_subdomain)
local_mesh, entity_map, vertex_map = m.create_submesh(mesh, mesh.topology.dim, marked_cells)[:3]
local_mesh.topology.create_connectivity(local_mesh.topology.dim, 0)
local_mesh.topology.create_connectivity(0, local_mesh.topology.dim)
parentCI = np.array(entity_map, dtype=int)
V_local = functionspace(local_mesh, ("DG", 0))
kle_local = Function(V_local)
subdom = m.meshtags(local_mesh, local_mesh.topology.dim,
np.arange(local_mesh.topology.index_map(local_mesh.topology.dim).size_local),
np.ones(local_mesh.topology.index_map(local_mesh.topology.dim).size_local, dtype=np.int32))
for ci in range(local_mesh.topology.index_map(local_mesh.topology.dim).size_local):
gci = parentCI[ci]
kle_local.x.array[ci] = kle.x.array[gci]
kle_ = save_kle(ms_dir + f'kle/kle_{w_i}.npz', local_mesh, kle_local, fine_cell)
line_j, column_i = divmod(w_i, (coarse_cell + 1))
i_0, j_0 = column_i, line_j
i_1, j_1 = column_i, line_j
if column_i != 0:
i_0 = (column_i - 1)
if line_j != 0:
j_0 = (line_j - 1)
if column_i != coarse_cell:
i_1 = (column_i + 1)
if line_j != coarse_cell:
j_1 = (line_j + 1)
linear_expressions = []
for ii in range(i_0, i_1):
for jj in range(j_0, j_1):
c_ind = 0
if get_index(ii, jj, coarse_cell + 1) == w_i:
c_ind = 0
if get_index(ii + 1, jj, coarse_cell + 1) == w_i:
c_ind = 1
if get_index(ii, jj + 1, coarse_cell + 1) == w_i:
c_ind = 2
if get_index(ii + 1, jj + 1, coarse_cell + 1) == w_i:
c_ind = 3
linear_expression = LinearExpression()
linear_expression.set_ind(
c_ind,
[ii * h, jj * h],
[(ii + 1) * h, (jj + 1) * h]
)
linear_expressions.append(linear_expression)
local_V = functionspace(local_mesh, ("CG", 1))
partition_of_unity_function = Function(local_V)
partition_of_unity_function.x.array[:] = 0.0
linear_function = Function(local_V)
linear_function.x.array[:] = 0.0
N_w_i = partition_of_unity_function.x.array.shape[0]
partition_of_unity_array = np.array(partition_of_unity_function.x.array)
for linear_expression in linear_expressions:
linear_function_array = linear_expression.eval(local_mesh.geometry.x.T)
for i in range(N_w_i):
if linear_function_array[i] > 1.0e-5:
partition_of_unity_array[i] = linear_function_array[i]
partition_of_unity_function.x.array[:] = partition_of_unity_array
partition_of_unity_function.x.petsc_vec.ghostUpdate()
u = ufl.TrialFunction(local_V)
v = ufl.TestFunction(local_V)
f = Constant(local_mesh, default_scalar_type(0))
a = ufl.dot(kle_local*ufl.grad(u), ufl.grad(v))*ufl.dx
a = form(a)
L = form(f * v * ufl.dx)
A = fem.petsc.assemble_matrix(a)
A.assemble()
b = fem.petsc.assemble_vector(L)
b.set(0.0)
_px = p[0]
_py = p[1]
def boundary_D(x):
x_max, y_max = x.max(axis=1)[:2]
x_min, y_min = x.min(axis=1)[:2]
arr = np.arange(x.shape[1])
tol = 1e-3
on_bound_1 = np.logical_or(np.isclose(x[0], _px, atol=tol), np.isclose(x[1], _py, atol=tol))
on_bound_2 = np.logical_or(np.isclose(x[0], x_max, atol=tol), np.isclose(x[1], y_max, atol=tol))
on_bound_3 = np.logical_or(np.isclose(x[0], x_min, atol=tol), np.isclose(x[1], y_min, atol=tol))
return arr[np.logical_or(np.logical_or(on_bound_1, on_bound_2), on_bound_3)]
boundary_dofs = boundary_D(local_mesh.geometry.x.T)
u_bc = Function(local_V)
u_bc.interpolate(partition_of_unity_function)
bc = dirichletbc(u_bc, boundary_dofs)
pou_problem = LinearProblem(a, L, bcs=[bc], u=Function(local_V),
petsc_options={"ksp_type": "gmres", "pc_type": "asm"})
pou_solution = pou_problem.solve()
The kle_{kle_number}.npz
file contains a field defined on quadrilateral mesh.