In parall computation(mpirun), how to communicate(scatter & gather) the data between local processes and some global process?

hi~ :slight_smile:
This question is an extension of the posting (The problem in a simple example I made and how to 'project' the value assigned to 'FunctionSpace' into a numpy array in dolfinx environment?) I asked before.
The dolfinx version of my laptop is

0.5.0
unknown

Thankfully, I got the desired u_sol array through the answer as follows.
(run in single process)

[ 0.00000000e+00  0.00000000e+00 -1.51458886e+01 -2.69893899e+01
  0.00000000e+00  0.00000000e+00  4.77395901e-15 -2.81564987e+01
  0.00000000e+00  0.00000000e+00  1.51458886e+01 -2.69893899e+01
  6.66133815e-15 -6.44827586e+01 -2.19098143e+01 -6.77188329e+01
  2.19098143e+01 -6.77188329e+01]

After that, in order to perform parallel calculation, the calculation of u_sol is performed through 2 local processes (command: mpirun -n 2 ~~) using MPI.

[ 0.00000000e+00  0.00000000e+00 -1.51458886e+01 -2.69893899e+01
  0.00000000e+00  0.00000000e+00 -2.19098143e+01 -6.77188329e+01
 -4.63518113e-15 -2.81564987e+01 -1.06581410e-14 -6.44827586e+01
  1.51458886e+01 -2.69893899e+01]
[ 0.00000000e+00  0.00000000e+00  1.51458886e+01 -2.69893899e+01
 -4.63518113e-15 -2.81564987e+01 -1.06581410e-14 -6.44827586e+01
  2.19098143e+01 -6.77188329e+01  0.00000000e+00  0.00000000e+00
 -1.51458886e+01 -2.69893899e+01]

I was curious how to make the result value obtained from the code written through mpirun command the same as the global value obtained when executing as a single process (gather in MPI) or a method of sending data to each local process for u_sol array value calculated in a single process, respectively. (scatter in MPI)
(I am going to use gather and scatter methods to perform various analyzes with u_sol array in code.)

I checked the posting (Gather solutions in parallel in FEniCSX) to get help, but an error occurred and I couldn’t use it. The error occurred at the following line of code.

dolfinx.cpp.la.scatter_forward(u.x)

When I followed the code, I thought that the problem was caused by the non-existence of scatter_forward command.
As there was a question in this posting, there is a degree of freedom given to the mesh for code, so if I simply use the gather command, I had no choice but to get a different value from the result obtained in a single process. (How to simply solve this problem I think there is, but I don’t know how to do it.)

Note: Code slightly modified from previous posting (the parts for each description are same)

import ufl
import numpy as np
from dolfinx import fem, mesh
from mpi4py import MPI
from petsc4py import PETSc

nelx = 2
nely = 2
volfrac = 0.5
penal = 3

sigma = lambda _u: 2.0 * mu * ufl.sym(ufl.grad(_u)) + lmd * ufl.tr(ufl.sym(ufl.grad(_u))) * ufl.Identity(len(_u))
psi = lambda _u: lmd / 2 * (ufl.tr(ufl.sym(ufl.grad(_u))) ** 2) + mu * ufl.tr(ufl.sym(ufl.grad(_u)) * ufl.sym(ufl.grad(_u)))

mu, lmd = PETSc.ScalarType(0.4), PETSc.ScalarType(0.6)

msh = mesh.create_rectangle(MPI.COMM_WORLD, ((0.0, 0.0), (nelx, nely)), (nelx, nely), cell_type=mesh.CellType.triangle, diagonal=mesh.DiagonalType.right_left)
U = fem.VectorFunctionSpace(msh, ("CG", 1))
D = fem.FunctionSpace(msh, ("DG", 0))
u, v = ufl.TrialFunction(U), ufl.TestFunction(U)
u_sol, density = fem.Function(U), fem.Function(D)
density.x.array[:] = volfrac

def left_clamp(x):
    return np.isclose(x[0], 0.0)
f_dim = msh.topology.dim - 1
bc_facets = mesh.locate_entities_boundary(msh, f_dim, left_clamp)
u_zero = np.array([0.0, 0.0], dtype=PETSc.ScalarType)
bc_l = fem.dirichletbc(u_zero, fem.locate_dofs_topological(U, f_dim, bc_facets), U)
bcs = [bc_l]

load_points = [(1, lambda x: np.logical_and(x[0]==nelx, x[1]<=2))]

facet_indices, facet_markers = [], []
for (marker, locator) in load_points:
    facets = mesh.locate_entities(msh, f_dim, locator)
    facet_indices.append(facets)
    facet_markers.append(np.full(len(facets), marker))
facet_indices = np.array(np.hstack(facet_indices), dtype=np.int32)
facet_markers = np.array(np.hstack(facet_markers), dtype=np.int32)
sorted_facets = np.argsort(facet_indices)
facet_tag = mesh.meshtags(msh, f_dim, facet_indices[sorted_facets], facet_markers[sorted_facets])

ds = ufl.Measure("ds", domain=msh, subdomain_data=facet_tag)
f = ufl.dot(v, fem.Constant(msh, PETSc.ScalarType((0.0, -1.0)))) * ds(1)

k = ufl.inner(density ** penal * sigma(u), ufl.grad(v)) * ufl.dx
problem = fem.petsc.LinearProblem(k, f, bcs)
u_sol = problem.solve()

As you can see in: https://github.com/FEniCS/dolfinx/blob/114c268aab752b6669abc2718837116cec90dba6/python/test/unit/fem/test_dof_coordinates.py#L16
the syntax as changed to u.x.scatter_forward().

You can use something along the lines of:

import dolfinx
from mpi4py import MPI
Nx, Ny, Nz = 1, 1, 1
P = 2
mesh = dolfinx.mesh.create_unit_cube(
    MPI.COMM_WORLD, Nx, Ny, Nz, ghost_mode=dolfinx.mesh.GhostMode.none)
print(mesh.comm.rank, mesh.topology.original_cell_index,
      len(mesh.topology.original_cell_index))
o_indices = mesh.topology.original_cell_index
V = dolfinx.fem.FunctionSpace(mesh, ("Lagrange", 1))
string = ""
for i in range(mesh.topology.index_map(mesh.topology.dim).size_local):
    string += f"{mesh.comm.rank}: {o_indices[i]}, {V.dofmap.cell_dofs(i)}\n"
print(string, flush=True)

To figure out which local index correponds to the dofs of the serially executed program.

2 Likes

Sorry for the late reply due to personal issues. :pray:
There are two additional questions while applying based on the code @dokken gave me, so I leave a question.

1] The result obtained when executing only a single process based on the answer code is

0 [0 1 5 3 4 2 6 7] 8
0: 0, [0 1 2]
0: 1, [1 2 3]
0: 5, [2 4 5]
0: 3, [1 3 6]
0: 4, [2 3 5]
0: 2, [1 7 6]
0: 6, [3 6 5]
0: 7, [6 5 8]

Using ParaView result, the node & cell indexing was checked as follows,

1core_node_cell

and it was confirmed that the response was good. (Except for the node assigned to cell 2 & 5)

However, when running with 2 multi processes (mpirun -n 2 ~~), the output was as follows,

0 [0 1 3 2] 4
1 [5 4 6 7] 4
0: 0, [0 1 2]
0: 1, [1 2 4]
0: 3, [1 4 5]
0: 2, [1 3 5]

1: 5, [5 0 1]
1: 4, [5 2 1]
1: 6, [2 3 1]
1: 7, [3 1 4]

In the same way as in the single process, when node & cell indexing was checked through ParaView, it was confirmed as shown in the figure below.

2core_node_cell

There was a difficulty in matching the output result with the picture above. How should node & cell indexing be done in multi process? (Shouldn’t it be interpreted as a result obtained from ParaView?)

2] Based on the previously referenced posting, I applied it to MWE. The code is as follows.
(As an aside, while analyzing the code, I confirmed that u_sol obtained from a single process and multi-process u_sol cannot be perfectly identical. Because the indexing order is completely different.)

import ufl
import numpy as np
from dolfinx import fem, mesh
from mpi4py import MPI
from petsc4py import PETSc

nelx = 2
nely = 2
volfrac = 0.5
penal = 3

comm = MPI.COMM_WORLD
rank = comm.rank

sigma = lambda _u: 2.0 * mu * ufl.sym(ufl.grad(_u)) + lmd * ufl.tr(ufl.sym(ufl.grad(_u))) * ufl.Identity(len(_u))
psi = lambda _u: lmd / 2 * (ufl.tr(ufl.sym(ufl.grad(_u))) ** 2) + mu * ufl.tr(ufl.sym(ufl.grad(_u)) * ufl.sym(ufl.grad(_u)))

mu, lmd = PETSc.ScalarType(0.4), PETSc.ScalarType(0.6)

msh = mesh.create_rectangle(MPI.COMM_WORLD, ((0.0, 0.0), (nelx, nely)), (nelx, nely), cell_type=mesh.CellType.triangle, ghost_mode=mesh.GhostMode.none, diagonal=mesh.DiagonalType.right_left)

# Added based on the answer code
print(msh.comm.rank, msh.topology.original_cell_index, len(msh.topology.original_cell_index))

U = fem.VectorFunctionSpace(msh, ("CG", 1))
D = fem.FunctionSpace(msh, ("DG", 0))
u, v = ufl.TrialFunction(U), ufl.TestFunction(U)
u_sol, density = fem.Function(U), fem.Function(D)
density.x.array[:] = volfrac

if rank == 0:
    # Mesh on one(root) process
    msh_self = mesh.create_rectangle(MPI.COMM_SELF, ((0.0, 0.0), (nelx, nely)), (nelx, nely), cell_type=mesh.CellType.triangle, ghost_mode=mesh.GhostMode.none, diagonal=mesh.DiagonalType.right_left)
    U_self = fem.VectorFunctionSpace(msh_self, ("CG", 1))
    x_self = U_self.tabulate_dof_coordinates()

def left_clamp(x):
    return np.isclose(x[0], 0.0)
f_dim = msh.topology.dim - 1
bc_facets = mesh.locate_entities_boundary(msh, f_dim, left_clamp)
u_zero = np.array([0.0, 0.0], dtype=PETSc.ScalarType)
bc_l = fem.dirichletbc(u_zero, fem.locate_dofs_topological(U, f_dim, bc_facets), U)
bcs = [bc_l]

load_points = [(1, lambda x: np.logical_and(x[0]==nelx, x[1]<=2))]

facet_indices, facet_markers = [], []
for (marker, locator) in load_points:
    facets = mesh.locate_entities(msh, f_dim, locator)
    facet_indices.append(facets)
    facet_markers.append(np.full(len(facets), marker))
facet_indices = np.array(np.hstack(facet_indices), dtype=np.int32)
facet_markers = np.array(np.hstack(facet_markers), dtype=np.int32)
sorted_facets = np.argsort(facet_indices)
facet_tag = mesh.meshtags(msh, f_dim, facet_indices[sorted_facets], facet_markers[sorted_facets])

ds = ufl.Measure("ds", domain=msh, subdomain_data=facet_tag)
f = ufl.dot(v, fem.Constant(msh, PETSc.ScalarType((0.0, -1.0)))) * ds(1)

k = ufl.inner(density ** penal * sigma(u), ufl.grad(v)) * ufl.dx
problem = fem.petsc.LinearProblem(k, f, bcs)
u_sol = problem.solve()

# Added based on the answer code
string = ""
for i in range(msh.topology.index_map(msh.topology.dim).size_local):
    string += f"{msh.comm.rank}: {msh.topology.original_cell_index[i]}, {U.dofmap.cell_dofs(i)}\n"
print(string, flush=True)

# Get local ranges and global size of array
imap = u_sol.function_space.dofmap.index_map
local_range = np.asarray(imap.local_range, dtype=np.int32) * u_sol.function_space.dofmap.index_map_bs
size_global = imap.size_global * u_sol.function_space.dofmap.index_map_bs

# Communicate ranges and local data
ranges = comm.gather(local_range, root=0)
data = comm.gather(u_sol.vector.array, root=0)

# Communicate local dof coordinates
x = U.tabulate_dof_coordinates()[:imap.size_local]
x_glob = comm.gather(x, root=0)

# Gather
if comm.rank == 0:
    # Create array received all values
    global_array = np.zeros(size_global)
    for r, d in zip(ranges, data):
        global_array[r[0]:r[1]] = d

    # Create array with all coordinates
    global_x = np.zeros((size_global, 3))
    for r, x_ in zip(ranges, x_glob):
        global_x[r[0]:r[1], :] = x_
    serial_to_global = []
    for coord in x_self:
        serial_to_global.append(np.abs(global_x-coord).sum(axis=1).argmin())

    # Create sorted array from
    u_from_global = np.zeros(global_array.shape)
    for serial, glob in enumerate(serial_to_global):
        u_from_global[serial] = global_array[glob]
    print(f"Gathered from parallel array: \n{u_from_global}")
    assert (np.allclose(u_from_global))

When I ran this code through 2 multi processes (mpirun -n 2 ~~), an error occurred.

Traceback (most recent call last):
  File "~~~ file directory location ~~~", line 103, in <module>
    global_x[r[0]:r[1], :] = x_
ValueError: could not broadcast input array from shape (4,3) into shape (8,3)
--------------------------------------------------------------------------
Primary job  terminated normally, but 1 process returned
a non-zero exit code. Per user-direction, the job has been aborted.
--------------------------------------------------------------------------
--------------------------------------------------------------------------
mpirun detected that one or more processes exited with non-zero status, thus causing
the job to be terminated. The first process to do so was:

  Process name: [[14691,1],0]
  Exit code:    1
--------------------------------------------------------------------------

I think the cause of this error is the size of the array does not match.
In order to solve this error, the corresponding code would work only when the value of the u_sol array (1 by 12) obtained as a result of the calculation appears based on the global size of array (1 by 18) size.

Is there a way to solve that problem?
(Or is there a way to get an array containing node & cell information corresponding to u_sol obtained from each local process? This is because if I know this method, it will be a clue to gather u_sol dependent on the degree of freedom into the global array.) → You can ignore this part if you have any simple solution.

P.S.) I tried to make the question as simple as possible, but I can’t…
Sorry for writing too much. :cry:

There is way too much stuff going on in your code for me to being able to parse it.
There are several things you can do to simplify it.
You could consider u being a solution created by an interpolation, see the following code:

import ufl
import numpy as np
from dolfinx import fem, mesh
from mpi4py import MPI
from petsc4py import PETSc

nelx = 2
nely = 2
msh = mesh.create_rectangle(MPI.COMM_WORLD, ((0.0, 0.0), (nelx, nely)), (nelx, nely), cell_type=mesh.CellType.triangle,
                            ghost_mode=mesh.GhostMode.none, diagonal=mesh.DiagonalType.right_left)


U = fem.VectorFunctionSpace(msh, ("CG", 1))
u = fem.Function(U)

u.interpolate(lambda x: (x[0], np.sin(x[1])))

If you can postulate your problem using the data from this u, it would be alot easier to help you.

1 Like

Thank you for reply! :smile:
In the example presented by @dokken, I analyzed the part where I am currently having a problem.
When executing only a single process based on the example is

u array: 
[ 0.00000000e+00  2.15422807e-17  1.00000000e+00  2.15422807e-17
  2.15422807e-17  8.41470985e-01  1.00000000e+00  8.41470985e-01
 -2.15422807e-17  9.09297427e-01  1.00000000e+00  9.09297427e-01
  2.00000000e+00  8.41470985e-01  2.00000000e+00 -2.15422807e-17
  2.00000000e+00  9.09297427e-01]

(I will respond to the corresponding array as A)

If the algorithm is executed in 2 multi processes (mpirun -n 2 ~~), the following result can be obtained.

Each local u array: 
[ 0.00000000e+00  2.15422807e-17  1.00000000e+00  2.15422807e-17
  8.94800217e-17  8.41470985e-01  2.00000000e+00 -2.15422807e-17]

Each local u array: 
[-2.15422807e-17  9.09297427e-01  1.00000000e+00  9.09297427e-01
  1.00000000e+00  8.41470985e-01  2.00000000e+00  8.41470985e-01
  2.00000000e+00  9.09297427e-01]

(I will respond to the corresponding array as B)
The mesh (node & cell) information corresponding to the u was the same as the figures I posted above.

My final goal is to gather arrays B into one array like array A in (mpirun -n 2 ~~) situation.
(Note: Based on the result checked in ParaView, I think that the array order sequences of the gathered array can be changed.)

So I added the code like this:

import numpy as np
from dolfinx import fem, mesh
from mpi4py import MPI

# Input parameters
nelx = 2
nely = 2

# Declare mesh & function
msh = mesh.create_rectangle(MPI.COMM_WORLD, ((0.0, 0.0), (nelx, nely)), (nelx, nely), cell_type=mesh.CellType.triangle, ghost_mode=mesh.GhostMode.none, diagonal=mesh.DiagonalType.right_left)
U = fem.VectorFunctionSpace(msh, ("CG", 1))
u = fem.Function(U)
u.interpolate(lambda x: (x[0], np.sin(x[1])))

# Mesh on one(root) process(To create a global array criterion)
if MPI.COMM_WORLD.rank == 0:
    msh_self = mesh.create_rectangle(MPI.COMM_SELF, ((0.0, 0.0), (nelx, nely)), (nelx, nely), cell_type=mesh.CellType.triangle, ghost_mode=mesh.GhostMode.none, diagonal=mesh.DiagonalType.right_left)
    U_self = fem.VectorFunctionSpace(msh_self, ("CG", 1))
    x_self = U_self.tabulate_dof_coordinates()

# Get local ranges and global size of array
imap = u.function_space.dofmap.index_map
local_range = np.asarray(imap.local_range, dtype=np.int32) * u.function_space.dofmap.index_map_bs
size_global = imap.size_global * u.function_space.dofmap.index_map_bs

# Communicate ranges and local data
ranges = MPI.COMM_WORLD.gather(local_range, root=0)
data = MPI.COMM_WORLD.gather(u.vector.array, root=0)

# Communicate local DOF coordinates
x = U.tabulate_dof_coordinates()[:imap.size_local]
x_glob = MPI.COMM_WORLD.gather(x, root=0)

# gather each local array
if MPI.COMM_WORLD.rank == 0:
    # Create array received all values
    global_array = np.zeros(size_global)
    for r, d in zip(ranges, data):
        global_array[r[0]:r[1]] = d
    # Create array with all coordinates
    global_x = np.zeros((size_global, 3))
    for r, x_ in zip(ranges, x_glob):
        global_x[r[0]:r[1], :] = x_
    serial_to_global = []
    for coord in x_self:
        serial_to_global.append(np.abs(global_x - coord).sum(axis=1).argmin())
    # Create sorted array from
    u_from_global = np.zeros(global_array.shape)
    for serial, glob in enumerate(serial_to_global):
        u_from_global[serial] = global_array[glob]
    print(f"Gathered from parallel array: \n{u_from_global}")
    assert (np.allclose(u_from_global))

When running the added algorithm through mpirun, the following error occurred.

Traceback (most recent call last):
  File "~~~ file directory location ~~~/test.py", line 36, in <module>
    global_x[r[0]:r[1], :] = x_
ValueError: could not broadcast input array from shape (4,3) into shape (8,3)
--------------------------------------------------------------------------
Primary job  terminated normally, but 1 process returned
a non-zero exit code. Per user-direction, the job has been aborted.
--------------------------------------------------------------------------
--------------------------------------------------------------------------
mpirun detected that one or more processes exited with non-zero status, thus causing
the job to be terminated. The first process to do so was:

  Process name: [[8633,1],0]
  Exit code:    1
--------------------------------------------------------------------------

Same error as mentioned above.

How should I solve the problem?

To @dokken.
The error mentioned in the previous reply was resolved by carefully debugging the code.
The reason for the error message

was that the size of the array of global_x array and the size of the array of x_glob array obtained from local DOF were different. The problem was solved by setting the array size to be the same through the np.repeat command.
It could be found if the code was debugged in detail. Sorry for asking a simple question. :pray: :smiling_face_with_tear:

While resolving previous error message, I left a reply because there was a question while analyzing the code in the posing I referenced run in mpirun -n 2 ~~. The modified referenced code is

import dolfinx
import numpy
from mpi4py import MPI

# Parallel mesh
mesh = dolfinx.mesh.create_unit_square(MPI.COMM_WORLD, 2, 2, dolfinx.mesh.CellType.quadrilateral)
V = dolfinx.fem.FunctionSpace(mesh, ("CG", 1))

def func(x):
    return x[0] + 3 * x[1]

if MPI.COMM_WORLD.rank == 0:
    # Mesh on one proc
    mesh0 = dolfinx.mesh.create_unit_square(MPI.COMM_SELF, 2, 2, dolfinx.mesh.CellType.quadrilateral)
    V0 = dolfinx.fem.FunctionSpace(mesh0, ("CG", 1))
    x0 = V0.tabulate_dof_coordinates()
    u0 = dolfinx.fem.Function(V0)
    u0.interpolate(func)
    # Reference solution
    print(f"Serial interpolation: \n{u0.vector.array}\n")

# Parallel interpolation
u = dolfinx.fem.Function(V)
u.interpolate(func)
# u.x.scatter_forward()

# Get local ranges and global size of array
imap = u.function_space.dofmap.index_map
local_range = numpy.asarray(imap.local_range, dtype=numpy.int32) * u.function_space.dofmap.index_map_bs
size_global = imap.size_global * u.function_space.dofmap.index_map_bs

# Communicate ranges and local data
ranges = MPI.COMM_WORLD.gather(local_range, root=0)
data = MPI.COMM_WORLD.gather(u.vector.array, root=0)

# Communicate local dof coordinates
x = V.tabulate_dof_coordinates()[:imap.size_local]
x_glob = MPI.COMM_WORLD.gather(x, root=0)

# Declare gathered parallel arrays
global_array = numpy.zeros(size_global)
global_x = numpy.zeros((size_global, 3))
u_from_global = numpy.zeros(global_array.shape)

# Gather
if MPI.COMM_WORLD.rank == 0:
    # Create array with all values (received)
    for r, d in zip(ranges, data):
        global_array[r[0]:r[1]] = d
    print(f"Global array: \n{global_array}\n")
    # Create array with all coordinates
    for r, x_ in zip(ranges, x_glob):
        global_x[r[0]:r[1], :] = x_
    serial_to_global = []
    for coord in x0:
        serial_to_global.append(numpy.abs(global_x-coord).sum(axis=1).argmin())
    # Create sorted array from
    for serial, glob in enumerate(serial_to_global):
        u_from_global[serial] = global_array[glob]
    print(f"Gathered from parallel: \n{u_from_global}\n")

# Calculated process
u_cal = numpy.multiply(3, u_from_global)
print(f"Calculated global array: \n{u_cal}\n")

I wonder if there is a way to scatter the u_cal to each parallel local process. Does the method exist?