Hbase学习笔记(六) Hbase与MR的结合使用
2020-08-11 本文已影响0人
做个合格的大厂程序员
需求,使用MR实现读取hbase表数据,只要某一个列族的数据,并且写入到另一张表中。
Main
package hbaseMR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HbaseMain extends Configured implements Tool{
public int run(String[] args) throws Exception {
/**
* Mr 执行的8个步骤
* 1, 读取文件,解析成key,value对 K1,v1
* 2.自定义mapper逻辑,接受k1,v2,转换成,k2,v2输出
* 3.分区,相同的key的数据发送到同一个reduce里面去
* 4.排序,对数据按照key2进行排序
* 5.规约,调优的步骤,在map端相同的key2进行提前合并
* 6.分组,相同的key的数据合并
* 7.自定义reduce逻辑,接受k2,v2转换成k3,v3
* 8.输出k3,v3保存
* */
Scan scan = new Scan();
Job job = Job.getInstance(super.getConf(), "hbase_mapreduce");
/*
TableName table,
Scan scan,
Class<? extends TableMapper> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass,
job
* */
TableMapReduceUtil.initTableMapperJob("myuser",scan,HbaseReadMapper.class, Text.class, Put.class,job);
TableMapReduceUtil.initTableReducerJob("myuser2",HbaseWriteReducer.class,job);
job.setNumReduceTasks(1);
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
//创造一个hbase的配置
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
int run = ToolRunner.run(configuration,new HbaseMain(),args);
System.exit(run);
}
}
Mapper
package hbaseMR;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.List;
public class HbaseReadMapper extends TableMapper<Text, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//获取到rowkey的字节数组
byte[] rowkeyBytes = key.get();
String rowkey = Bytes.toString(rowkeyBytes);
Put put = new Put(rowkeyBytes);
//选中我们结果当中所有对应的列,只要f1,中的name和age列
List<Cell> cells = value.listCells();
for (Cell cell : cells){
//获取类族名
byte[] family = cell.getFamily();
//获取列名
byte[] qualifier = cell.getQualifier();
if("f1".equals(Bytes.toString(family))){
if ("name".equals(Bytes.toString(qualifier)) || ("age".equals(Bytes.toString(qualifier)))){
//这里都是需要的cell
put.add(cell);
}
}
}
//判断put对象是否为空
if (!put.isEmpty()){
context.write(new Text(rowkey),put);
}
}
}
Reducer
package hbaseMR;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class HbaseWriteReducer extends TableReducer<Text, Put, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
immutableBytesWritable.set(key.toString().getBytes());
for (Put put : values){
context.write(immutableBytesWritable,put);
}
}
}
HDFS导入数据到Hbase和Hbase导出到HDFS中
Main
package Hdfs2Hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Hbase2HDFS extends Configured implements Tool {
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(),"hdfs2Hbase");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hbase/input"));
job.setMapperClass(HDFSMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
TableMapReduceUtil.initTableReducerJob("myuser2",HbaseReducer.class,job);
job.setNumReduceTasks(1);
boolean bl = job.waitForCompletion(true);
return bl?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
int run = ToolRunner.run(configuration,new Hbase2HDFS(),args);
System.exit(run);
}
}
Mapper
package Hdfs2Hbase;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class HDFSMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
Reducer
package Hdfs2Hbase;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class HbaseReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
String[] split = key.toString().split("\t");
//rowkey
Put put =new Put(Bytes.toBytes(split[0]));
put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes());
context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
}
}
用Bulkload的方式直接输出Hfile供Hbase导入
Main
package bulkLoad;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class BulkLoadHbase extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
final String INPUT_PATH= "hdfs://hadoop01:9000/hbase/input";
final String OUTPUT_PATH= "hdfs://hadoop01:9000/hbase/output_hfile";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("myuser2"));
Job job= Job.getInstance(conf);
job.setJarByClass(BulkLoadHbase.class);
job.setMapperClass(BulkLoadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("myuser2")));
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
int run = ToolRunner.run(configuration, new BulkLoadHbase(), args);
System.exit(run);
}
}
Mapper
package bulkLoad;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
Put put = new Put(Bytes.toBytes(split[0]));
put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
put.addColumn("f1".getBytes(),"age".getBytes(),Bytes.toBytes(Integer.parseInt(split[2])));
//split[0] rowkey
context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
}
}