大数据学习

Spark Rdd(JSONObject) 读写hbase

2020-07-03  本文已影响0人  xiaogp

读, 使用newAPIHadoopRDD 方法

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}


object readHbaseTest {
  def main(args: Array[String]): Unit = {

    // 本地模式运行,便于测试
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("readHbaseTest")

    // 创建hbase configuration
    val hBaseConf = HBaseConfiguration.create()
    hBaseConf.set("hbase.zookeeper.quorum", "192.168.61.97")  //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
    hBaseConf.set("hbase.zookeeper.property.clientPort", "2181")       //设置zookeeper连接端口,默认2181
    hBaseConf.set(TableInputFormat.INPUT_TABLE, "test:gp")

    // 创建 spark context
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // 从数据源获取数据
    val hbaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

    // 将数据映射为表  也就是将 RDD转化为 dataframe schema
    val data = hbaseRDD.map(r => (
      Bytes.toString(r._2.getRow()),
      Bytes.toString(r._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
    )).toDF("rowkey", "category")

    data.show()

  }
}
+------+--------+
|rowkey|category|
+------+--------+
| rk001|      gp|
| rk002|      wf|
| rk003|     lqq|
+------+--------+

写, 使用saveAsHadoopDataset 方法

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext


object writeHbaseTest {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("readHbaseTest")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val data = Seq((1, 2), (2, 3)).toDF("a", "b")

    val hBaseConf = HBaseConfiguration.create()
    hBaseConf.set("hbase.zookeeper.quorum", "192.168.61.97")
    hBaseConf.set("hbase.zookeeper.property.clientPort", "2181")

    val jobConf = new JobConf(hBaseConf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "test:gp")

    data.rdd.map(row => {
      val rowkey = row(0).toString // 必须是String类型
      val name = row(1).toString
      val put = new Put(Bytes.toBytes(rowkey))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name)) // 列,列分割符,值
      (new ImmutableBytesWritable, put)
    }).saveAsHadoopDataset(jobConf)
  }
}
hbase(main):003:0> scan 'test:gp'
ROW                                 COLUMN+CELL                                                                                         
 1                                  column=info:name, timestamp=1593767839176, value=2                                                  
 2                                  column=info:name, timestamp=1593767839176, value=3                                                  
 rk001                              column=info:name, timestamp=1593763852691, value=gp                                                 
 rk002                              column=info:name, timestamp=1593763858131, value=wf                                                 
 rk003                              column=info:name, timestamp=1593765395612, value=lqq 

使用saveAsNewAPIHadoopDataset 方法

将rdd使用map算子将元素转化为(new ImmutableBytesWritable, put)的rdd,新rdd调用saveAsNewAPIHadoopDataset方法写入Hbase。

import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.util.Bytes


object writeHbaseTest2 {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("readHbaseTest")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val data = Seq((1, 2), (2, 3)).toDF("a", "b")

    val tablename = "test:gp"
    sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "192.168.61.97")
    sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)

    val job = Job.getInstance(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val save_rdd = data.rdd.map(x => {
        // 第一列做rowkey
        val rowkey = x(0).toString
        // 第二列做value,列是info:name
        val name = x(1).toString
        val put = new Put(Bytes.toBytes(rowkey))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
        (new ImmutableBytesWritable, put)
      })

    save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
  }

}

写,rdd内部为JSON对象,获得put对象,forEach + java.util.function.BiConsumer实现addColumn

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}


object JsonSvaeHbase {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("JsonSvaeHbase")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val str = "{\"id\": \"101\", \"name\": \"2222\", \"age\": \"3333\"}"
    val jsonObject = JSON.parse(str).asInstanceOf[JSONObject]
    val data = sc.parallelize(Seq(jsonObject))

    val tablename = "test:gp"
    sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "192.168.61.97")
    sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)

    val job = Job.getInstance(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val save_rdd = data.map(x => {
      val id = x.getString("id")
      val put: Put = new Put(Bytes.toBytes(id))


      x.forEach(new java.util.function.BiConsumer[String, Object]() {
        @Override
        def accept(k: String, v: Object) = {
          if (k != null && v != null) {
            put.addColumn(Bytes.toBytes("info"), k.getBytes, v.toString().getBytes)
          }
        }
      })
      (new ImmutableBytesWritable(), put)
    })

    save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
  }
}
//hbase(main):023:0> get 'test:gp', '101'
//COLUMN                                   CELL
//  info:age                                timestamp=1597288908139, value=3333
//info:id                                 timestamp=1597288908139, value=101
//info:name                               timestamp=1597288908139, value=2222

或者获得put对象,使用JSON.keySet iterator循环addColumn实现

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}


object JsonSvaeHbase {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("JsonSvaeHbase")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val str = "{\"id\": \"102\", \"name\": \"2222\", \"age\": \"3333\"}"
    val jsonObject = JSON.parse(str).asInstanceOf[JSONObject]
    val data = sc.parallelize(Seq(jsonObject))

    val tablename = "test:gp"
    sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "192.168.61.97")
    sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)

    val job = Job.getInstance(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val save_rdd = data.map(x => {
      val id_code = x.getString("id")
      val put = new Put(Bytes.toBytes(id_code))
      insert_hbase(x, put)
      (new ImmutableBytesWritable, put)
    })

    save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)

  }

  def insert_hbase(jsonObject: JSONObject, onePut: Put): Unit = {
    val keys = jsonObject.keySet
    val iterator = keys.iterator
    while (iterator.hasNext) {
      val col = iterator.next()
      val value = jsonObject.get(col).toString
      onePut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(col), Bytes.toBytes(value))
    }
  }
}
//hbase(main):038:0> get 'test:gp', '102'
//COLUMN                                   CELL
//  info:age                                timestamp=1597290047490, value=3333
//info:id                                 timestamp=1597290047490, value=102
//info:name                               timestamp=1597290047490, value=2222
上一篇 下一篇

猜你喜欢

热点阅读