Spark实例-Spark读取Hbase表后转成DataFram

2019-05-05  本文已影响0人  __元昊__

hbase表中数据结构如下:


QQ截图20190505185659.png

代码:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession

object readHbase_dataframe {
  val zookeeper_ip="192.168.199.128,192.168.199.131,192.168.199.132"
  val tableName="user"

  def main(args: Array[String]): Unit = {
    val spark= SparkSession.builder()
      .appName("readHbase_dataframe")
      .master("local")
      .getOrCreate()

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", zookeeper_ip)
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
    hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)

    val sqlContext = spark.sqlContext
    val sc=spark.sparkContext
    import sqlContext.implicits._ //没有的话 rdd不能调用.toDF
    // 从数据源获取数据
    val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

    // 将数据映射为表  也就是将 RDD转化为 dataframe schema
    val hbaseTable=hbaseRDD.map(r=>{
      val hbase_result=r._2
      val rowkey=Bytes.toString(hbase_result.getRow)
      val name=Bytes.toString(hbase_result.getValue(Bytes.toBytes("info"),Bytes.toBytes("name")))
      val age=Bytes.toString(hbase_result.getValue(Bytes.toBytes("info"),Bytes.toBytes("age")))

      (rowkey,name,age)
  }).toDF("id","name","age")

    hbaseTable.createOrReplaceTempView("emp")
    val hbase_result=spark.sql("select * from emp")
    hbase_result.show()
  }
}

运行结果:


QQ截图20190505190129.png

上述转成标准关系型数据库格式的Dataframe,但是代码里要手写所有的列名,比较不自动化。也可以只传hbase表名一个参数,转换成hbase格式的Dataframe

代码:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataTypes,StructField, StructType}
import test1.hdfsUtil
import test_TBDS.HbaseUtils

object ReadHbase_Dataframe {
  val zookeeper_ip="172.24.112.13,172.24.112.14,172.24.112.15"
  val tableName="testTable"

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")

    val spark= SparkSession.builder()
      .appName("ReadHbase_Dataframe")
      .master("local")
      .getOrCreate()

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", zookeeper_ip)
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
    hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)

    val hbase_list=HbaseUtils.getAllData(tableName)//用到hbase,hdfs的简单工具类
    if(hdfsUtil.isExists("hdfs://172.24.112.11:8020/input/hbase.csv")){
      hdfsUtil.deleteHDFSFile("hdfs://172.24.112.11:8020/input/hbase.csv")
    }
    for(context<-hbase_list){
      println(context)
      hdfsUtil.append("hdfs://172.24.112.11:8020/input/hbase.csv",context+"\n")
    }

    val myschema = StructType(List(StructField("rowkey", DataTypes.StringType)
      , StructField("family", DataTypes.StringType)
      ,StructField("column", DataTypes.StringType)
      ,StructField("value", DataTypes.StringType)
      ))
    val df=spark.read.schema(myschema).csv("hdfs://172.24.112.11:8020/input/hbase.csv")
    df.createOrReplaceTempView("emp")
    val hbase_result=spark.sql("select * from emp")
    hbase_result.show()
  }
}

结果:


微信截图_20190510104119.png
上一篇下一篇

猜你喜欢

热点阅读