AirFlow_本机课程v 1.0

2024-02-08  本文已影响0人  山猪打不过家猪

1.认识airflow

2. 常用命令

1.docker 查看当前运行的images,并且找到airflow

docker ps
image.png
  1. 进入到aiflow的bash命令行里
docker exec -it 1aa536546bb6 /bin/bash

3.查看目前所有的dags

airflow dags list

4.查看指定的dag信息

airflow dags list-runs -d dag_python_v6 
  1. 触发之前未运行的dag,airflow dags backfill -s 起始日期 -e 结束日期 Dag_id
airflow dags backfill -s 2024-02-06 -e 2024-02-08 dag_python_v6

6.查看指定dag的tasksairflow tasks list dag_id

airflow tasks list dag_python_v6

7.测试指定dag在某天的运行情况airflow tasks test dag_id task_id 日期

airflow tasks test forex_v1 is_forex_available 2024-02-01

3.第一个DAG

流程

image.png

什么是DAG

简单理解就是pipeline

最简单的dag的格式

from airflow import DAG
from datetime import datetime,timedelta

#不是dag的参数,而是task任务的参数
default_args = {
    'owner':'lg',
    'retries':1,
    'retry_delay':timedelta(minutes=2)
}


with DAG(
    dag_id= 'forex_v1',
    default_args=default_args,
    start_date=datetime(2024,2,9),
    schedule_interval="@daily",
    catchup=False #True会运行设定start_date之前的所有任务
) as dag:
    pass

catchup = False:True会运行设定start_date之前的所有任务,例如设置的start_date2024-2-1且是@daily执行的方式,如果今天是2024-2-9那么他会执行这8天的所有任务

什么是operator

简单理解就是task
有三种类型的operator:
1.action: 例如执行Python,bash,sql都有对应的Operator
2.transfer: 迁移转换,实现source到sink的迁移
3.sensor:sensor allow you to verify if a condition is mer or not before moving forward.实现了等待某以任务执行后,在执行另一个,例如你等待所有文件到了指定文件夹后,在执行另外一个。

1.创建一个api的sensor来check api available

  1. 创建dag
from airflow import DAG
from datetime import datetime,timedelta
from airflow.providers.http.sensors.http import HttpSensor

#不是dag的参数,而是task任务的参数
default_args = {
    'owner':'lg',
    'retries':1,
    'retry_delay':timedelta(minutes=2)
}


with DAG(
    dag_id= 'forex_v1',
    default_args=default_args,
    start_date=datetime(2024,2,9),
    schedule_interval="@daily",
    catchup=False #True会运行设定start_date之前的所有任务
) as dag:
    is_forex_available = HttpSensor(
        task_id ='is_forex_available' , 
        http_conn_id='forex_api',  #必须和页面里connection的设置一样
        endpoint= 'marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b',
        response_check=lambda response: 'rate' in response.text,
        poke_interval = 5,
        timeout = 10
    )


    is_forex_available

2.去客户端设置connections


image.png

3.测试

airflow@1aa536546bb6:/opt/airflow$ airflow tasks test forex_v1 is_forex_available 2024-02-08

2.使用file sensor检测文件是否存在

1.添加文件检测的task

    #检测文件是否存在的sensor
    is_forex_csv_available = FileSensor(
        task_id = 'is_forex_csv_available',
        fs_conn_id='forex_path',
        filepath='forex_currencies.csv',
        poke_interval = 5, #检测链接是否可以用5s一次
        timeout = 10  #链接超时时间
    )

3.查看flies的位置

airflow@1aa536546bb6:/opt/airflow$ ls

4.设置好connecions


image.png

3.使用python operator执行下载csv的程序

官方文档地址
1.添加python的下载程序

def download_rates():
  pass

2.添加Python operator

    #python operator用来执行python程序
    down_load_csv = PythonOperator(
        task_id = 'down_load_csv',
        python_callable= download_rates, #python function的name;
        ## op_kwargs={'age':20} #用来传递参数,这里无参,不用传递
    )
  1. 执行测试,测试成功后,文件将会被下载
airflow tasks test forex_v1 down_load_csv 2024-02-01
image.png

4.完整代码

import csv
import requests
import json
from datetime import datetime,timedelta

from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor #http sensor
from airflow.sensors.filesystem import FileSensor #file sensor
from airflow.operators.python import PythonOperator #引入python operator

#不是dag的参数,而是task任务的参数
default_args = {
    'owner':'lg',
    'retries':1,
    'retry_delay':timedelta(minutes=2)
}

def download_rates():
    BASE_URL = "https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/"
    ENDPOINTS = {
        'USD': 'api_forex_exchange_usd.json',
        'EUR': 'api_forex_exchange_eur.json'
    }
    with open('/opt/airflow/dags/__pycache__/dags/files/forex_currencies.csv') as forex_currencies:
        reader = csv.DictReader(forex_currencies, delimiter=';')
        for idx, row in enumerate(reader):
            base = row['base']
            with_pairs = row['with_pairs'].split(' ')
            indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json()
            outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
            for pair in with_pairs:
                outdata['rates'][pair] = indata['rates'][pair]
            with open('/opt/airflow/dags/__pycache__/dags/files/forex_rates.json', 'a') as outfile:
                json.dump(outdata, outfile)
                outfile.write('\n')

with DAG(
    dag_id= 'forex_v1',
    default_args=default_args,
    start_date=datetime(2024,2,9),
    schedule_interval="@daily",
    catchup=False #True会运行设定start_date之前的所有任务
) as dag:
    #检测api是否可用的sensor
    is_forex_api_available = HttpSensor(
        task_id ='is_forex_api_available' , #id
        http_conn_id='forex_api',  #设置链接ID
        endpoint= 'marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b', #域名具体地址
        response_check=lambda response: 'rate' in response.text, #检测是否存在rate字段
        poke_interval = 5, #检测链接是否可以用5s一次
        timeout = 10  #链接超时时间
    )

    #检测文件是否存在的sensor
    is_forex_csv_available = FileSensor(
        task_id = 'is_forex_csv_available',
        fs_conn_id='forex_path',
        filepath='forex_currencies.csv',
        poke_interval = 5, #检测链接是否可以用5s一次
        timeout = 10  #链接超时时间
    )

    #python operator用来执行python程序
    down_load_csv = PythonOperator(
        task_id = 'down_load_csv',
        python_callable= download_rates, #python function的name;
        ## op_kwargs={'age':20} #用来传递参数,这里无参,不用传递
    )
  1. 改进,使用好的项目结构


    image.png

4.使用bash operator将文件存储到hdfs里

一般的数据流的文件很大, 我们应该将数据存储去datalake里,将文件

from airflow.operators.bash import BashOperator

    #使用bash operator将json文件存储到hdfs里
    saving_rates_to_hdfs = BashOperator(
        task_id = 'saving_rates_to_hdfs',
        bash_command="""
            hdfs dfs -mkdir /forex && \
            hdfs dfs -put -f $AIRFLOW_HOME/dags/files/forex_rates.json /forex
        """
    )

5.将数据存放区hive的表里

pass

6. 用spark处理数据

airflow只是一个调度工具,所有的大数据处理都应该用相应的工具完成,所以这里我们需要直接spark处理,
原始版本的airflow是没有处理spark的包的需要我们自己安装
1.新建Dockerfile

FROM apache/airflow:2.8.1
COPY requirements.txt requirements.txt
RUN pip install --user --upgrade pip
RUN pip install apache-airflow-providers-apache-spark
RUN pip install -r requirements.txt
  1. 查看当前的images
docker images
  1. build 新的images,不能和上面的重名
docker build . --tag extending_airflow:latest

4.安装完成后,引入operator,添加task

    #将数据用spark处理
    forex_processing= SparkSubmitOperator(
        task_id = "forex_processing",
        application="/opt/airflow/dags/source/processing_by_spark.py",#必须是.py文件
        conn_id="spark_conn",
        verbose=False
    )

5.然后编写spark处理的文件

from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json

warehouse_location = abspath('spark-warehouse')

# Initialize Spark Session
spark = SparkSession \
    .appName("Forex processing") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

# Read the file forex_rates.json from the HDFS
df = spark.read.json('hdfs://namenode:9000/forex/forex_rates.json')

# Drop the duplicated rows based on the base and last_update columns
forex_rates = df.select('base', 'last_update', 'rates.eur', 'rates.usd', 'rates.cad', 'rates.gbp', 'rates.jpy', 'rates.nzd') \
    .dropDuplicates(['base', 'last_update']) \
    .fillna(0, subset=['EUR', 'USD', 'JPY', 'CAD', 'GBP', 'NZD'])

# Export the dataframe into the Hive table forex_rates
forex_rates.write.mode("append").insertInto("forex_rates")

7.任务失败发送邮件

/opt/airflow$ airflow config list --defaults >  ${AIRFLOW_PROJ_DIR:-.}/config/airflow.cfg
    #添加email operator
    send_email = EmailOperator(
        task_id = "send_email",
        to = "airflow@gmail.com",
        subject= "forex_pipeline_status",
        html_content="<h>forex_pipeline_fail</h>"
    )

8.设置slack提醒

pass

4.DAG的设置

4.1DAG的时间设置

夏令时,如果你想让dag在UTC早上5点执行的话,需要设置到4点,其实际运行时间是,NZ表显的UTC 5点中

4.2 Dag之间的依赖关系

如果一个DAG有3个任务,第一天运行正常,第二天运行task2失败了,导致之后的task都没运行,第三天,第四天又成功了,第五天因为task2任务失败,又运行成功,这样会导致程序非常不稳定,所以我们可以设置
depends_on_past = True的话,如果任何一天的task2失败了,之后的所有日期都不会执行,直到task2的问题解决。

4.3 管理Dag文件夹

将需要运行的dag和需要处理的scripts脚本文件一起打包,注意scripts和my_dags.py都是在同一个目录下,scripts文件夹里应该有__init__文件

zip -rm package_dag.zip my_dag.py scripts/

5. Dag出错处理

添加超时

with DAG(
    dag_id = 'our_first_dag_lg_v1',
    default_args= _default_args,
    start_date = datetime(2024,2,3,1),
    schedule_interval = '@daily',
    dagrun_timeout= timedelta(seconds=30) #添加timeout根据dag的平均运行时间
) as dag:

添加失败后的回调函数

def _on_success_callback(dict):
    print("on_success_callback")
    print(dict)

def _on_failure_callback(dict):
    print("on_success_callback")
    print(dict)

with DAG(
    dag_id = 'our_first_dag_lg_v1',
    default_args= _default_args,
    start_date = datetime(2024,2,3,1),
    schedule_interval = '@daily',
    dagrun_timeout= timedelta(seconds=30), #添加timeout根据dag的平均运行时间
    on_success_callback=_on_success_callback, #成功后的回调函数
    on_failure_callback=_on_failure_callback  #失败后的回调函数
    
) as dag:

添加失败后的retry

_default_args= {
    'owner':'lg',
    'retries':'2',  #retry次数
    'retry_delay':timedelta(minutes=3) #retry的delay
}

添加邮件发送

_default_args= {
    'owner':'lg',
    'retries':'2',  #retry次数
    'retry_delay':timedelta(minutes=3), #retry的delay
    'emails':['pjj@gmail.com'],
    'email_on_failure':True,
    'email_on_retry':False
}

测试DAGs

pytest学习

6. 变量

variable

1.在页面里添加变量


image.png

2.获取变量

from airflow.models import Variable

support_email = Variable.get("support_email"),

xcom 传递dag之间的数据

上传xcom的2种方式
def _training_model():
    accuracy = uniform(0.1, 10.0)
    print(f"model's accuracy: {accuracy}")
    #push xcom
    return accuracy
image.png
def _training_model(ti):
    accuracy = uniform(0.1, 10.0)
    print(f"model's accuracy: {accuracy}")
    ti.xcom_push(key='model_accuracy',value =accuracy)
image.png
    downloading_data = BashOperator(
        task_id='downloading_data',
        bash_command='sleep 3',
        do_xcom_push = False
    )
image.png
拉去xcom的值
def _choose_best_model(ti):
    print('choose best model')
    #一个id用task_id,多个id用task_ids
    all_accuracies = ti.xcom_pull(key='model_accuracy',task_ids =['training_model_A','training_model_B','training_model_C'])
    print(all_accuracies)

7.发送邮件

from airflow.operators.email import EmailOperator

    send_email = EmailOperator(
        task_id = 'send_email',
        to = 'qq394967886@163.com',
        subject='dag error',
        html_content="""
            <h3>Dag has error</h3>
                """
    )

8. airflow dependency

1.使用dataset建立denpendency

使用dataset可以直观的通过数据集来建立dag之间的依赖关系。
例如:现在有2个dag, dag_A的功能是更新两个s3的数据集,dag_B的任务需要dag_A两个数据集都更新后才可以执行,如果使用传统的方式会比较麻烦,所以我们使用新的方式
producer.py 用来处理2个数据集的dag

from airflow.decorators import dag, task
from datetime import datetime

data_a = Dataset("s3://bucket_a/data_a") #数据集1
data_b = Dataset("s3://bucket_b/data_b") #数据集2

@dag(start_date=datetime(2023, 1 ,1), schedule='@daily', catchup=False)
def producer():

    @task(outlets=[data_a])
    def update_a():
        print("update A")

    @task(outlets=[data_b])
    def update_b():
        print("update B")

    update_a() >> update_b()

producer()

costomer.py使用处理好的2个数据集,进行下一步任务的dag

from airflow.decorators import dag, task
from datetime import datetime

data_a = Dataset("s3://bucket_a/data_a")
data_b = Dataset("s3://bucket_b/data_b")

@dag(start_date=datetime(2023, 1 ,1), schedule=[data_a, data_b], catchup=False)
def consumer():

    @task
    def run():
        print('run')

    run()

consumer()

这里的schedule=[data_a, data_b]就是必须等待2个数据集outlets输出后,才能执行这个

2.TriggerDagRunOperator

假设:有一个dag_A和dag_B,其中dag_A里面有3个task分别是task1获取API得数据,task2 trigger dag_B,和保存数据 ,dag_ B则是一个down_load数据的task里面也有众多的task,可能执行5分钟,也可能执行十几分钟,只想完dag_B之后,需要在dag_A里,清洗数据

    #一般情况下,下面的参数都需要
    trigger_download = TriggerDagRunOperator(
        task_id='trigger_target',
        trigger_dag_id='down_loading_v1',
        execution_date='{{ ds }}', #只有2个dag在相同日期的时候才能执行
        reset_dag_run=True, #允许同一个日期多次执行
        wait_for_completion=True, #等待down_loading_v1完成后,执行后面的dag,如果不加的话后面的storagin等三个任务会一直执行
        poke_interval= 10  #检测down_loading_v1是否完成10s一次
    )

down_load_data.py:用来下载的程序,里面也可以多个task

from airflow import DAG
from airflow.decorators import dag,task
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

import time
from datetime import datetime,timedelta

_default_args= {
    'owner':'lg',
    'retries':'2',  #retry次数
    'retry_delay':timedelta(minutes=3), #retry的delay
}

@dag(_default_args)
def down_loading_v1():
    @task
    def down_load_ali_data():
        time.sleep(10)
        print('down_loading_ali_data successed')

    down_load_ali_data()

down_loading_v1()

pl_ali_data.py:data pipeline 数据下载,迁移,存储,清洗的dag

from airflow import DAG
from airflow.decorators import dag,task
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from datetime import datetime,timedelta

_default_args= {
    'owner':'lg',
    'retries':'2',  #retry次数
    'retry_delay':timedelta(minutes=3), #retry的delay
}

def _get_api():
    print('getting down_load data api')

def _storaging():
    print('Storaging data from down_loading_dag ')

def _cleaning():
    print('Clearning data')


with DAG(
    dag_id='pl_ali_data_v1',
    start_date=datetime(2024,2,12),
    schedule='@daily',
    catchup=False,
    default_args= _default_args
) as dag:

    get_api_task = PythonOperator(
        task_id= 'get_api_task',
        python_callable=_get_api,
    )

    storaging_task = PythonOperator(
        task_id= 'storaging_task',
        python_callable=_storaging,
    )

    cleaning_task = PythonOperator(
        task_id= 'cleaning_task',
        python_callable=_cleaning,
    )

    #一般情况下,下面的参数都需要
    trigger_download = TriggerDagRunOperator(
        task_id='trigger_target',
        trigger_dag_id='down_loading_v1',
        execution_date='{{ ds }}', #只有2个dag在相同日期的时候才能执行
        reset_dag_run=True, #允许同一个日期多次执行
        wait_for_completion=True, #等待down_loading_v1完成后,执行后面的dag,如果不加的话后面的storagin等三个任务会一直执行
        poke_interval= 10  #检测down_loading_v1是否完成10s一次
    )

    get_api_task >> trigger_download >> storaging_task >> cleaning_task

注意:①装饰器的写法和普通的写法不能混用;

3. depend_on_past = True

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta


default_args = {
    'start_date': datetime(2024, 1, 1),
    'owner': 'lg'
}

def second_task():
    print('Hello from second_task')
    # raise ValueError('This will turns the python task in failed state')

def third_task():
    print('Hello from third_task')
    # raise ValueError('This will turns the python task in failed state')

with DAG(dag_id='depends_task', schedule_interval="@daily", default_args=default_args,catchup=False) as dag:
    
    # Task 1
    bash_task_1 = BashOperator(task_id='bash_task_1', bash_command="echo 'first task'")
    
    # Task 2
    python_task_2 = PythonOperator(task_id='python_task_2', python_callable=second_task,depends_on_past = True)

    # Task 3
    python_task_3 = PythonOperator(task_id='python_task_3', python_callable=third_task)

    bash_task_1 >> python_task_2 >> python_task_3

4. wait_for_downstream = True

设置task2的wait_for_downstream = True,如果任何一个task出现了问题,将不会在执行包括task2之后的所有task,但是task1会执行

    python_task_2 = PythonOperator(task_id='python_task_2', python_callable=second_task,wait_for_downstream= True)

5. trigger rules

在Apache Airflow中,trigger_rule参数用于指定任务的触发规则,决定了任务何时被触发。下面是Airflow中常用的触发规则(trigger_rule):

  1. all_success: 所有父任务都成功完成时触发。
  2. all_failed: 所有父任务都失败时触发。
  3. all_done: 所有父任务都完成时触发(无论成功或失败)。
  4. one_success: 至少有一个父任务成功完成时触发。
  5. one_failed: 至少有一个父任务失败时触发。
  6. dummy: 永远不触发,这在构建DAG时很有用,以防止不希望被触发的任务被错误地执行。
import airflow
from airflow.models import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator

default_args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(1),
}

def download_website_a():
    # print("download_website_a")
    raise ValueError("error")

def download_website_b():
    print("download_website_b")
    #raise ValueError("error")

def download_failed():
    print("download_failed")
    #raise ValueError("error")

def download_status():
    print('This is download status')

def download_succeed():
    print("download_succeed")
    #raise ValueError("error")

def process():
    print("process data")
    #raise ValueError("error")

def notif_a():
    print("all_success")
    #raise ValueError("error")

def notif_b():
    print("one_failed")
    #raise ValueError("error")

with DAG(dag_id='trigger_rules', 
    default_args=default_args, 
    schedule_interval="@daily") as dag:

    download_status_task = PythonOperator(
        task_id='download_status',
        python_callable=download_status,
        trigger_rule = 'all_success'
    )

    download_website_a_task = PythonOperator(
        task_id='download_website_a',
        python_callable=download_website_a,
    )

    download_website_b_task = PythonOperator(
        task_id='download_website_b',
        python_callable=download_website_b,
    )

    process_task = PythonOperator(
        task_id='process',
        python_callable=process,
        trigger_rule = 'one_success'
    )

    notif_all_success_task = PythonOperator(
        task_id='notif_all_success_task',
        python_callable=notif_a,
        trigger_rule = 'all_success'
    )

    notif_one_failed_task = PythonOperator(
        task_id='notif_one_failed_task',
        python_callable=notif_b,
        trigger_rule = 'one_failed'
    )

    [download_website_a_task,download_website_b_task] >> download_status_task >> process_task>>[notif_all_success_task,notif_one_failed_task]
    

6. ExternalTaskSensor

-假设有两个DAG,DAG A和DAG B。DAG A的某个任务(task)需要等待DAG B中的某个任务(task)完成后才能执行,那么就可以在DAG A中使用ExternalTaskSensor来等待DAG B的特定任务完成。task和task之间的依赖,所以并不需要等待另一个dag完全执行成功,只需要里面的一个task完成即可。TriggerRunOperator则是dag之间的依赖

image.png
import airflow.utils.dates
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

default_args = {
        "owner": "airflow", 
        "start_date": airflow.utils.dates.days_ago(1)
    }

with DAG(dag_id="externaltasksensor_dag", default_args=default_args, schedule_interval="@daily") as dag:
    sensor = ExternalTaskSensor(
        task_id='sensor',  
        external_dag_id='sleep_dag',
        external_task_id='t2'    
    )

    last_task = DummyOperator(task_id="last_task")

    sensor >> last_task
上一篇下一篇

猜你喜欢

热点阅读