AirFlow_本机课程v 1.0
1.认识airflow
-
airflow的组成
image.png -
operator的分类
image.png
2. 常用命令
1.docker 查看当前运行的images,并且找到airflow
docker ps
image.png
- 进入到aiflow的bash命令行里
docker exec -it 1aa536546bb6 /bin/bash
3.查看目前所有的dags
airflow dags list
4.查看指定的dag信息
airflow dags list-runs -d dag_python_v6
- 触发之前未运行的dag,
airflow dags backfill -s 起始日期 -e 结束日期 Dag_id
airflow dags backfill -s 2024-02-06 -e 2024-02-08 dag_python_v6
6.查看指定dag的tasksairflow tasks list dag_id
airflow tasks list dag_python_v6
7.测试指定dag在某天的运行情况airflow tasks test dag_id task_id 日期
airflow tasks test forex_v1 is_forex_available 2024-02-01
3.第一个DAG
流程
image.png什么是DAG
简单理解就是pipeline
最简单的dag的格式
from airflow import DAG
from datetime import datetime,timedelta
#不是dag的参数,而是task任务的参数
default_args = {
'owner':'lg',
'retries':1,
'retry_delay':timedelta(minutes=2)
}
with DAG(
dag_id= 'forex_v1',
default_args=default_args,
start_date=datetime(2024,2,9),
schedule_interval="@daily",
catchup=False #True会运行设定start_date之前的所有任务
) as dag:
pass
catchup = False
:True会运行设定start_date之前的所有任务,例如设置的start_date
是2024-2-1
且是@daily
执行的方式,如果今天是2024-2-9
那么他会执行这8天的所有任务
什么是operator
简单理解就是task
有三种类型的operator:
1.action: 例如执行Python,bash,sql都有对应的Operator
2.transfer: 迁移转换,实现source到sink的迁移
3.sensor:sensor allow you to verify if a condition is mer or not before moving forward.实现了等待某以任务执行后,在执行另一个,例如你等待所有文件到了指定文件夹后,在执行另外一个。
1.创建一个api的sensor来check api available
- api的网址是
https://gist.github.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b
- 创建dag
from airflow import DAG
from datetime import datetime,timedelta
from airflow.providers.http.sensors.http import HttpSensor
#不是dag的参数,而是task任务的参数
default_args = {
'owner':'lg',
'retries':1,
'retry_delay':timedelta(minutes=2)
}
with DAG(
dag_id= 'forex_v1',
default_args=default_args,
start_date=datetime(2024,2,9),
schedule_interval="@daily",
catchup=False #True会运行设定start_date之前的所有任务
) as dag:
is_forex_available = HttpSensor(
task_id ='is_forex_available' ,
http_conn_id='forex_api', #必须和页面里connection的设置一样
endpoint= 'marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b',
response_check=lambda response: 'rate' in response.text,
poke_interval = 5,
timeout = 10
)
is_forex_available
2.去客户端设置connections
image.png
3.测试
airflow@1aa536546bb6:/opt/airflow$ airflow tasks test forex_v1 is_forex_available 2024-02-08
2.使用file sensor检测文件是否存在
1.添加文件检测的task
#检测文件是否存在的sensor
is_forex_csv_available = FileSensor(
task_id = 'is_forex_csv_available',
fs_conn_id='forex_path',
filepath='forex_currencies.csv',
poke_interval = 5, #检测链接是否可以用5s一次
timeout = 10 #链接超时时间
)
3.查看flies的位置
airflow@1aa536546bb6:/opt/airflow$ ls
4.设置好connecions
image.png
3.使用python operator执行下载csv的程序
官方文档地址
1.添加python的下载程序
def download_rates():
pass
2.添加Python operator
#python operator用来执行python程序
down_load_csv = PythonOperator(
task_id = 'down_load_csv',
python_callable= download_rates, #python function的name;
## op_kwargs={'age':20} #用来传递参数,这里无参,不用传递
)
- 执行测试,测试成功后,文件将会被下载
airflow tasks test forex_v1 down_load_csv 2024-02-01
image.png
4.完整代码
import csv
import requests
import json
from datetime import datetime,timedelta
from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor #http sensor
from airflow.sensors.filesystem import FileSensor #file sensor
from airflow.operators.python import PythonOperator #引入python operator
#不是dag的参数,而是task任务的参数
default_args = {
'owner':'lg',
'retries':1,
'retry_delay':timedelta(minutes=2)
}
def download_rates():
BASE_URL = "https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/"
ENDPOINTS = {
'USD': 'api_forex_exchange_usd.json',
'EUR': 'api_forex_exchange_eur.json'
}
with open('/opt/airflow/dags/__pycache__/dags/files/forex_currencies.csv') as forex_currencies:
reader = csv.DictReader(forex_currencies, delimiter=';')
for idx, row in enumerate(reader):
base = row['base']
with_pairs = row['with_pairs'].split(' ')
indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json()
outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
for pair in with_pairs:
outdata['rates'][pair] = indata['rates'][pair]
with open('/opt/airflow/dags/__pycache__/dags/files/forex_rates.json', 'a') as outfile:
json.dump(outdata, outfile)
outfile.write('\n')
with DAG(
dag_id= 'forex_v1',
default_args=default_args,
start_date=datetime(2024,2,9),
schedule_interval="@daily",
catchup=False #True会运行设定start_date之前的所有任务
) as dag:
#检测api是否可用的sensor
is_forex_api_available = HttpSensor(
task_id ='is_forex_api_available' , #id
http_conn_id='forex_api', #设置链接ID
endpoint= 'marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b', #域名具体地址
response_check=lambda response: 'rate' in response.text, #检测是否存在rate字段
poke_interval = 5, #检测链接是否可以用5s一次
timeout = 10 #链接超时时间
)
#检测文件是否存在的sensor
is_forex_csv_available = FileSensor(
task_id = 'is_forex_csv_available',
fs_conn_id='forex_path',
filepath='forex_currencies.csv',
poke_interval = 5, #检测链接是否可以用5s一次
timeout = 10 #链接超时时间
)
#python operator用来执行python程序
down_load_csv = PythonOperator(
task_id = 'down_load_csv',
python_callable= download_rates, #python function的name;
## op_kwargs={'age':20} #用来传递参数,这里无参,不用传递
)
-
改进,使用好的项目结构
image.png
4.使用bash operator将文件存储到hdfs里
一般的数据流的文件很大, 我们应该将数据存储去datalake里,将文件
from airflow.operators.bash import BashOperator
#使用bash operator将json文件存储到hdfs里
saving_rates_to_hdfs = BashOperator(
task_id = 'saving_rates_to_hdfs',
bash_command="""
hdfs dfs -mkdir /forex && \
hdfs dfs -put -f $AIRFLOW_HOME/dags/files/forex_rates.json /forex
"""
)
5.将数据存放区hive的表里
pass
6. 用spark处理数据
airflow只是一个调度工具,所有的大数据处理都应该用相应的工具完成,所以这里我们需要直接spark处理,
原始版本的airflow是没有处理spark的包的需要我们自己安装
1.新建Dockerfile
FROM apache/airflow:2.8.1
COPY requirements.txt requirements.txt
RUN pip install --user --upgrade pip
RUN pip install apache-airflow-providers-apache-spark
RUN pip install -r requirements.txt
- 查看当前的images
docker images
- build 新的images,不能和上面的重名
docker build . --tag extending_airflow:latest
4.安装完成后,引入operator,添加task
#将数据用spark处理
forex_processing= SparkSubmitOperator(
task_id = "forex_processing",
application="/opt/airflow/dags/source/processing_by_spark.py",#必须是.py文件
conn_id="spark_conn",
verbose=False
)
5.然后编写spark处理的文件
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
warehouse_location = abspath('spark-warehouse')
# Initialize Spark Session
spark = SparkSession \
.appName("Forex processing") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# Read the file forex_rates.json from the HDFS
df = spark.read.json('hdfs://namenode:9000/forex/forex_rates.json')
# Drop the duplicated rows based on the base and last_update columns
forex_rates = df.select('base', 'last_update', 'rates.eur', 'rates.usd', 'rates.cad', 'rates.gbp', 'rates.jpy', 'rates.nzd') \
.dropDuplicates(['base', 'last_update']) \
.fillna(0, subset=['EUR', 'USD', 'JPY', 'CAD', 'GBP', 'NZD'])
# Export the dataframe into the Hive table forex_rates
forex_rates.write.mode("append").insertInto("forex_rates")
7.任务失败发送邮件
- 需要配置
airflow.cfg
文件,但是docker里是没有的,我们需要将docker环境下的airflow.cfg
重定向到本地
/opt/airflow$ airflow config list --defaults > ${AIRFLOW_PROJ_DIR:-.}/config/airflow.cfg
- 设置
airflow.cfg
image.png - 添加email的Operator
#添加email operator
send_email = EmailOperator(
task_id = "send_email",
to = "airflow@gmail.com",
subject= "forex_pipeline_status",
html_content="<h>forex_pipeline_fail</h>"
)
8.设置slack提醒
pass
4.DAG的设置
4.1DAG的时间设置
夏令时,如果你想让dag在UTC早上5点执行的话,需要设置到4点,其实际运行时间是,NZ表显的UTC 5点中
4.2 Dag之间的依赖关系
如果一个DAG有3个任务,第一天运行正常,第二天运行task2失败了,导致之后的task都没运行,第三天,第四天又成功了,第五天因为task2任务失败,又运行成功,这样会导致程序非常不稳定,所以我们可以设置
depends_on_past = True
的话,如果任何一天的task2失败了,之后的所有日期都不会执行,直到task2的问题解决。
4.3 管理Dag文件夹
将需要运行的dag和需要处理的scripts脚本文件一起打包,注意scripts和my_dags.py都是在同一个目录下,scripts文件夹里应该有__init__
文件
zip -rm package_dag.zip my_dag.py scripts/
5. Dag出错处理
添加超时
- 比如一个dag平时运行的时间是15s,组多不会超过25s,我们就可以设置dag的超时时间
dagrun_timeout = timedelta(seconds = 30)
with DAG(
dag_id = 'our_first_dag_lg_v1',
default_args= _default_args,
start_date = datetime(2024,2,3,1),
schedule_interval = '@daily',
dagrun_timeout= timedelta(seconds=30) #添加timeout根据dag的平均运行时间
) as dag:
添加失败后的回调函数
def _on_success_callback(dict):
print("on_success_callback")
print(dict)
def _on_failure_callback(dict):
print("on_success_callback")
print(dict)
with DAG(
dag_id = 'our_first_dag_lg_v1',
default_args= _default_args,
start_date = datetime(2024,2,3,1),
schedule_interval = '@daily',
dagrun_timeout= timedelta(seconds=30), #添加timeout根据dag的平均运行时间
on_success_callback=_on_success_callback, #成功后的回调函数
on_failure_callback=_on_failure_callback #失败后的回调函数
) as dag:
添加失败后的retry
_default_args= {
'owner':'lg',
'retries':'2', #retry次数
'retry_delay':timedelta(minutes=3) #retry的delay
}
添加邮件发送
- 首先需要早airflow.cfg里面配置smtp
_default_args= {
'owner':'lg',
'retries':'2', #retry次数
'retry_delay':timedelta(minutes=3), #retry的delay
'emails':['pjj@gmail.com'],
'email_on_failure':True,
'email_on_retry':False
}
测试DAGs
-
使用Pytest来进行dags的单元测试
1.validation test:用来测试,是否有拼写错误,参数设置错误,等常规错误
2.pipeline definition test:测试DAGs的数据,依赖关系是否正确
3.unit test: 检查程序逻辑
4.integration test: 测试task是否意义获取正确的数据,外部资源是否正确
5.end to end piplien test:
image.png
生产环境流程
image.png
pytest学习
6. 变量
variable
1.在页面里添加变量
image.png
2.获取变量
from airflow.models import Variable
support_email = Variable.get("support_email"),
xcom 传递dag之间的数据
上传xcom的2种方式
- 使用return直接push
def _training_model():
accuracy = uniform(0.1, 10.0)
print(f"model's accuracy: {accuracy}")
#push xcom
return accuracy
image.png
- 自定义xcom的名称
def _training_model(ti):
accuracy = uniform(0.1, 10.0)
print(f"model's accuracy: {accuracy}")
ti.xcom_push(key='model_accuracy',value =accuracy)
image.png
- bash的operator会自动上传一个空的xcom,我们需要关闭他
downloading_data = BashOperator(
task_id='downloading_data',
bash_command='sleep 3',
do_xcom_push = False
)
image.png
拉去xcom的值
def _choose_best_model(ti):
print('choose best model')
#一个id用task_id,多个id用task_ids
all_accuracies = ti.xcom_pull(key='model_accuracy',task_ids =['training_model_A','training_model_B','training_model_C'])
print(all_accuracies)
7.发送邮件
- 配置
airflow.cfg
image.png - 添加emailoperator
from airflow.operators.email import EmailOperator
send_email = EmailOperator(
task_id = 'send_email',
to = 'qq394967886@163.com',
subject='dag error',
html_content="""
<h3>Dag has error</h3>
"""
)
8. airflow dependency
1.使用dataset建立denpendency
使用dataset可以直观的通过数据集来建立dag之间的依赖关系。
例如:现在有2个dag, dag_A的功能是更新两个s3的数据集,dag_B的任务需要dag_A两个数据集都更新后才可以执行,如果使用传统的方式会比较麻烦,所以我们使用新的方式
producer.py
用来处理2个数据集的dag
from airflow.decorators import dag, task
from datetime import datetime
data_a = Dataset("s3://bucket_a/data_a") #数据集1
data_b = Dataset("s3://bucket_b/data_b") #数据集2
@dag(start_date=datetime(2023, 1 ,1), schedule='@daily', catchup=False)
def producer():
@task(outlets=[data_a])
def update_a():
print("update A")
@task(outlets=[data_b])
def update_b():
print("update B")
update_a() >> update_b()
producer()
costomer.py
使用处理好的2个数据集,进行下一步任务的dag
from airflow.decorators import dag, task
from datetime import datetime
data_a = Dataset("s3://bucket_a/data_a")
data_b = Dataset("s3://bucket_b/data_b")
@dag(start_date=datetime(2023, 1 ,1), schedule=[data_a, data_b], catchup=False)
def consumer():
@task
def run():
print('run')
run()
consumer()
这里的schedule=[data_a, data_b]
就是必须等待2个数据集outlets输出后,才能执行这个
2.TriggerDagRunOperator
假设:有一个dag_A和dag_B,其中dag_A里面有3个task分别是task1获取API得数据,task2 trigger dag_B,和保存数据 ,dag_ B则是一个down_load数据的task里面也有众多的task,可能执行5分钟,也可能执行十几分钟,只想完dag_B之后,需要在dag_A里,清洗数据
- 我们通过设置TriggerDagRunOperator来设置两个dag的依赖
#一般情况下,下面的参数都需要
trigger_download = TriggerDagRunOperator(
task_id='trigger_target',
trigger_dag_id='down_loading_v1',
execution_date='{{ ds }}', #只有2个dag在相同日期的时候才能执行
reset_dag_run=True, #允许同一个日期多次执行
wait_for_completion=True, #等待down_loading_v1完成后,执行后面的dag,如果不加的话后面的storagin等三个任务会一直执行
poke_interval= 10 #检测down_loading_v1是否完成10s一次
)
down_load_data.py
:用来下载的程序,里面也可以多个task
from airflow import DAG
from airflow.decorators import dag,task
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
import time
from datetime import datetime,timedelta
_default_args= {
'owner':'lg',
'retries':'2', #retry次数
'retry_delay':timedelta(minutes=3), #retry的delay
}
@dag(_default_args)
def down_loading_v1():
@task
def down_load_ali_data():
time.sleep(10)
print('down_loading_ali_data successed')
down_load_ali_data()
down_loading_v1()
pl_ali_data.py
:data pipeline 数据下载,迁移,存储,清洗的dag
from airflow import DAG
from airflow.decorators import dag,task
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime,timedelta
_default_args= {
'owner':'lg',
'retries':'2', #retry次数
'retry_delay':timedelta(minutes=3), #retry的delay
}
def _get_api():
print('getting down_load data api')
def _storaging():
print('Storaging data from down_loading_dag ')
def _cleaning():
print('Clearning data')
with DAG(
dag_id='pl_ali_data_v1',
start_date=datetime(2024,2,12),
schedule='@daily',
catchup=False,
default_args= _default_args
) as dag:
get_api_task = PythonOperator(
task_id= 'get_api_task',
python_callable=_get_api,
)
storaging_task = PythonOperator(
task_id= 'storaging_task',
python_callable=_storaging,
)
cleaning_task = PythonOperator(
task_id= 'cleaning_task',
python_callable=_cleaning,
)
#一般情况下,下面的参数都需要
trigger_download = TriggerDagRunOperator(
task_id='trigger_target',
trigger_dag_id='down_loading_v1',
execution_date='{{ ds }}', #只有2个dag在相同日期的时候才能执行
reset_dag_run=True, #允许同一个日期多次执行
wait_for_completion=True, #等待down_loading_v1完成后,执行后面的dag,如果不加的话后面的storagin等三个任务会一直执行
poke_interval= 10 #检测down_loading_v1是否完成10s一次
)
get_api_task >> trigger_download >> storaging_task >> cleaning_task
注意:①装饰器的写法和普通的写法不能混用;
3. depend_on_past = True
- 设置task2的
depend_on_past = True
,有且仅当task2出错的时候,之后剩下的schedule都会在task2时不执行,当解决了一个出错的task2之后,会自动补全所有的schedule
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'start_date': datetime(2024, 1, 1),
'owner': 'lg'
}
def second_task():
print('Hello from second_task')
# raise ValueError('This will turns the python task in failed state')
def third_task():
print('Hello from third_task')
# raise ValueError('This will turns the python task in failed state')
with DAG(dag_id='depends_task', schedule_interval="@daily", default_args=default_args,catchup=False) as dag:
# Task 1
bash_task_1 = BashOperator(task_id='bash_task_1', bash_command="echo 'first task'")
# Task 2
python_task_2 = PythonOperator(task_id='python_task_2', python_callable=second_task,depends_on_past = True)
# Task 3
python_task_3 = PythonOperator(task_id='python_task_3', python_callable=third_task)
bash_task_1 >> python_task_2 >> python_task_3
4. wait_for_downstream = True
设置task2的wait_for_downstream = True
,如果任何一个task出现了问题,将不会在执行包括task2之后的所有task,但是task1会执行
python_task_2 = PythonOperator(task_id='python_task_2', python_callable=second_task,wait_for_downstream= True)
5. trigger rules
在Apache Airflow中,trigger_rule
参数用于指定任务的触发规则,决定了任务何时被触发。下面是Airflow中常用的触发规则(trigger_rule
):
-
all_success
: 所有父任务都成功完成时触发。 -
all_failed
: 所有父任务都失败时触发。 -
all_done
: 所有父任务都完成时触发(无论成功或失败)。 -
one_success
: 至少有一个父任务成功完成时触发。 -
one_failed
: 至少有一个父任务失败时触发。 -
dummy
: 永远不触发,这在构建DAG时很有用,以防止不希望被触发的任务被错误地执行。
-
例如:现有一个从2个api 下载数据的pipeline,当两个现在成功,则发从成功的邮件,任务失败则发送失败邮件,流程如下
image.png - 示例代码:
import airflow
from airflow.models import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
default_args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(1),
}
def download_website_a():
# print("download_website_a")
raise ValueError("error")
def download_website_b():
print("download_website_b")
#raise ValueError("error")
def download_failed():
print("download_failed")
#raise ValueError("error")
def download_status():
print('This is download status')
def download_succeed():
print("download_succeed")
#raise ValueError("error")
def process():
print("process data")
#raise ValueError("error")
def notif_a():
print("all_success")
#raise ValueError("error")
def notif_b():
print("one_failed")
#raise ValueError("error")
with DAG(dag_id='trigger_rules',
default_args=default_args,
schedule_interval="@daily") as dag:
download_status_task = PythonOperator(
task_id='download_status',
python_callable=download_status,
trigger_rule = 'all_success'
)
download_website_a_task = PythonOperator(
task_id='download_website_a',
python_callable=download_website_a,
)
download_website_b_task = PythonOperator(
task_id='download_website_b',
python_callable=download_website_b,
)
process_task = PythonOperator(
task_id='process',
python_callable=process,
trigger_rule = 'one_success'
)
notif_all_success_task = PythonOperator(
task_id='notif_all_success_task',
python_callable=notif_a,
trigger_rule = 'all_success'
)
notif_one_failed_task = PythonOperator(
task_id='notif_one_failed_task',
python_callable=notif_b,
trigger_rule = 'one_failed'
)
[download_website_a_task,download_website_b_task] >> download_status_task >> process_task>>[notif_all_success_task,notif_one_failed_task]
6. ExternalTaskSensor
-假设有两个DAG,DAG A和DAG B。DAG A的某个任务(task)需要等待DAG B中的某个任务(task)完成后才能执行,那么就可以在DAG A中使用ExternalTaskSensor来等待DAG B的特定任务完成。task和task之间的依赖,所以并不需要等待另一个dag完全执行成功,只需要里面的一个task完成即可。TriggerRunOperator则是dag之间的依赖。
import airflow.utils.dates
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(1)
}
with DAG(dag_id="externaltasksensor_dag", default_args=default_args, schedule_interval="@daily") as dag:
sensor = ExternalTaskSensor(
task_id='sensor',
external_dag_id='sleep_dag',
external_task_id='t2'
)
last_task = DummyOperator(task_id="last_task")
sensor >> last_task