Spark实例-Spark读取Hbase表后转成DataFram
2019-05-05 本文已影响0人
__元昊__
hbase表中数据结构如下:
QQ截图20190505185659.png
代码:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
object readHbase_dataframe {
val zookeeper_ip="192.168.199.128,192.168.199.131,192.168.199.132"
val tableName="user"
def main(args: Array[String]): Unit = {
val spark= SparkSession.builder()
.appName("readHbase_dataframe")
.master("local")
.getOrCreate()
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", zookeeper_ip)
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
val sqlContext = spark.sqlContext
val sc=spark.sparkContext
import sqlContext.implicits._ //没有的话 rdd不能调用.toDF
// 从数据源获取数据
val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
// 将数据映射为表 也就是将 RDD转化为 dataframe schema
val hbaseTable=hbaseRDD.map(r=>{
val hbase_result=r._2
val rowkey=Bytes.toString(hbase_result.getRow)
val name=Bytes.toString(hbase_result.getValue(Bytes.toBytes("info"),Bytes.toBytes("name")))
val age=Bytes.toString(hbase_result.getValue(Bytes.toBytes("info"),Bytes.toBytes("age")))
(rowkey,name,age)
}).toDF("id","name","age")
hbaseTable.createOrReplaceTempView("emp")
val hbase_result=spark.sql("select * from emp")
hbase_result.show()
}
}
运行结果:
QQ截图20190505190129.png
上述转成标准关系型数据库格式的Dataframe,但是代码里要手写所有的列名,比较不自动化。也可以只传hbase表名一个参数,转换成hbase格式的Dataframe
代码:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataTypes,StructField, StructType}
import test1.hdfsUtil
import test_TBDS.HbaseUtils
object ReadHbase_Dataframe {
val zookeeper_ip="172.24.112.13,172.24.112.14,172.24.112.15"
val tableName="testTable"
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hdfs")
val spark= SparkSession.builder()
.appName("ReadHbase_Dataframe")
.master("local")
.getOrCreate()
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", zookeeper_ip)
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
val hbase_list=HbaseUtils.getAllData(tableName)//用到hbase,hdfs的简单工具类
if(hdfsUtil.isExists("hdfs://172.24.112.11:8020/input/hbase.csv")){
hdfsUtil.deleteHDFSFile("hdfs://172.24.112.11:8020/input/hbase.csv")
}
for(context<-hbase_list){
println(context)
hdfsUtil.append("hdfs://172.24.112.11:8020/input/hbase.csv",context+"\n")
}
val myschema = StructType(List(StructField("rowkey", DataTypes.StringType)
, StructField("family", DataTypes.StringType)
,StructField("column", DataTypes.StringType)
,StructField("value", DataTypes.StringType)
))
val df=spark.read.schema(myschema).csv("hdfs://172.24.112.11:8020/input/hbase.csv")
df.createOrReplaceTempView("emp")
val hbase_result=spark.sql("select * from emp")
hbase_result.show()
}
}
结果:
微信截图_20190510104119.png