Flink-ValueState实例

2021-12-19  本文已影响0人  卡门001

功能描述

当计数到达3时求平均数,并清空已计算过的数值

知识
ValueStateDescriptor
ValueState

package com.flink.state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class StateApp {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        test01(env);
        env.execute("StateApp");
    }

    /**
     * 使用ValueState
     * 统计窗口平均数
     *
     * 需求:入元素为Tuple2
     * 如果输入的元素>2,则求平均数,并半结果
     * @param env
     */
    private static void test01(StreamExecutionEnvironment env) {
        List<Tuple2<Long,Long>> list = new ArrayList<>();
        list.add(Tuple2.of(1l,3l));
        list.add(Tuple2.of(1l,7l));
        list.add(Tuple2.of(2l,4l));
        list.add(Tuple2.of(1l,5l));
        list.add(Tuple2.of(1l,2l));
        list.add(Tuple2.of(2l,3l));
        list.add(Tuple2.of(2l,5l));

        env.fromCollection(list).keyBy(x->x.f0).flatMap(new AvgWithValueState()).print();
    }


    static class AvgWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {

        //求平均数:总和/记录条数
        //关键字transient:,序列化对象的时候,这个属性就不会被序列化。
        private transient ValueState<Tuple2<Long, Long>> sum;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", Types.TUPLE(Types.LONG,Types.LONG));
            sum = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
            //in.
            Tuple2<Long,Long> currentState = sum.value();
            if (currentState==null){
                currentState = Tuple2.of(0L,0L);
            }
            currentState.f0 += 1;  //求数总
            currentState.f1 += value.f1; // 求和
            sum.update(currentState);

            //达到3条数据,就算出平均数
            if (sum.value().f0>=3){
                out.collect(Tuple2.of(value.f0,currentState.f1/currentState.f0.doubleValue()));
                sum.clear();//清理
            }
        }
    }
}

上一篇 下一篇

猜你喜欢

热点阅读