luigi使用 - multiple pipeline
2018-07-05 本文已影响0人
_Megamind_
一般地,通常使用luigi框架搭建流程都是只有一个pipeline(暂时没有通过google找到有多个Pipeline的教程)
由于工作需要,需要把之前写好的多个流程串联起来作为一个总的pipeline,并且各个pipeline之间有一定的依赖关系
假设有 pipeline_1, pipeline_2, pipeline_3 三个子流程(可单独运行),结构如下:
class TaskSon(luigi.Task):
def run(self):
pass
def output(self):
return luigi.LocalTarget("tmp")
class workflow(luigi.Task):
def required(self):
return TaskSon()
同时,有主流程main_pipeline,结构如下:
class Pipeline1_Task(luigi.Task):
def run(self):
# 执行子流程 pipeline_1
pass
def output(self):
# 返回子流程 pipeline_1 的输出
pass
class Pipeline2_Task(luigi.Task):
def required(self):
# 依赖于子流程 pipeline_1 的输出
return Pipeline1_Task()
def run(self):
# 执行子流程 pipeline_2
pass
def output():
# 返回子流程 pipeline_2 的输出
pass
class Pipeline3_Task(luigi.Task):
def required(self):
# 依赖于子流程 pipeline_2 的输出
return Pipeline2_Task()
def run(self):
# 执行子流程 pipeline_3
pass
def output(self):
# 返回子流程 pipeline_3 的输出
pass
class workflow(luigi.Task):
def required(self):
return Pipeline3_Task()
Screenshot.png
这里需要考虑一个问题
- 如何将子流程的输入输出跟主流程中对应任务的输入输出对接
为了解决这个问题,首先需要考虑,如何将子流程中所有任务的输出反馈到主流程
-
一般地,流程的结构设计都是有一个主入口(workflow),由主入口任务(在required方法中)初始化并启动其他任务
-
那么,就需要在workflow任务中把整个流程中其他任务的输出作为一个整体输出:
class workflow(luigi.Task):
def required(self):
return [otherTask()]
def output(self):
# *** 这样就可以将主入口所依赖的所有其他任务的输出返回 ***
return self.input()
既然能够获取到子流程中所有任务的总输出,那么就需要考虑把输出反馈给主流程
- 考虑到workflow任务获取其他任务的总输出的方法,可以直接将workflow的output方法跟对应主流程任务的output方法结合:
class Pipeline1_Task()
def output(self):
# *** 这样子流程的output就会跟主流程任务的output对接 ***
# 同时,这样处理在主流程启动时,luigi框架依旧是会检查子流程的输出是否已经完整
from pipeline1 import workflow as pipeline1
return pipeline1().output()
- 至于主流程中的任务的依赖就比较容易处理了:
class Pipeline2_Task(luigi.Task):
def required(self):
# 由于Pipeline1_Task的输出即为子流程pipeline_1的输出,所以这里luigi会检查到子流程pipeline_1的输出是否完整
return Pipeline1_Task()
子流程的输入输出已经可以跟主流程的输入输出对应上了,那么就需要考虑如何怎么运行子流程
这里是没有办法通过pipeline1.workflow().run()直接执行,因为入口任务是没有重载run方法
- 所以,这里把子流程作为一个黑箱执行:
class Pipeline1_Task(luigi.Task):
def run(self):
from pipeline1 import workflow as pipeline1
# *** 黑箱 ***
luigi.Build([pipeline1()])
由于需要确保主流程中的任务“挂载”的是统一的一个子流程,则可以定义一个变量来储存子流程对象
class Pipeline1_Task(luigi.Task):
pipeline = None
def run(self):
luigi.Build([self.pipeline])
def output(self):
# 由于每个任务在流程中优先执行的是output方法(当任务被依赖的时候luigi会利用output方法检查输出的完整性),所以self.pipeline的初始化应该在output方法内执行
from pipeline1 import workflow as pipeline1
self.pipeline = pipeline1()
return pipeline1.output()
这样,就可以完整地把子流程装载到主流程的任务中