mpi4py 中的 futures 模块
在上一篇中我们介绍了 mpi4py 中的 profiling,下面我们将介绍 mpi4py 中的 futures 模块。
mpi4py.futures 提供了一个由多个工作进程使用 MPI 进程间通信来异步执行任务的高级别接口。mpi4py.futures 是建立在 Python 标准库中的 concurrent.futures 的基础之上的。这里先简要介绍一下 concurrent.futures 模块。
concurrent.futures
concurrent.futures 提供了一个异步并行执行任务的高级别接口。异步执行的任务可以通过线程来完成(使用 ThreadPoolExecutor),也可以通过不同的进程来完成(使用 ProcessPoolExecutor),它们都继承自抽象的 Executor 类。
为了有助于理解和使用 moi4py.futures,下面简要给出 concurrent.futures 中的主要类,方法和函数,但是不做详细的讲解,想要了解更多的读者可以参见其文档。
Executor 类
class concurrent.futures.Executor
提供异步执行任务的抽象类,不能直接使用,而是使用其具体的子类。下面是其几个主要的方法接口:
submit(fn, *args, **kwargs)
提交执行 fn(*args **kwargs),返回一个 Future 对象(将在下面介绍)表示提交执行的结果。
map(func, *iterables, timeout=None, chunksize=1)
类似于 Python 中的 map(func, *iterables),不同的是 func
是异步并发执行的。
shutdown(wait=True)
通知任务执行器在当前挂起的 future 任务完成后释放所占用的资源。
ThreadPoolExecutor 类
ThreadPoolExecutor 是 Executor 的一个子类,使用一个线程池来异步地执行任务,下面是其类原型:
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
ProcessPoolExecutor 类
ProcessPoolExecutor 是 Executor 的一个子类,使用一个进程池来异步地执行任务,使用 multiprocessing 模块,可以避开 Python 的 Global Interpreter Lock (GIL),但是只有那些可以被 pickle 系列化的对象才能被执行并返回。下面是其类原型:
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
Future 对象
Future 类包装异步执行的任务结果。Future 类对象由 Executor.submit() 创建。
class concurrent.futures.Future()
Future 类原型,下面使其主要方法:
cancel()
尝试取消调用。
cancelled()
如果调用被成功取消则返回 True。
running()
如果调用正在执行而不能被取消则返回 True。
done()
如果调用被成功取消或已执行完成则返回 True。
result(timeout=None)
返回调用的结果。
exception(timeout=None)
返回调用抛出的异常。
模块函数
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待 Future 对象 fs
完成。
concurrent.futures.as_completed(fs, timeout=None)
返回一个由一系列 Future 对象中完成(执行完或者被取消)的那些组成的迭代器。
mpi4py.futures
mpi4py.futures 是基于 concurrent.futures 的。具体来说,mpi4py.futures 提供了 Executor 类的子类实现 MPIPoolExecutor 和 MPICommExecutor。
MPIPoolExecutor 类
MPIPoolExecutor 使用一个 MPI 进程池来异步执行任务。同 ProcessPoolExecutor,MPIPoolExecutor 可以避开 Python 的 Global Interpreter Lock (GIL),但是只有那些可以被 pickle 系列化的对象才能被执行并返回。因为工作进程必须导入 __main__ 模块,因此 MPIPoolExecutor 不能工作在交互的环境下(如 Ipython shell 中)。
MPIPoolExecutor 使用 MPI-2 标准中引进的动态进程管理特性。主进程使用 MPI.COMM_SELF 的 Spawn() 方法启动新的工作进程。主进程会分别使用一个单独的线程同每个工作进程进行通信。工作进程在其唯一的主线程内执行所分配到的任务。为了避免工作进程再启动新的工作进程(会导致无限递归),一种简单的方式是将 MPIPoolExecutor 执行代码放到主脚本的 if __name__ == '__main__': 语句下。
下面是其类原型:
class mpi4py.futures.MPIPoolExecutor(max_workers=None,**kwargs)
Executor 的子类,使用至多 max_workers
个进程组成的进程池来异步执行任务。如果 max_workers
为 None (默认值),则其值由环境变量 MPI4PY_MAX_WORKERS (如果设置了)决定,或者由 MPI universe 的大小(如果设置了)决定,否则只会生成单个工作进程。如果 max_workers
的值小于或等于 0,则会抛出 ValueError 异常。其它可设置参数有:
-
python_exe
:Python 执行程序路径。 -
python_args
:列表或者可迭代对象,用来向 Python 执行程序传递额外的命令行参数。 -
mpi_info
:字典或可产生 (key, value) 对的迭代对象。这些 (key, value) 对会通过 MPI.Info 对象传递给 MPI.Intracomm.Spawn() 调用以启动工作进程。可以通过此机制告诉 MPI 在什么地方及怎样启动新的进程。 -
globals
:字典或可产生 (name, value) 对的迭代对象,用来初始化工作进程的主模块命名空间。 -
main
:如果为 False,则不会在工作进程中导入 __main__ 模块。 -
path
:列表或可迭代对象,向 sys.path 中追加一系列工作进程搜寻路径。 -
wdir
:设置工作进程的当前工作目录。 -
env
:字典可产生 (name, value) 对的迭代对象,用来更新工作进程的 os.environ。
submit(func, *args, **kwargs)
以 func(*args, **kwargs) 提交执行任务,返回一个 Future 对象作为提交结果。简单的使用例程如下:
executor = MPIPoolExecutor(max_workers=1)
future = executor.submit(pow, 321, 1234)
print(future.result())
map(func, *iterables, timeout=None, chunksize=1, **kwargs)
等价于 map(func, *iterables),不同的是 func
是被异步地执行,对 func
的多个调用可以在多个进程中并发地无序地执行。返回的迭代器会抛出一个 TimeoutError 如果 __next__() 调用后 timout
秒还没有得到结果,timeout
可以为一个整数或一个浮点数。如果 timeout
为 None,则等待的时间没有限制。如果某个调用抛出了异常,则在获取返回的迭代器中该值时会重新抛出该异常。该方法会将 iterables
分割成若干块分别提交到进程池中执行,块的近似大小由 chunksize
设置。对非常长的 iterables
,使用一个大的 chunksize
可以显著地提高执行性能。在默认情况下,返回的迭代器会产生与原 iterables
相同顺序的结果,等待提交的任务依次完成,如果传递并设置关键字参数 unordered
为 True,则返回的迭代器会尽快地返回任意已经完成的任务。
starmap(func, iterable, timeout=None, chunksize=1, **kwargs)
等价于 itertools.starmap(func, iterable)。如果 iterable
已经 "zip" 过了,则使用该方法更方便。map(func, *iterable) 等价于 starmap(func, zip(*iterable))。
shutdown(wait=True)
通知任务执行器在当前挂起的 future 任务完成后释放所占用的资源。在该方法执行后的 submit() 和 map() 调用会抛出 RuntimeError。如果 wait
为 True (默认),则该方法会一直等到所有挂起的 future 任务都执行完并且执行器的相关资源都已释放后才返回。如果 wait
为 False,则该方法会立即返回,但执行器的相关资源则会等到所有挂起的 future 任务都完成后才释放。不管 wait
的值是什么,整个 Python 程序都会在所有挂起的 future 任务都完成后才会结束退出。使用 with 语句可以避免显式地调用该方法。with 语句相当于设置 wait
为 True 调用 shutdown(),举例如下:
import time
with MPIPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 2) assert future.done()
bootup(wait=True)
通知执行器尽早分配所需的资源(特别是 MPI 进程)。如果 wait
为 True,则该方法会直到资源已经分配好才返回。在第一次调用 submit() 时会自动分配所需的资源,因此很少需要显式地调用该方法。
需要注意的是,因为主进程要使用单独的线程同每一个工作进程进行 MPI 通信,因此所使用的 MPI 环境需要提供 MPI.THREAD_MULTIPLE 级别的多线程支持。如果 MPI 环境所支持的线程级别比 MPI.THREAD_MULTIPLE 低,则 mpi4py.futures 会使用一个全局锁来系列化 MPI 调用。如果支持的线程级别比 MPI.THREAD_SERIALIZED 低, 则 mpi4py.futures 会发出 RuntimeWarning 警告。
MPICommExecutor 类
对只支持 MPI-1 标准的 MPI 实现,无法使用 MPI-2 标准中才引入的动态进程管理特性,另外在一些超算平台上对调用 MPI_Comm_spawn() 方法可能有额外的限制或引起额外的复杂度。针对这些情况,mpi4py.futures 支持另一种更加传统的类似于 SPMD 的使用方式,这种使用方式只用到 MPI-1 的相关特性。用户使用 mpiexec 命令来启动 Python 应用程序,在程序里面集合调用 MPICommExecutor 上下文管理器将启动起来的若干 MPI 进程分割成一个主进程和多个工作进程。主进程访问 MPICommExecutor 实例以提交任务,与此同时,工作进程沿着另一个不同的执行路径执行主进程提交的任务。
下面是 MPICommExecutor 的原型:
class mpi4py.futures.MPICommExecutor(comm=None,root=0)
MPICommExecutor 的上下文管理器,将一个 MPI 组内通信子 comm
(默认值 None 表示 MPI.COMM_WORLD) 分割成两个无交集的集合:单个主进程(rank 为 root
的进程)和其它的所有进程作为工作进程。这两个集合通过组间通信子连接在一起。with 语句的目标要么是一个 MPICommExecutor 实例(对主进程),要么是 None (对工作进程)。简短的使用例程如下:
from mpi4py import MPI
from mpi4py.futures import MPICommExecutor
with MPICommExecutor(MPI.COMM_WORLD, root=0) as executor:
if executor is not None:
future = executor.submit(abs, -42)
assert future.result() == 42
answer = set(executor.map(abs, [-42, 42]))
assert answer == {42}
需要注意的是,如果向 MPICommExecutor 传递了一个 size 为 1 的通信子(如 MPI.COMM_SELF),with 语句的目标将会将所有提交的任务在一个单独的工作线程上完成,用于保证任务会被异步地执行。但是,Python 的 Global Interpreter Lock (GIL) 会阻止主线程和工作线程并发地执行,即使是运行在多核处理器上。线程的频繁切换可能会降低程序的性能,因此一般不建议使用一个 size 为 1 的通信子来执行 MPICommExecutor,如果确实要使用的话,可以考虑使用 concurrent.futures 的 ThreadPoolExecutor。
命令行执行方法
当所使用的 MPI 实现不支持动态进程管理特性时,可以用另一种方式来使用 mpi4py.futures:在命令行方式下传递 -m mpi4py.futures 给 python 执行程序,此外 mpi4py.futures 接受 -m mod 以执行一个模块,-c cmd 以执行一条 Python 语句,或者 - 从标准输入(sys.stdin)读取 Python 命令语句。总的来说,可以使用下面 4 种命令行方式来运行 mpi4py.futures:
- $ mpiexec -n numprocs python -m mpi4py.futures pyfile [arg] ...
- $ mpiexec -n numprocs python -m mpi4py.futures -m mod [arg] ...
- $ mpiexec -n numprocs python -m mpi4py.futures -c cmd [arg] ...
- $ mpiexec -n numprocs python -m mpi4py.futures - [arg] ...
在开始执行主脚本之前,mpi4py.futures 会将 MPI.COMM_WORLD 分割成一个主进程(MPI.COMM_WORLD 中 rank 为 0 的进程)和 numprocs - 1 个工作进程,这些进程会通过一个 MPI 组间通信子连接起来。然后,主进程执行用户脚本代码,最终会创建 MPIPoolExecutor 实例以提交计算任务,与此同时,工作进程沿着一个不同的执行路径以服务于主进程。当主进程顺利地执行完主脚本结束时,整个 MPI 执行环境会合适地退出,但是在遇到没有处理的异常情况时,主进程会调用 MPI.COMM_WORLD.Abort(1) 以避免死锁并强制整个 MPI 执行环境退出。
例程
下面给出相应的使用例程。
# julia.py
"""
Demonstrates the usage of mpi4py.futures.MPIPoolExecutor.
Run this with 1 processes like:
$ mpiexec -n 1 -usize 17 python julia.py
or 17 processes like:
$ mpiexec -n 17 python -m mpi4py.futures julia.py
"""
from mpi4py.futures import MPIPoolExecutor
x0, x1, w = -2.0, +2.0, 640*2
y0, y1, h = -1.5, +1.5, 480*2
dx = (x1 - x0) / w
dy = (y1 - y0) / h
c = complex(0, 0.65)
def julia(x, y):
z = complex(x, y)
n = 255
while abs(z) < 3 and n > 1:
z = z**2 + c
n -= 1
return n
def julia_line(k):
line = bytearray(w)
y = y1 - k * dy
for j in range(w):
x = x0 + j * dx
line[j] = julia(x, y)
return line
if __name__ == '__main__':
with MPIPoolExecutor() as executor:
image = executor.map(julia_line, range(h))
with open('julia.pgm', 'wb') as f:
f.write(b'P5 %d %d %d\n' % (w, h, 255))
for line in image:
f.write(line)
推荐以 1 个 MPI 进程并设置所需的 universe size 的方式使用 mpiexec 命令以启动并执行以上脚本:
$ mpiexec -n 1 -usize 17 python julia.py
注意以上 -usize 标志(或者等价的设置 MPIEXEC_UNIVERSE_SIZE 环境变量)只适用于 MPICH。对 OPenMPI,则需要设置 OMPI_UNIVERSE_SIZE 环境变量来给定 universe size。
在以上执行方式中,mpiexec 命令启动单个 MPI 进程(主进程)以执行主脚本,当需要时,mpi4py.futures 生成另外的 16 个 MPI 进程以组成一个工作进程池。主进程提交任务到工作进程池并等待其返回结果。工作进程接收来自主进程提交的任务,执行并返回结果给主进程。
另外,用户还可以以一种更加传统的方式来执行以上脚本,即一次启动所有需要的 MPI 进程。这种执行方式类似下面的命令:
$ mpiexec -n 17 python -m mpi4py.futures julia.py
此时,启动的 17 个进程会被分割成一个主进程和 16 个工作进程。主进程执行主脚本并提交任务,工作进程执行提交的任务并返回结果给主进程。
程序执行后的结果如下:
julia 集以上介绍了 mpi4py 中的 futures 模块,在下一篇中我们将介绍 mpi4py 中的 run 模块。