Flink学习指南

Flink:DataStream类型转换及常用算子说明

2020-04-29  本文已影响0人  傻子般白痴

1、DataStream算子转换概览

DataStreamFormations

2、DataStream转换算子

(1)Map [DataStream->DataStream]

说明:

样例:

DataStream<String> stream = env.addSource(new SimpleStringGenerator()).setParallelism(1);
    stream.map(new MapFunction<String, String>() {
    @Override
    public String map(String s) throws Exception {
        Thread.sleep(1000);
        String  tempStr="tmp_"+s;  // 单条数据计算
        return tempStr;
    }
}).print();
(2)FlatMap [DataStream->DataStream]

说明:

样例:

DataStream<String> textTemp=stream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String s, Collector<String> collector) throws Exception {
        // 样例数据:"1;50001;0;202001;\"53180093<40100017:53180093:40006212;"
        List<String> tempList=Arrays.asList(s.split(";"));
        for(String str:tempList){
            collector.collect(str);
        }
    }
})
(3)Filter [DataStream->DataStream]

说明:

样例:

DataStream<String> filterStream=stream.filter(new FilterFunction<String>() {
    @Override
    public boolean filter(String s) throws Exception {
        return s.isEmpty();
    }
});
(4)KeyBy [DataStream->KeyedStream]

说明:

样例:

DataStreamSource<UserInfo> source = env.fromCollection(Arrays.asList(
    new UserInfo("张三", "21", "click"),
    new UserInfo("李四", "23", "browse"),
    new UserInfo("张三", "31", "click")
    ));
        
KeyedStream<UserInfo, String> result = source.keyBy(new KeySelector<UserInfo, String>() {
    @Override
    public String getKey(UserInfo user) throws Exception {
        return user.getName();
    }
});
(5)Reduce [KeyedStream->DataStream]

说明:

样例:

KeyedStream<UserInfo, String> result = source.keyBy(new KeySelector<UserInfo, String>() {
    @Override
    public String getKey(UserInfo user) throws Exception {
        return user.getName();
    }
});

DataStream<UserInfo> text=result.reduce(new ReduceFunction<UserInfo>() {
    @Override
    public UserInfo reduce(UserInfo userInfo, UserInfo t1) throws Exception {
        return userInfo;
    }
});
(6)Aggregations[KeyedStream->DataStream]

<html>
Aggregations 是 KeyedDataStream 接口提供的聚合算子,根据指定的字段进行聚合操 作,滚动地产生一系列数据聚合结果。其实是将 Reduce 算子中的函数进行了封装,封装的 聚合操作有 等,这样就不需要用户自己定义 Reduce 函数。
</html>

(7)Union[DataStream ->DataStream]

说明:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserInfo> source = env.fromCollection(Arrays.asList(
   new UserInfo("张三", "21", "click"),
    new UserInfo("李四", "23", "browse"),
    new UserInfo("张三", "31", "click")
));

DataStreamSource<UserInfo> sourceTwo = env.fromCollection(Arrays.asList(
    new UserInfo("Ton", "21", "click"),
    new UserInfo("java", "23", "browse"),
    new UserInfo("flink", "31", "click")
));
        
DataStream unionStream=source.union(sourceTwo);
unionStream.print();
(8)Connect,CoMap,CoFlatMap[DataStream ->ConnectedStream->DataStream]

说明:

DataStreamSource<UserInfo> source = env.fromCollection(Arrays.asList(
    new UserInfo("张三", "21", "click"),
    new UserInfo("李四", "23", "browse"),
    new UserInfo("张三", "31", "click")
));

DataStreamSource<String> sourceTwo = env.fromCollection(Arrays.asList("1,2,3,4,5,6".split(",")));
// connect 算子进行两个流的拼接合并
ConnectedStreams<UserInfo, String> connectStream=source.connect(sourceTwo);

DataStream<String> text=connectStream.map(new CoMapFunction<UserInfo, String, String>() {
    @Override
    public String map1(UserInfo userInfo) throws Exception {
        return userInfo.toString();
    }

    @Override
    public String map2(String s) throws Exception {
        return s;
    }
});
text.print();

输出结果:
10> 2
2> 6
11> 3
1> 5
12> 4
9> 1
12> UserInfo{name='张三', age='31', desc='click'}
11> UserInfo{name='李四', age='23', desc='browse'}
10> UserInfo{name='张三', age='21', desc='click'}

流转换流程:

connectedStream转换流程
(9)Split 和 select [DataStream->SplitStream->DataStream]

说明:

样例:

DataStreamSource<UserInfo> source = env.fromCollection(Arrays.asList(
    new UserInfo("张三", "21", "click"),
    new UserInfo("李四", "23", "browse"),
    new UserInfo("张三", "31", "click")
));

SplitStream<UserInfo> splitStream = source.split(new OutputSelector<UserInfo>() {
    @Override
    public Iterable<String> select(UserInfo userInfo) {
        List<String> list = new ArrayList<>();
        if (userInfo.getName().equals("张三")) {
            list.add("success");
        } else {
            list.add("error");
        }
        return list;
    }
});
DataStream successStream = splitStream.select("success");
successStream.print("success-");
DataStream errorStream = splitStream.select("error");
errorStream.print("error-");

输出结果:
success-:3> UserInfo{name='张三', age='31', desc='click'}
success-:2> UserInfo{name='张三', age='21', desc='click'}
error-:9> UserInfo{name='李四', age='23', desc='browse'}

流转换流程:

split()&select()
上一篇下一篇

猜你喜欢

热点阅读