flink

flink 使用RichCoFlatMapFunction算子,

2020-04-10  本文已影响0人  Jaming
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, RichCoFlatMapFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.util.Collector

object flink_connect {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers" ,"xiao100:9092")
    properties.setProperty("group.id","flink_consumer")
    val consumer = new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema, properties)

    val kafka: DataStream[String] = env.addSource(consumer)

    val socket: DataStream[String] = env.socketTextStream("192.168.252.100",9999)

    val k_wo: DataStream[(String, Int)] = kafka.flatMap(_.split(",")).map((_,1))
    val s_wo: DataStream[(String, String)] = socket.map(x => (x.split(",")(0),x.split(",")(1)))
    val all: ConnectedStreams[(String, Int), (String, String)] = k_wo.connect(s_wo)

    val key: ConnectedStreams[(String, Int), (String, String)] = all.keyBy(x=> x._1,y=> y._1)

    val result: DataStream[String] = key.flatMap(new MyCoFlatmapFunction())
    result.print()
    env.execute()
  }
}

class MyCoFlatmapFunction extends RichCoFlatMapFunction[(String,Int),(String,String),String]() {

  var value: ValueState[Boolean] = _

  override def open(parameters: Configuration): Unit = {
   value = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isout",classOf[Boolean]))
  }

  override def flatMap1(in1: (String, Int), collector: Collector[String]): Unit = {
    var bool: Boolean = value.value()
    if(null == bool) bool = false
    if(bool){
      collector.collect(in1._1+"============>"+in1._2)
    }
  }

  override def flatMap2(in2: (String, String), collector: Collector[String]): Unit = {
    println(in2._1+"--->"+in2._2)
    value.update(in2._2.toBoolean)
  }
}

感觉这种场景还是挺有用的,经过在本地和yarn上测,没问题

上一篇 下一篇

猜你喜欢

热点阅读