Flink-ValueState实例
2021-12-19 本文已影响0人
卡门001
功能描述
当计数到达3时求平均数,并清空已计算过的数值
知识
ValueStateDescriptor
ValueState
package com.flink.state;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
public class StateApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
test01(env);
env.execute("StateApp");
}
/**
* 使用ValueState
* 统计窗口平均数
*
* 需求:入元素为Tuple2
* 如果输入的元素>2,则求平均数,并半结果
* @param env
*/
private static void test01(StreamExecutionEnvironment env) {
List<Tuple2<Long,Long>> list = new ArrayList<>();
list.add(Tuple2.of(1l,3l));
list.add(Tuple2.of(1l,7l));
list.add(Tuple2.of(2l,4l));
list.add(Tuple2.of(1l,5l));
list.add(Tuple2.of(1l,2l));
list.add(Tuple2.of(2l,3l));
list.add(Tuple2.of(2l,5l));
env.fromCollection(list).keyBy(x->x.f0).flatMap(new AvgWithValueState()).print();
}
static class AvgWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
//求平均数:总和/记录条数
//关键字transient:,序列化对象的时候,这个属性就不会被序列化。
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", Types.TUPLE(Types.LONG,Types.LONG));
sum = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
//in.
Tuple2<Long,Long> currentState = sum.value();
if (currentState==null){
currentState = Tuple2.of(0L,0L);
}
currentState.f0 += 1; //求数总
currentState.f1 += value.f1; // 求和
sum.update(currentState);
//达到3条数据,就算出平均数
if (sum.value().f0>=3){
out.collect(Tuple2.of(value.f0,currentState.f1/currentState.f0.doubleValue()));
sum.clear();//清理
}
}
}
}