自定义operator

2019-10-16  本文已影响0人  john瀚

默认$AIRFLOW_HOME/plugins存放定义的plugins,自定义组件。可以自定义operator,hook等等。我们希望可以直接使用这种模式定义机器学习的一个算子。下面定义了一个简单的加法算子。

# -*- coding: UTF-8 -*-
# !/usr/bin/env python

from airflow.plugins_manager import AirflowPlugin
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

# Will show up under airflow.operators.plus_plugin.PluginOperator
class PlusOperator(BaseOperator):

    @apply_defaults
    def __init__(self, op_args=None, params=None, provide_context=False, set_context=False, *args, **kwargs):
        super(PlusOperator, self).__init__(*args, **kwargs)
        self.params = params or {}
        self.set_context = set_context

    def execute(self, context):
        if self.provide_context:
            context.update(self.op_kwargs)
            self.op_kwargs = context

        puls = self.op_kwargs['a'] + self.op_kwargs['b']
        print "a =", self.op_kwargs['a'], ". b=", self.op_kwargs['a']
        return_value = self.main()
        context[self.task_id].xcom_push(key='return_value', value=return_value)
        return puls


# Defining the plugin class
class PlusPlugin(AirflowPlugin):
    name = "plus_plugin"
    operators = [PlusOperator]

在dag中使用案例如下

from airflow.operators.plus_plugin import PlusOperator
plus_task = PlusOperator(task_id='plus_task', provide_context=True, params={'a': 1,'b':2},dag=dag)
上一篇 下一篇

猜你喜欢

热点阅读