Kubernetes

Argo Workflows快速入门

2023-01-05  本文已影响0人  王勇1024

介绍

Argo Workflows 是一个开源容器原生工作流引擎,用于在 Kubernetes 上编排并行作业。Argo Workflows 实现为 Kubernetes CRD。

特点如下:

Argo 可以让用户用一个类似于传统的 YAML 文件定义的 DSL 来运行多个步骤的 Pipeline。该框架提供了复杂的循环、条件判断、依赖管理等功能,这有助于提高部署应用程序的灵活性以及配置和依赖的灵活性。

系统架构

Init

当用户的 template 中需要使用到 inputs 中的 artifact 或者是 script 类型时(script 类型需要注入脚本),Argo 都会为这个 pod 加上一个 InitContainer —— 其镜像为 argoexec,命令是 argoexec init。在这个 Init Container 中,主要工作就是加载 artifact。

Wait

除了 Resource 类型外的 template,Argo 都会注入一个 Wait Container,用于等待 Main Container 的完成并结束所有 Sidecar 容器,并做一些收尾的工作(如捕获脚本结果,保存日志、输出参数、artifact等)。这个 Wait Container 的镜像同样为 argoexec,命令是 argoexec wait。(Resource 类型的不需要是因为 Resource 类型的 template 直接使用 argoexec 作为 Main Container 运行)。

核心概念

Workflow

Workflow是Argo中最重要的资源,其主要有两个重要功能:

要执行的工作流定义在Workflow.spec字段中,其主要包括templatesentrypoint,如下:

apiVersion: argoproj.io/v1alpha1 
kind: Workflow 
metadata: 
  generateName: hello-world-  # Workflow的配置名称 
spec: 
  entrypoint: whalesay        # 解析whalesay templates 
  templates: 
  - name: whalesay            # 定义whalesay templates,和entrypoint保持一致 
    container:                # 定义一个容器,输出"helloworld" 
      image: docker/whalesay 
      command: [cowsay] 
      args: ["hello world"]   

Templates

templates是列表结构,主要分为两类:

WorkflowTemplate

WorkflowTemplate 相当于 Workflow 的模板库,和 Workflow 一样,也由 template 组成。用户在创建完 WorkflowTemplate 后,可以通过直接提交它们来执行 Workflow。

Workflow Template 的定义与 Workflow 几乎一致,除了类型不同。正因为 Workflow 既可以是一个定义也可以是一个实例,所以才需要 WorkflowTemplate 作为 Workflow 的模板,WorkflowTemplate 在定义后可以通过提交(Submit)来创建一个 Workflow。

使用 workflowMetadata 向工作流添加 labels/annotations

要自动向从 WorkflowTemplates 创建的工作流添加 labels/annotations,请使用 workflowMetadata。

ClusterWorkflowTemplates

ClusterWorkflowTemplates 是集群范围的 WorkflowTemplates。 ClusterWorkflowTemplate 可以像 ClusterRole 一样在集群范围内使用,并且可以跨集群中的所有命名空间访问。

apiVersion: argoproj.io/v1alpha1
kind: ClusterWorkflowTemplate
metadata:
  name: cluster-workflow-template-whalesay-template
spec:
  templates:
  - name: whalesay-template
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

CronTemplate

CronWorkflow 是按预设时间表运行的工作流。它们旨在轻松地从 Workflow 转换并模仿与 Kubernetes CronJob 相同的选项。本质上,CronWorkflow = Workflow + 一些特定的 cron 选项。

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: test-cron-wf
spec:
  schedule: "* * * * *"
  concurrencyPolicy: "Replace"
  startingDeadlineSeconds: 0
  workflowSpec:
    entrypoint: whalesay
    templates:
    - name: whalesay
      container:
        image: alpine:3.6
        command: [sh, -c]
        args: ["date; sleep 90"]

Artifacts(工件)

您将需要配置一个工件存储库(推荐使用 S3)来运行此示例。在此处配置工件存储库

在运行工作流时,生成或使用工件的步骤是很常见的。通常,一个步骤的输出工件可以用作后续步骤的输入工件。

下面的工作流程规范由两个按顺序运行的步骤组成。

第一个步骤名为 generate-artifact,将使用whalsay 模板生成一个工件,第二个步骤名为 print-message,将使用第一步生成的工件。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: artifact-passing-
spec:
  entrypoint: artifact-example
  templates:
  - name: artifact-example
    steps:
    - - name: generate-artifact
        template: whalesay
    - - name: consume-artifact
        template: print-message
        arguments:
          artifacts:
          # bind message to the hello-art artifact
          # generated by the generate-artifact step
          - name: message
            from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}"

  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["cowsay hello world | tee /tmp/hello_world.txt"]
    outputs:
      artifacts:
      # generate hello-art artifact from /tmp/hello_world.txt
      # artifacts can be directories as well as files
      - name: hello-art
        path: /tmp/hello_world.txt

  - name: print-message
    inputs:
      artifacts:
      # unpack the message input artifact
      # and put it at /tmp/message
      - name: message
        path: /tmp/message
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["cat /tmp/message"]

whalesay 模板使用 cowsay 命令生成一个名为 /tmp/hello-world.txt 的文件。然后它将这个文件输出为一个名为 hello-art 的工件。通常,工件的路径可能是一个目录而不仅仅是一个文件。print-message 模板接受一个名为 message 的输入工件,在名为 /tmp/message 的路径解压缩它,然后使用 cat 命令打印 /tmp/message 的内容。artifact-example 模板将 generate-artifact 步骤生成的 output hello-art 工件作为 print-message 步骤的 input 工件 message。DAG 模板使用 tasks 前缀来引用另一个任务,例如 {{tasks.generate-artifact.outputs.artifacts.hello-art}}。

Artifacts 被打包为 Tarballs,默认情况下被 gzip 压缩。您可以通过使用 archive 字段指定归档策略来自定义此行为。例如:

<... snipped ...>
    outputs:
      artifacts:
        # default behavior - tar+gzip default compression.
      - name: hello-art-1
        path: /tmp/hello_world.txt

        # disable archiving entirely - upload the file / directory as is.
        # this is useful when the container layout matches the desired target repository layout.   
      - name: hello-art-2
        path: /tmp/hello_world.txt
        archive:
          none: {}

        # customize the compression behavior (disabling it here).
        # this is useful for files with varying compression benefits, 
        # e.g. disabling compression for a cached build workspace and large binaries, 
        # or increasing compression for "perfect" textual data - like a json/xml export of a large database.
      - name: hello-art-3
        path: /tmp/hello_world.txt
        archive:
          tar:
            # no compression (also accepts the standard gzip 1 to 9 values)
            compressionLevel: 0
<... snipped ...>

定义具体的工作流

定义具体的工作流有7种类别,如下:

Container

container是最常用的模板类型,它将调度一个container,其模板规范和K8S的容器规范相同,如下:

- name: whalesay             
  container:                 
    image: docker/whalesay 
    command: [cowsay] 
    args: ["hello world"]   

Script

Script是Container的另一种包装实现,其定义方式和Container相同,只是增加了source字段用于自定义脚本,如下:

- name: gen-random-int 
  script: 
    image: python:alpine3.6 
    command: [python] 
    source: | 
      import random 
      i = random.randint(1, 100) 
      print(i) 

脚本的输出结果会根据调用方式自动导出到 {{tasks.outputs.result}}{{steps.outputs.result}} 中。

Resource

Resource主要用于直接在K8S集群上执行集群资源操作,可以 get、create、apply、delete、replace、patch集群资源。如下在集群中创建一个ConfigMap类型资源:

- name: k8s-owner-reference 
  resource: 
    action: create 
    manifest: | 
      apiVersion: v1 
      kind: ConfigMap 
      metadata: 
        generateName: owned-eg- 
      data: 
        some: value 

Suspend

Suspend主要用于暂停,可以暂停一段时间,也可以手动恢复,命令使用argo resume进行恢复。定义格式如下:

- name: delay 
  suspend: 
    duration: "20s" 

HTTP

HTTP 模板可用于执行 HTTP 请求。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: http-template-
spec:
  entrypoint: main
  templates:
    - name: main
      steps:
        - - name: get-google-homepage
            template: http
            arguments:
              parameters: [{name: url, value: "https://www.google.com"}]
    - name: http
      inputs:
        parameters:
          - name: url
      http:
        timeoutSeconds: 20 # Default 30
        url: "{{inputs.parameters.url}}"
        method: "GET" # Default GET
        headers:
          - name: "x-header-name"
            value: "test-value"
        # Template will succeed if evaluated to true, otherwise will fail
        # Available variables:
        #  request.body: string, the request body
        #  request.headers: map[string][]string, the request headers
        #  response.url: string, the request url
        #  response.method: string, the request method
        #  response.statusCode: int, the response status code
        #  response.body: string, the response body
        #  response.headers: map[string][]string, the response headers
        successCondition: "response.body contains \"google\"" # available since v3.3
        body: "test body" # Change request body

Container Set

Container Set 模板和普通的 container 或 script 模板类似,但允许你在一个 pod 中指定运行多个 containers。

由于这多个 container 包含在一个 pod 中,它们将被调度到同一台宿主机上。你可以使用 empty-dir 卷替代 PVC 来实现多个步骤间共享数据。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: container-set-template-
spec:
  entrypoint: main
  templates:
    - name: main
      volumes:
        - name: workspace
          emptyDir: { }
      containerSet:
        volumeMounts:
          - mountPath: /workspace
            name: workspace
        containers:
          - name: a
            image: argoproj/argosay:v2
          - name: b
            image: argoproj/argosay:v2
          - name: main
            image: argoproj/argosay:v2
            dependencies:
              - a
              - b
      outputs:
        parameters:
          - name: message
            valueFrom:
              path: /workspace/message

Data

用户经常将获取和转换数据作为其工作流程的一部分。data 模板为这些常见操作提供一流的支持。

可以通过查看 bash 中的常见数据源和转换操作来理解 data 模板:

find -r . | grep ".pdf" | sed "s/foo/foo.ready/"

此类操作包括两个主要部分:

例如,此操作在寻找待处理文件的潜在列表以及根据需要过滤和操作列表时可能很有用。

在 Argo 中,这个操作可以写成:

- name: generate-artifacts
  data:
    source:             # Define a source for the data, only a single "source" is permitted
      artifactPaths:    # A predefined source: Generate a list of all artifact paths in a given repository
        s3:             # Source from an S3 bucket
          bucket: test
          endpoint: minio:9000
          insecure: true
          accessKeySecret:
            name: my-minio-cred
            key: accesskey
          secretKeySecret:
            name: my-minio-cred
            key: secretkey
    transformation:     # The source is then passed to be transformed by transformations defined here
      - expression: "filter(data, {# endsWith \".pdf\"})"
      - expression: "map(data, {# + \".ready\"})"

data 模板必须包含一个 source 字段。当前可用的数据源有S3、Git、HTTP、HDFS、OSS、GCS:

data 模板可能包含多个 transformations(也可能是0个)。转换将按顺序连续应用。当前可用的转换:

我们知道 expression 转换是有限的。我们打算根据社区的反馈大大扩展此模板的功能。请参阅本文档顶部的链接以提交有关此功能的想法或用例。

调用其他模板提供并行控制

调用其他模板也有两种类别:

Steps

Steps主要是通过定义一系列步骤来定义任务,其结构是"list of lists",外部列表将顺序执行,内部列表将并行执行。如下:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-
spec:
  entrypoint: hello-hello-hello

  # This spec contains two templates: hello-hello-hello and whalesay
  templates:
  - name: hello-hello-hello
    # Instead of just running a container
    # This template has a sequence of steps
    steps:
    - - name: hello1            # hello1 is run before the following steps
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello1"
    - - name: hello2a           # double dash => run after previous step
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2a"
      - name: hello2b           # single dash => run in parallel with previous step
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2b"

  # This is the same template as from the previous example
  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

其中step1和step2a是顺序执行,而step2a和step2b是并行执行。

STEP            TEMPLATE           PODNAME                 DURATION  MESSAGE
 ✔ steps-z2zdn  hello-hello-hello
 ├───✔ hello1   whalesay           steps-z2zdn-27420706    2s
 └─┬─✔ hello2a  whalesay           steps-z2zdn-2006760091  3s
   └─✔ hello2b  whalesay           steps-z2zdn-2023537710  3s

还可以通过When来进行条件判断。如下:

apiVersion: argoproj.io/v1alpha1 
kind: Workflow 
metadata: 
  generateName: coinflip- 
spec: 
  entrypoint: coinflip 
  templates: 
  - name: coinflip 
    steps: 
    - - name: flip-coin 
        template: flip-coin 
    - - name: heads 
        template: heads 
        when: "{{steps.flip-coin.outputs.result}} == heads" 
      - name: tails 
        template: tails 
        when: "{{steps.flip-coin.outputs.result}} == tails" 

  - name: flip-coin 
    script: 
      image: python:alpine3.6 
      command: [python] 
      source: | 
        import random 
        result = "heads" if random.randint(0,1) == 0 else "tails" 
        print(result) 

  - name: heads 
    container: 
      image: alpine:3.6 
      command: [sh, -c] 
      args: ["echo \"it was heads\""] 

  - name: tails 
    container: 
      image: alpine:3.6 
      command: [sh, -c] 
      args: ["echo \"it was tails\""] 

除了使用When进行条件判断,还可以进行循环操作,示例代码如下:

apiVersion: argoproj.io/v1alpha1 
kind: Workflow 
metadata: 
  generateName: loops- 
spec: 
  entrypoint: loop-example 
  templates: 
  - name: loop-example 
    steps: 
    - - name: print-message 
        template: whalesay 
        arguments: 
          parameters: 
          - name: message 
            value: "{{item}}" 
        withItems: 
        - hello world 
        - goodbye world 

  - name: whalesay 
    inputs: 
      parameters: 
      - name: message 
    container: 
      image: docker/whalesay:latest 
      command: [cowsay] 
      args: ["{{inputs.parameters.message}}"] 

DAG

DAG 模板允许您将任务定义为依赖关系图。在 DAG 中,您列出所有任务并设置在特定任务开始之前必须完成哪些其他任务。没有任何依赖关系的任务将立即运行。

在这个示例中,A 首先执行,待其执行完毕后,B和C并行执行,待B和C都执行完毕后,执行D:

  - name: diamond
    dag:
      tasks:
      - name: A
        template: echo
      - name: B
        dependencies: [A]
        template: echo
      - name: C
        dependencies: [A]
        template: echo
      - name: D
        dependencies: [B, C]
        template: echo

环境部署

Controller and Server

安装结果:

NAME                                   READY   STATUS    RESTARTS   AGE
argo-server-746dc95c84-6pwj2           1/1     Running   0          5d4h
workflow-controller-777b7f45d8-whkdk   1/1     Running   0          5d4h

Argo CLI

Mac

# Download the binary
curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/argo-darwin-amd64.gz

# Unzip
gunzip argo-darwin-amd64.gz

# Make binary executable
chmod +x argo-darwin-amd64

# Move binary to path
mv ./argo-darwin-amd64 /usr/local/bin/argo

# Test installation
argo version

Linux

# Download the binary
curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/argo-linux-amd64.gz

# Unzip
gunzip argo-linux-amd64.gz

# Make binary executable
chmod +x argo-linux-amd64

# Move binary to path
mv ./argo-linux-amd64 /usr/local/bin/argo

# Test installation
argo version

常用命令:

argo submit hello-world.yaml    # 提交工作流到 Kubernetes
argo list                       # 列出当前所有工作流
argo get hello-world-xxx        # 获取工作流配置信息
argo logs hello-world-xxx       # 打印工作流日志
argo delete hello-world-xxx     # 删除工作流

您也可以使用 kubectl 直接操作工作流,但 Argo CLI 提供语法检查、更好的输出并且需要更少的输入。

kubectl create -f hello-world.yaml
kubectl get wf
kubectl get wf hello-world-xxx
kubectl get po --selector=workflows.argoproj.io/workflow=hello-world-xxx --show-all  # similar to argo
kubectl logs hello-world-xxx-yyy -c main
kubectl delete wf hello-world-xxx

Hello World

让我们从创建一个非常简单的工作流模板开始,使用来自 Docker Hub 的 docker/whalesay 容器镜像来回显“hello world”。

您可以使用简单的 docker 命令直接从 shell 运行它:

$ docker run docker/whalesay cowsay "hello world"
 _____________
< hello world >
 -------------
    \
     \
      \
                    ##        .
              ## ## ##       ==
           ## ## ## ##      ===
       /""""""""""""""""___/ ===
  ~~~ {~~ ~~~~ ~~~ ~~~~ ~~ ~ /  ===- ~~~
       \______ o          __/
        \    \        __/
          \____\______/

Hello from Docker!
This message shows that your installation appears to be working correctly.

一个简单的 Workflow 示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: hello-world-
  labels:
    workflows.argoproj.io/archive-strategy: "false"
spec:
  entrypoint: whalesay
  templates:
  - name: whalesay
    container:
      image: whalesay:latest
      command: [cowsay]
      args: ["hello world"]

Workflow 配置

参数

让我们看一个稍微复杂的带有参数的工作流规范。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: hello-world-parameters-
spec:
  # invoke the whalesay template with
  # "hello world" as the argument
  # to the message parameter
  entrypoint: whalesay
  arguments:
    parameters:
    - name: message
      value: hello world

  templates:
  - name: whalesay
    inputs:
      parameters:
      - name: message       # parameter declaration
    container:
      # run cowsay with that message input parameter as args
      image: docker/whalesay
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

whalesay 模板采用一个名为 message 的输入参数,该参数作为 args 传递给 cowsay 命令。为了引用参数(例如,“{{inputs.parameters.message}}”),参数必须用双引号括起来以转义 YAML 中的花括号。

argo CLI 提供了一种方便的方法来覆盖用于调用 entrypoint 的参数。例如,以下命令会将消息参数绑定到“goodbye world”,而不是默认的“hello world”。

argo submit arguments-parameters.yaml -p message="goodbye world"

在可以覆盖多个参数的情况下,argo CLI 提供了一个命令来加载 YAML 或 JSON 格式的参数文件。以下是此类参数文件的示例:

message: goodbye world

要运行使用以下命令:

argo submit arguments-parameters.yaml --parameter-file params.yaml

命令行参数也可用于覆盖默认 entrypoint 并调用工作流 spec 中的任何模板。例如,如果您添加了一个名为whalsay-caps 的新版本的whalsay 模板,但您不想更改默认入口点,则可以从命令行调用它,如下所示:

argo submit arguments-parameters.yaml --entrypoint whalesay-caps

通过使用 --entrypoint 和 -p 参数的组合,您可以使用您喜欢的任何参数调用工作流 spec 中的任何模板。

spec.arguments.parameters 中设置的值全局可用,可以通过 {{workflow.parameters.parameter_name}} 访问。这对于将信息传递给工作流中的多个步骤很有用。例如,如果您想使用在每个容器的环境中设置的不同日志记录级别来运行工作流,您可以使用一个类似于此的 YAML 文件:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: global-parameters-
spec:
  entrypoint: A
  arguments:
    parameters:
    - name: log-level
      value: INFO

  templates:
  - name: A
    container:
      image: containerA
      env:
      - name: LOG_LEVEL
        value: "{{workflow.parameters.log-level}}"
      command: [runA]
  - name: B
    container:
      image: containerB
      env:
      - name: LOG_LEVEL
        value: "{{workflow.parameters.log-level}}"
      command: [runB]

在此工作流中,步骤 A 和 B 都将相同的日志级别设置为 INFO,并且可以使用 -p 参数在提交工作流时轻松更改。

变量

Argo Workflows用户手册——Workflow变量

访问控制

Argo Workflows用户手册——访问控制

Secrets

Argo 支持与 Kubernetes Pod specs 相同的 secrets 语法和机制,允许以环境变量或卷挂载的形式访问 secrets。

# To run this example, first create the secret by running:
# kubectl create secret generic my-secret --from-literal=mypassword=S00perS3cretPa55word
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: secret-example-
spec:
  entrypoint: whalesay
  # To access secrets as files, add a volume entry in spec.volumes[] and
  # then in the container template spec, add a mount using volumeMounts.
  volumes:
  - name: my-secret-vol
    secret:
      secretName: my-secret     # name of an existing k8s secret
  templates:
  - name: whalesay
    container:
      image: alpine:3.7
      command: [sh, -c]
      args: ['
        echo "secret from env: $MYSECRETPASSWORD";
        echo "secret from file: `cat /secret/mountpath/mypassword`"
      ']
      # To access secrets as environment variables, use the k8s valueFrom and
      # secretKeyRef constructs.
      env:
      - name: MYSECRETPASSWORD  # name of env var
        valueFrom:
          secretKeyRef:
            name: my-secret     # name of an existing k8s secret
            key: mypassword     # 'key' subcomponent of the secret
      volumeMounts:
      - name: my-secret-vol     # mount file containing secret at /secret/mountpath
        mountPath: "/secret/mountpath"

输出参数

输出参数提供了一种通用机制,可将步骤的结果用作参数(而不仅仅是工件)。这使您可以将任何类型的 step(不仅仅是 script)的结果用于条件测试、循环和参数。输出参数的工作方式与 script 结果类似,只是输出参数的值设置为生成文件的内容,而不是 stdout 的内容。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: output-parameter-
spec:
  entrypoint: output-parameter
  templates:
  - name: output-parameter
    steps:
    - - name: generate-parameter
        template: whalesay
    - - name: consume-parameter
        template: print-message
        arguments:
          parameters:
          # Pass the hello-param output from the generate-parameter step as the message input to print-message
          - name: message
            value: "{{steps.generate-parameter.outputs.parameters.hello-param}}"

  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["echo -n hello world > /tmp/hello_world.txt"]  # 生成 hello_world.txt 文件内容
    outputs:
      parameters:
      - name: hello-param  # 输出参数名称
        valueFrom:
          path: /tmp/hello_world.txt # set the value of hello-param to the contents of this hello-world.txt

  - name: print-message
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

DAG 模板使用 tasks 前缀来引用另一个任务,例如 {{tasks.generate-parameter.outputs.parameters.hello-param}}。

输出参数 result

输出参数 result 会捕获标准输出,并通过 outputs.result 访问。只会捕获 256 kb 的标准输出流。

Scripts

script 模板的输出会被分配给标准输出,并通过 result 参数捕获。

Containers

container 模板的标准输出也会被 result 参数捕获。如果使用 DAG,假设有一个名为 log-int 的 task,可以通过 {{ tasks.log-int.outputs.result }} 访问其结果。如果使用 steps,访问方式为 {{ steps.log-int.outputs.result }}。

循环

在编写工作流时,能够迭代一组输入通常非常有用,如下例所示:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-
spec:
  entrypoint: loop-example
  templates:
  - name: loop-example
    steps:
    - - name: print-message
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems:              # invoke whalesay once for each item in parallel
        - hello world           # item 1
        - goodbye world         # item 2

  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

我们还可以迭代对象集合:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-maps-
spec:
  entrypoint: loop-map-example
  templates:
  - name: loop-map-example
    steps:
    - - name: test-linux
        template: cat-os-release
        arguments:
          parameters:
          - name: image
            value: "{{item.image}}"
          - name: tag
            value: "{{item.tag}}"
        withItems:
        - { image: 'debian', tag: '9.1' }       #item set 1
        - { image: 'debian', tag: '8.9' }       #item set 2
        - { image: 'alpine', tag: '3.6' }       #item set 3
        - { image: 'ubuntu', tag: '17.10' }     #item set 4

  - name: cat-os-release
    inputs:
      parameters:
      - name: image
      - name: tag
    container:
      image: "{{inputs.parameters.image}}:{{inputs.parameters.tag}}"
      command: [cat]
      args: [/etc/os-release]

我们可以将项目列表作为参数传递:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-arg-
spec:
  entrypoint: loop-param-arg-example
  arguments:
    parameters:
    - name: os-list                                     # a list of items
      value: |
        [
          { "image": "debian", "tag": "9.1" },
          { "image": "debian", "tag": "8.9" },
          { "image": "alpine", "tag": "3.6" },
          { "image": "ubuntu", "tag": "17.10" }
        ]

  templates:
  - name: loop-param-arg-example
    inputs:
      parameters:
      - name: os-list
    steps:
    - - name: test-linux
        template: cat-os-release
        arguments:
          parameters:
          - name: image
            value: "{{item.image}}"
          - name: tag
            value: "{{item.tag}}"
        withParam: "{{inputs.parameters.os-list}}"      # parameter specifies the list to iterate over

  # This template is the same as in the previous example
  - name: cat-os-release
    inputs:
      parameters:
      - name: image
      - name: tag
    container:
      image: "{{inputs.parameters.image}}:{{inputs.parameters.tag}}"
      command: [cat]
      args: [/etc/os-release]

我们甚至可以动态生成要迭代的项目列表!

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    # Iterate over the list of numbers generated by the generate step above
    - - name: sleep
        template: sleep-n-sec
        arguments:
          parameters:
          - name: seconds
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"

  # Generate a list of numbers in JSON format
  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: sleep-n-sec
    inputs:
      parameters:
      - name: seconds
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo sleeping for {{inputs.parameters.seconds}} seconds; sleep {{inputs.parameters.seconds}}; echo done"]

条件语句

我们还支持条件执行。语法由 govaluate 实现,它提供对复杂语法的支持。见示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: coinflip-
spec:
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                       # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails
        template: tails                       # call tails template if "tails"
        when: "{{steps.flip-coin.outputs.result}} == tails"
    - - name: flip-again
        template: flip-coin
    - - name: complex-condition
        template: heads-tails-or-twice-tails
        # call heads template if first flip was "heads" and second was "tails" OR both were "tails"
        when: >-
            ( {{steps.flip-coin.outputs.result}} == heads &&
              {{steps.flip-again.outputs.result}} == tails
            ) ||
            ( {{steps.flip-coin.outputs.result}} == tails &&
              {{steps.flip-again.outputs.result}} == tails )
      - name: heads-regex
        template: heads                       # call heads template if ~ "hea"
        when: "{{steps.flip-again.outputs.result}} =~ hea"
      - name: tails-regex
        template: tails                       # call heads template if ~ "tai"
        when: "{{steps.flip-again.outputs.result}} =~ tai"

  # Return heads or tails based on a random number
  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]

  - name: tails
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was tails\""]

  - name: heads-tails-or-twice-tails
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads the first flip and tails the second. Or it was two times tails.\""]

注意

如果参数值包含引号,则可能会使 govaluate 表达式无效。要处理带引号的参数,请在条件中嵌入 expr 表达式。例如:

 when: "{{=inputs.parameters['may-contain-quotes'] == 'example'}}"

重试失败或错误的步骤

您可以在 Workflow.spec 或 templates 中指定一个 retryStrategy 来指示如何重试失败或错误的步骤:

# This example demonstrates the use of retry back offs
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: retry-backoff-
spec:
  entrypoint: retry-backoff
  templates:
  - name: retry-backoff
    retryStrategy:
      limit: 10
      retryPolicy: "Always"
      backoff:
        duration: "1"      # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
        factor: 2
        maxDuration: "1m"  # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
      affinity:
        nodeAntiAffinity: {}
    container:
      image: python:alpine3.6
      command: ["python", -c]
      # fail with a 66% probability
      args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]

提供一个空的 retryStrategy(即 retryStrategy: {})将导致容器重试直到完成。

递归

模板可以递归地相互调用!在上述抛硬币模板的变体中,我们继续抛硬币,直到出现正面。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: coinflip-recursive-
spec:
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                 # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails                     # keep flipping coins if "tails"
        template: coinflip
        when: "{{steps.flip-coin.outputs.result}} == tails"

  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]

这是几次抛硬币的结果以进行比较。

argo get coinflip-recursive-tzcb5

STEP                         PODNAME                              MESSAGE
 ✔ coinflip-recursive-vhph5
 ├───✔ flip-coin             coinflip-recursive-vhph5-2123890397
 └─┬─✔ heads                 coinflip-recursive-vhph5-128690560
   └─○ tails

STEP                          PODNAME                              MESSAGE
 ✔ coinflip-recursive-tzcb5
 ├───✔ flip-coin              coinflip-recursive-tzcb5-322836820
 └─┬─○ heads
   └─✔ tails
     ├───✔ flip-coin          coinflip-recursive-tzcb5-1863890320
     └─┬─○ heads
       └─✔ tails
         ├───✔ flip-coin      coinflip-recursive-tzcb5-1768147140
         └─┬─○ heads
           └─✔ tails
             ├───✔ flip-coin  coinflip-recursive-tzcb5-4080411136
             └─┬─✔ heads      coinflip-recursive-tzcb5-4080323273
               └─○ tails

在第一次运行中,硬币立即出现正面,我们停止。在第二轮中,硬币出现了三次反面,最后出现正面,我们停下来。

退出处理程序

退出处理程序是在工作流结束时始终执行的模板,无论成功或失败。

退出处理程序的一些常见用例是:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: exit-handlers-
spec:
  entrypoint: intentional-fail
  onExit: exit-handler                  # invoke exit-handler template at end of the workflow
  templates:
  # primary workflow template
  - name: intentional-fail
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo intentional failure; exit 1"]

  # Exit handler templates
  # After the completion of the entrypoint template, the status of the
  # workflow is made available in the global variable {{workflow.status}}.
  # {{workflow.status}} will be one of: Succeeded, Failed, Error
  - name: exit-handler
    steps:
    - - name: notify
        template: send-email
      - name: celebrate
        template: celebrate
        when: "{{workflow.status}} == Succeeded"
      - name: cry
        template: cry
        when: "{{workflow.status}} != Succeeded"
  - name: send-email
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo send e-mail: {{workflow.name}} {{workflow.status}} {{workflow.duration}}"]
  - name: celebrate
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo hooray!"]
  - name: cry
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo boohoo!"]

超时

要限制工作流的经过时间,您可以设置变量 activeDeadlineSeconds。

# To enforce a timeout for a container template, specify a value for activeDeadlineSeconds.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: timeouts-
spec:
  entrypoint: sleep
  templates:
  - name: sleep
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo sleeping for 1m; sleep 60; echo done"]
    activeDeadlineSeconds: 10           # terminate container template after 10 seconds

Volumes

这并不是 Argo 处理产物传递的一种标准方式,但是通过共享存储,我们显然也能达到共通产物的结果。当然,如果使用 Volume,我们则无需借助 Inputs 和 Outputs。在 Workflow 的 Spec 中,我们定义一个 Volume 模板。

以下示例动态创建一个卷,然后在两步工作流中使用该卷。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: volumes-pvc-
spec:
  entrypoint: volumes-pvc-example
  volumeClaimTemplates:                 # define volume, same syntax as k8s Pod spec
  - metadata:
      name: workdir                     # name of volume claim
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 1Gi                  # Gi => 1024 * 1024 * 1024

  templates:
  - name: volumes-pvc-example
    steps:
    - - name: generate
        template: whalesay
    - - name: print
        template: print-message

  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
      # Mount workdir volume at /mnt/vol before invoking docker/whalesay
      volumeMounts:                     # same syntax as k8s Pod spec
      - name: workdir
        mountPath: /mnt/vol

  - name: print-message
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
      # Mount workdir volume at /mnt/vol before invoking docker/whalesay
      volumeMounts:                     # same syntax as k8s Pod spec
      - name: workdir
        mountPath: /mnt/vol

卷是将大量数据从工作流中的一个步骤移动到另一个步骤的非常有用的方法。根据系统的不同,某些卷可能可以在多个步骤同时访问。
在某些情况下,您希望访问一个已经存在的卷,而不是动态地创建/销毁一个。

# Define Kubernetes PVC
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: my-existing-volume
spec:
  accessModes: [ "ReadWriteOnce" ]
  resources:
    requests:
      storage: 1Gi

---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: volumes-existing-
spec:
  entrypoint: volumes-existing-example
  volumes:
  # Pass my-existing-volume as an argument to the volumes-existing-example template
  # Same syntax as k8s Pod spec
  - name: workdir
    persistentVolumeClaim:
      claimName: my-existing-volume

  templates:
  - name: volumes-existing-example
    steps:
    - - name: generate
        template: whalesay
    - - name: print
        template: print-message

  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
      volumeMounts:
      - name: workdir
        mountPath: /mnt/vol

  - name: print-message
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
      volumeMounts:
      - name: workdir
        mountPath: /mnt/vol

也可以在 template 级别而不是 workflow 级别声明现有卷。工作流可以使用 resource 步骤生成卷。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: template-level-volume-
spec:
  entrypoint: generate-and-use-volume
  templates:
  - name: generate-and-use-volume
    steps:
    - - name: generate-volume
        template: generate-volume
        arguments:
          parameters:
            - name: pvc-size
              # In a real-world example, this could be generated by a previous workflow step.
              value: '1Gi'
    - - name: generate
        template: whalesay
        arguments:
          parameters:
            - name: pvc-name
              value: '{{steps.generate-volume.outputs.parameters.pvc-name}}'
    - - name: print
        template: print-message
        arguments:
          parameters:
            - name: pvc-name
              value: '{{steps.generate-volume.outputs.parameters.pvc-name}}'

  - name: generate-volume
    inputs:
      parameters:
        - name: pvc-size
    resource:
      action: create
      setOwnerReference: true
      manifest: |
        apiVersion: v1
        kind: PersistentVolumeClaim
        metadata:
          generateName: pvc-example-
        spec:
          accessModes: ['ReadWriteOnce', 'ReadOnlyMany']
          resources:
            requests:
              storage: '{{inputs.parameters.pvc-size}}'
    outputs:
      parameters:
        - name: pvc-name
          valueFrom:
            jsonPath: '{.metadata.name}'

  - name: whalesay
    inputs:
      parameters:
        - name: pvc-name
    volumes:
      - name: workdir
        persistentVolumeClaim:
          claimName: '{{inputs.parameters.pvc-name}}'
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
      volumeMounts:
      - name: workdir
        mountPath: /mnt/vol

  - name: print-message
    inputs:
        parameters:
          - name: pvc-name
    volumes:
      - name: workdir
        persistentVolumeClaim:
          claimName: '{{inputs.parameters.pvc-name}}'
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
      volumeMounts:
      - name: workdir
        mountPath: /mnt/vol

暂停

可以通过以下方式暂停工作流

argo suspend WORKFLOW

或者通过在工作流上指定 suspend 步骤:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: suspend-template-
spec:
  entrypoint: suspend
  templates:
  - name: suspend
    steps:
    - - name: build
        template: whalesay
    - - name: approve
        template: approve
    - - name: delay
        template: delay
    - - name: release
        template: whalesay

  - name: approve
    suspend: {}

  - name: delay
    suspend:
      duration: "20"    # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"

  - name: whalesay
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["hello world"]

后台容器

Argo 工作流可以启动在后台运行的容器(也称为 daemon containers),而工作流本身会继续执行。请注意,当工作流退出调用守护进程的模板范围时,daemon 将自动销毁。守护程序容器对于启动要测试或用于测试的服务(例如,fixtures)很有用。我们还发现它在运行大型模拟以启动数据库作为收集和组织结果的守护进程时非常有用。与 sidecar 相比,守护进程的最大优势在于它们的存在可以跨多个步骤甚至整个工作流程持续存在

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: daemon-step-
spec:
  entrypoint: daemon-example
  templates:
  - name: daemon-example
    steps:
    - - name: influx
        template: influxdb              # start an influxdb as a daemon (see the influxdb template spec below)

    - - name: init-database             # initialize influxdb
        template: influxdb-client
        arguments:
          parameters:
          - name: cmd
            value: curl -XPOST 'http://{{steps.influx.ip}}:8086/query' --data-urlencode "q=CREATE DATABASE mydb"

    - - name: producer-1                # add entries to influxdb
        template: influxdb-client
        arguments:
          parameters:
          - name: cmd
            value: for i in $(seq 1 20); do curl -XPOST 'http://{{steps.influx.ip}}:8086/write?db=mydb' -d "cpu,host=server01,region=uswest load=$i" ; sleep .5 ; done
      - name: producer-2                # add entries to influxdb
        template: influxdb-client
        arguments:
          parameters:
          - name: cmd
            value: for i in $(seq 1 20); do curl -XPOST 'http://{{steps.influx.ip}}:8086/write?db=mydb' -d "cpu,host=server02,region=uswest load=$((RANDOM % 100))" ; sleep .5 ; done
      - name: producer-3                # add entries to influxdb
        template: influxdb-client
        arguments:
          parameters:
          - name: cmd
            value: curl -XPOST 'http://{{steps.influx.ip}}:8086/write?db=mydb' -d 'cpu,host=server03,region=useast load=15.4'

    - - name: consumer                  # consume intries from influxdb
        template: influxdb-client
        arguments:
          parameters:
          - name: cmd
            value: curl --silent -G http://{{steps.influx.ip}}:8086/query?pretty=true --data-urlencode "db=mydb" --data-urlencode "q=SELECT * FROM cpu"

  - name: influxdb
    daemon: true                        # start influxdb as a daemon
    retryStrategy:
      limit: 10                         # retry container if it fails
    container:
      image: influxdb:1.2
      command:
      - influxd
      readinessProbe:                   # wait for readinessProbe to succeed
        httpGet:
          path: /ping
          port: 8086

  - name: influxdb-client
    inputs:
      parameters:
      - name: cmd
    container:
      image: appropriate/curl:latest
      command: ["/bin/sh", "-c"]
      args: ["{{inputs.parameters.cmd}}"]
      resources:
        requests:
          memory: 32Mi
          cpu: 100m

Step 模板使用 steps 前缀来引用另一个 step:例如 {{steps.influx.ip}}。DAG 模板中使用 tasks 前缀:例如 {{tasks.influx.ip}}。

Sidecars

Sidecar 是另一个容器,它与主容器在同一个 pod 中同时执行,在创建多容器 pod 时很有用。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: sidecar-nginx-
spec:
  entrypoint: sidecar-nginx-example
  templates:
  - name: sidecar-nginx-example
    container:
      image: appropriate/curl
      command: [sh, -c]
      # Try to read from nginx web server until it comes up
      args: ["until `curl -G 'http://127.0.0.1/' >& /tmp/out`; do echo sleep && sleep 1; done && cat /tmp/out"]
    # Create a simple nginx web server
    sidecars:
    - name: nginx
      image: nginx:1.13
      command: [nginx, -g, daemon off;]

在上面的例子中,我们创建了一个 sidecar 容器,将 Nginx 作为一个简单的 Web 服务器运行。容器出现的顺序是随机的,所以在这个例子中,主容器轮询 Nginx 容器,直到它准备好为请求提供服务。在设计多容器系统时,这是一个很好的设计模式:在运行主代码之前始终等待您需要的任何服务。

使用 Sidecar 的 Docker-in-Docker

Sidecar 的一个应用是实现 Docker-in-Docker (DIND)。当您想从容器内运行 Docker 命令时,DIND 很有用。例如,您可能希望从构建容器内部构建和推送容器映像。在以下示例中,我们使用 docker:dind 映像在 sidecar 中运行 Docker 守护程序,并授予主容器访问该守护程序的权限。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: sidecar-dind-
spec:
  entrypoint: dind-sidecar-example
  templates:
  - name: dind-sidecar-example
    container:
      image: docker:19.03.13
      command: [sh, -c]
      args: ["until docker ps; do sleep 3; done; docker run --rm debian:latest cat /etc/os-release"]
      env:
      - name: DOCKER_HOST               # the docker daemon can be access on the standard port on localhost
        value: 127.0.0.1
    sidecars:
    - name: dind
      image: docker:19.03.13-dind          # Docker already provides an image for running a Docker daemon
      command: [dockerd-entrypoint.sh]
      env:
        - name: DOCKER_TLS_CERTDIR         # Docker TLS env config
          value: ""
      securityContext:
        privileged: true                # the Docker daemon can only run in a privileged container
      # mirrorVolumeMounts will mount the same volumes specified in the main container
      # to the sidecar (including artifacts), at the same mountPaths. This enables
      # dind daemon to (partially) see the same filesystem as the main container in
      # order to use features such as docker volume binding.
      mirrorVolumeMounts: true

Hardwired Artifacts

使用 Argo,您可以使用任何您喜欢的容器镜像来生成任何类型的工件。然而,在实践中,我们发现某些类型的工件非常常见,因此内置了对 git、HTTP、GCS 和 S3 工件的支持。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: hardwired-artifact-
spec:
  entrypoint: hardwired-artifact
  templates:
  - name: hardwired-artifact
    inputs:
      artifacts:
      # Check out the master branch of the argo repo and place it at /src
      # revision can be anything that git checkout accepts: branch, commit, tag, etc.
      - name: argo-source
        path: /src
        git:
          repo: https://github.com/argoproj/argo-workflows.git
          revision: "master"
      # Download kubectl 1.8.0 and place it at /bin/kubectl
      - name: kubectl
        path: /bin/kubectl
        mode: 0755
        http:
          url: https://storage.googleapis.com/kubernetes-release/release/v1.8.0/bin/linux/amd64/kubectl
      # Copy an s3 compatible artifact repository bucket (such as AWS, GCS and MinIO) and place it at /s3
      - name: objects
        path: /s3
        s3:
          endpoint: storage.googleapis.com
          bucket: my-bucket-name
          key: path/in/bucket
          accessKeySecret:
            name: my-s3-credentials
            key: accessKey
          secretKeySecret:
            name: my-s3-credentials
            key: secretKey
    container:
      image: debian
      command: [sh, -c]
      args: ["ls -l /src /bin/kubectl /s3"]
上一篇下一篇

猜你喜欢

热点阅读