脑核磁共振数据处理——fMRI

Nipype学习笔记(3)——Nipype中的并行

2018-07-06  本文已影响149人  韧心222

7. 并行计算

7.1 Iterables

长期进行神经影像学数据处理的朋友应该都知道,对一批被试进行同样的数据处理是一种很常见的方式,例如要对所有的被试执行:

对于这样的基本需求,Nipype当然也帮我们准备好了一些并行处理的手段,那就是Iterables。此外,如果你还想尝试一下不同的参数对于数据处理的结果有什么影响,那么使用Gretna这样的软件通常会需要执行几遍(麻烦的是不是运行,而是需要重新设定参数,如果没有处理好,运行结果还可能彼此覆盖),这时候如果你能想到用Nipype中的Iterables,那就太好了。废话少说,先看东西:

Example 1 试验不同参数

from nipype import Node, Workflow
from nipype.interfaces.fsl import BET, IsotropicSmooth

# Initiate a skull stripping Node with BET
skullstrip = Node(
      BET(mask=True, in_file='/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz'),
      name="skullstrip")
isosmooth = Node(IsotropicSmooth(), name='iso_smooth')
isosmooth.iterables = ("fwhm", [4, 8, 16])
wf = Workflow(name="smoothflow")
wf.base_dir = "/output"
wf.connect(skullstrip, 'out_file', isosmooth, 'in_file')

# Run it in parallel (one core for each smoothing kernel)
wf.run('MultiProc', plugin_args={'n_procs': 3})

在这个例子中,整个Workflow由两个节点组成,第一个是剥头骨的节点,第二个是进行平滑化的节点,为了测试不同的平滑化参数,因此设置了一个iterables,指定了参数fwhm的不同值,分别是[4, 8, 16]。Workflow在运行时指定使用了MultiProc插件(官方文档中称之为plugin,不知道为什么要这么称呼),插件的参数为{'n_procs': 3},也就是最多可以运行三个进程(如果不指定的话好像是根据系统情况自行选择,但是没说自动选择是个啥概念)。

结果

运行之后的结果可以从上图中看出来,对应于不同的参数,会生成三个不同的文件夹。

Example 2 用同一套计算流程计算所有被试

subject_list = ['01', '02', '03', '04', '05']
from nipype import IdentityInterface
infosource = Node(IdentityInterface(fields=['subject_id']),
                  name="infosource")
infosource.iterables = [('subject_id', subject_list)]

from os.path import join as opj
from nipype.interfaces.io import SelectFiles, DataSink

anat_file = opj('sub-{subject_id}', 'ses-test', 'anat', 'sub-{subject_id}_ses-test_T1w.nii.gz')

templates = {'anat': anat_file}

selectfiles = Node(SelectFiles(templates,
                               base_directory='/data/ds000114'),
                   name="selectfiles")

# Datasink - creates output folder for important outputs
datasink = Node(DataSink(base_directory="/output",
                         container="datasink"),
                name="datasink")

wf_sub = Workflow(name="choosing_subjects")
wf_sub.connect(infosource, "subject_id", selectfiles, "subject_id")
wf_sub.connect(selectfiles, "anat", datasink, "anat_files")
wf_sub.run()

在这个例子中,是用的是IdentityInterface的iterables来处理不同的被试。这是最基本的套路,只要你想用Nipype来运行多个被试,就必然采用这种框架。

7.2 MapNode

在平时的计算中还有一种需求,就是当我们使用Iterable的时候,会生成多个输出,这个时候如果要将这些输出合并成一个列表并作为一个节点的输入(如图所示),那么该怎么办呢?这就需要我们的MapNode出场了。

还是通过一个简单的小例子来看一下:

from nipype import Function
def square_func(x):
    return x ** 2
square = Function(["x"], ["f_x"], square_func)
square.run(x=2).outputs.f_x

以上是一个计算平方的函数,我们用Function将其封装起来,这样当x=2时,square.run(x=2).outputs.f_x的输出结果就是4了。此时,如果我们用MapNode来封装Function,就可以得到一个带有迭代功能的Node了,代码如下:

from nipype import MapNode
square_node = MapNode(square, name="square", iterfield=["x"])
square_node.inputs.x = [0, 1, 2, 3]
res = square_node.run()

需要注意的是,我们在构造MapNode对象的时候,指定了一个iterfiled参数,该参数表明哪些参数是需要迭代的。例如,在该示例中,iterfield被指向为x,因此可以在设置input.x的时候将其设置为[0,1,2,3],这样一来,workflow就会在运行的时候依次将0,1,2,3代入其中并运行。

运行之后查看结果:

res.outputs.f_x

会看到结果为[0, 1, 4, 9]。如果你观察够仔细的话,会发现iterfield是一个列表,因此我们可以预见的是,其中可以迭代多个变量。

在下面这个例子中,不妨引入两个变量来看看结果:

def power_func(x, y):
    return x ** y
power = Function(["x", "y"], ["f_xy"], power_func)
power_node = MapNode(power, name="power", iterfield=["x", "y"])
power_node.inputs.x = [0, 1, 2, 3]
power_node.inputs.y = [0, 1, 2, 3]
res = power_node.run()
print(res.outputs.f_xy)

在上面这个例子中,我们将iterfield参数设置为["x", "y"],而xy的变化都是从0到4,这样一来运行之后的结果是[1, 1, 4, 27]

目前,在我接触到的处理中还不涉及这一用途,所以用的还比较少。

7.3 JoinNode

就如官方文档中所说,JoinNode的作用和Iterable完全相反,Iterable会产生多个并行的分支,但是JoinNode则会将多个分支进行合并(如下图所示)

JoinNode
其中,D就是一个JoinNode,按照我目前的理解,Iterables+JoinNode = MapNode,当然,使用Iterables+JoinNode的方式要比MapNode的方式要灵活的多,因为MapNode刚刚分手就和好了嘛,而Iterables+JoinNode却可以像以色列一样,分手千年,仍能复国。

下面的代码是上面这张图的伪代码,在编写程序的时候可以参考一下:

from nipype import Node, JoinNode, Workflow

# Specify fake input node A
a = Node(interface=A(), name="a")

# Iterate over fake node B's input 'in_file?
b = Node(interface=B(), name="b")
b.iterables = ('in_file', [file1, file2])

# Pass results on to fake node C
c = Node(interface=C(), name="c")

# Join forked execution workflow in fake node D
d = JoinNode(interface=D(),
             joinsource="b",
             joinfield="in_files",
             name="d")

# Put everything into a workflow as usual
workflow = Workflow(name="workflow")
workflow.connect([(a, b, [('subject', 'subject')]),
                  (b, c, [('out_file', 'in_file')])
                  (c, d, [('out_file', 'in_files')])
                  ])

7.3 并行中的一些参数设置

7.3.1 synchronize

synchronize的本意是同步,在Nipype中表示对于两个迭代变量,可以并行执行,这句话比较拗口,直接看图最合适不过了(都是官方的图和代码)。

b.iterables = [("m", [1, 2]), ("n", [3, 4])]
image.png

假设节点b有两个迭代变量mn,其迭代值分别是[1, 2][3, 4],如果我们不设置synchronize变量,那么正常情况下,会生成四条分支,也就是m和n值的组合:[1,3][1,4][2, 3][2,4]
一旦我们设置了synchronize变量:

b.iterables = [("m", [1, 2]), ("n", [3, 4])]
b.synchronize = True

就会生成如下的分支图:


image.png

如图所示,设置synchronize后,就会生成mn两两配对的分支。

7.3.2 itersource

itersource其实可以看成是synchronize的高级版本,能够实现更加复杂的分支控制。关于其功能,官方文档只给出了一句话的描述,对于这句话该如何翻译,我还拿捏的不是很准确,暂且先放在这里"The itersource feature allows you to expand a downstream iterable based on a mapping of an upstream iterable.",编程嘛,还是通过实际例子来看看会比较好。

a = Node(interface=A(), name="a")
b = Node(interface=B(), name="b")
b.iterables = ("m", [1, 2])
c = Node(interface=C(), name="c")
d = Node(interface=D(), name="d")
d.itersource = ("b", "m")
d.iterables = [("n", {1:[3,4], 2:[5,6]})]
my_workflow = Workflow(name="my_workflow")
my_workflow.connect([(a,b,[('out_file','in_file')]),
                     (b,c,[('out_file','in_file')])
                     (c,d,[('out_file','in_file')])
                     ])

上面代码对应的分支图如下图所示:

image.png
关键点在于节点d的设置,其先设置了itersourcebm,也就是说d节点的迭代是依赖于b节点的m值,之后又执行了代码:
d.iterables = [("n", {1:[3,4], 2:[5,6]})]

也就是说变化的是d节点的n变量,当m=1时,n值为13m=2时,n值为56

未完待续

上一篇 下一篇

猜你喜欢

热点阅读