Spark读写HBase表数据
2021-05-24 本文已影响0人
扎西的德勒
一、Maven依赖
<repositories>
<!-- spark on hbase是cloudera提供的,所以这个地方添加了cdh仓库地址 -->
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>2.1.0-cdh6.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.3</version>
</dependency>
</dependencies>
二、Spark代码
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
object SparkOnHBase {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("sparkOnHBase").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//创建HBase的环境变量参数
val hbaseConf: Configuration = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","node01,node02,node03")
hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
hbaseConf.set(TableInputFormat.INPUT_TABLE,"spark_hbase")
val hbaseContext: HBaseContext = new HBaseContext(sc, hbaseConf)
val scan: Scan = new Scan()
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = hbaseContext.hbaseRDD(TableName.valueOf("spark_hbase"), scan)
hbaseRDD.map(eachResult => {
// val rowkey1: String = Bytes.toString(eachResult._1.get())
val result: Result = eachResult._2
val rowKey: String = Bytes.toString(result.getRow)
val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
val age: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
// println(rowKey+":"+name+":"+age)
rowKey + ":" + name + ":" + age
}).foreach(println)
//向HBase写数据,提前创建HBase表:create 'spark_hbase_out','info'
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE,"spark_hbase_out")
//通过job来设置输出的格式的类
val job = Job.getInstance(hbaseConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val initialRDD: RDD[(String, String, String)] = sc.parallelize(List(("100", "apple", "11"), ("200", "banana", "12"), ("300", "pear", "13")))
val hbaseRDD2: RDD[(ImmutableBytesWritable, Put)] = initialRDD.map(x => {
val put: Put = new Put(Bytes.toBytes(x._1))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(x._2))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(x._3))
(new ImmutableBytesWritable(), put)
})
hbaseRDD2.saveAsNewAPIHadoopDataset(job.getConfiguration)
sc.stop()
}
}