flink简单使用教程

flink使用10-通过Bulk iterator计算圆周率

2019-11-09  本文已影响0人  CheckChe

迭代处理是批量处理处理中的常见操作, Flink 的 迭代计算支持两种模式, 分别是 Bulk Iteration (全量迭代计算) 和 Delt Iteration (增量迭代计算). 下面就一个计算圆周率的例子 来说一下使用 Bulk Iteration 都有哪几个步骤.

在 Bulk Iteration 中, 主要的步骤其实是分为3步, 第一步是指定最大循环次数, 第二步是指定在循环时的一个计算处理的过程, 最后一步就是调用计算过程, 指定结束条件. 具体代码如下所示

public class BulkIteration {

    public static void main(String[] args) throws Exception {
        // 获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 构建输出数据
        DataSource<Integer> data = env.fromElements(0);
        // 1. 指定循环次数
        IterativeDataSet<Integer> loop = data.iterate(1000);
        // 2. 指循环计算过程
        MapOperator<Integer, Integer> process = loop.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer i) throws Exception {
                double x = Math.random();
                double y = Math.random();
                int result = (x * x + y * y) < 1 ? 1 : 0;
                return i + result;
            }
        });
        // 3. 使用 closeWith 调用计算过程
        List<Integer> collect = loop.closeWith(process).collect();
        // 输出最终结果
        for (Integer i : collect) {
            System.out.println( i / 1000.0 * 4);
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读