Spring Cloud

大数据平台 —— 调度系统 - Azkaban

2020-11-15  本文已影响0人  端碗吹水

Azkaban介绍

常见的开源调度框架:

开源调度框架对比:


image.png

Azkaban简介:

Azkaban优点:

Azkaban适用场景:


Azkaban架构与调度流程

Azkaban架构图如下:


image.png

其中AzkabanWebServer可以说是整个Azkaban工作流系统的主要管理者,它负责project管理、用户登录认证、定时执行工作流、跟踪工作流执行进度等一系列任务。

同时,它还提供Web服务操作的接口,利用该接口,用户可以使用curl或其他ajax的方式,来执行azkaban的相关操作。操作包括:用户登录、创建project、上传workflow、执行workflow、查询workflow的执行进度、杀掉workflow等一系列操作,且这些操作的返回结果均是json的格式。

并且Azkaban使用方便,Azkaban使用以.job为后缀名的键值属性文件来定义工作流中的各个任务,以及使用dependencies属性来定义作业间的依赖关系链。这些作业文件和关联的代码最终以*.zip的方式通过Azkaban UI上传到Web服务器上。

Azkaban有三种部署模式:

Azkaban执行流程图:


image.png
  1. 用户通过界面或者API提交任务到Webserver,Webserver根据内存中缓存的各Executor的资源状态(Webserver有一个线程会遍历各个active executor,去发送http请求获取其资源状态信息缓存到内存中),按照选择策略(包括executor资源状态、最近执行流个数等)选择一个合适的executor下发工作流;
  2. executor判断是否设置作业粒度分配,如果未设置作业粒度分配,则在当前executor执行所有作业;如果设置了作业粒度分配,则当前节点会成为作业分配的决策者,即分配节点;
  3. 分配节点从zookeeper获取各个executor的资源状态信息,然后根据策略选择一个executor分配作业;
  4. 被分配到作业的executor即成为执行节点,执行作业,然后更新数据库。

Azkaban核心交互流程:


image.png
  1. AzkabanServer主动调用Executor的API获取状态信息
  2. 根据计算规则选择执行的Executor Server(任务数量、内存和CPU等资源、最近分配的时间)
  3. 调度WorkFlow到Executor执行,Executor执行并监控任务

Azkaban安装部署

这里采用的是Two mode部署模式,因为Multi mode只不过是在该基础上部署了多个ExecutorServer,也就是说在Two mode基础上增加ExecutorServer节点就是Multi mode了。

编译Azkaban源码

首先,准备好Java和Maven:

[root@azkaban01 ~]# java -version
java version "1.8.0_261"
Java(TM) SE Runtime Environment (build 1.8.0_261-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.261-b12, mixed mode)
[root@azkaban01 ~]# mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/maven
Java version: 1.8.0_261, vendor: Oracle Corporation, runtime: /usr/local/jdk/1.8/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.el7.x86_64", arch: "amd64", family: "unix"
[root@azkaban01 ~]# 

安装一些工具:

[root@azkaban01 ~]# yum install -y git gcc-c++

然后从GitHub上拉取Azkaban的源码:

[root@hadoop01 ~]# cd /usr/local/src
[root@hadoop01 /usr/local/src]# git clone https://github.com/azkaban/azkaban.git

进入源码目录,在settings.gradle文件的开头增加插件仓库配置:

[root@hadoop01 /usr/local/src]# cd azkaban
[root@azkaban01 /usr/local/src/azkaban]# vim settings.gradle 
pluginManagement {
    repositories {
        maven {
            url 'https://maven.aliyun.com/repository/gradle-plugin'
        }
        gradlePluginPortal()
    }
}

...

然后修改build.gradle文件中的仓库配置:

[root@azkaban01 /usr/local/src/azkaban]# vim build.gradle
buildscript {
  repositories {
    maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
    maven { url 'http://maven.aliyun.com/nexus/content/repositories/jcenter' }
    maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }
    maven { url 'https://maven.aliyun.com/repository/google' }
    maven { url 'https://maven.aliyun.com/repository/jcenter' }
  }
  ...
}

...

allprojects {
  apply plugin: 'jacoco'

  repositories {
    mavenLocal()
    maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
    maven { url 'http://maven.aliyun.com/nexus/content/repositories/jcenter' }
    maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }
    maven { url 'https://maven.aliyun.com/repository/google' }
    maven { url 'https://maven.aliyun.com/repository/jcenter' }
  }
}

gradle/wrapper/gradle-wrapper.properties文件中会定义从远程下载gradle,如果下载不下来的话,可以通过别的方式下载,然后上传到相应的目录下,并在该文件指定从本地文件系统中加载gradle的安装包:

[root@azkaban01 /usr/local/src/azkaban]# vim gradle/wrapper/gradle-wrapper.properties
distributionUrl=file:///usr/local/src/gradle-4.6-all.zip

完成以上的修改后,就可以执行如下命令开始编译安装了:

[root@azkaban01 /usr/local/src/azkaban]# ./gradlew build installDist -x test

打包编译的过程中,有可能会报如下错误:

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':azkaban-web-server:nodeSetup'.
> Could not resolve all files for configuration ':azkaban-web-server:detachedConfiguration1'.
   > Could not download node-linux-x64.tar.gz (org.nodejs:node:8.10.0)
      > Could not get resource 'https://nodejs.org/dist/v8.10.0/node-v8.10.0-linux-x64.tar.gz'.
         > Read timed out

这是因为系统中没有安装NodeJS,而azkaban-web-server这个模块需要用到NodeJS来编译web代码。由于无法通过远程下载NodeJS的安装包就会报这个错。解决方式也简单,在系统中安装NodeJS即可。步骤如下:

[root@azkaban01 /usr/local/src/azkaban]# curl --silent --location https://rpm.nodesource.com/setup_14.x | bash -
[root@azkaban01 /usr/local/src/azkaban]# yum install -y nodejs
[root@azkaban01 /usr/local/src/azkaban]# npm -v
6.14.8
[root@azkaban01 /usr/local/src/azkaban]# node -v
v14.15.0
[root@azkaban01 /usr/local/src/azkaban]# 

设置npm使用淘宝镜像仓库:

[root@azkaban01 /usr/local/src/azkaban]# npm config set registry https://registry.npm.taobao.org
[root@azkaban01 /usr/local/src/azkaban]# npm config get registry 
https://registry.npm.taobao.org/
[root@azkaban01 /usr/local/src/azkaban]# 

打开azkaban-web-server模块下的build.gradle文件,修改原本的仓库配置,并注释掉node相关的配置。如下所示:

[root@azkaban01 /usr/local/src/azkaban]# vim azkaban-web-server/build.gradle
buildscript {
    repositories {
        maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
        maven { url 'http://maven.aliyun.com/nexus/content/repositories/jcenter' }
        maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }
        maven { url 'https://maven.aliyun.com/repository/google' }
        maven { url 'https://maven.aliyun.com/repository/jcenter' }
        mavenCentral()
    }
    ...
}

...

//node {
    // Version of node to use.
    //version = '8.10.0'

    // Version of npm to use.
    //npmVersion = '5.6.0'

    // Base URL for fetching node distributions (change if you have a mirror).
    //distBaseUrl = 'https://nodejs.org/dist'

    // If true, it will download node using above parameters.
    // If false, it will try to use globally installed node.
    //download = true

    // Set the work directory for unpacking node
    //workDir = file("${project.buildDir}/nodejs")

    // Set the work directory where node_modules should be located
    //nodeModulesDir = file("${project.projectDir}")
//}

然后重新执行打包编译命令:

[root@azkaban01 /usr/local/src/azkaban]# ./gradlew build installDist -x test

最终打包编译成功:


image.png

此时在核心组件的build/distributions目录下,可以看到打包好的安装包:

[root@azkaban01 /usr/local/src/azkaban]# ls azkaban-exec-server/build/distributions/
azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz  azkaban-exec-server-0.1.0-SNAPSHOT.zip
[root@azkaban01 /usr/local/src/azkaban]# ls azkaban-web-server/build/distributions/
azkaban-web-server-0.1.0-SNAPSHOT.tar.gz  azkaban-web-server-0.1.0-SNAPSHOT.zip
[root@azkaban01 /usr/local/src/azkaban]# ls azkaban-db/build/distributions/
azkaban-db-0.1.0-SNAPSHOT.tar.gz  azkaban-db-0.1.0-SNAPSHOT.zip
[root@azkaban01 /usr/local/src/azkaban]# 

安装部署Azkaban

解压安装包:

[root@azkaban01 /usr/local/src/azkaban]# mkdir /usr/local/azkaban
[root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-db/build/distributions/azkaban-db-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban
[root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-exec-server/build/distributions/azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban
[root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-web-server/build/distributions/azkaban-web-server-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban

为了查看方便,将解压后的目录重命名:

[root@azkaban01 /usr/local/src/azkaban]# cd /usr/local/azkaban/
[root@azkaban01 /usr/local/azkaban]# mv azkaban-db-0.1.0-SNAPSHOT/ azkaban-db
[root@azkaban01 /usr/local/azkaban]# mv azkaban-exec-server-0.1.0-SNAPSHOT/ azkaban-exec-server
[root@azkaban01 /usr/local/azkaban]# mv azkaban-web-server-0.1.0-SNAPSHOT/ azkaban-web-server

首先,到MySQL中创建azkaban数据库,然后将azkaban-db目录下的create-all-sql-0.1.0-SNAPSHOT.sql文件给导入到MySQL中:

create database azkaban;
use azkaban;
source /usr/local/azkaban/azkaban-db/create-all-sql-0.1.0-SNAPSHOT.sql

然后配置azkaban-exec-server

[root@azkaban01 /usr/local/azkaban]# cd azkaban-exec-server/
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# vim conf/azkaban.properties
# webserver的连接地址
azkaban.webserver.url=http://localhost:8081

database.type=mysql
mysql.port=3306
mysql.host=192.168.1.11
# MySQL8.x需要加时区参数,5.x则不需要
mysql.database=azkaban?serverTimezone=Asia/Shanghai
mysql.user=root
mysql.password=123456a.
mysql.numconnections=100

由于azkaban-exec-server默认使用的是5.x版本的MySQL驱动,而我这部署的MySQL是8.x版本的,所以还得替换一下MySQL驱动包:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# cp /usr/local/src/mysql-connector-java-8.0.21.jar lib/
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# rm -rf lib/mysql-connector-java-5.1.28.jar 

启动azkaban-exec-server:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/start-exec.sh

检查azkaban-exec-server进程是否正常运行:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# jps
2005 Jps
1982 AzkabanExecutorServer
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# netstat -lntp |grep 1982
tcp6       0      0 :::35195                :::*               LISTEN      1982/java           
tcp6       0      0 :::36304                :::*               LISTEN      1982/java           
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# 

通过API手动激活Executor Server:

$ curl http://localhost:35195/executor?action=activate

接着配置azkaban-webserver

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# cd ../azkaban-web-server/
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# vim conf/azkaban.properties 
database.type=mysql
mysql.port=3306
mysql.host=192.168.1.11
# MySQL8.x需要加时区参数,5.x则不需要
mysql.database=azkaban?serverTimezone=Asia/Shanghai
mysql.user=root
mysql.password=123456a.
mysql.numconnections=100

替换MySQL驱动包:

[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# cp /usr/local/src/mysql-connector-java-8.0.21.jar lib/
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# rm -rf lib/mysql-connector-java-5.1.28.jar

启动azkaban-webserver:

[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/start-web.sh

检查azkaban-webserver进程是否正常运行:

[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# jps
2201 Jps
2172 AzkabanWebServer
1982 AzkabanExecutorServer
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# netstat -lntp |grep 2172
tcp6       0      0 :::46136                :::*              LISTEN      2172/java           
tcp6       0      0 :::8081                 :::*              LISTEN      2172/java           
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# 

使用浏览器访问webserver的页面,会进入到登录页,默认的用户名和密码都是azkaban

image.png

登录成功进入到首页,如下:


image.png

提交Azkaban任务

关于Job的官方文档:

Azkaban工作流:

Azkaban任务类型:

单个任务

我们来通过WebServer的可视化界面提交一个最简单的command任务,首先创建任务定义文件:

$ vim cmd_test.job
type=command
command=sh job1.sh

编写一个简单的shell脚本:

$ vim job1.sh
#!/bin/sh
echo "hello azkaban"

将这两个文件打成一个zip包:


image.png

到WebServer页面上创建一个Project:


image.png

上传压缩包:


image.png

上传成功后,点击“Execute Flow” -> “Schedule”,通过配置crontab表达式定义调度的时间:

image.png

配置好表达式点击“Schedule”后,可以在“Scheduling”看到正在调度的任务:


image.png

点击“Flow”下的“cmd_test”,可以查看该任务的执行情况:


image.png

多个任务

以上演示了单个任务的定义、提交和调度,接下来演示下多个任务的定义、提交和调度,并且这多个任务之间还存在依赖关系,也就是任务之间的调度存在先后顺序。首先,创建任务文件:

$ vim job1.job
type=command
command=sh job1.sh

----------

$ vim job2.job
type=command
command=sh job2.sh
# 依赖job1,当job1调度执行完才会执行job2
dependencies=job1

----------

$ vim job3.job
type=command
command=sh job3.sh
dependencies=job1

----------

$ vim job4.job
type=command
command=sh job4.sh
dependencies=job2,job3

编写与任务对应的shell脚本:

$ vim job1.sh
#!/bin/sh
echo "job1 exec over"

----------

$ vim job2.sh
#!/bin/sh
echo "job2 exec over"

----------

$ vim job3.sh
#!/bin/sh
echo "job3 exec over"

----------

$ vim job4.sh
#!/bin/sh
echo "job4 exec over"

同样,将这些文件打成一个压缩包:


image.png

在WebServer上新建一个Project,并将压缩包上传:


image.png

此时展开job可以看到一个树状结构:


image.png

点击“Execute Flow”,可以看到任务之间的依赖图:


image.png

点击“Execute”执行任务,该方式是单次执行,不会调度执行。所有任务节点均执行成功,图中的节点都是绿色的:


image.png

在“Job List”可以看到任务列表,以及一些执行信息:


image.png

Azkaban用户代理

Azkaban代理用户:

编译用户代理模块:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# mkdir extlib
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# gcc /usr/local/src/azkaban/az-exec-util/src/main/c/execute-as-user.c -o extlib/execute-as-user
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# chmod 6050 extlib/execute-as-user

创建配置文件:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# vim plugins/jobtypes/commonprivate.properties
execute.as.user=true
azkaban.native.lib=/usr/local/azkaban/azkaban-exec-server/extlib/
azkaban.group.name=root

重启ExecuteServer:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/shutdown-exec.sh 
Killing executor. [pid: 1982], attempt: 1
shutdown succeeded
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/start-exec.sh 

激活ExecutorServer:

$ curl http://localhost:46176/executor?action=activate

重启WebServer:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# cd ../azkaban-web-server/
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/shutdown-web.sh 
Killing web-server. [pid: 2172], attempt: 1
shutdown succeeded
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/start-web.sh

接下来提交任务测试一下,创建任务定义文件:

$ vim proxy.job
type=command
command=sh test.sh

编写对应的shell脚本:

$ vim test.sh
#!/bin/sh
echo "----------------"
whoami
echo "----------------"

将其打成压缩包:


image.png

创建“Project”,并上传压缩包:


image.png

然后点击“Execute Flow” -> “Execute”执行该任务,此时会发现执行失败了:


image.png

查看日志可以看到不允许代理‘azkaban’用户:


image.png

到操作系统上,新建一个用户:

$ useradd hadoop

修改任务配置文件,指定代理用户,如下所示:

$ vim proxy.job
type=command
command=sh test.sh
user.to.proxy=hadoop

然后重新打包上传,重新执行该任务。这次任务执行成功,输出的日志如下:


image.png

以上的示例都是简单的执行一个shell脚本,如果想真正调度起一个MR任务其实也很简单,就只需要配置执行相应的命令就可以了。如下示例:

type=command
command=yarn jar /soft/home/hadoop-2.8.5/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.5.jar pi 16 1000
user.to.proxy=hadoop

关于Java操作Azkaban Api

除了可以在可视化的Azkaban WebServer界面上进行项目的创建、任务的上传/提交等操作外,Azkaban还支持通过HTTP API来完成这些操作。因为我们如果要开发自己的大数据平台,可能并不会使用Azkaban WebServer的可视化界面,而是希望在自己的大数据平台界面去与Azkaban进行交互,完成任务的调度管理。所以Azkaban提供了HTTP Api的支持,让我们可以轻松实现与自研平台的整合。

关于Azkaban Api的官方文档地址如下:

我这里准备了一个示例代码仓库,可以简单参考下:

上一篇下一篇

猜你喜欢

热点阅读