flink与hbase的交互

2022-05-12  本文已影响0人  万州客

因为我是在虚拟机上用docker安装的hbase,很多映射端口和主机名不太好搞,所以读模式失败了,试一下写模式也失败了作个记录吧~

一,读代码

package org.bbk.flink

import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.java.tuple
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import org.apache.flink.addons.hbase.TableInputFormat

object Demo {
  def main(args:Array[String]):Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val hbaseData:DataSet[tuple.Tuple2[String, String]] = env
      .createInput(new TableInputFormat[tuple.Tuple2[String, String]]{
        override def configure(parameters:Configuration): Unit = {
          val conf = HBaseConfiguration.create()
          conf.set(HConstants.ZOOKEEPER_QUORUM, "myhbase")
          conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
          val conn:Connection = ConnectionFactory.createConnection(conf)
          table = classOf[HTable].cast(conn.getTable(TableName.valueOf("hbasesource")))
          scan = new Scan(){
            addFamily(Bytes.toBytes("f1"))
          }
        }
        override def getScanner: Scan = {
          scan
        }
        override def getTableName:String ={
          "hbasesource"
        }
        override def mapResultToTuple(result: Result): tuple.Tuple2[String, String] = {
          val rowkey:String = Bytes.toString(result.getRow)
          val sb = new StringBuilder()
          for (cell:Cell <- result.rawCells()) {
            val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
            sb.append(value).append(",")
          }
          val valueString = sb.replace(sb.length()-1, sb.length(), "").toString
          val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String,String]
          tuple2.setField(rowkey, 0)
          tuple2.setField(valueString, 1)
          tuple2
        }
      })
    hbaseData.print()
    env.execute()
  }
}



二,写代码

package org.bbk.flink

import java.util
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.java.tuple
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.common.io.OutputFormat

object Demo {
  def main(args:Array[String]):Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val sourceDataSet:DataSet[String] = env.fromElements("01, zhangsan, 28", "02, lisi, 30")
    sourceDataSet.output(new HBaseOutputFormat)
    env.execute()
  }
}

class HBaseOutputFormat extends OutputFormat[String] {
  val zkServer = "myhbase"
  val port = "2181"
  var conn:Connection = null

  override def configure(configuration: Configuration): Unit ={

  }

  override def open(i: Int, i1: Int): Unit = {
    val config:org.apache.hadoop.conf.Configuration = HBaseConfiguration.create()
    config.set(HConstants.ZOOKEEPER_QUORUM, zkServer)
    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)
    config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
    config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
    conn = ConnectionFactory.createConnection(config)
  }

  override def writeRecord(it: String): Unit = {
    val tableName: TableName = TableName.valueOf("hbasesource")
    val cf1 = "f1"
    val array: Array[String] = it.split(",")
    val put: Put = new Put(Bytes.toBytes(array(0)))
    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))
    val putList: util.ArrayList[Put] = new util.ArrayList[Put]
    putList.add(put)
    val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
    params.writeBufferSize(1024 * 1024)
    val mutator: BufferedMutator = conn.getBufferedMutator(params)
    mutator.mutate(putList)
    mutator.flush()
    putList.clear()
  }

  override def close(): Unit = {
    if (null != conn) {
      conn.close()
    }
  }
}



上一篇 下一篇

猜你喜欢

热点阅读