kubernetes

kubeflow功能架构

2024-10-09  本文已影响0人  sknfie

概述

kubeflow提供很多功能,这里按模块介绍下 Kubeflow 的几个核心组件:

Notebook Servers

notebook 完美的发挥了动态语言的交互性,提供了使用 jupyter notebook 快速构建云上的实验环境,这里创建一下自定义的镜像为例:
我们创建了一个test-jupyter名字的镜像,配置了一个 tensorflow 的镜像,点击启动,可以看到在kubeflow命名空间下已经创建我们的应用了:

kubectl get po -nkubeflow
NAME                                               READY   STATUS            RESTARTS   AGE
ml-pipeline-ui-artifact-6d7ffcc4b6-9kxkk           2/2     Running           0          48m
ml-pipeline-visualizationserver-84d577b989-5hl46   2/2     Running           0          48m
test-jupyter-0                                 0/2     PodInitializing   0          44s

创建完成后点击 connect 就可以进入我们创建的应用界面中了。
在 jupyterlab 环境中开发人员可以很方便的进行算法实验,同时由于运行在云上利用 k8s api甚至可以很方便构建k8s资源,比如通过 kfserving 创建一个ML服务。

AutoML

AutoML 是机器学习比较热的领域,主要用来模型自动优化和超参数调整,这里其实是用的 Katib来实现的,一个基于k8s的 AutoML 项目,详细见https://github.com/kubeflow/katib

Katib 主要提供了 超参数调整(Hyperparameter Tuning),早停法(Early Stopping)和神经网络架构搜索(Neural Architecture Search)

这里以一个随机搜索算法为例:

apiVersion: "kubeflow.org/v1beta1"
kind: Experiment
metadata:
  namespace: kubeflow
  name: random-example
spec:
  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: Validation-accuracy
    additionalMetricNames:
      - Train-accuracy
  algorithm:
    algorithmName: random
  parallelTrialCount: 3
  maxTrialCount: 12
  maxFailedTrialCount: 3
  parameters:
    - name: lr
      parameterType: double
      feasibleSpace:
        min: "0.01"
        max: "0.03"
    - name: num-layers
      parameterType: int
      feasibleSpace:
        min: "2"
        max: "5"
    - name: optimizer
      parameterType: categorical
      feasibleSpace:
        list:
          - sgd
          - adam
          - ftrl
  trialTemplate:
    primaryContainerName: training-container
    trialParameters:
      - name: learningRate
        description: Learning rate for the training model
        reference: lr
      - name: numberLayers
        description: Number of training model layers
        reference: num-layers
      - name: optimizer
        description: Training model optimizer (sdg, adam or ftrl)
        reference: optimizer
    trialSpec:
      apiVersion: batch/v1
      kind: Job
      spec:
        template:
          spec:
            containers:
              - name: training-container
                image: docker.io/kubeflowkatib/mxnet-mnist:v1beta1-45c5727
                command:
                  - "python3"
                  - "/opt/mxnet-mnist/mnist.py"
                  - "--batch-size=64"
                  - "--lr=${trialParameters.learningRate}"
                  - "--num-layers=${trialParameters.numberLayers}"
                  - "--optimizer=${trialParameters.optimizer}"
            restartPolicy: Never

Experiments and Pipelines

experiments 为我们提供了一个可以创建实验空间功能, pipeline 定义了算法组合的模板,通过 pipeline 我们可以将算法中各处理模块按特定的拓扑图的方式组合起来。

这里可以看看官方提供的几个 pipeline 例子:

kubeflow pipeline 本质是基于 argo workflow 实现,由于我们的kubeflow是基于kind上构建的,容器运行时用的containerd,而workflow默认的pipeline执行器是docker,因此有些特性不兼容,这块可以见 argo workflow 官方说明:https://argoproj.github.io/argo-workflows/workflow-executors/ 这里我是把 workflow 的 containerRuntimeExecutor 改成了 k8sapi。但 k8sapi 由于在 workflow 是二级公民,因此有些功能不能用,比如 kubeflow pipeline 在 input/output 的 artifacts 需要用到 docker cp 命令,可以参考这个issue: argoproj/argo-workflows#2685 (comment)

由于以上原因 kubeflow 默认给的几个案例并没有用 volumes 是无法在 kind 中运行起来,这里我们基于 argo workflow 语法自己实现一个 pipeline

基于pipeline构建一个的工作流水

第一步,构建一个 workflow pipeline 文件:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: kubeflow-test-
spec:
  entrypoint: kubeflow-test
  templates:
  - name: kubeflow-test
    dag:
      tasks:
      - name: print-text
        template: print-text
        dependencies: [repeat-line]
      - {name: repeat-line, template: repeat-line}
  - name: repeat-line
    container:
      args: [--line, Hello, --count, '15', --output-text, /gotest/outputs/output_text/data]
      command:
      - sh
      - -ec
      - |
        program_path=$(mktemp)
        printf "%s" "$0" > "$program_path"
        python3 -u "$program_path" "$@"
      - |
        def _make_parent_dirs_and_return_path(file_path: str):
            import os
            os.makedirs(os.path.dirname(file_path), exist_ok=True)
            return file_path

        def repeat_line(line, output_text_path, count = 10):
            '''Repeat the line specified number of times'''
            with open(output_text_path, 'w') as writer:
                for i in range(count):
                    writer.write(line + '\n')

        import argparse
        _parser = argparse.ArgumentParser(prog='Repeat line', description='Repeat the line specified number of times')
        _parser.add_argument("--line", dest="line", type=str, required=True, default=argparse.SUPPRESS)
        _parser.add_argument("--count", dest="count", type=int, required=False, default=argparse.SUPPRESS)
        _parser.add_argument("--output-text", dest="output_text_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
        _parsed_args = vars(_parser.parse_args())

        _outputs = repeat_line(**_parsed_args)
      image: python:3.7
      volumeMounts:
      - name: workdir
        mountPath: /gotest/outputs/output_text/
    volumes:
      - name: workdir
        persistentVolumeClaim:
          claimName: kubeflow-test-pv
    metadata:
      annotations: 
  - name: print-text
    container:
      args: [--text, /gotest/outputs/output_text/data]
      command:
      - sh
      - -ec
      - |
        program_path=$(mktemp)
        printf "%s" "$0" > "$program_path"
        python3 -u "$program_path" "$@"
      - |
        def print_text(text_path): # The "text" input is untyped so that any data can be printed
            '''Print text'''
            with open(text_path, 'r') as reader:
                for line in reader:
                    print(line, end = '')

        import argparse
        _parser = argparse.ArgumentParser(prog='Print text', description='Print text')
        _parser.add_argument("--text", dest="text_path", type=str, required=True, default=argparse.SUPPRESS)
        _parsed_args = vars(_parser.parse_args())

        _outputs = print_text(**_parsed_args)
      image: python:3.7
      volumeMounts:
      - name: workdir
        mountPath: /gotest/outputs/output_text/
    volumes:
      - name: workdir
        persistentVolumeClaim:
          claimName: kubeflow-test-pv
    metadata:
      annotations: 

第二步,定义好 pipeline 文件后可以创建pipeline:

第三步,启动一个pipeline:

启动 pipeline 除了单次运行模式 one-off,也支持定时器循环模式 Recurring,这块可以根据自己的需求确定。

查看运行结果:

运行完后,可以将实验进行归档(Archived)。

MLOps 工作流

这是一个 google 提供的 level 1 级别的机器学习流水线自动化,整个流水线包括以下几部分:

基于上述功能描述我们其实可以基于 kubeflow 的 pipelinekfserving 功能轻松实现一个简单的 MLOps 流水线发布流程。不过,值得注意的是,DevOps 本身并不仅仅是一种技术,同时是一种工程文化,所以在实践落地中需要团队各方的协同分阶段的落地。这块可以参考《MLOps: Continuous delivery and automation pipelines in machine learning》《Hidden Technical Debt in Machine Learning Systems》

上一篇 下一篇

猜你喜欢

热点阅读