flink DataSet

2020-08-23  本文已影响0人  程序男保姆

DataSet 开发概述

DataSource 数据来源

 
public class ReadTextFileDemo {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//        DataSource<String> dataSource = env.readTextFile("file:///Users/baidu/Desktop/input");
        DataSource<String> dataSource = env.readTextFile("hdfs://master:9000/user/yuan/input/wc.count");
        dataSource.flatMap(new FlatMapFunction<String, S>() {
            @Override
            public void flatMap(String value, Collector<S> out) throws Exception {
                Arrays.stream(value.split("")).forEach(o -> {
                    S s = new S();
                    s.setStr(o);
                    out.collect(s);
                });
            }
        }).print();

    }


    public static class S {
        String str;

        public String getStr() {
            return str;
        }

        public void setStr(String str) {
            this.str = str;
        }

        @Override
        public String toString() {
            return "S{" +
                    "str='" + str + '\'' +
                    '}';
        }
    }
}

计数器

    public static void main(String[] args) throws Exception {

        // ExecutionEnvironment 执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> text = env.fromElements("Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        text
                .flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {

                    IntCounter intCounter = new IntCounter(0);

                    @Override
                    public void open(Configuration parameters) throws Exception {
                         // 解决多并行度时 计数总数不正确(因为每个并行度就是一个线程 每个线程之前是不共享数据的)
                        getRuntimeContext().addAccumulator("c", intCounter);
                    }

                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        for (String s : value.split(" ")) {
                            intCounter.add(1);
                            out.collect(new Tuple2<String, Integer>(s, 1));
                        }
                    }
                })
                // 设置几个并行度 就写几份文件
                .setParallelism(4)
                .writeAsText("file:///Users/baidu/Desktop/222", FileSystem.WriteMode.OVERWRITE);;


        JobExecutionResult execute = env.execute("abcd");

        Object c = execute.getAccumulatorResult("c");

        System.out.println("c = " + c);

    }

Sink

分布式缓存

Transformation算子

public class MapDemo {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> text = env.fromElements("1", "2", "3", "4", "5");
        // map 一对一转换 字符串数值
        text.map(new MapFunction<String, Integer>() {
            public Integer map(String value) throws Exception {
                return Integer.valueOf(value);
            }
        }).print();
    }

}
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> text = env.fromElements("我是一个大傻瓜", "我是一个小朋友");

        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            // 一对多 一行内容转换了好几行
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String s : value.split("")) {
                    out.collect(new Tuple2<String, Integer>(s, 1));
                }
            }
        }).print();
    }
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));


        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        // 按照数字分区 相同的数据会分到一个分区里
        collection
                .partitionByRange(0)
                .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                        values.forEach(o -> {
                            System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                        });
                        System.out.println();
                    }
                })
                .setParallelism(4)
                .print();
        System.out.println("partitionByRange");

        // 按照hash分区 
        collection
                .partitionByHash(0)
                .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                        values.forEach(o -> {
                            System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                        });
                        System.out.println();
                    }
                })
                .setParallelism(4)
                .print();
        System.out.println("partitionByHash");

        // 按照自定义分区 
        collection
                .partitionCustom(new Partitioner<Integer>() {
                    @Override
                    public int partition(Integer key, int numPartitions) {
                        System.out.println("key=" + key + "  numPartitions=" + numPartitions);
                        return key % numPartitions;
                    }
                }, 0)
                .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                        values.forEach(o -> {
                            System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                        });
                        //System.out.println();
                    }
                })
                .setParallelism(4)
                .print();
        System.out.println("partitionCustom");

    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Integer> dataSource = env.fromElements(1, 2, 3, 4, 5);

        dataSource.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer value) throws Exception {
                return value % 2 == 0;
            }
        }).print();

    }
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List<Tuple3<Integer, Integer, Integer>> data = new ArrayList<>();
        data.add(new Tuple3<>(1, 2, 3));
        data.add(new Tuple3<>(4, 5, 6));
        data.add(new Tuple3<>(7, 8, 9));

        // converts Tuple3<Integer, Integer, Integer> into Tuple2<Integer, Integer> 
        //选出合适的字段且可调换顺序
        env
                .fromCollection(data)
                .project(2, 0)
                .print();

    }

执行后的结果
(3,1)
(6,4)
(9,7)

关键表达,groupBy("key")
键选择器功能,implements KeySelector
一个或多个字段位置键(仅限元组数据集),groupBy(0, 1)
案例类别字段(仅案例类别),groupBy("a", "b")
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection.groupBy(0).reduce(new ReduceFunction<Tuple2<Integer, String>>() {
            @Override
            public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                return new Tuple2<>(value1.f0,value1.f1+" + "+value2.f1);
            }
        }).print();


    }

  public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection.groupBy(0)
                .reduceGroup(new RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {

                        values.forEach(o -> {
                            if (o.f0 % 2 == 0) {
                                out.collect(o);
                            }
                        });
                    }
                })
                .print();
    }


    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection.groupBy(0)
//                .reduceGroup(new RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
//                    @Override
//                    public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
//
//                        values.forEach(o -> {
//                            if (o.f0 % 2 == 0) {
//                                out.collect(o);
//                            }
//                        });
//                    }
//                })
                .reduceGroup(new GroupReduce())
                .print();
    }

    private static class GroupReduce implements GroupCombineFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>, GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {

/**
  * 与 reduce 函数相比,group-reduce 函数不是可隐式组合的。
  * 为了使 group-reduce 函数可组合,它必须实现 GroupCombineFunction 接口。
  *
  * 要点:GroupCombineFunction 接口的通用输入和输出类型必须等于 GroupReduceFunction 的通用输入类型,
  * 如以下示例所示:
  */
        // 实现comblinefunction接口 
        @Override
        public void combine(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
            Tuple2<Integer, String> t = StreamSupport.stream(values.spliterator(), false).reduce((o1, o2) -> {
                return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
            }).get();
            out.collect(t);
        }

        @Override
        public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
            values.forEach(o->{
                out.collect(o);
            });
        }
    }

聚合转换只能应用于元组数据集,并且仅支持用于分组的字段位置键

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, 1));
        data.add(new Tuple2<>(1, 2));
        data.add(new Tuple2<>(1, 3));
        data.add(new Tuple2<>(2, 4));
        data.add(new Tuple2<>(2, 5));
        data.add(new Tuple2<>(2, 6));
        data.add(new Tuple2<>(3, 7));
        data.add(new Tuple2<>(3, 8));
        data.add(new Tuple2<>(4, 9));
        data.add(new Tuple2<>(4, 10));
        data.add(new Tuple2<>(4, 11));
        data.add(new Tuple2<>(4, 12));
        data.add(new Tuple2<>(5, 13));
        data.add(new Tuple2<>(5, 14));
        data.add(new Tuple2<>(5, 15));
        data.add(new Tuple2<>(5, 16));
        data.add(new Tuple2<>(5, 17));
        data.add(new Tuple2<>(6, 18));
        data.add(new Tuple2<>(6, 19));
        data.add(new Tuple2<>(6, 20));
        data.add(new Tuple2<>(6, 21));

        DataSource<Tuple2<Integer, Integer>> collection = env.fromCollection(data);

        collection.groupBy(0)
                .aggregate(Aggregations.SUM,1)
                .print();
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, 1));
        data.add(new Tuple2<>(1, 2));
        data.add(new Tuple2<>(1, 3));
        data.add(new Tuple2<>(2, 4));
        data.add(new Tuple2<>(2, 5));
        data.add(new Tuple2<>(2, 6));
        data.add(new Tuple2<>(3, 7));
        data.add(new Tuple2<>(3, 8));
        data.add(new Tuple2<>(4, 9));
        data.add(new Tuple2<>(4, 10));
        data.add(new Tuple2<>(4, 11));
        data.add(new Tuple2<>(4, 12));
        data.add(new Tuple2<>(5, 13));
        data.add(new Tuple2<>(5, 14));
        data.add(new Tuple2<>(5, 15));
        data.add(new Tuple2<>(5, 16));
        data.add(new Tuple2<>(5, 17));
        data.add(new Tuple2<>(6, 18));
        data.add(new Tuple2<>(6, 19));
        data.add(new Tuple2<>(6, 20));
        data.add(new Tuple2<>(6, 21));

        DataSource<Tuple2<Integer, Integer>> collection = env.fromCollection(data);

        collection.groupBy(0)
                .maxBy(1)
                .print();
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection
                .reduce(new ReduceFunction<Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                        return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + " + " + value2.f1);
                    }
                }).print();
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
      

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection
//                .groupBy(0)
                .reduce(new ReduceFunction<Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                        return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + " + " + value2.f1);
                    }
                }).print();


    }

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Integer, String>> stringDataSource1 =
                env.fromElements(
                        new Tuple2(1, "小王"),
                        new Tuple2(2, "小里"),
                        new Tuple2(3, "小张"),
                        new Tuple2(4, "小四")
                );


        DataSource<Tuple2<Integer, String>> stringDataSource2 =
                env.fromElements(
                        new Tuple2(1, "北京"),
                        new Tuple2(2, "上海"),
                        new Tuple2(3, "成都"),
                        new Tuple2(5, "重庆")
                );


        stringDataSource1
                //.leftOuterJoin(stringDataSource2)
                //.rightOuterJoin(stringDataSource2)
                //.fullOuterJoin(stringDataSource2)
                .join(stringDataSource2)
                // 第一个数据 中 第一个字段
                .where(0)
                // 第二个数据 中 第一个字段
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                        if (first == null) {
                            return new Tuple3<>(second.f0, "-", second.f1);
                        } else if (second == null) {
                            return new Tuple3<>(first.f0, first.f1, "-");
                        } else {
                            return new Tuple3<>(first.f0, first.f1, second.f1);
                        }
                    }
                })
                .print();
    }

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple1<Integer>> stringDataSource1 =
                env.fromElements(
                        new Tuple1(1),
                        new Tuple1(2)
                );

        DataSource<Tuple1<String>> stringDataSource2 =
                env.fromElements(
                        new Tuple1("北京"),
                        new Tuple1("上海"),
                        new Tuple1("重庆")
                );

        stringDataSource1
                .cross(stringDataSource2)
                .with(new CrossFunction<Tuple1<Integer>, Tuple1<String>, Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> cross(Tuple1<Integer> val1, Tuple1<String> val2) throws Exception {
                        return new Tuple2(val1.f0, val2.f0);
                    }
                }).print();
    }

(1,北京)
(1,上海)
(1,重庆)
(2,北京)
(2,上海)
(2,重庆)

与 Reduce,GroupReduce 和 Join 相似,可以使用不同的键选择方法来定义键。

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Integer, String>> stringDataSource1 =
                env.fromElements(
                        new Tuple2(1, "小王"),
                        new Tuple2(1, "小周"),
                        new Tuple2(2, "小里"),
                        new Tuple2(3, "小张"),
                        new Tuple2(4, "小四")
                );


        DataSource<Tuple2<Integer, String>> stringDataSource2 =
                env.fromElements(
                        new Tuple2(1, "北京"),
                        new Tuple2(1, "邯郸"),
                        new Tuple2(2, "上海"),
                        new Tuple2(3, "成都"),
                        new Tuple2(5, "重庆")
                );


        stringDataSource1
                .coGroup(stringDataSource2)
                // 第一个数据 中 第一个字段
                .where(0)
                // 第二个数据 中 第一个字段
                .equalTo(0)
                .with(new CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<Integer, String>> first, Iterable<Tuple2<Integer, String>> second, Collector<Tuple2<Integer, String>> out) throws Exception {
                        Tuple2<Integer, String> t1 = StreamSupport.stream(first.spliterator(), false).reduce((o1, o2) -> {
                            return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
                        }).orElse(null);
                        Tuple2<Integer, String> t2 = StreamSupport.stream(second.spliterator(), false).reduce((o1, o2) -> {
                            return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
                        }).orElse(null);
                        if (t1 != null && t2 != null) {
                            out.collect(new Tuple2<>(t1.f0, t1.f1 + " + " + t2.f1));
                        }
                    }
                })
                .print();
    }
(3,小张 + 成都)
(1,小王 + 小周 + 北京 + 邯郸)
(2,小里 + 上海)

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Integer, String>> stringDataSource1 =
                env.fromElements(
                        new Tuple2(1, "小王"),
                        new Tuple2(1, "小周"),
                        new Tuple2(2, "小里"),
                        new Tuple2(3, "小张"),
                        new Tuple2(4, "小四")
                );


        DataSource<Tuple2<Integer, String>> stringDataSource2 =
                env.fromElements(
                        new Tuple2(1, "北京"),
                        new Tuple2(2, "上海"),
                        new Tuple2(3, "成都"),
                        new Tuple2(5, "重庆")
                );

        stringDataSource1.union(stringDataSource2)
                .print();
    }
(1,小王)
(4,小四)
(1,北京)
(1,小周)
(2,上海)
(2,小里)
(3,小张)
(3,小张)
(5,重庆)
上一篇下一篇

猜你喜欢

热点阅读