HBASE 知识整理HBASE 精选文集

HBase-Spark-Snapshot-Read-Demo

2019-12-26  本文已影响0人  步闲
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}


object SparkReadHBaseTest {

  //   主函数
  def main(args: Array[String]) {

    // 设置spark访问入口
    val conf = new SparkConf().setAppName("SparkReadDwAPPTest")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .setMaster("local")//调试
    val sc = new SparkContext(conf)
    // 获取HbaseRDD
    val job = Job.getInstance(getHbaseConf())
    TableSnapshotInputFormat.setInput(job, "test1_snap", new Path("/user/zhou.pengbo"))

    val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    hbaseRDD.map(_._2).map(getRes(_)).count()
  }


  def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
    val rowkey = Bytes.toString(result.getRow())
    val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
    println(rowkey+"---"+name)
    name
  }
  // 构造 Hbase 配置信息
  def getHbaseConf(): Configuration = {
    val conf: Configuration = HBaseConfiguration.create()
    conf.set(TableInputFormat.SCAN, getScanStr())
    conf
  }

  // 获取扫描器
  def getScanStr(): String = {
    val scan = new Scan()
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray())
  }
}

注:上述代码需将core-site.xml&hdfs-site.xml&hbase-site.xml文件放在资源目录resources下。或者在代码中进行Conf配置,如下:

package com.xcar.etl


import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}


object SparkReadHBaseTest {

  val HBASE_ZOOKEEPER_QUORUM = "xxxx.com.cn"

  //   主函数
  def main(args: Array[String]) {

    // 设置spark访问入口
    val conf = new SparkConf().setAppName("SparkReadDwAPPTest")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .setMaster("local")//调试
    val sc = new SparkContext(conf)
    // 获取HbaseRDD
    val job = Job.getInstance(getHbaseConf())
    TableSnapshotInputFormat.setInput(job, "test1_snap", new Path("/user/zhou.pengbo"))

    val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    hbaseRDD.map(_._2).map(getRes(_)).count()
  }

  def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
    val rowkey = Bytes.toString(result.getRow())
    val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
    println(rowkey+"---"+name)
    name
  }

  // 构造 Hbase 配置信息
  def getHbaseConf(): Configuration = {
    val conf: Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
    conf.set("hbase.rootdir", "/apps/hbase/data")
    // 设置查询的表名
    conf.set(TableInputFormat.INPUT_TABLE, "test_snap")
    // conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "test1_snap")
    conf.set("fs.defaultFS","hdfs://xxxxxx:8020") 
    conf.set(TableInputFormat.SCAN, getScanStr())
    conf
  }

  // 获取扫描器
  def getScanStr(): String = {
    val scan = new Scan()
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray())
  }
}
上一篇下一篇

猜你喜欢

热点阅读