Parallel for-loop with multiple solve-calls, MPI

Hello,

I’d like to run my optimisation problem based on time-distributed control demo in parallel. I modified the demo to have multiple PDEs (of the same type but with different coefficients) which are solved in the for-loop.

for i in range(PDEs_numb):
            solve(a[i] == L[i], u[i])

I’d like to run this for-loop in parallel. As I understood, one of the options is MPI. I tested my code without modifications and got the following resultts for 30 PDEs and single optimization-iteration allowed each time restarting docker container:
Without MPI: 86 seconds
MPI 2 Cores: 95
MPI 4 Cores: 105
MPI 10 Cores: 150

So it didn’t get faster.
The runtime without MPI would be acceptable If I did’t need to consider much more PDEs (300 or even 1000)

I have few questions:

  1. From other post I can see, that it’s always being solved in parallel without MPI. Is it really so?
  2. Can MPI handle this for-loop in parallel? And if it can, how can I make it work?
  3. What else would you advise to speed up the solution of this optimization in terms of MPI?

Thanks in advance!

My MWE:

from fenics import *
from fenics_adjoint import *
from collections import OrderedDict
import numpy as np
import time
set_log_active(False)

data = Expression("16*x[0]*(x[0]-1)*x[1]*(x[1]-1)*sin(pi*t)", t=0, degree=4)
nu = Constant(1e-5)

PDEs_numb = 30
# 30 sbamples
kappa = np.array([1.38584602, 0.74351727, 0.73387536, 0.98770648, 1.27195437,
                  0.70672424, 0.75884729, 1.20655541, 0.87772363, 1.10871134,
                  0.83404077, 0.70594961, 1.40019136, 0.89766789, 0.99397856,
                  1.00917521, 1.20975742, 1.27970173, 1.26007072, 1.19429403,
                  0.69197102, 0.77906883, 0.85317978, 0.70669097, 1.22831455,
                  0.87615114, 1.03976916, 0.83006619, 0.82888496, 1.33441591])

mesh = UnitSquareMesh(8, 8)
V = FunctionSpace(mesh, "CG", 1)
dt = Constant(0.1)
T = 2

ctrls = OrderedDict()
t = float(dt)
while t <= T:
    ctrls[t] = Function(V)
    t += float(dt)

def solve_heat(ctrls):
    u = []
    v = []
    u_0 = []

    for i in range(PDEs_numb):
        u.append(TrialFunction(V))
        v.append(TestFunction(V))
        u_0.append(Function(V, name="solution"))

    f = Function(V, name="source")
    d = Function(V, name="data")

    F = []
    for i in range(PDEs_numb):
        F.append(((u[i] - u_0[i]) / dt * v[i] + kappa[i] * nu * inner(grad(u[i]), grad(v[i])) - f * v[i]) * dx)

    a = []
    L = []

    for i in range(PDEs_numb):
        a.append(lhs(F[i]))
        L.append(rhs(F[i]))

    bc = DirichletBC(V, 0, "on_boundary")

    t = float(dt)

    j = 0

    for i in range(PDEs_numb):
        j += 0.5 * float(dt) * assemble((u_0[i] - d) ** 2 * dx)

    while t <= T:
        f.assign(ctrls[t])
        data.t = t
        d.assign(interpolate(data, V))

        for i in range(PDEs_numb):
            solve(a[i] == L[i], u_0[i])

        for i in range(PDEs_numb):
            j += 0.5 * float(dt) * assemble((u_0[i] - d) ** 2 * dx)
        t += float(dt)

    return u_0, d, j

u, d, j = solve_heat(ctrls)

alpha = Constant(1e-1)
regularisation = alpha/2*sum([1/dt*(fb-fa)**2*dx for fb, fa in
    zip(list(ctrls.values())[1:], list(ctrls.values())[:-1])])


J = j + assemble(regularisation)
m = [Control(c) for c in ctrls.values()]

start_time = time.time()

rf = ReducedFunctional(J, m)
opt_ctrls = minimize(rf, options={"maxiter": 1})

total_time = (time.time() - start_time)

f = open ("Runtime_parallel_test.txt", "w")
f.write(str(total_time))
f.write(" seconds")
f.close()

I would suggest having a look at: How to solve in parallel for parametric study

1 Like

Thank you for your reply!

I’ve made an extention of what was suggested in the post you attached by gathering solution from the processes in order to use it in forward model (solve_heat). There are 3 methods in MPI: Gather, GartherV and gather. GartherV is not provided by dolfin, Gather can’t handle object-type data, and gather, which is supposed to work with objects-type, gives me an error:

TypeError: can’t pickle dolfin.cpp.function.Function objects

The code:

from dolfin import *
import numpy as np

size = comm.Get_size()
rank = comm.Get_rank()

numDataPerRank = 1 

def poisson(alpha):
    mesh = UnitSquareMesh(256, 256)
    V = FunctionSpace(mesh, 'CG', 1)
    u, v = TrialFunction(V), TestFunction(V)
    bc = DirichletBC(V, Constant(0), 'on_boundary')
    a = inner(Constant(alpha)*grad(u), grad(v))*dx
    L = inner(Constant(1), v)*dx
    uh = Function(V)
    solve(a == L, uh, bc)
    return uh
   
alphas = [3, 8, 9, 10]
assert len(alphas) >= MPI.size(MPI.comm_world)

# Get alpha based on global rank
my_alpha = alphas[MPI.rank(MPI.comm_world)]
u_sol = poisson(my_alpha)  

# Gathered solution 
u_sol_gath = None
if rank == 0:
    u_sol_gath =  np.empty(numDataPerRank*size, dtype='f')  

u_sol_gath = MPI.comm_world.gather(u_sol, root=0)
# u_sol_gath = comm.gather(u_sol, root=0)
# comm.Gather(u_sol, u_sol_gath, root=0)

if rank == 0:
    print('Rank: ',rank, ', Gathered solutions: ', len(u_sol_gath))

I also tested this program on non-fenics objects, works fine.

from dolfin import *
import numpy as np

size = comm.Get_size()
rank = comm.Get_rank()

numDataPerRank = 1 

def function1(alpha):
    return alpha*2
   
alphas = [3, 8, 9, 10]

assert len(alphas) >= MPI.size(MPI.comm_world)
# Get alpha based on global rank
my_alpha = alphas[MPI.rank(MPI.comm_world)]
u_sol = function1(my_alpha)  

# print('Rank: ',rank, ', sendbuf: ',u_sol)
# Gathered solution 

u_sol_gath = None
if rank == 0:
    u_sol_gath =  np.empty(numDataPerRank*size, dtype='f')  

u_sol_gath = MPI.comm_world.gather(u_sol, root=0)
# u_sol_gath = comm.gather(u_sol, root=0)
# comm.Gather(u_sol, u_sol_gath, root=0)

if rank == 0:
    print('Rank: ',rank, ', Gathered solutions: ', u_sol_gath)

Rank: 0 , Gathered solutions: [6, 16, 18, 20]

It seems like this issue also accurs using multiprocessing lib. and it was priviously discussed. Apperently I can try saving solutions from processes to my hard drive and upload them later to overcome this issue and I will try.

But are there other options or ideas in terms of MPI?

Sry, I missed one line of code

Running this with mpirun -n 4 python3 filename.py the error can be reproduced

from dolfin import *
import numpy as np

comm = MPI.comm_world
size = comm.Get_size()
rank = comm.Get_rank()

numDataPerRank = 1 

def poisson(alpha):
    mesh = UnitSquareMesh(256, 256)
    V = FunctionSpace(mesh, 'CG', 1)
    u, v = TrialFunction(V), TestFunction(V)
    bc = DirichletBC(V, Constant(0), 'on_boundary')
    a = inner(Constant(alpha)*grad(u), grad(v))*dx
    L = inner(Constant(1), v)*dx
    uh = Function(V)
    solve(a == L, uh, bc)
    return uh
   
alphas = [3, 8, 9, 10]
assert len(alphas) >= MPI.size(MPI.comm_world)

# Get alpha based on global rank
my_alpha = alphas[MPI.rank(MPI.comm_world)]
u_sol = poisson(my_alpha)  

# Gathered solution 
u_sol_gath = None
if rank == 0:
    u_sol_gath =  np.empty(numDataPerRank*size, dtype='f')  

u_sol_gath = MPI.comm_world.gather(u_sol, root=0)

if rank == 0:
    print('Rank: ',rank, ', Gathered solution: ', len(u_sol_gath))

It still seems like you haven’t read the previous post I linked to. You are in your example creating the mesh with the global MPI communicator, aka. a distributed mesh.
What you could do is to use the comm_self communicator to solve one problem with one mesh per processor. Then you can gather the local vector (which is the array containing your solution, and post-process it).
See the example below:

from dolfin import *
import numpy as np

comm = MPI.comm_world
size = comm.Get_size()
rank = comm.Get_rank()

numDataPerRank = 1

def poisson(alpha):
    mesh = UnitSquareMesh(MPI.comm_self, 10, 10)
    V = FunctionSpace(mesh, 'CG', 1)
    u, v = TrialFunction(V), TestFunction(V)
    bc = DirichletBC(V, Constant(0), 'on_boundary')
    a = inner(Constant(alpha)*grad(u), grad(v))*dx
    L = inner(Constant(1), v)*dx
    uh = Function(V)
    print(len(uh.vector().get_local()))
    solve(a == L, uh, bc)
    return uh

alphas = [3, 8, 9, 10]
assert len(alphas) >= MPI.size(MPI.comm_world)

# Get alpha based on global rank
my_alpha = alphas[MPI.rank(MPI.comm_world)]
u_sol = poisson(my_alpha)

# Gathered solution
u_sol_gath = None
if rank == 0:
    u_sol_gath =  np.empty(numDataPerRank*size, dtype='f')

u_sol_gath = MPI.comm_world.gather(u_sol.vector().get_local(), root=0)


if rank == 0:
    print('Rank: ',rank, ', Gathered solution: ', len(u_sol_gath))
    u_output = Function(u_sol.function_space())
    out = File(MPI.comm_self, "output.pvd")
    for i in range(size):
        u_output.vector()[:] = u_sol_gath[i]
        out << u_output
2 Likes

I’ve read that post carefully and tried mesh = UnitSquareMesh(MPI.comm_self, as well. Now I understand that this is the main point of running my code in parallel, but it didn’t make any difference in terms of error I was getting, thats why I excludede it from MWE.

But what I didn’t know, is that we can create new dolfin objects from the old ones not diractly but through an array and avoid this way that pickle error.

Thank you very much for your help!

The following Code shows how to deal with more parameters (more then available processes). I just scatter all parameters by the processes, solve the PDEs in each process in for-loop and gather the solutions at the end:

from dolfin import *
import numpy as np

comm = MPI.comm_world
size = comm.Get_size() 
rank = comm.Get_rank()

def poisson(alpha):
    mesh = UnitSquareMesh(MPI.comm_self, 256, 256)
    V = FunctionSpace(mesh, 'CG', 1)
    u, v = TrialFunction(V), TestFunction(V)
    bc = DirichletBC(V, Constant(0), 'on_boundary')
    a = inner(Constant(alpha)*grad(u), grad(v))*dx
    L = inner(Constant(1), v)*dx
    uh = Function(V)
    solve(a == L, uh, bc)
    return uh 

PDEsnumb = 100
numDataPerRank = int(PDEsnumb/size)

alpha = None

if rank == 0:
    # here should be an np.array of parameters
    # array of ones is used to show that solutions assembling works fine
    alpha = np.ones(100)

my_alpha = np.empty(numDataPerRank, dtype='d')
comm.Scatter(alpha, my_alpha, root=0)

u_sol_single = poisson(my_alpha[0])
u_sol_function_space = u_sol_single.function_space()
u_sol_array =  u_sol_single.vector().get_local()
u_sol_length = len(u_sol_array) 

u_sol = u_sol_array

for i in range(1, numDataPerRank):
    u_sol = np.concatenate((u_sol, poisson(my_alpha[i]).vector().get_local()))
    
u_sol_gath = None

if rank == 0:
    u_sol_gath = np.empty(PDEsnumb*u_sol_length, dtype='d') 

comm.Gather(u_sol, u_sol_gath, root=0)

if rank == 0:
    u_output = []

    for i in range(PDEsnumb):
        u_output.append(Function(u_sol_function_space)) 
        u_output[i].vector()[:] = u_sol_gath[i*u_sol_length : i*u_sol_length + u_sol_length]
    
    k = u_output[0]

    for i in u_output:
        # this shows that for the same parameters solutions are equal 
        print(assemble(((k-i)**2)*dx))

Unfortunatly this gathering procedure wasn’t necessary for the Optimization, because in time distributed control we have the while loop for time-stepping which must be computed in serial, and inside of this loop we have the for-loop, which is running in paralle. I.e. it should be something like this:

if rank == 0:
    # some code
    while t <= T:
        # the code that must be run on the all processes, 
        # i.e. it can't be inside the 'if rank==0' statement 
        t += dt

So it seems imposible with MPI.

Another idea is to compute forward model completly in parallel and optimize in serial. I scatter PDEs parameters and broadcast the controls, on each process compute forward model and then gather objective function values on single process and run the optimization there.

How do you think, it can work out? Can dolfin adjoint catch such forward model and use it further?

Trying to implement it I’ve already spent a lot of time, and there are still a lot of questionable things. So I’m already not sure if it’s worth spending more time on MPI without knowing that the idea is right.

I do not quite follow what you are trying to achieve with the time dependent problem.
For each separate parameter input, you can solve the PDE on a separate processor.
You cannot distribute time steps over multiple processors (as I think you also state in your setting).

In the setting of the optimization, what kind of descent method/optimization algorithm are you using?
Could you make a simple problem illustrating the issue (without it extending 50 lines).

This is the initial problem:

from fenics import *
from fenics_adjoint import *
from collections import OrderedDict

data = Expression("16*x[0]*(x[0]-1)*x[1]*(x[1]-1)*sin(pi*t)", t=0, degree=4)
nu = Constant(1e-5)

# number of parapeters (i.e. PDEs)
PDEs_numb = 30
# this is an array of 30 parameters, which produces 30 PDEs
kappa = np.arange(1, PDEs_numb + 1)    

mesh = UnitSquareMesh(8, 8)
V = FunctionSpace(mesh, "CG", 1)
dt = Constant(0.1)
T = 2

ctrls = OrderedDict()
t = float(dt)
while t <= T:
    ctrls[t] = Function(V)
    t += float(dt)

# forward model
def solve_heat(ctrls):
    # for each PDE we create functions and variables separatly, filling out the arrays
    u, v, u_0 = [], [], []

    for i in range(PDEs_numb):
        u.append(TrialFunction(V))
        v.append(TestFunction(V))
        u_0.append(Function(V, name="solution"))

    # we have single control which affects all PDEs, and we want to know optimal control for the all PDE
    # in this settings it of course can't be optimal, but this is outside the scope of this post
    f = Function(V, name="source")
    d = Function(V, name="data")
    
    # Here we create empty arrays for weak formulations and its lhs and rhs for each parameter kappa
    F, a, L = [], [], []
    # and fill them
    for i in range(PDEs_numb):
        F.append(((u[i] - u_0[i]) / dt * v[i] + kappa[i] * nu * inner(grad(u[i]), grad(v[i])) - f * v[i]) * dx)
        a.append(lhs(F[i]))
        L.append(rhs(F[i]))

    bc = DirichletBC(V, 0, "on_boundary")

    t = float(dt)

    j = 0
    
    # in objective function we compute mean value of PDEs solutions, thats why we have a sum 
    for i in range(PDEs_numb):
        j += 1 / PDEs_numb * float(dt) * assemble((u_0[i] - d) ** 2 * dx)

    while t <= T:
        f.assign(ctrls[t])
        data.t = t
        d.assign(interpolate(data, V))
        
        # in this loop we solve PDEs, and this is the part I want to do in parallel
        # so I though I can destribute here PDEs over processes
        for i in range(PDEs_numb):
            solve(a[i] == L[i], u_0[i])
        # and here gather all solutions to use them in the step below

        #we compute mean from computed solutions
        for i in range(PDEs_numb):
            j += 1 / PDEs_numb * float(dt) * assemble((u_0[i] - d) ** 2 * dx)
        t += float(dt)

    return u_0, d, j

u, d, j = solve_heat(ctrls)

m = [Control(c) for c in ctrls.values()]
rf = ReducedFunctional(j, m)
opt_ctrls = minimize(rf, options={"maxiter": 1})

And sorry, I don’t know how to make any it smaller

When you run this code, you’ll see, that the more PDEs we want, the longer it takes for single iteration of optimization.

So the first idea was to distribute parameters (i.e. PDEs) over processes to solve them in parallel and after that gather all solutions and compute objective function value from them. All this parallel thing was supposed to be inside of the while loop, but the while loop it self must be on single process, i.e. inside if rank == 0:, where we can compute things only on the process with rank 0. So this is the issue I can’t resolve.

The second idea I’m trying to implement now

  1. is to distribute parameters and controls from single process over all processes before computing forward model,
  2. then compute complete forward models solve_heat(ctrls) on each process for distributed parameters and cotrols (i.e. inside while-loop we sovle PDEs in for-loop as it was in the intial code above). Here I don’t distribute time steps, i.e. each process computes forward model for all time steps. And as it is in the initial code, inside one process PDEs have the same parameters but different forcing terms (controls) on each time step.
  3. And finally we gather objective function values from all process as sums and sum these sums up (cuz I stiil need only mean value) on single process, where we run optimization.

About algorith, I use L-BFGS-B, cuz I have control bounds in the main problem.

I don’t see the immediate problem with the while-loop, as you can add an else statement for the processes that are not rank 0, and have them wait for data from the main process’ while loop.
I could see how it is very inefficient with regards to a lot of data transfers between processes though, and I don’t know if you can transfer/pickle UFL forms.

Your second idea sounds more reasonable. Dividing up the work at the very beginning and only gathering data at the end should work for the forward model.

The gradient computations I’m a bit more unsure about. The tapes will be local to each process, so you cannot just gather the functionals and define a normal ReducedFunctional unless you define some custom functions to handle MPI communication (but I think that is tricky).

Instead I think the easiest path is to let each process define their own ReducedFunctional for their parts of the functional, and then have the main process also define a MainFunctional that calls each process for the forward/adjoint model and gathers the result.
Defining the MainFunctional as a subclass of the ReducedFunctional would allow you to run it through the pyadjoint optimization interface.

The main process also needs to know about the controls of the other classes in order to give the optimizer the correct information (dimension of the parameter space) and to distribute new parameter values to the correct processes (forward model) and return the gradient with the correct order of (sub) gradients.
Thus, the main process needs to know the number of controls for each process and be consistent in the ordering of inputs for the forward model and the ordering of outputs for the gradients.

2 Likes

First of all thank you for your detailed answer!

I can’t get how else-statement could help construct forward model on single process and what should be under this else-statement?

The first idea was to use the code I previously posted, just for finding PDE solutions in parallel and constructing forward model once and in serial (i.e. inside if rank==0 statement). In order to do so the whole forward model should be under if rank==0 statement, but the code responsible for parallel computations of PDEs can’t be under it.

The only place I can find to put else-statement is after while-loop, i.e. after fowrard model, but it means that forward model on process 0 would’n take into account parameters and PDEs distributed over processes other than 0, and in order to use them, we have to create and compute forward models on each process, which is already the second idea.

Or I got it wrong and you meant something else?

I haven’t worked with MPI directly, so I might be wrong or doing something that is bad practice, but in psuedo-code, this is what I imagined.
Note that I do not advocate for this model, as I said previously I think it is better to split it up before the time while-loop.

# Setup problem
...

if rank == 0:
    while t <= T:
        # Setup things
        ...

        # Send a command to all processes
        MPI_Send command = 1

        # Send function vectors
        MPI_Send u[i]

        # Receive solutions / functional
        MPI_Recv u[i] or J[i]

        # Advance timestep
        ...
    # Signal that we are done.
    MPI_Send command = 0
else:
    MPI_Recv command
    while command != 0
        MPI_Recv u[i]
        
        # Solve PDE
        ...

        MPI_Send u[i] or J[i]

        # Wait for next command
        MPI_Recv command

Optionally you could have the main process also do some work by defining a function for solving the PDE at a time step and then call that also on the main process.

Thank you for your reply and desire to help.

About pseudo code: It’s the same as if we executed the whole while-loop in parallel, i.e. without if- and else-statemnts, each process would have while loop for all time steps but for certain PDE-parameters, thus the forward model would run in parallel. And this would be the second idea already.

Before diving into the jungle of reduced functionals subclasses I tried to implement the first idea with multiprocessing lib and map function but stuck at the problem, you’ve mentioned. I can’t distribute the control as an UFL object over processes. I tried dill and pathos libraries, and unfortunalty they can’t handle it either.

Then I tried not to distribute the control directly but to distribute only time as an argument of the poisson function, and just call the control from the main process inside the poisson function. So poisson function is running in parallel, but it calls for control from the main process. It seems like it should work, but running optimization I get the error:

Traceback (most recent call last):
  File "TDCmult.py", line 105, in <module>
    opt_ctrls = minimize(rf, options={"maxiter": 1})
  File "/usr/local/lib/python3.6/dist-packages/pyadjoint/tape.py", line 46, in wrapper
    return function(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/pyadjoint/optimization/optimization.py", line 254, in minimize
    opt = algorithm(rf_np, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/pyadjoint/optimization/optimization.py", line 63, in minimize_scipy_generic
    m_global = rf_np.obj_to_array(m)
  File "/usr/local/lib/python3.6/dist-packages/pyadjoint/reduced_functional_numpy.py", line 104, in obj_to_array
    return self.get_global(obj)
  File "/usr/local/lib/python3.6/dist-packages/pyadjoint/reduced_functional_numpy.py", line 54, in get_global
    m_global += self.controls[i].control._ad_to_list(v)
  File "/usr/local/lib/python3.6/dist-packages/fenics_adjoint/types/function.py", line 231, in _ad_to_list
    m_v = m.vector()
AttributeError: 'NoneType' object has no attribute 'vector'

Seems like the control I’m passing to the reduced functional has a wrong type. How do you think, is it related with the way I transfer control to poisson function or I just missed/forgot something?

And again sorry for the MWE, but I can’t make it smaller and get the error.

from fenics import *
from fenics_adjoint import *
from collections import OrderedDict
import numpy as np
import multiprocessing
import itertools
set_log_active(False)

# Number of processes
size = 10
# Number of PDEs
PDEs_numb = 100
#Parameters
alphas = np.arange(1,PDEs_numb + 1)

assert (PDEs_numb % size) == 0    
data = Expression("16*x[0]*(x[0]-1)*x[1]*(x[1]-1)*sin(pi*t)", t=0, degree=4)
dt = Constant(0.1)
T = 2

def poisson(alpha, u_input, tf, flag = True):
    '''
    alpha are PDEs parameters i.e. alphas
    u_input is previosly computed uh.vector().get_local()
    tf is time in while loop
    '''
    mesh = UnitSquareMesh(MPI.comm_world, 20, 20)  
    V = FunctionSpace(mesh, 'CG', 1)
    u, v = TrialFunction(V), TestFunction(V)
    f = Function(V)
    u_0 = Function(V)
    bc = DirichletBC(V, Constant(0), 'on_boundary')
    a = (u*v + dt*inner(Constant(alpha)*grad(u), grad(v)))*dx
    # under flag True is the function to be computed in prallel
    if flag == True:
        u_0.vector()[:] = u_input
        #in the following line I call the control from the main process to avoid pickle error for control
        # and that's what I suspect is causing the error
        f.assign(ctrls[tf])     
        L = (u_0 + dt*f)*v*dx 
        uh = Function(V)
        solve(a == L, uh, bc) 
        return uh.vector().get_local()
    # this part is only to get functional space, descrived below
    else:
        L = (u_0 + dt*f)*v*dx 
        uh = Function(V)
        solve(a == L, uh, bc)
        return uh

# only functional space created in poisson function can be further used for computing functional
u_sol = poisson(1, 1, 1, flag = False)
u_function_space = u_sol.function_space() 
# since u_0 = Function(V) can't be transfered to poisson as initial state (in the first iteration of while loop)
# I construct initial u_0[i] as vector of zeros of the same lenght as u_sol
u_sol_length = len(u_sol.vector().get_local())
sol_of_zeros = [0 for i in range(u_sol_length)]

ctrls = OrderedDict()
t = float(dt)
while t <= T:
    ctrls[t] = Function(u_function_space)
    t += float(dt)

def solve_heat(ctrls):
    f = Function(u_function_space, name="source")
    d = Function(u_function_space, name="data")    
    t = float(dt)

    j = 0
    for i in range(PDEs_numb):
        j += 1 / PDEs_numb * float(dt) * assemble((Function(u_function_space) - d) ** 2 * dx)
    # so here I construct u_0 as array of zero arrays for each PDE and transfer it to poisson in the first iteration of while-loop
    u_0 = []
    for i in range(PDEs_numb):
        u_0.append(sol_of_zeros)
    # after the first iteration u_0 is obtained from poisson function and used as u_input argument in it in the next time iteration
    pool = multiprocessing.Pool(processes=size)
    while t <= T:
        data.t = t
        d.assign(interpolate(data, u_function_space))
        # here I ran poisson in parallel through starmap 
        # I transfer all parameters alpha, all initial states u_0 and single control for given time iteration to the poisson function
        u_output = pool.starmap(poisson, zip(alphas, u_0, itertools.repeat(t)))
        u_0 =[]
        # here I construct a vector of solutions to be used for functional computation
        for i in range(PDEs_numb):
            u_0.append(Function(u_function_space)) 
            u_0[i].vector()[:] = u_output[i]
            # and compute mean value of functional from all solutions
            j += 1 / PDEs_numb * float(dt) * assemble((u_0[i] - d) ** 2 * dx)
             
        u_0 = u_output
        t += float(dt)

    return u_0, d, j

u, d, j = solve_heat(ctrls)
m = [Control(c) for c in ctrls.values()]
rf = ReducedFunctional(j, m)
opt_ctrls = minimize(rf, options={"maxiter": 1})

Could you explain, how to

… return the gradient with the correct order of (sub) gradients

If I got it right I have to redefine two methods of ReducedFunctional class in MainFunctional subclas cuz they are used in ReducedFunctionalNumPy and then in minimize_scipy_generic.

I see that I can gather ReducedFunctional values from each process and sum them up in the __call__ method of MainFunctional which will produce the mean of the functional I need. But how to correctly implement the derivative method? It’s clear how to gather sub gradients from processes:

# the code below should be impemented inside MainFunctional subclass
# on each process call gradients of reduced functional
rf = ReducedFunctional(j, m)
array_of_gradient_values = []
for i in range(len(m)):
    array_of_gradient_values.append(rf.derivative()[i].vector().get_local())
# send array_of_gradient_values to the main process
# on the mein process assemble ufl-formes from all array_of_gradient_values (from each process):
sub_gradient = []
for i in range(len(m)):
    sub_gradient.append(Function(V))
    sub_gradient[i].vector()[:] = array_of_gradient_values[i]

So the number of subgradients will be equal to number of processes in use. Each subgradient will have the same number of elemnts (equal to number of controls/time steps)

What should I do with these subgradients to get single gradient to be returned in derivative method of MainFunctional?

Another question is, why

the main process needs to know the number of controls for each process

Isn’t the dimension of the parameter space the same for all processes?

Thank you in advance.

The following does what it should, but might be done in more efficient way. Cuz now the same optimization problem is solved on each process. I tried to use single process, but didn’t manage to. I don’t know how to call functions on processes other the 0, inside process 0. I.e. as long the optimization is running on process 0, other prcesses can’t be used.

from fenics import *
from fenics_adjoint import *
from collections import OrderedDict
import numpy as np
from mpi4py import MPI as MPI1
set_log_active(False)

comm = MPI1.COMM_WORLD
size = comm.Get_size() 
rank = comm.Get_rank()

data =Constant(2)
nu = Constant(1e-5)
dt = Constant(0.1)
T = 2

PDEs_numb = 10
numDataPerRank = int(PDEs_numb/size)

alphas = None
if rank == 0:
    alphas = np.ones(PDEs_numb)

my_alpha = np.empty(numDataPerRank, dtype='d')
comm.Scatter(alphas, my_alpha, root=0)
nx = ny = 20
mesh = UnitSquareMesh(MPI1.COMM_SELF, nx, ny) # MPI1.COMM_SELF,
V = FunctionSpace(mesh, "CG", 1)

ctrls = OrderedDict()
t = float(dt)
while t <= T:
    ctrls[t] = Function(V) 
    t += float(dt)

def poisson(alpha, u_0, f):
    u, v = TrialFunction(V), TestFunction(V)
    bc = DirichletBC(V, Constant(0), 'on_boundary')
    a = (u*v + dt*inner(Constant(alpha)*nu*grad(u), grad(v)))*dx
    L = (u_0 + dt*f)*v*dx
    uh = Function(V)
    solve(a == L, uh, bc) 
    return uh

def solve_heat(ctrls):
    u_0 = []
    f = Function(V, name="source")
    d = Function(V, name="data")
    d.assign(project(data, V))
    t = float(dt)
    j = 0

    for i in range(numDataPerRank):
        u_0.append(Function(V, name="solution"))
        j += 0.5 * float(dt) * assemble((u_0[i] - d) ** 2 * dx)

    while t <= T:
        f.assign(ctrls[t])
        
        for i in range(numDataPerRank):
            u_0[i] = poisson(my_alpha[i], u_0[i], f)
            j += 0.5 * float(dt) * assemble((u_0[i] - d) ** 2 * dx)

        t += float(dt)

    return u_0, d, j

u, d, j = solve_heat(ctrls)
u_sol_length = len(u[0].vector().get_local())

alpha1 = Constant(1e-5)
regularisation = alpha1/2*sum([1/dt*(fb-fa)**2*dx for fb, fa in
    zip(list(ctrls.values())[1:], list(ctrls.values())[:-1])])


J = j + assemble(regularisation)
m = [Control(c) for c in ctrls.values()]

# beacause of the MPI, MainFunctional instance will be created on each process
# optimization runs on each process as well and results on all processes are equal 

class MainFunctional(ReducedFunctional):
    def __call__(self, values):
        # MainFunctional takes functional and controls from self and compputes 
        # ReducedFunctional values on each process
        rf = ReducedFunctional(self.functional, self.controls)
        func_value = np.array(rf([i for i in values]))
        # MainFuntional value is sum of Reduces functional values
        sum_func_value = np.array(0.0,'d')
        # obetained by reduce method, and then distributed over all processes via Allreduce
        # here only mpi4py package can be used 
        comm.Allreduce(func_value, sum_func_value, op=MPI1.SUM)
        return sum_func_value
    
    def derivative(self):
        # MainFunctional takes functional and controls from self and compputes 
        # gradients of ReducedFunctionals on each process
        rf = ReducedFunctional(self.functional, self.controls)
        rf_der = rf.derivative()
        # but to avid pickle error, gradients must be first transformed into arrays
        # and joined to a single array - der_array
        der_array = np.array(())
        for i in range(len(ctrls)):
            der_array = np.append(der_array, rf_der[i].vector().get_local())
        # and again we sum all the gradients up and distribute these sums over all processes
        sum_der_array = np.empty(u_sol_length*len(ctrls), dtype='d')
        comm.Allreduce(der_array, sum_der_array, op=MPI1.SUM) # MPI.SUM
        # here we need to assemble gradients arays into initial ufl form 
        uflderiv = []
        for i in range(len(ctrls)):
            uflderiv.append(Function(V))
            uflderiv[i].vector()[:] = sum_der_array[i*u_sol_length : i*u_sol_length + u_sol_length]
        return uflderiv

 
mf = MainFunctional(J, m)

# print(mf([Function(V) for i in range(len(ctrls))]))
# print(mf.derivative()[1].vector().get_local())
 
opt_ctrls = minimize(mf, options={"maxiter": (50) })

With time mpirun -n 10 python3:
real 0m57.565s

from fenics import *
from fenics_adjoint import *
from collections import OrderedDict
set_log_active(False)
import numpy as np

data =Constant(2)
nu = Constant(1e-5)

dt = Constant(0.1)
T = 2

PDEs_numb = 10
numDataPerRank = int(PDEs_numb)

alphas = np.ones(PDEs_numb)
my_alpha = alphas

nx = ny = 20
mesh = UnitSquareMesh( nx, nx)
V = FunctionSpace(mesh, "CG", 1)

ctrls = OrderedDict()
t = float(dt)
while t <= T: 
    ctrls[t] = Function(V)
    t += float(dt)

def poisson(alpha, u_0, f):
    u, v = TrialFunction(V), TestFunction(V)
    bc = DirichletBC(V, Constant(0), 'on_boundary')
    a = (u*v + dt*inner(Constant(alpha)*nu*grad(u), grad(v)))*dx
    L = (u_0 + dt*f)*v*dx
    uh = Function(V)
    solve(a == L, uh, bc) 
    return uh

def solve_heat(ctrls):
    u_0 = []
    f = Function(V, name="source")
    d = Function(V, name="data")
    d.assign(project(data, V))  
    t = float(dt)
    j = 0

    for i in range(numDataPerRank):
        u_0.append(Function(V, name="solution"))
        j += 0.5 * float(dt) * assemble((u_0[i] - d) ** 2 * dx)   
    
    while t <= T:
        f.assign(ctrls[t])
        
        for i in range(numDataPerRank):
            u_0[i] = poisson(my_alpha[i], u_0[i], f)
            j += 0.5 * float(dt) * assemble((u_0[i] - d) ** 2 * dx)

        t += float(dt)

    return u_0, d, j

u, d, j = solve_heat(ctrls)

alpha1 = Constant(1e-5)
regularisation = alpha1/2*sum([1/dt*(fb-fa)**2*dx for fb, fa in
    zip(list(ctrls.values())[1:], list(ctrls.values())[:-1])])

J = j + assemble(regularisation)
m = [Control(c) for c in ctrls.values()]

rf = ReducedFunctional(J, m)

opt_ctrls = minimize(rf, options={"maxiter": 50, 'disp': True})

With time python3:
real 4m41.190s