Flink常用的DataSet和DataStream API

2021-09-03  本文已影响0人  一生逍遥一生

DataSet和DataStream的区别和联系

DataSet部分来源于文件、表或者Java集合;DataStream的Source部分则一般是消息中间件。

API介绍

@Data
@Getter
@Setter
@ToString
public class Item implements Serializable {
    private String name;
    private Integer id;
}
public class CustomDataSource implements SourceFunction<Item> {
    private boolean isRunning = true;

    @Override
    public void run(SourceContext<Item> ctx) throws Exception {
        while (isRunning) {
            Item item = generateItem();
            ctx.collect(item);
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    private Item generateItem() {
        int i = new Random().nextInt(100);
        Item item = new Item();
        item.setName("name " + i);
        item.setId(i);
        return item;
    }
}
public class StreamingDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //获取数据源
        /**
         *         DataStreamSource<Item> text = env.addSource(new CustomDataSource()).setParallelism(1);
         *         DataStream<Item> item = text.map((MapFunction<Item, Item>) value -> value);
         *         item.print().setParallelism(1);
         *         String jobName = "user define streaming source";
         *         env.execute(jobName);
         */
        // Map方式:Map接收一个元素作为输入,
        /**
         *         DataStreamSource<Item> items = env.addSource(new CustomDataSource()).setParallelism(1);
         *         SingleOutputStreamOperator<Object> mapItems = items.map((MapFunction<Item, Object>) item -> item.getName());
         *         mapItems.print().setParallelism(1);
         *         String jobName = "user define streaming source";
         *         env.execute(jobName);
         */
        //FlatMap:FlatMap接收一个元素,返回零到多个元素。
        //Filter:过滤不需要的数据
        //KeyedStream:根据某个或者某种属性进行分组,进行不同的处理
        //KeyBy: 对数据进行分组,数据添加随机数,防止数据倾斜
        //Aggregations:聚合函数。
        //min与minBy的区别:min会返回限定字段的最小值,minBy会返回对应的元素
        List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
        data.add(new Tuple3<>(0,1,0));
        data.add(new Tuple3<>(0,1,1));
        data.add(new Tuple3<>(0,2,2));
        data.add(new Tuple3<>(0,1,3));
        data.add(new Tuple3<>(1,2,5));
        data.add(new Tuple3<>(1,2,9));
        data.add(new Tuple3<>(1,2,11));
        data.add(new Tuple3<>(1,2,13));
        DataStreamSource<Item> items = env.fromCollection(data);
        items.keyBy(0).max(2).printToErr();
        env.execute();
        //Reduce会在每一个分组的KeyedStream上生效,按照用户自定义的聚合逻辑进行分组聚合。
    }
}
上一篇下一篇

猜你喜欢

热点阅读