09-flink-Accumulator(累加器)

2019-11-04  本文已影响0人  蜗牛写java

09-flink-Accumulator(累加器)

概念

Accumulator(累加器):累加器主要作用在用户操作(operate)中收集分布式统计信息或聚合信息。每个并行实例创建并更新其自己的累加器对象,不同并行实例的累加器由系统作业结束后合并。结果可以从作业执行的结果中获得,也可以从web运行时监视器中获得。

分类

  1. IntCounter
  2. LongCounter
  3. DoubleCounter
  4. Histogram
  5. 自定义(实现SimpleAccumulator接口)

用法

  1. 创建累加器:private IntCounter numLines = new IntCounter();

    备注:在operate中使用

  2. 注册累加器:getRuntimeContext().addAccumulator("num-lines", this.numLines);

    备注:operate 实现 Rich...接口

  3. 使用累加器:this.numLines.add(1);

  4. 获取结果:

    JobExecutionResult myJobExecutionResult = env.execute("accumulators Test");
    myJobExecutionResult.getAccumulatorResult("num-lines")
    

例子

public class CounterTest {

    public static void main(String[] args) throws Exception {
        //获取执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //--hostname 10.24.14.193  --port 9000
        final ParameterTool params = ParameterTool.fromArgs(args);
        String hostname = params.has("hostname") ? params.get("hostname") : "localhost";
        int port = params.has("port") ? params.getInt("port") : 9000;

        System.out.println("hostName=" + hostname + " port=" + port);

        //数据来源
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");

        //operate
        text.map(new RichMapFunction<String, String>() {

            //第一步:定义累加器
            private IntCounter numLines = new IntCounter();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);

                //第二步:注册累加器
                getRuntimeContext().addAccumulator("num-lines", this.numLines);
            }

            @Override
            public String map(String s) throws Exception {
                //第三步:累加
                this.numLines.add(1);
                return s;
            }
        });

        //数据去向
        text.print();

        //执行
        JobExecutionResult socketTest = env.execute("socketTest");

        //第四步:结束后输出总量;如果不需要结束后持久化,可以省去,因为在flinkUI中可以看到
        //String total = socketTest.getAccumulatorResult("num-lines").toString();
    }

sparkUI 查看

flinkUI-Accumulators.png
上一篇 下一篇

猜你喜欢

热点阅读