Hase我爱编程

编写MapReduce程序,集成HBase对表进行读取和写入数据

2018-05-07  本文已影响382人  明明德撩码

参考地址:http://hbase.apache.org/boot.html#mapreduce

导入:import org.apache.hadoop.conf.Configured;
导入:import org.apache.hadoop.util.Tool;。


三要素

创建Mapper Class
创建Reducer Class
创建Driver

创建Mapper Class

在map方法中,代码思路步骤如下:
class内代码片段
//读取user表中的数据  ImmutableBytesWritable:key   Put:一列数据
    public static class ReadUserMapper extends TableMapper<ImmutableBytesWritable, Put> {

        @Override
        protected void map(ImmutableBytesWritable row, Result value,
                Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context)
                        throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            context.write(row, resultToPut(row, value));
        }
        //和命令:put 'user','10001','info:address','shanghai'  相同
        private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
            Put put = new Put(key.get());
            for (KeyValue kv : result.raw()) {
                put.add(kv);
            }
            return put;
        }
    }

创建Reducer Class

public static class WriteBasicReducer extends TableReducer<ImmutableBytesWritable, Put,ImmutableBytesWritable>{

        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
                Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Mutation>.Context context)
                        throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            for(Put put:values){
                context.write(key, put);
            }
        }
        
    }

创建Driver

Job job = Job.getInstance(getConf(), this.getClass().getName());
job.setJarByClass(this.getClass());
Scan scan = new Scan();
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don't set to true for MR jobs

Scan表示全表扫描,setCaching方法表示一次抓取多少条数据,而setCacheBlock 方法表示是否设置缓存,mapReduce千万不设置缓存,所以设置为false。

TableMapReduceUtil.initTableMapperJob(
                  "user",        // input HBase table name
                  scan,             // Scan instance to control CF and attribute selection
                  ReadUserMapper.class,   // mapper
                  Text.class,             // mapper output key
                  Put.class,             // mapper output value
                  job);
    TableMapReduceUtil.initTableReducerJob(
                  "basic",      // output table
                  WriteBasicReducer.class,             // reducer class
                  job);

+设置Reduce的任务为1个

job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
        
public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Job job = Job.getInstance(getConf(), this.getClass().getName());
        job.setJarByClass(this.getClass());
        
        Scan scan = new Scan();
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don't set to true for MR jobs
        
        
        TableMapReduceUtil.initTableMapperJob(
                  "user",        // input HBase table name
                  scan,             // Scan instance to control CF and attribute selection
                  ReadUserMapper.class,   // mapper
                  Text.class,             // mapper output key
                  Put.class,             // mapper output value
                  job);
        
        
        TableMapReduceUtil.initTableReducerJob(
                  "basic",      // output table
                  WriteBasicReducer.class,             // reducer class
                  job);
        
        job.setNumReduceTasks(0);
        
        boolean b = job.waitForCompletion(true);
        
        return b?0:1;
    }

写main方法

Configuration configuration = HBaseConfiguration.create();
        int status = ToolRunner.run(configuration, new User2BasicMapReduce(), args); 
System.exit(status);
public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int status = ToolRunner.run(configuration, new User2BasicMapReduce(), args);
        System.exit(status);
    }

验证

export HBASE_HOME=/opt/sofewares/hbase/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/cdh5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`  $HADOOP_HOME/bin/yarn jar  $HADOOP_HOME/jars/hbase-mr-user2basic.jar
上一篇下一篇

猜你喜欢

热点阅读