数据分析实践 | flink | window函数处理篇

2019-11-17  本文已影响0人  Sevsea

0x01 从Aggregate开始

接着上一篇,这是一个普通的aggregate+windowfunction的处理过程。

    val Pv_Bolt =
    Pv_Bolt_Before_result
      .keyBy(k=>k._1)
      .timeWindow(Time.minutes(1))
      .aggregate(new DummyAggregator(),new MyProcessWindowFunction()).name("Aggregate")
 

这样的处理其实已经可以为没什么特殊需求的处理过程提供框架了,

但是会在实际过程中遇到需要将数据打散之类的过程,例如windowfunction中处理打散数据之后如何再聚合回来?

所以需要后续处理的过程。

0x02 windowfunction后续处理

接着上一个datastream做后续的处理,

例如:这里举例统计某字段频率,每个Pv_Bolt的流的类型为Accumulator_Fre(需提前定义,可根据实际情况更换)


case class Accumulator_Fre(
                            var url:String,
                            var str_Fre:Map[String,Int]//Map构成为Map[单词,单词出现数量]()
                           )
                              
...                  
//Pv_Bolt为已经做完数据打散处理的datastream
 val warning = Pv_Bolt
      .keyBy(k=>k.url)
      .timeWindow(Time.minutes(15))
      .reduce(
      (F1:Accumulator_Fre,F2:Accumulator_Fre)=> {
        var new_Fre = F1.str_Fre
        F2.str_Fre.foreach ( str_num => {
          if (F1.str_Fre.contains ( str_num._1 )) {
            new_Fre = new_Fre.updated ( str_num._1, F1.str_Fre.get ( str_num._1 ).get + str_num._2 )
          }
        } )
        new Accumulator_Fre ( F1.url, newip_Fre )

      },
      ( key: String,
        window:TimeWindow,
        #Context,
        input: Iterable[Accumulator_Fre],
        out: Collector[String])=>
      {

        var in = input.iterator.next ()
        val starttime = tranTimeToString(window.getStart.toString)
        val endtime = tranTimeToString(window.getEnd.toString)
        var str_Fre = in.str_Fre.filter(_._2>50).toSeq.sortBy(_._2).toMap //筛选规则
        if(str_Fre.size>0){
          content = starttime+"-"+endtime+" | " +in.url+"-> str_Fre : "+str_Fre+"\n\n"
          log.info(content)
          out.collect(content)

        }
        
      }
    ).name("reduce_window")
    .addSink(bk)//窗口内out结果外联处理,如kafka/redis等。
...
上一篇下一篇

猜你喜欢

热点阅读