flink 写数据到hbase

2019-07-31  本文已影响0人  邵红晓

flink 写入数据到hbase

 <dependency>
            <groupId>org.apache.hbase</groupId>
            <!--shaded主要是解决jar包冲突-->
            <artifactId>hbase-shaded-client</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
 def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val list = ArrayBuffer[String]()
    list.append("my-key-1")
    val text = env.fromCollection(list)
    text.map((_, 1)).process(new ProcessFunction[(String, Int), String] {
      var table: Table = _

      override def open(parameters: Configuration): Unit = {
        table = HbaseUtil().connect.getTable(TableName.valueOf("test"))
      }

      override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), String]#Context, out: Collector[String]): Unit = {
        //读取
        val get = new Get(Bytes.toBytes(value._1))
        get.addColumn(Bytes.toBytes("a"), Bytes.toBytes("key11"))
        val result = table.get(get)
        val v = Bytes.toString(result.getValue(Bytes.toBytes("a"), Bytes.toBytes("key11")))
        println(v)
        //写入
        val put = new Put(Bytes.toBytes("shx_" + value._2))
        put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lisi"))
        table.put(put)
      }

      override def close(): Unit = {
        table.close()
      }
    })
上一篇 下一篇

猜你喜欢

热点阅读