Hbase学习笔记(六) Hbase与MR的结合使用

2020-08-11  本文已影响0人  做个合格的大厂程序员

需求,使用MR实现读取hbase表数据,只要某一个列族的数据,并且写入到另一张表中。

Main

package hbaseMR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HbaseMain  extends Configured implements Tool{

    public int run(String[] args) throws Exception {
        /**
         * Mr 执行的8个步骤
         * 1, 读取文件,解析成key,value对 K1,v1
         * 2.自定义mapper逻辑,接受k1,v2,转换成,k2,v2输出
         * 3.分区,相同的key的数据发送到同一个reduce里面去
         * 4.排序,对数据按照key2进行排序
         * 5.规约,调优的步骤,在map端相同的key2进行提前合并
         * 6.分组,相同的key的数据合并
         * 7.自定义reduce逻辑,接受k2,v2转换成k3,v3
         * 8.输出k3,v3保存
         * */

        Scan scan = new Scan();

        Job job = Job.getInstance(super.getConf(), "hbase_mapreduce");


        /*

        TableName table,
      Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass,
      job
        * */
        TableMapReduceUtil.initTableMapperJob("myuser",scan,HbaseReadMapper.class, Text.class, Put.class,job);


        TableMapReduceUtil.initTableReducerJob("myuser2",HbaseWriteReducer.class,job);

        job.setNumReduceTasks(1);

        boolean b = job.waitForCompletion(true);

        return b?0:1;

    }



    public static void main(String[] args) throws Exception {
        //创造一个hbase的配置
        Configuration configuration = HBaseConfiguration.create();

        configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");

        int run = ToolRunner.run(configuration,new HbaseMain(),args);

        System.exit(run);
    }
}

Mapper

package hbaseMR;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.List;

public class HbaseReadMapper extends TableMapper<Text, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        //获取到rowkey的字节数组
        byte[] rowkeyBytes = key.get();
        String rowkey = Bytes.toString(rowkeyBytes);

        Put put = new Put(rowkeyBytes);


        //选中我们结果当中所有对应的列,只要f1,中的name和age列
        List<Cell> cells = value.listCells();

        for (Cell cell : cells){

            //获取类族名
            byte[] family = cell.getFamily();

            //获取列名
            byte[] qualifier = cell.getQualifier();

            if("f1".equals(Bytes.toString(family))){
                if ("name".equals(Bytes.toString(qualifier)) || ("age".equals(Bytes.toString(qualifier)))){

                    //这里都是需要的cell
                    put.add(cell);
                }
            }
        }


        //判断put对象是否为空
        if (!put.isEmpty()){
            context.write(new Text(rowkey),put);
        }

    }
}

Reducer

package hbaseMR;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class HbaseWriteReducer extends TableReducer<Text, Put, ImmutableBytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {

        ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
        immutableBytesWritable.set(key.toString().getBytes());


        for (Put put : values){
            context.write(immutableBytesWritable,put);
        }
    }
}

HDFS导入数据到Hbase和Hbase导出到HDFS中

Main

package Hdfs2Hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Hbase2HDFS extends Configured implements Tool {


    public int run(String[] args) throws Exception {

        Job job = Job.getInstance(super.getConf(),"hdfs2Hbase");

        job.setInputFormatClass(TextInputFormat.class);

        TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hbase/input"));

        job.setMapperClass(HDFSMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        TableMapReduceUtil.initTableReducerJob("myuser2",HbaseReducer.class,job);

        job.setNumReduceTasks(1);

        boolean bl = job.waitForCompletion(true);

        return bl?0:1;
    }

    public static void main(String[] args) throws Exception {

        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
        int run = ToolRunner.run(configuration,new Hbase2HDFS(),args);
        System.exit(run);
    }
}

Mapper

package Hdfs2Hbase;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class HDFSMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        context.write(value,NullWritable.get());
    }
}

Reducer

package Hdfs2Hbase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class HbaseReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

        String[] split = key.toString().split("\t");

        //rowkey
        Put put =new Put(Bytes.toBytes(split[0]));

        put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());

        put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes());

        context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
    }
}

用Bulkload的方式直接输出Hfile供Hbase导入

Main

package bulkLoad;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class BulkLoadHbase extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        final String INPUT_PATH= "hdfs://hadoop01:9000/hbase/input";
        final String OUTPUT_PATH= "hdfs://hadoop01:9000/hbase/output_hfile";
        Configuration conf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("myuser2"));
        Job job= Job.getInstance(conf);
        job.setJarByClass(BulkLoadHbase.class);
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("myuser2")));
        FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int run = ToolRunner.run(configuration, new BulkLoadHbase(), args);
        System.exit(run);
    }
}

Mapper

package bulkLoad;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
        Put put = new Put(Bytes.toBytes(split[0]));
        put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
        put.addColumn("f1".getBytes(),"age".getBytes(),Bytes.toBytes(Integer.parseInt(split[2])));

        //split[0] rowkey
        context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
    }
}
上一篇下一篇

猜你喜欢

热点阅读