flink sink hbase
2019-08-02 本文已影响0人
邵红晓
测试环境:
- flink 1.7.2
- hbase 1.3.1
hbase util
package com.xxx.etl
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.hadoop.security.UserGroupInformation
class HbaseUtil(getConnect:()=> Connection) extends Serializable {
lazy val connect = getConnect()
}
object HbaseUtil {
val conf: Configuration = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "pg-hadoop-xxx.xxxx.com.cn,pg-hadoop-xxx.xxx.com.cn,pg-hadoop-xxx.xxx.com.cn")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.setLong("hbase.rpc.timeout", 3000000L)
conf.setInt("hbase.client.ipc.pool.size", 1)
// No FileSystem for schema : hdfs
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
def apply(): HbaseUtil = {
val f = ()=>{
UserGroupInformation.setConfiguration(conf)
val romoteUser = UserGroupInformation.createRemoteUser("hbase")
UserGroupInformation.setLoginUser(romoteUser)
val connection = ConnectionFactory.createConnection(conf)
//释放资源 在executor的JVM关闭之前,千万不要忘记
sys.addShutdownHook {
connection.close()
}
connection
}
new HbaseUtil(f)
}
}
hbase sink 定义
import org.apache.flink.api.common.io.RichOutputFormat
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.slf4j.{Logger, LoggerFactory}
class HBaseRichOutputFormat extends RichOutputFormat[Array[(String,Int)]]{
val logger: Logger = LoggerFactory.getLogger(getClass)
var table: Table = _
override def configure(parameters: Configuration) :Unit= {
logger.info("configure open")
}
override def open(taskNumber: Int, numTasks: Int): Unit = {
table = HbaseUtil().connect.getTable(TableName.valueOf("test"))
}
override def writeRecord(record: Array[(String,Int)]): Unit ={
import scala.collection.JavaConverters._
//批量写入数据
val list = record.map(d=>{
val put = new Put(Bytes.toBytes(d._1))
put.addColumn(Bytes.toBytes("f"),Bytes.toBytes("name"),Bytes.toBytes(d._2))
put
}).toList
table.put(list.asJava)
}
override def close() :Unit= {
// 结束的时候记得关闭连接(其实永远不会结束)
table.close()
}
}
主程序调用
import scala.collection.mutable.ArrayBuffer
/**
* hbase flink sink
*/
object HbaseSinkExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
import org.apache.flink.api.scala._
val list = ArrayBuffer[String]()
list.append("shx_2")
list.append("shx_3")
list.append("shx_4")
list.append("shx_5")
list.append("shx_6")
env.fromCollection(list).map((_, 1))
.keyBy(_._1).countWindow(1L)
.process(new ProcessWindowFunction[(String,Int),Array[(String,Int)],String,GlobalWindow] {
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[Array[(String,Int)]]): Unit ={
println(elements.toArray)
out.collect(elements.toArray)
}
}).writeUsingOutputFormat(new HBaseRichOutputFormat())
env.execute("Streaming WordCount")
}
}