luigi使用 - multiple pipeline

2018-07-05  本文已影响0人  _Megamind_

一般地,通常使用luigi框架搭建流程都是只有一个pipeline(暂时没有通过google找到有多个Pipeline的教程)

由于工作需要,需要把之前写好的多个流程串联起来作为一个总的pipeline,并且各个pipeline之间有一定的依赖关系


假设有 pipeline_1, pipeline_2, pipeline_3 三个子流程(可单独运行),结构如下:

  class TaskSon(luigi.Task):
    def run(self):
      pass
    def output(self):
      return luigi.LocalTarget("tmp")

  class workflow(luigi.Task):
    def required(self):
      return TaskSon()

同时,有主流程main_pipeline,结构如下:

  class Pipeline1_Task(luigi.Task):
    def run(self):
      # 执行子流程 pipeline_1
      pass
    def output(self):
      # 返回子流程 pipeline_1 的输出
      pass

  class Pipeline2_Task(luigi.Task):
    def required(self):
      # 依赖于子流程 pipeline_1 的输出
      return Pipeline1_Task()
    def run(self):
      # 执行子流程 pipeline_2
      pass
    def output():
      # 返回子流程 pipeline_2 的输出
      pass

  class Pipeline3_Task(luigi.Task):
    def required(self):
      # 依赖于子流程 pipeline_2 的输出
      return Pipeline2_Task()
    def run(self):
      # 执行子流程 pipeline_3
      pass
    def output(self):
      # 返回子流程 pipeline_3 的输出
      pass

  class workflow(luigi.Task):
    def required(self):
      return Pipeline3_Task()
Screenshot.png

这里需要考虑一个问题

  • 如何将子流程的输入输出跟主流程中对应任务的输入输出对接

为了解决这个问题,首先需要考虑,如何将子流程中所有任务的输出反馈到主流程

  class workflow(luigi.Task):
    def required(self):
      return [otherTask()]
    def output(self):
      # *** 这样就可以将主入口所依赖的所有其他任务的输出返回 ***
      return self.input()

既然能够获取到子流程中所有任务的总输出,那么就需要考虑把输出反馈给主流程

  class Pipeline1_Task()
    def output(self):
      # *** 这样子流程的output就会跟主流程任务的output对接 ***
      # 同时,这样处理在主流程启动时,luigi框架依旧是会检查子流程的输出是否已经完整
      from pipeline1 import workflow as pipeline1
      return pipeline1().output()
  class Pipeline2_Task(luigi.Task):
    def required(self):
      # 由于Pipeline1_Task的输出即为子流程pipeline_1的输出,所以这里luigi会检查到子流程pipeline_1的输出是否完整
      return Pipeline1_Task()

子流程的输入输出已经可以跟主流程的输入输出对应上了,那么就需要考虑如何怎么运行子流程

这里是没有办法通过pipeline1.workflow().run()直接执行,因为入口任务是没有重载run方法

  class Pipeline1_Task(luigi.Task):
    def run(self):
      from pipeline1 import workflow as pipeline1
      # *** 黑箱 ***
      luigi.Build([pipeline1()])

由于需要确保主流程中的任务“挂载”的是统一的一个子流程,则可以定义一个变量来储存子流程对象

  class Pipeline1_Task(luigi.Task):
    pipeline = None
    def run(self):
      luigi.Build([self.pipeline])
    def output(self):
      # 由于每个任务在流程中优先执行的是output方法(当任务被依赖的时候luigi会利用output方法检查输出的完整性),所以self.pipeline的初始化应该在output方法内执行
      from pipeline1 import workflow as pipeline1
      self.pipeline = pipeline1()
      return pipeline1.output()

这样,就可以完整地把子流程装载到主流程的任务中

上一篇下一篇

猜你喜欢

热点阅读