大数据大数据,机器学习,人工智能玩转大数据

Kylin构建流程分析-加载HFile到Hbase中(Load

2018-04-11  本文已影响271人  b00d1f0f0afd
麒麟出没,必有祥瑞

环境信息
系统:win10
代码编辑器:IDEA
kylin:2.3.0
hadoop:2.7.1

本文介绍了kylin构建的第四个阶段,根据cuboid文件创建Hfile并且将cuboid Hfile加载到hbase之中,也就是BatchCubingJobBuilder2类中的build方法的第四个阶段。

outputSide.addStepPhase3_BuildCube(result);
@Override
    public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
        return new IMRBatchCubingOutputSide2() {
            HBaseMRSteps steps = new HBaseMRSteps(seg);

            @Override
            public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
                jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
            }

            @Override
            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
                jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
            }

            @Override
            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
                // nothing to do
            }

            @Override
            public IMROutputFormat getOuputFormat() {
                return new HBaseMROutputFormat();
            }
        };
    }

用到的是HBaseMROutput2Transition类中的内部类,分为两个步骤:

1.createConvertCuboidToHfileStep

2.createBulkLoadStep

下面分析一下bulk load的代码。首先入口是 BatchCubingJobBuilder2 类中createBulkLoadStep方法。

public HadoopShellExecutable createBulkLoadStep(String jobId) {
        //实例化一个hadoop任务
        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);

        StringBuilder cmd = new StringBuilder();
        //设置前面保存的hfile路径
        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getHFilePath(jobId));
        //设置htable name
        appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
        //设置cube name
        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
        //设置cmd 参数
        bulkLoadStep.setJobParams(cmd.toString());
        //设置job 类
        bulkLoadStep.setJobClass(BulkLoadJob.class);
        return bulkLoadStep;
    }

上面代码中生成cmd.toString 参数的实例如下:

 -input hdfs://server1.fibo.com:8020/apps/kylin/kylin_metadata/kylin-c2974055-2ccf-4b06-a98b-6f14e946e1ca/unload/hfile
 -htablename KYLIN_BYH4SABC2Y -cubename unload

进入到BulkLoadJob类中,主要的run方法如下:

   @Override
    public int run(String[] args) throws Exception {
        Options options = new Options();

        options.addOption(OPTION_INPUT_PATH);
        options.addOption(OPTION_HTABLE_NAME);
        options.addOption(OPTION_CUBE_NAME);
        parseOptions(options, args);
        String tableName = getOptionValue(OPTION_HTABLE_NAME);
        // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
        // end with "/"
        String input = getOptionValue(OPTION_INPUT_PATH);
        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
        FsShell shell = new FsShell(conf);
        int exitCode = -1;
        int retryCount = 10;
        // 给上面的hfile文件赋予读权限
        while (exitCode != 0 && retryCount >= 1) {
            exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
            retryCount--;
            Thread.sleep(5000);
        } 
        if (exitCode != 0) {
            logger.error("Failed to change the file permissions: " + input);
            throw new IOException("Failed to change the file permissions: " + input);
        }
        String[] newArgs = new String[2];
        newArgs[0] = input;
        newArgs[1] = tableName;
        logger.debug("Start to run LoadIncrementalHFiles");
        //将Hfile输出格式的输出加载到现有表中的工具。
        int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), newArgs);
        logger.debug("End to run LoadIncrementalHFiles");
        return ret;
    }

可以看到,最后调用的是Hbase包的类LoadIncrementalHFiles,这是一个工具类,官方解释是“Tool to load the output of HFileOutputFormat into an existing table.” 。对于这个类的解释可以参考这个文章 HBase 写优化之 BulkLoad 实现数据快速入库

本文作者: 彭双宝
原文链接: http://blog.lovedata.net/5a9a3875.html

上一篇下一篇

猜你喜欢

热点阅读