大数据后端砖头java基础

Yarn上运行Hello World

2022-11-23  本文已影响0人  pq217

前言

上文提到yarn类似一个分布式操作系统,那么我们就可以自定义写一些应用在这个操作系统上运行

当然也不能太过随意写,我们要运行在操作系统上就必然要遵守操作系统本身的规矩

Yarn

Yarn体系中,用户的主程序被称作ApplicationMaster,当然我们可以在ApplicationMaster中继续向RM申请资源来执行子程序,比如MapReduce中的MapTask和ReduceTask都属于子程序。

这就好比我们平时写java,在main方法主线程中可以创建子线程跑一些逻辑

Hello World

接下来我们就尝试写一个简单应用(输出Hello World),运行在yarn中,我们先不考虑使用子程序,直接在ApplicationMaster中输出Hello World

ApplicationMaster

写一个Hello World应用再简单不过了:

public class MyAppMaster {
    public static void main(String[] args) {
        System.out.println("HELLO WORLD");
    }
}

但还是那句话,在yarn上运行就要遵守人家的规矩,而yarn规定:

ApplicationMaster程序运行前需要向RM注册,运行结束后需要取消注册

也就是说程序不是你想跑就能跑,你得告诉人家资源管理器一声,否则人家队伍怎么带?

注册的相关逻辑如果真自己写还挺复杂,但好在hadoop为我们提供了客户端工具,我们引入依赖就方便了

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-yarn-client</artifactId>
    <version>3.1.3</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.1.3</version>
</dependency>

最终ApplicationMaster代码如下(就是增加了注册到RM和取消注册)

public class MyAppMaster {

    /**
     * AppMaster 程序入口
     * @param args 执行参数
     */
    public static void main(String[] args) {
        MyAppMaster master = new MyAppMaster();
        master.run();
    }

    /**
     * AppMaster 运行
     */
    public void run() {
        try {
            // 开启am-rm client,建立rm-am的通道,用于注册AM
            AMRMClientAsync amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, null);
            amRmClient.init(new Configuration());
            amRmClient.start();
            String hostName = NetUtils.getHostname();
            // 注册至RM
            amRmClient.registerApplicationMaster(hostName, -1, null);
            // 运行程序
            doRun();
            // 解除注册
            amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "SUCCESS", null);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 实际运行程序,就一个输出
     */
    private void doRun() {
        System.out.println("HELLO WORLD");
    }
}

到此我们的应用程序就写完了,并且遵守了yarn的规矩

YarnClient

应用程序写完了,怎么把程序部署到yarn上运行呐?

yarn又有规定了:

想让你的程序在我的平台上跑,需要你在RM上创建应用,并指定好应用名称、运行环境、程序(jar包)位置、启动命令、所需资源等

当然这些数据的提交是有一定格式的,就像我们前端对接后端api,肯定是有一个json格式

索性我们不要考虑这复杂的格式,因为hadoop-yarn-client依赖同样帮我们封装好了,就好似有了sdk,写写代码就可以和RM对接了,而这个负责对接RM上传应用程序和启动参数的代码,一般我们叫它:YarnClient

我们开始写代码实现这个YarnClient

1.配置

首先我们要与RM沟通创建应用,首先要搞清楚RM在哪才能和它交互,所以先配置一下RM的IP地址

Configuration conf = new Configuration();
// 设置rm所在的ip地址
conf.set("yarn.resourcemanager.hostname", "192.168.10.101");

其中192.168.10.101就是你运行RM的机器IP地址

2.申请应用

有了地址,就可以申请应用,这一步直接使用hadoop-yarn-client依赖的工具即可

// 创建客户端
YarnClient yarnClient = YarnClient.createYarnClient();
// 初始配置
yarnClient.init(conf);
// 开启(建立连接)
yarnClient.start();
// 向RM发送请求创建应用
YarnClientApplication application = yarnClient.createApplication();
// 准备应用提交上下文(RM要求你提交的信息格式)
ApplicationSubmissionContext applicationSubmissionContext = application.getApplicationSubmissionContext();
// 获取分配的应用id
ApplicationId appId = applicationSubmissionContext.getApplicationId();
log.info("appId: {}", appId);

其中ApplicationId就是RM给我们分配的应用ID,ApplicationSubmissionContext就是我们要提交的应用相关信息的载体

所以接下来就是给applicationSubmissionContext填充应用名称、运行环境、程序(jar包)位置、启动命令、所需资源等信息再次提交给RM

3.设置应用名称

应用名称就起个"Hello World"

// 设置应用名称
applicationSubmissionContext.setApplicationName("Hello World");
4.设置程序(jar包)位置

这一步最重要,你得告诉RM你得程序在哪,一般都存在HDFS上,因为我懒着去上传,写了一个本地传送到HDFS的方法

// 即上一步写的AppMaster jar包的本地位置
String jarPath = "E:\\projects\\hadoop2\\MyYarnApp\\target\\my-yarn-app.jar";
String jarName = "my-yarn-app.jar";
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
    put(jarName, addLocalToHdfs(jarPath, jarName));
}};

其中addLocalToHdfs就是上传到HDFS,并获取HDFS路径

private LocalResource addLocalToHdfs(String jarPath, String jarName) throws IOException, URISyntaxException, InterruptedException {
    //获取文件系统
    Configuration configuration = new Configuration();
    //NameNode的ip和端口
    FileSystem fs  = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
    // 目标路径
    String dst =
            "hello/" + jarName;
    Path dstPath =
            new Path(fs.getHomeDirectory(), dst);
    // 上传
    fs.copyFromLocalFile(new Path(jarPath), dstPath);
    FileStatus scFileStatus = fs.getFileStatus(dstPath);
    // 关闭
    fs.close();
    LocalResource scRsrc = LocalResource.newInstance(
                    URL.fromURI(dstPath.toUri()),
                    LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
                    scFileStatus.getLen(), scFileStatus.getModificationTime());
    return scRsrc;
}

这一步需要引入hdfs-client依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs-client</artifactId>
    <version>3.1.3</version>
</dependency>
5.设置程序环境

这一步同样比较重要,我们需要设置程序运行的环境,jdk、yarn包什么的,设置了CLASSPATH

Map<String, String> env = new HashMap<>();
// 任务的运行依赖jar包的准备
StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
        .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
// yarn依赖包
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
    classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
    classPathEnv.append(c.trim());
}
env.put("CLASSPATH", classPathEnv.toString());
6.设置启动脚本

这一步一样至关重要,我们要告诉RM我们的程序怎么启动,因为Yarn不光支持java包这一种程序,所以我们要写java的启动命令,可以通过-Xms -Xmx等设置启动jvm参数

List<String> commands = new ArrayList<String>() {{
    add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx300m me.pq.yarn.MyAppMaster");
}};
7.配置Container启动上下文

资源、环境、启动命令等就组成了一个Container(AM的Container)启动的所需参数,把它们打包为container启动上下文,通过setAMContainerSpec设置到要提交的参数中

ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
        localResources, env, commands, null, null, null);
// 准备am Container的运行环境
applicationSubmissionContext.setAMContainerSpec(amContainer);
8.设置am程序所需硬件资源

准备好了所有启动程序的信息,下一步就是告诉RM你这个AppMaster需要多少硬件资源,这样RM才能给你找合适的节点运行你的程序,通过setResource设置到要提交的参数中

int memory = 1024;
int vCores = 2;
applicationSubmissionContext.setResource(Resource.newInstance(memory, vCores));
9.提交作业

完事具备,提交给RM你的程序就会被跑起来了

yarnClient.submitApplication(applicationSubmissionContext);
完整代码

YarnClient完整代码如下

package me.pq.yarn;

/**
 * @Author pq217
 * @Date 2022/11/18 17:47
 * @Description
 */
public class MyYarnClient {

    private static Logger log = LoggerFactory.getLogger(MyYarnClient.class);

    public static void main(String[] args) {
        MyYarnClient client = new MyYarnClient();
        try {
            client.run();
        } catch (Exception e) {
            log.error("client run exception , please check log file.", e);
        }
    }

    /**
     * 客户端运行
     * @throws IOException
     * @throws YarnException
     * @throws URISyntaxException
     * @throws InterruptedException
     */
    public void run() throws IOException, YarnException, URISyntaxException, InterruptedException {
        /**=====1.配置=====**/
        Configuration conf = new Configuration();
        // 设置rm所在的ip地址
        conf.set("yarn.resourcemanager.hostname", "192.168.10.101");
        /**=====2.申请app=====**/
        // 创建YarnClient和ResourceManager进行交互
        YarnClient yarnClient = YarnClient.createYarnClient();
        // 初始配置
        yarnClient.init(conf);
        // 开启(建立连接)
        yarnClient.start();
        // 向RM发送请求创建应用
        YarnClientApplication application = yarnClient.createApplication();
        // 准备应用提交上下文(RM要求你提交的信息格式)
        ApplicationSubmissionContext applicationSubmissionContext = application.getApplicationSubmissionContext();
        // 获取分配的应用id
        ApplicationId appId = applicationSubmissionContext.getApplicationId();
        log.info("appId: {}", appId);
        /**=====3.设置应用名称=====**/
        // 设置应用名称
        applicationSubmissionContext.setApplicationName("Hello World");
        /**=====4.准备程序(jar包)=====**/
        String jarPath = "E:\\projects\\hadoop2\\MyYarnApp\\target\\my-yarn-app.jar";
        String jarName = "my-yarn-app.jar";
        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
            put(jarName, addLocalToHdfs(jarPath, jarName));
        }};
        /**=====5.准备程序环境=====**/
        Map<String, String> env = new HashMap<>();
        // 任务的运行依赖jar包的准备
        StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
                .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
        // yarn依赖包
        for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
            classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
            classPathEnv.append(c.trim());
        }
        env.put("CLASSPATH", classPathEnv.toString());

        /**=====6.准备启动命令=====**/
        List<String> commands = new ArrayList<String>() {{
            add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx300m me.pq.yarn.MyAppMaster");
        }};

        /**=====7.构造am container运行资源+环境+脚本=====**/
        ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
                localResources, env, commands, null, null, null);
        // 准备am Container的运行环境
        applicationSubmissionContext.setAMContainerSpec(amContainer);
        /**=====8.设置am程序所需资源=====**/
        int memory = 1024;
        int vCores = 2;
        applicationSubmissionContext.setResource(Resource.newInstance(memory, vCores));
        /**=====9.提交并开始作业=====**/
        yarnClient.submitApplication(applicationSubmissionContext);
        /**=====10.查询作业是否完成=====**/
        for (;;) {
            Thread.sleep(500);
            ApplicationReport applicationReport = yarnClient.getApplicationReport(appId);
            YarnApplicationState state = applicationReport.getYarnApplicationState();
            FinalApplicationStatus status = applicationReport.getFinalApplicationStatus();
            if (state.equals(YarnApplicationState.FINISHED)) {
                if (status.equals(FinalApplicationStatus.SUCCEEDED)) {
                    log.info("程序运行成功!");
                    break;
                } else  {
                    log.error("程序运行失败!");
                    break;
                }
            } else if (state.equals(YarnApplicationState.FAILED) || state.equals(YarnApplicationState.KILLED) ) {
                log.error("程序运行失败!");
                break;
            }
            log.info("计算中...");
        }
    }

    /**
     * 上传本地jar包到hdfs
     * @param jarPath
     * @param jarName
     * @throws IOException
     */
    private LocalResource addLocalToHdfs(String jarPath, String jarName) throws IOException, URISyntaxException, InterruptedException {
        //获取文件系统
        Configuration configuration = new Configuration();
        //NameNode的ip和端口
        FileSystem fs  = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
        // 目标路径
        String dst =
                "hello/" + jarName;
        Path dstPath =
                new Path(fs.getHomeDirectory(), dst);
        // 上传
        fs.copyFromLocalFile(new Path(jarPath), dstPath);
        FileStatus scFileStatus = fs.getFileStatus(dstPath);
        // 关闭
        fs.close();
        LocalResource scRsrc = LocalResource.newInstance(
                        URL.fromURI(dstPath.toUri()),
                        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
                        scFileStatus.getLen(), scFileStatus.getModificationTime());
        return scRsrc;
    }

}

测试

应用写好了,上传应用的client也写好了,下面测一下

首先使用maven-assembly插件给程序打jar包

mvn clean package

其次,本地idea直接运行YarnClient的main方法

注意替换一下代码中的jar包地址和名称,以及AppMaster的全路径名,以及hadoop的ip地址等信息

MyYarnClient的运行结果idea输出如下

MyYarnClient

打开yarn-web再看一下日志

yarn-web

成功实现了一个运行在Yarn上的小程序!

分布式计算

以上,我们完成了一个简单的程序运行在yarn上,但其实这个应用程序实际上只在一个节点上实际运行了System.out.println的代码,这就像去了一趟沃尔玛,买了瓶矿泉水

yarn的优势是可以让我们的计算程序分给多个机器节点去执行,我们继续改造一下AppMaster,实现如下功能:

ChildTask

首先编写子任务,我为了省事,直接和AppMaster放一个项目中了,很简单的代码,创建一个/child/+服务器hostName的文件夹

public class ChildTask {

    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
        //获取文件系统
        Configuration configuration = new Configuration();
        //NameNode的ip和端口
        FileSystem fs  = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
        // hostName
        String hostName = NetUtils.getHostname();
        // 创建一个文件夹
        fs.mkdirs(new Path("/child/"+hostName));
        fs.close();
    }

}

AppMaster

接下来要改造AppMaster,原来只是输出Hello World,现在要向RM申请Container用来执行子任务

container请求

首先申请Container需要向RM申请,所以使用amRmClient即可发出请求

// 两个子任务,对应两个container
int childTaskNum = 2;
for (int i = 0; i < childTaskNum; i++) {
    // 向rm申请一个1M内存,1个CPU的资源容器
    int memory = 1024;
    int vCores = 1;
    AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(Resource.newInstance(memory, vCores), null, null, Priority.UNDEFINED);
    amRmClient.addContainerRequest(containerRequest);
}
rm回调

申请成功后,当rm分配出container时还要进行相关回调处理,所以amRmClient定义时要加上一个回调处理类

// rm回调处理器
AMRMClientAsync.AbstractCallbackHandler rmCallBackHandler = new RMCallBackHandler();
// 开启am-rm client,建立rm-am的通道,用于注册AM, allocListener负责处理AM的响应
AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCallBackHandler);

RMCallBackHandler是rm响应的处理器

private class RMCallBackHandler extends AMRMClientAsync.AbstractCallbackHandler {

重点要实现两个方法

onContainersCompleted

这个方法主要是子任务运行完成,我们在AppMaster加几个内部变量控制所有子任务完成再输出"Hello World"

// 充当锁
private Object lock = new Object();
// 任务个数
private int childTaskNum = 2;
// 已完成任务个数
private int childTaskCompletedNum = 0;

RMCallBackHandler的onContainersCompleted方法实现如下:

@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
    for (ContainerStatus status : statuses) {
        synchronized (lock) {
            System.out.println(++childTaskCompletedNum + " container completed");
            // 子任务全部完成
            if (childTaskCompletedNum == childTaskNum) {
                lock.notify();
            }
        }
    }
}

doRun方法修改为如下

private void doRun(AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient) throws InterruptedException {
    // 申请两个资源容器
    for (int i = 0; i < childTaskNum; i++) {
        // 向rm申请一个1M内存,1个CPU的资源容器
        int memory = 1024;
        int vCores = 1;
        AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(Resource.newInstance(memory, vCores), null, null, Priority.UNDEFINED);
        amRmClient.addContainerRequest(containerRequest);
    }
    synchronized (lock) {
        // 等待子任务完成
        lock.wait();
    }
    System.out.println("HELLO WORLD");
}

到此即可实现申请两个container,两个container运行完后再执行输出"HELLO WORLD"

onContainersAllocated

这是RMCallBackHandler中要实现的重点方法,当container分配成功后要做什么?

思路很简单,container分配之后当然要在对应的容器上运行我们的子任务:ChildTask,而子任务的运行一定是在container所指定的NM节点上,所以我们要提前初始化一个NM客户端:
加一个内部属性以供AppMaster整个类使用

NMClientAsyncImpl nmClientAsync;

此时AppMaster run方法修改如下

public void run() {
    try {
        // rm回调处理器
        AMRMClientAsync.AbstractCallbackHandler rmCallBackHandler = new RMCallBackHandler();
        // 开启am-rm client,建立rm-am的通道,用于注册AM, allocListener负责处理AM的响应
        AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCallBackHandler);
        amRmClient.init(new Configuration());
        amRmClient.start();
        String hostName = NetUtils.getHostname();
        // 注册至RM
        amRmClient.registerApplicationMaster(hostName, -1, null);
        // 初始化nmClient
        nmClientAsync = new NMClientAsyncImpl(new NMCallBackHandler());
        nmClientAsync.init(conf);
        nmClientAsync.start();
        // 运行程序
        doRun(amRmClient);
        // 解除注册
        amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "SUCCESS", null);
        // am-rm客户端关闭
        amRmClient.stop();
        // nm客户端关闭
        nmClientAsync.stop();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

其中NMCallBackHandler是一个NM响应的Callback,可以通过实现其方法在container声明周期加入一些逻辑

private class NMCallBackHandler extends NMClientAsync.AbstractCallbackHandler {
}

接下来就是实现onContainersAllocated,代码如下

@Override
public void onContainersAllocated(List<Container> containers) {
    try {
        for (Container container : containers) {
            System.out.println("container allocated, Node=" + container.getNodeHttpAddress());
            // 构建AM<->NM客户端并开启
            // 还是YarnClient containerLaunchContext那一套,这把直接去HDFS系统取文件,因为和YarnClient打包到一个jar上传
            Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
                //NameNode的ip和端口
                FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), conf, "root");
                URI appUri = new URI("/user/root/hello/my-yarn-app.jar");
                FileStatus fileStatus = fs.getFileStatus(new Path(appUri));
                put("my-yarn-app.jar", LocalResource.newInstance(
                        URL.fromURI(appUri),
                        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
                        fileStatus.getLen(), fileStatus.getModificationTime()));
            }};
            Map<String, String> env = new HashMap<>();
            StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
                    .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
            for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
                classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
                classPathEnv.append(c.trim());
            }
            env.put("CLASSPATH", classPathEnv.toString());
            List<String> commands = new ArrayList<String>() {{
                // 传入ip地址作为参数
                add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx200m me.pq.yarn.ChildTask");
            }};
            ContainerLaunchContext containerLaunchContext = ContainerLaunchContext.newInstance(
                    localResources, env, commands, null, null, null);
            // nm节点启动container
            nmClientAsync.startContainerAsync(container, containerLaunchContext);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

代码就不详解了,和YarnClient提交的ContainerLaunchContext写法基本一致(最终运行me.pq.yarn.ChildTask而不是MyAppMaster),最后使用NM客户端的startContainerAsync方法让子任务运行在NM上

值得一提的是我的ChildTask和AppMaster都在一个jar包下,所以这里不用上传了,直接去HDFS取即可

测试

代码写完了,测试一下,mvn clean package然后执行MyYarnClient main方法

idea输出

MyYarnClient

HDFS-WEB上看一下子任务的文件夹创建是否成功

HDFS-WEB

可见文件夹创建出来了

YARN-WEB看一下AppMaster的日志

YARN-WEB AppMaster

到此,实现了一个运行在yarn上的简单分布式计算程序~

上一篇下一篇

猜你喜欢

热点阅读