AIRFLOW-SLEEK 安装

2024-02-03  本文已影响0人  山猪打不过家猪

0. 课程地址

https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
  1. 查看yml我们需要3个文件夹,dags,logs.plungins,vscode中快捷键打开terminalCtrl + `
mkdir dags
  1. compose-docker
  2. 如果需要关闭docker直接使用
docker compose down
  1. 创建admin用户

  1. 关闭所有发行版:wsl --shutdown
  2. 导入现有文件到指定目录
wsl --export docker-desktop-data D:\DockerDeskTopData\docker-desktop-data.tar
  1. 注销当前docker data
wsl --unregister docker-desktop-data

4.重新导入

wsl --import docker-desktop-data D:\DockerDeskTopData\ D:\DockerDeskTopData\docker-desktop-data.tar --version 2

1.认识airflow

2. airflow的生命周期

image.png

3.创建第一个dag

from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator


_default_args= {
    'owner':'lg',
    'retries':'5',
    'retry_delay':timedelta(minutes=3)

}

with DAG(
    dag_id = 'our_first_dag_lg_v5',
    default_args= _default_args,
    start_date = datetime(2024,2,3,1),
    schedule_interval = '@daily'
) as dag:
    task1 = BashOperator(
        task_id = '1_task',
        bash_command = 'echo hello world!'
    )

    task2 = BashOperator(
        task_id = '2_task',
        bash_command="echo I am task 2"
    )

    task3 = BashOperator(
        task_id = '3_task',
        bash_command = "echo I am task3"
    )
    #task1
    # task1 >> task2 >> task3
    task1>>[task2,task3]

4. 运行python文件

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

_default_args= {
    'owner':'lg',
    'retries':'5',
    'retry_delay':timedelta(minutes=3)

}

def great(name,age):
    # print('hello world!')
    print(f"Hello World! My name is {name},I am {age}"
          )


with DAG(
    dag_id = 'dag_python_v4',
    default_args= _default_args,
    start_date = datetime(2024,2,4),
    schedule_interval = '@daily'

)as dag:
    task1 = PythonOperator(
        task_id = 'greate',
        python_callable=great,
        #用来传参数
        op_kwargs={'name':'fxx','age':20}
    )

    task1
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

_default_args= {
    'owner':'lg',
    'retries':'5',
    'retry_delay':timedelta(minutes=3)

}

def great(ti,age):
    #将task2返回的值注册过来
    name = ti.xcom_pull(task_ids = 'get_name')
    print(f"Hello World! My name is {name} +{age}"
          )

def get_name():
    return 'FXX'

with DAG(
    dag_id = 'dag_python_v6',
    default_args= _default_args,
    start_date = datetime(2024,2,4),
    schedule_interval = '@daily'

)as dag:
    task1 = PythonOperator(
        task_id = 'great',
        python_callable=great,
        #用来传参数
        op_kwargs={'age':20}
    )

    task2 = PythonOperator(
        task_id = 'get_name',
        python_callable = get_name
    )
    #task2执行完才能执行task1
    task2>>task1
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

_default_args= {
    'owner':'lg',
    'retries':'5',
    'retry_delay':timedelta(minutes=3)

}

def great(ti,age):
    #将task2返回的值注册过来
    fist_name = ti.xcom_pull(task_ids = 'get_name',key = 'first_name')
    last_name = ti.xcom_pull(task_ids = 'get_name',key = 'last_name')

    print(f"Hello World! My name is {fist_name} +{last_name}"
          )

def get_name(ti):
    ti.xcom_push(key= 'first_name',value= 'Pjj')
    ti.xcom_push(key= 'last_name',value= 'zzz')

with DAG(
    dag_id = 'dag_python_v6',
    default_args= _default_args,
    start_date = datetime(2024,2,4),
    schedule_interval = '@daily'

)as dag:
    task1 = PythonOperator(
        task_id = 'great',
        python_callable=great,
        #用来传参数
        op_kwargs={'age':20}
    )

    task2 = PythonOperator(
        task_id = 'get_name',
        python_callable = get_name
    )
    #task2执行完才能执行task1
    task2>>task1

5. 使用taskflow快速创建

from airflow.decorators import dag,task
from datetime import datetime, timedelta


_default_args= {
    'owner':'lg',
    'retries':'5',
    'retry_delay':timedelta(minutes=3)

}

@dag(
   dag_id='dag_with_taskflow_01',
    default_args= _default_args,
    start_date = datetime(2024,2,4),
    schedule_interval = '@daily'     
)
def hello_world_etl():
    
    @task()
    def get_name():
        return 'fxx'
    @task()
    def get_age():
        return 10
    
    @task()
    def greet(name,age):
        print(f'hello! My name is {name} and I am {age}.')

    
    name = get_name()
    age =  get_age()
    greet(name = name,age= age)

greet_dag = hello_world_etl()

6. run task in the past

方法1:更改默认参数catchup = True
方法2:使用backfill

image.png

7. install python packages in docker

  1. 创建需要安装的requirements.txt文件, 注意一定不要加空格在等于号左右
scikit-learn==0.24.2
matplotlib==3.3.3

2.编写Dockerfile

FROM apache/airflow:2.8.1
COPY requirements.txt requirements.txt
RUN pip install --user --upgrade pip
RUN pip install -r requirements.txt
  1. 控制台build一个名为extending_airflow:latest的新image
docker build . --tag extending_airflow:latest

4.删除之前的images

docker rmi airflow

5.compose up 后台运行

docker build . --tag extending_airflow_dbt:latest

上一篇 下一篇

猜你喜欢

热点阅读