Spark读写HBase表数据

2021-05-24  本文已影响0人  扎西的德勒

一、Maven依赖

<repositories>
        <!-- spark on hbase是cloudera提供的,所以这个地方添加了cdh仓库地址 -->
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
        </repository>
</repositories>
<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.7</version>
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>2.1.0-cdh6.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>
</dependencies>

二、Spark代码


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD


object SparkOnHBase {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sparkOnHBase").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //创建HBase的环境变量参数
    val hbaseConf: Configuration = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum","node01,node02,node03")
    hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
    hbaseConf.set(TableInputFormat.INPUT_TABLE,"spark_hbase")

    val hbaseContext: HBaseContext = new HBaseContext(sc, hbaseConf)

    val scan: Scan = new Scan()

    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = hbaseContext.hbaseRDD(TableName.valueOf("spark_hbase"), scan)

    hbaseRDD.map(eachResult => {
      //      val rowkey1: String = Bytes.toString(eachResult._1.get())
      val result: Result = eachResult._2
      val rowKey: String = Bytes.toString(result.getRow)

      val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
      val age: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
      //      println(rowKey+":"+name+":"+age)
      rowKey + ":" + name + ":" + age
    }).foreach(println)

    //向HBase写数据,提前创建HBase表:create 'spark_hbase_out','info'
    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE,"spark_hbase_out")
    //通过job来设置输出的格式的类
    val job = Job.getInstance(hbaseConf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val initialRDD: RDD[(String, String, String)] = sc.parallelize(List(("100", "apple", "11"), ("200", "banana", "12"), ("300", "pear", "13")))

    val hbaseRDD2: RDD[(ImmutableBytesWritable, Put)] = initialRDD.map(x => {
      val put: Put = new Put(Bytes.toBytes(x._1))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(x._2))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(x._3))
      (new ImmutableBytesWritable(), put)
    })

    hbaseRDD2.saveAsNewAPIHadoopDataset(job.getConfiguration)

    sc.stop()

  }

}
上一篇下一篇

猜你喜欢

热点阅读