spark on hbase 读写
本文主要讲述了spark对hbase进行独写的两种方式,这两种方式分别为:
1.利用spark提供的 newAPIHadoopRDD api 对hbase进行读写
2.SparkOnHbase,这种方式其实是利用Cloudera-labs开源的一个HbaseContext的工具类来支持spark用RDD的方式批量读写hbase
hbase 表格式如下:
image.png
部分数据集如下:
image.png
文中的spark 的版本为2.3.2,hbase 的版本为1.2.6
因为hbase数据集的数据都是序列化的,所以spark 默认读取Hbase的数据时会报数据序列化的错误,不管哪种方式,在读取hbase数据之前,为spark配置序列化方式,如图所示:
image.png
newAPIHadoopRDD
主要是利用TableInputFormat,TableOutPutFormat的方式对hbase进行读写。
下边是对hbase进行读
package com.zjc.spark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.List;
/**
* Created by zjc on 2019/1/2.
*/
public class sparkApplication11 {
public static void main(String[] args) {
SparkConf sconf = new SparkConf();
sconf.setAppName("spark on hbase");
sconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
Configuration hconf = HBaseConfiguration.create();
hconf.set("hbase.zookeeper.quorum", "");
hconf.set(TableInputFormat.INPUT_TABLE,"person");
hconf.set(TableInputFormat.SCAN_COLUMNS,"basic:name basic:birth social:address social:idnum social:phone");
//hconf.set(TableInputFormat.SCAN_ROW_START,"0");
// hconf.set(TableInputFormat.SCAN_ROW_STOP,"100000000");
JavaSparkContext spark = new JavaSparkContext(sconf);
JavaPairRDD<ImmutableBytesWritable,Result> pairRDD=spark.newAPIHadoopRDD(hconf,TableInputFormat.class,ImmutableBytesWritable.class,Result.class);
System.out.println("count-----"+pairRDD.count());
List<Result> list=pairRDD.map(t->t._2()).take(10);
System.out.println("list size---"+list.size());
for(Result result:list){
List<Cell> cells=result.listCells();
System.out.println(Bytes.toString(CellUtil.cloneRow(cells.get(0))));
for(Cell cell:cells){
System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}
}
运行结果如图:
image.pngsparkOnHbase
通过maven 将hbase-spark jar 报 导入
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>1.2.0-cdh5.16.1</version>
</dependency>
由于hbase-spark 运用的spark 版本为1.6 而实际的spark 版本为2.3.2,所以执行spark 任务会报 没有 org.apache.spark.logging 类没有定义,这是因为 spark 2.3.2 这个类名已经改变,因此需要重新构造这个类并打成jar包放入到spark 的jar目录里即可
以下为读方式:
package com.zjc.spark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.List;
/**
* Created by zjc on 2019/1/2.
*/
public class sparkApplication11 {
public static void main1(String[] args) {
SparkConf sconf = new SparkConf();
sconf.setAppName("spark on hbase");
sconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
Configuration hconf = HBaseConfiguration.create();
hconf.set("hbase.zookeeper.quorum", "");
JavaSparkContext spark = new JavaSparkContext(sconf);
JavaHBaseContext jbc = new JavaHBaseContext(spark, hconf);
Scan scan = new Scan();
// scan.setStartRow(Bytes.toBytes("1000"));
// scan.setStopRow(Bytes.toBytes("10000"));
JavaRDD<Tuple2<ImmutableBytesWritable, Result>> pairRDD = jbc.hbaseRDD(TableName.valueOf("person"), scan);
List<Tuple2<ImmutableBytesWritable, Result>> t = pairRDD.take(10);
for (Tuple2 tuple2 : t) {
ImmutableBytesWritable row = (ImmutableBytesWritable) tuple2._1;
System.out.println(Bytes.toString(row.get()));
Result result = (Result) tuple2._2();
List<Cell> cells = result.listCells();
System.out.println(Bytes.toString(CellUtil.cloneRow(cells.get(0))));
for (Cell cell : cells) {
System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}
}
sparkOnHbase 对于第一种方式的优势在于:
1>无缝的使用Hbase connection
2>和Kerberos无缝集成
3>通过get或者scan直接生成rdd
4>利用RDD支持hbase的任何组合操作
5>为通用操作提供简单的方法,同时通过API允许不受限制的未知高级操作
6>支持java和scala
7>为spark和 spark streaming提供相似的API