I want to solve the equation
∂ₜρ + div(w ⋅ ρ)
say on an interval. Here ρ is a mass density that evolves over time
and w is a given “windfield”, that pushes ρ around.
The boundary conditions are that ρ(t=0) is prescribed and wind can blow ρ out of the domain, but never into the domain.
It took me quite a while to come up with something working. I googled quite a while for advection examples.
But all examples that I found were either out of date, incorrect or advection was only a small piece of the equations.
So I am looking for feedback. Is this implementation sane? Can it be simplified, made more readable etc?
import matplotlib.pyplot as plt
from fenics import *
class OnBoundaryNear(SubDomain):
def __init__(self, x, **kw):
self.x = x
super().__init__(**kw)
def inside(self, x, on_boundary):
return on_boundary and near(x[0],self.x, DOLFIN_EPS)
def pospart(x):
return 0.5*(x+abs(x))
def negpart(x):
return 0.5*(x-abs(x))
class SimpleAdvection():
"""
Solve the basic advection
∂ₜρ + div(w ⋅ ρ)
on an interval. Here
* w is a given "windfield"
* ρ is a mass density that is moved by w
* ρ(t = 0) is prescribed
* The quantity can move out of the domain, but cannot move into the domain
"""
def __init__(self,
dtmax,
rho0,
windfield,
mesh,
):
self.V = FunctionSpace(mesh, "DG",0)
self.dtmax = dtmax
self.mesh = mesh
self.rho = Function(self.V)
self.rho.assign(rho0)
self.windfield = windfield
self.t = 0
# bdry
self.bdry_markers = MeshFunction("size_t", mesh, dim=mesh.topology().dim()-1)
self.LEFT_BOUNDARY = 1
self.RIGHT_BOUNDARY = 2
OnBoundaryNear(0).mark(self.bdry_markers, self.LEFT_BOUNDARY)
OnBoundaryNear(1).mark(self.bdry_markers, self.RIGHT_BOUNDARY)
self.drho_dt = Function(self.V)
# precompute
self.build_F()
def build_F(self):
phi = TestFunction(self.V)
rho = self.rho
drho_dt = self.drho_dt
n = FacetNormal(self.mesh)
cond = dot(n("+"), self.windfield) <= 0
windstrength = abs(dot(n("+"), self.windfield))
# evaluate quantities on out / in flow cells
rho_out = conditional(cond, rho("-"), rho("+"))
phi_in = conditional(cond, phi("+"), phi("-"))
phi_out = conditional(cond, phi("-"), phi("+"))
Q_out = phi_out * rho_out * windstrength * dS
Q_in = phi_in * rho_out * windstrength * dS
Q_rhs = phi * drho_dt * dx
# bdry
n_left = Constant(("-1",))
n_right = Constant(("1",))
wind_left = dot(self.windfield, n_left)
wind_right = dot(self.windfield, n_right)
ds_left = ds(self.LEFT_BOUNDARY , subdomain_data=self.bdry_markers)
ds_right = ds(self.RIGHT_BOUNDARY, subdomain_data=self.bdry_markers)
Q_bdry_left = phi*pospart(wind_left )*rho*ds_left
Q_bdry_right = phi*pospart(wind_right)*rho*ds_right
F = -Q_rhs - Q_out + Q_in - Q_bdry_left - Q_bdry_right
self.F = F
def update_drho_dt(self):
solve(self.F == 0, self.drho_dt)
def timestep(self, tmax):
if tmax == self.t:
return
assert tmax > self.t
self.update_drho_dt()
dt = min(tmax - self.t, self.dtmax)
t_new = self.t + dt
assert t_new > self.t
self.t = t_new
self.rho.assign(self.rho + dt * self.drho_dt)
def rununtil(self, tmax):
while self.t < tmax:
self.timestep(tmax)
ncells = 5
rho0 = Expression("x[0]", degree=1)
mesh = UnitIntervalMesh(ncells)
windfield = Expression(("1",), degree=1)
o = SimpleAdvection(mesh=mesh, windfield=windfield, rho0=rho0, dtmax=1.0/ncells)
# plot(o.drho_dt)
for i in range(ncells):
plot(o.rho)
o.rununtil(i/ncells)
plot(o.rho)