Flink 的window机制(二) 窗口函数

2021-09-03  本文已影响0人  万事万物

Window Function

前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.

window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.

ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.

ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素

ReduceFunction(增量聚合函数)

需求,统计一个班级的学生成绩
为了统计方便,定义一个javabean

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Student {
    /**
     * 班级id
     */
    private Integer classId;
    /**
     * 学习id
     */
    private Integer stuId;
    /**
     * 成绩
     */
    private Double score;


    public Student(String[] args){
        this.classId=Integer.valueOf(args[0]);
        this.stuId=Integer.valueOf(args[1]);
        this.score=Double.valueOf(args[2]);

    }

}
    @Test
    public void test1() throws Exception {

        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);


        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 转换成对象
        SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));

        //设置窗口
        KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);

        //使用 reduce进行聚合
        keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .reduce((ReduceFunction<Student>) (value1, value2) -> {
                    //聚合
                    value1.setScore(value1.getScore()+value2.getScore());
                    return value1;
                }).print();

        env.execute();
    }
1,1,90
1,2,60
11> Student(classId=1, stuId=1, score=150.0)
    @Test
    public void test2() throws Exception {

        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);


        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 转换成对象
        SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));

        //设置窗口
        KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);

        //使用 reduce进行聚合
        keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                /*.reduce((ReduceFunction<Student>) (value1, value2) -> {
                    //聚合
                    value1.setScore(value1.getScore()+value2.getScore());
                    return value1;
                }).print();*/
            .sum("score").print();

        env.execute();
    }

结果

11> Student(classId=1, stuId=1, score=150.0)

使用sum也可以达到同样的目的,但是对比于reduce来说,它更加灵活。

    @Test
    public void test3() throws Exception {

        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);


        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 转换成对象
        SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));

        //设置窗口
        KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);

        //使用 reduce进行聚合
        keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .reduce((ReduceFunction<Student>) (value1, value2) -> {
                    //聚合
                    value1.setScore(value1.getScore() + value2.getScore());
                    return value1;
                }, new WindowFunction<Student, String, Integer, TimeWindow>() {

                    /**
                     *
                     * @param key class id
                     * @param window 窗口
                     * @param input 上一个窗口聚合的结果
                     * @param out 收集器
                     * @throws Exception
                     */
                    @Override
                    public void apply(Integer key, TimeWindow window, Iterable<Student> input, Collector<String> out) throws Exception {

                        Student next = input.iterator().next();

                        String msg=String.format("key=%s,window[%s,%s),input=%s",key, window.getStart(),window.getEnd(),next.toString());
                        out.collect(msg);

                    }
                }).print();

        env.execute();
    }

输出

11> key=1,window[1628997085000,1628997090000),input=Student(classId=1, stuId=1, score=150.0)

这样不仅可以获取聚合信息,也可以获取窗口信息。

AggregateFunction(增量聚合函数)

 * @param <IN> 输入元素类型
 * @param <ACC> 累加器类型
 * @param <OUT> 输出类型
 */
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

统计学生平均分

  @Test
    public void test3() throws Exception {

        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);


        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 转换成对象
        SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));

        //设置窗口
        KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);

        //使用 reduce进行聚合
        keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

                .aggregate(new org.apache.flink.api.common.functions.AggregateFunction<Student, Tuple2<Integer,Double>, Double>() {

                    /**
                     * 初始化累加器,只会调用一次。
                     * 这里计算学生个数,所以从零开始。
                     * @return
                     */
                    @Override
                    public Tuple2<Integer,Double> createAccumulator() {
                        // 初始化 学生个数,成绩
                        return Tuple2.of(0,0.0);
                    }

                    /**
                     * 元素个数累加
                     * (来一个学生就累计次数+1)
                     * @param value
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public Tuple2<Integer,Double> add(Student value, Tuple2<Integer,Double> accumulator) {
                        // 学生个数+1
                        return Tuple2.of(accumulator.f0+1,value.getScore()+accumulator.f1);
                    }

                    /**
                     * 结果返回,计算平均分,所以是double类型
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public Double getResult(Tuple2<Integer,Double> accumulator) {
                        return accumulator.f1/accumulator.f0;
                    }

                    /**
                     * 两个累加器合并,
                     * 当窗口是会话窗口才会生效
                     * @param a
                     * @param b
                     * @return
                     */
                    @Override
                    public Tuple2<Integer,Double> merge(Tuple2<Integer,Double> a, Tuple2<Integer,Double> b) {
                        return null;
                    }
                }).print();


        env.execute();
    }
1,1,90
1,2,60
11> 75.0
@PublicEvolving
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
      AggregateFunction<T, ACC, V> aggFunction, 
      WindowFunction<V, R, K, W> windowFunction
) {

ProcessWindowFunction(全窗口函数)

会将窗口中所有的元素都攒下来,等到窗口关闭的时候,一起来进行处理。这样就会造成内存占用过高,但是有些时候又不得不用,比如全局排序

.process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
    // 参数1: key 参数2: 上下文对象 参数3: 这个窗口内所有的元素 参数4: 收集器, 用于向下游传递数据
    @Override
    public void process(String key,
                        Context context,
                        Iterable<Tuple2<String, Long>> elements,
                        Collector<Tuple2<String, Long>> out) throws Exception {
        System.out.println(context.window().getStart());
        long sum = 0L;
        for (Tuple2<String, Long> t : elements) {
            sum += t.f1;
        }
        out.collect(Tuple2.of(key, sum));
    }
})

Iterable<Tuple2<String, Long>> elements, 存储着所有的元素。

上一篇下一篇

猜你喜欢

热点阅读