邵红晓

flink sink hbase

2019-08-02  本文已影响0人  邵红晓

测试环境:

hbase util

package com.xxx.etl

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.hadoop.security.UserGroupInformation

class HbaseUtil(getConnect:()=> Connection) extends Serializable {
  lazy val  connect = getConnect()
}

object HbaseUtil {
  val conf: Configuration = HBaseConfiguration.create
  conf.set("hbase.zookeeper.quorum", "pg-hadoop-xxx.xxxx.com.cn,pg-hadoop-xxx.xxx.com.cn,pg-hadoop-xxx.xxx.com.cn")
  conf.set("zookeeper.znode.parent", "/hbase-unsecure")
  conf.set("hbase.zookeeper.property.clientPort", "2181")
  conf.setLong("hbase.rpc.timeout", 3000000L)
  conf.setInt("hbase.client.ipc.pool.size", 1)
  // No FileSystem for schema : hdfs
  conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")

  def apply(): HbaseUtil = {
    val f = ()=>{
      UserGroupInformation.setConfiguration(conf)
      val romoteUser = UserGroupInformation.createRemoteUser("hbase")
      UserGroupInformation.setLoginUser(romoteUser)
      val connection = ConnectionFactory.createConnection(conf)
      //释放资源 在executor的JVM关闭之前,千万不要忘记
      sys.addShutdownHook {
        connection.close()
      }
      connection
    }
    new HbaseUtil(f)
  }
}

hbase sink 定义

import org.apache.flink.api.common.io.RichOutputFormat
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.slf4j.{Logger, LoggerFactory}

class HBaseRichOutputFormat extends RichOutputFormat[Array[(String,Int)]]{
  val logger: Logger = LoggerFactory.getLogger(getClass)
  var table: Table = _

  override def configure(parameters: Configuration) :Unit= {
    logger.info("configure open")
  }
  override def open(taskNumber: Int, numTasks: Int): Unit = {
    table = HbaseUtil().connect.getTable(TableName.valueOf("test"))
  }
  override def writeRecord(record: Array[(String,Int)]): Unit ={
    import scala.collection.JavaConverters._
    //批量写入数据
    val list = record.map(d=>{
      val put = new Put(Bytes.toBytes(d._1))
      put.addColumn(Bytes.toBytes("f"),Bytes.toBytes("name"),Bytes.toBytes(d._2))
      put
    }).toList
    table.put(list.asJava)
  }
  override def close()  :Unit= {
    // 结束的时候记得关闭连接(其实永远不会结束)
    table.close()
  }
}

主程序调用

import scala.collection.mutable.ArrayBuffer

/**
  * hbase flink sink
  */
object HbaseSinkExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    import org.apache.flink.api.scala._
    val list = ArrayBuffer[String]()
    list.append("shx_2")
    list.append("shx_3")
    list.append("shx_4")
    list.append("shx_5")
    list.append("shx_6")
    env.fromCollection(list).map((_, 1))
      .keyBy(_._1).countWindow(1L)
        .process(new ProcessWindowFunction[(String,Int),Array[(String,Int)],String,GlobalWindow] {
          override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[Array[(String,Int)]]): Unit ={
            println(elements.toArray)
            out.collect(elements.toArray)
          }
        }).writeUsingOutputFormat(new HBaseRichOutputFormat())

    env.execute("Streaming WordCount")
  }
}
上一篇下一篇

猜你喜欢

热点阅读