spark sql和flink的滑动窗口

2019-05-16  本文已影响0人  王金松

spark sql的滑动窗口

val ret = pre_data
      .withWatermark("time", "5 seconds")
      .groupBy(
        window($"time", "5 minutes", "5 minutes").alias("window"),
        $"flowKey.src_ip".alias("src_ip"),
        $"flowKey.src_port".alias("src_port"),
        $"flowKey.dest_ip".alias("dest_ip"),
        $"flowKey.proto".alias("proto"),
        $"flowKey.direction".alias("direction")
      )

flink的滑动窗口

滑动窗口有三个参数

 processedData
      .keyBy(_.flowKey)
      .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(5), Time.minutes(5)))
      .reduce((a, b) => {
        val start = if (a.start_timestamp <= b.start_timestamp)  a.start_timestamp else b.start_timestamp
        val end = if (a.end_timestamp >= b.end_timestamp) a.end_timestamp else b.end_timestamp
        FlowLog(a.flowKey, start, end, a.count + b.count)
      }).map(new RichMapFunction[FlowLog, JSONObject] {
上一篇 下一篇

猜你喜欢

热点阅读