多流转换算子Connect和 CoMap
2019-08-25 本文已影响0人
yayooo
connect
DataStream,DataStream → ConnectedStreams
将两条流形式上包了一层,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。 CoMap/CoFlatMap
ConnectedStreams → DataStream
作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。
DataStream,DataStream → ConnectedStreams
将两条流形式上包了一层,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。 CoMap/CoFlatMap
ConnectedStreams → DataStream
作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。
package com.atguigu.apiTest
import org.apache.flink.streaming.api.scala._
object TestSplitSelect {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = env.readTextFile("C:\\Users\\Administrator\\Desktop\\0311Flink\\flink\\src\\main\\resources\\sensor")
val dataStrem2: DataStream[SensorReading] = dataStream.map(data => {
val dataArray: Array[String] = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
val splitStream: SplitStream[SensorReading] = dataStrem2.split(sensorData => {
if (sensorData.temperature > 30) Seq("high") else Seq("low")
})
val high = splitStream.select("high")
val low = splitStream.select("low")
val all = splitStream.select("high","low")
//合并两条流
val warning: DataStream[(String, Double)] = high.map(data => (data.id, data.temperature))
val connectedStream: ConnectedStreams[(String, Double), SensorReading] = warning.connect(low)
//def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R) 传入两个函数
val coMapDataStream: DataStream[Product with Serializable] = connectedStream.map(
warningData => (warningData._1, warningData._2, "warning"),
lowData => (lowData.id, "healthy")
)
coMapDataStream.print()
env.execute()
}
}
//传感器读数样例类
case class SensorReading(id:String, timestamp:Long, temperature: Double)
输出结果:
4> (sensor_1,healthy)
2> (sensor_10,38.1,warning)
2> (sensor_7,healthy)
3> (sensor_10,healthy)
1> (sensor_1,35.8,warning)
1> (sensor_6,healthy)