spark on hbase 读写

2019-01-02  本文已影响0人  Ivan_030c

本文主要讲述了spark对hbase进行独写的两种方式,这两种方式分别为:
1.利用spark提供的 newAPIHadoopRDD api 对hbase进行读写
2.SparkOnHbase,这种方式其实是利用Cloudera-labs开源的一个HbaseContext的工具类来支持spark用RDD的方式批量读写hbase

hbase 表格式如下:


image.png

部分数据集如下:


image.png

文中的spark 的版本为2.3.2,hbase 的版本为1.2.6

因为hbase数据集的数据都是序列化的,所以spark 默认读取Hbase的数据时会报数据序列化的错误,不管哪种方式,在读取hbase数据之前,为spark配置序列化方式,如图所示:


image.png

newAPIHadoopRDD

主要是利用TableInputFormat,TableOutPutFormat的方式对hbase进行读写。
下边是对hbase进行读

package com.zjc.spark;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.List;

/**
 * Created by zjc on 2019/1/2.
 */
public class sparkApplication11 {
    public static void main(String[] args) {
        SparkConf sconf = new SparkConf();
        sconf.setAppName("spark on hbase");
        sconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        Configuration hconf = HBaseConfiguration.create();
        hconf.set("hbase.zookeeper.quorum", "");
        hconf.set(TableInputFormat.INPUT_TABLE,"person");
        hconf.set(TableInputFormat.SCAN_COLUMNS,"basic:name basic:birth social:address social:idnum social:phone");
        //hconf.set(TableInputFormat.SCAN_ROW_START,"0");
       // hconf.set(TableInputFormat.SCAN_ROW_STOP,"100000000");
        JavaSparkContext spark = new JavaSparkContext(sconf);
        JavaPairRDD<ImmutableBytesWritable,Result> pairRDD=spark.newAPIHadoopRDD(hconf,TableInputFormat.class,ImmutableBytesWritable.class,Result.class);
        System.out.println("count-----"+pairRDD.count());
        List<Result> list=pairRDD.map(t->t._2()).take(10);
        System.out.println("list size---"+list.size());
        for(Result result:list){
            List<Cell> cells=result.listCells();
            System.out.println(Bytes.toString(CellUtil.cloneRow(cells.get(0))));
            for(Cell cell:cells){
                System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
    }
}

运行结果如图:

image.png

sparkOnHbase

通过maven 将hbase-spark jar 报 导入

 <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>1.2.0-cdh5.16.1</version>
        </dependency>

由于hbase-spark 运用的spark 版本为1.6 而实际的spark 版本为2.3.2,所以执行spark 任务会报 没有 org.apache.spark.logging 类没有定义,这是因为 spark 2.3.2 这个类名已经改变,因此需要重新构造这个类并打成jar包放入到spark 的jar目录里即可

以下为读方式:

package com.zjc.spark;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.List;

/**
 * Created by zjc on 2019/1/2.
 */
public class sparkApplication11 {
    public static void main1(String[] args) {
        SparkConf sconf = new SparkConf();
        sconf.setAppName("spark on hbase");
        sconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        Configuration hconf = HBaseConfiguration.create();
        hconf.set("hbase.zookeeper.quorum", "");
        JavaSparkContext spark = new JavaSparkContext(sconf);
        JavaHBaseContext jbc = new JavaHBaseContext(spark, hconf);
        Scan scan = new Scan();
        //  scan.setStartRow(Bytes.toBytes("1000"));
        //  scan.setStopRow(Bytes.toBytes("10000"));
        JavaRDD<Tuple2<ImmutableBytesWritable, Result>> pairRDD = jbc.hbaseRDD(TableName.valueOf("person"), scan);
        List<Tuple2<ImmutableBytesWritable, Result>> t = pairRDD.take(10);
        for (Tuple2 tuple2 : t) {
            ImmutableBytesWritable row = (ImmutableBytesWritable) tuple2._1;
            System.out.println(Bytes.toString(row.get()));
            Result result = (Result) tuple2._2();
            List<Cell> cells = result.listCells();
            System.out.println(Bytes.toString(CellUtil.cloneRow(cells.get(0))));
            for (Cell cell : cells) {
                System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
    }
}

sparkOnHbase 对于第一种方式的优势在于:

1>无缝的使用Hbase connection

2>和Kerberos无缝集成

3>通过get或者scan直接生成rdd

4>利用RDD支持hbase的任何组合操作

5>为通用操作提供简单的方法,同时通过API允许不受限制的未知高级操作

6>支持java和scala

7>为spark和 spark streaming提供相似的API

上一篇 下一篇

猜你喜欢

热点阅读