Flink-Streaming-算子学习-01

2019-07-14  本文已影响0人  李小李的路

概述

数据准备

streaming算子

Map

DataStream → DataStream 
Takes one element and produces one element. A map function that doubles the values of the input stream:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});
/**
     * 获取的是整个pojo
     *  MockUpModel(name=yahui, gender=female, timestamp=1563090399305, age=34)
     * @param kafkaData
     * @return
     */
    private static SingleOutputStreamOperator<MockUpModel> getAddAgePojo(SingleOutputStreamOperator<MockUpModel> kafkaData) {
        //      返回的是MockUpModel pojo类,其中age字段均+5
        return kafkaData.map(new MapFunction<MockUpModel, MockUpModel>() {
            @Override
            public MockUpModel map(MockUpModel value) throws Exception {
                MockUpModel mockUpModel = new MockUpModel();
                mockUpModel.name = value.name;
                mockUpModel.gender = value.gender;
                mockUpModel.age = value.age + 5;
                mockUpModel.timestamp = value.timestamp;
                return mockUpModel;

            }
        });
    }

FlatMap

DataStream → DataStream 
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});
public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator<MockUpModel> kafkaData = getKafka010Data(env);

        //      这个操作不能反应flatMap的算子作用,下面的作用相当于filter,输出结果为MockUpModel(name=liyahui-0, gender=male, timestamp=1561516105296, age=0)
        kafkaData.flatMap(new FlatMapFunction<MockUpModel, MockUpModel>() {
            @Override
            public void flatMap(MockUpModel value, Collector<MockUpModel> out) throws Exception {
                if (value.age % 2 == 0) {
                    out.collect(value);
                }
            }
        }).print().setParallelism(1);
        //   flatmap,是将嵌套集合转换并平铺成非嵌套集合。最好的解释详见官网的释义
        /*
        dataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out)
                    throws Exception {
                for(String word: value.split(" ")){
                    out.collect(word);
                }
            }
        });
        */


        env.execute("flink kafka010 demo");
    }

Fliter

DataStream → DataStream 
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});
// method 1
/**
     * 过滤出年龄是偶数的人
     *
     * @param kafkaData
     * @return
     */
    private static SingleOutputStreamOperator<MockUpModel> getFilterDS2(SingleOutputStreamOperator<MockUpModel> kafkaData) {
        return kafkaData.filter(new FilterFunction<MockUpModel>() {
            @Override
            public boolean filter(MockUpModel value) throws Exception {
                if (value.age % 2 == 0) {
                    return true;
                }
                return false;
            }
        });
    }
// method-2
/**
     * lambda 的方式
     *
     * @param kafkaData
     * @return
     */
    private static SingleOutputStreamOperator<MockUpModel> getFilterDS(SingleOutputStreamOperator<MockUpModel> kafkaData) {
        return kafkaData.filter(line -> line.age % 2 == 0);
    }

KeyBy

DataStream → KeyedStream 

Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, *keyBy()* is implemented with hash partitioning. There are different ways to [specify keys](https://ci.apache.org/projects/flink/flink-docs-
master/dev/api_concepts.html#specifying-keys).

This transformation returns a *KeyedStream*, which is, among other things, required to use [keyed state](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#keyed-state).


dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple


Attention A type **cannot be a key** if:

1.  it is a POJO type but does not override the *hashCode()* method and relies on the *Object.hashCode()* implementation.
2.  it is an array of any type.
/**
     * 以年龄为分组条件进行keyBy
     *
     * @param kafkaData
     * @return
     */
    private static KeyedStream<MockUpModel, Integer> getKeyedDS(SingleOutputStreamOperator<MockUpModel> kafkaData) {
        //  lambda 表达式
        //kafkaData.keyBy(line -> line.age).print().setParallelism(1);
        return kafkaData.keyBy(new KeySelector<MockUpModel, Integer>() {
            @Override
            public Integer getKey(MockUpModel value) throws Exception {
                return value.age;
            }
        });
    }

Reduce

Reduce
KeyedStream → DataStream    
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. 

A reduce function that creates a stream of partial sums:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});
public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator<MockUpModel> kafka010Data = getKafka010Data(env);
        //      lambda +java
        kafka010Data.keyBy(line -> line.gender).reduce(new ReduceFunction<MockUpModel>() {
            @Override
            public MockUpModel reduce(MockUpModel value1, MockUpModel value2) throws Exception {
                MockUpModel mockUpModel = new MockUpModel();
                mockUpModel.name = value1.name + "--" + value2.name;
                mockUpModel.gender = value1.gender;
                mockUpModel.age = (value1.age + value2.age) / 2;
                return mockUpModel;

            }
        }).print().setParallelism(1);

        env.execute("flink kafka010 demo");
    }

Fold

KeyedStream → DataStream    
A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value. 

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });

Aggregations

KeyedStream → DataStream    
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

union

DataStream* → DataStream    
Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.

dataStream.union(otherStream1, otherStream2, ...);

connect

Connect
DataStream,DataStream → ConnectedStreams    
"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

split

DataStream → SplitStream    
Split the stream into two or more streams according to some criterion.

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

select

SplitStream → DataStream    
Select one or more streams from a split stream.

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

iterate

DataStream → IterativeStream → DataStream 

Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See [iterations](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/index.html#iterations) for a complete description.

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});

总结


李小李可不能落后啊

上一篇 下一篇

猜你喜欢

热点阅读