flinkflink

Operators

2018-01-20  本文已影响5人  小C菜鸟

原文链接

操作符将一个或多个DataStream转换成为新的DataStream。程序可以将多个转换组合成复杂的数据流拓步。

本节给出了基本转换的描述,以及应用这些转换后的有效物理分区和Flink的操作符链接的见解。

DataStream转换操作

转换操作 描述
Map
DataStream → DataStream
输入一个元素并返回一个元素。一个将输入流的value乘以2的map函数:
<pre> DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
}); </pre>
FlatMap
DataStream → DataStream
输入一个元素,返回0,1或多个元素。一个将句子拆分成单词的flatmap函数:
<pre> <code> dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});</code> </pre>
Filter
DataStream → DataStream
计算每个元素的布尔函数,并保留函数返回true的值,一个过滤掉0值的filter:
<pre> dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
}); <code></code> </pre>
KeyBy
DataStream → KeyedStream
逻辑上将一条流划分为不同的区,每个区包含相同键的元素。在内部,这是通过hash分区实现的。如何指定键请看keys。这个转换操作返回一个KeyedStream。
<pre> <code>dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple</code></pre>
注意:下述类型不能成为key:
1. 它是一个POJO类,但是没有重写hashCode()方法而是依赖于Object.hashCode()的实现。
2. 它是任意类型的数组。
Reduce
KeyedStream → DataStream
keyed数据流上的“滚动”reduce。将当前元素与最后一个reduce值组合并发出新的值。
一个创建流的部分和的reduce函数:
<pre> <code> keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
}); </code></pre>
Fold
KeyedStream → DataStream
具有初始值的keyed数据流上的“滚动”fold。将当前元素与最后一个fold值组合并发出新的值。
当应用于序列(1,2,3,4,5)上时,fold函数会发出"start-1", "start-1-2", "start-1-2-3", ...
<pre> <code> DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
}); </code></pre>
Aggregations
KeyedStream → DataStream
keyed数据流上的滚动聚合。min和minBy的区别在于min返回最小值,minBy返回这个属性上具有最小值的元素(max和maxBy相同).
<pre> <code> keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key"); </code></pre>
Window
KeyedStream → WindowedStream
可以在已经分区的KeyedStream上定义窗口。窗口根据键的某些特性(例如,在最后5秒到达的数据)分组数据。关于窗口的完整描述请见Windows
<pre> <code> dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data </code></pre>
WindowAll
DataStream → AllWindowedStream
可以在常规的DataStream上定义窗口。窗口根据一些特性(例如,在最后5秒到达的数据)分组所有流的事件。关于窗口的完整描述请见Windows
警告: 这在很多情况下不是一个并行的转换。windowAll操作符会把所有的记录收集到一个任务中。
<pre> <code> dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data</code></pre>
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将一个通用函数应用于窗口。下面是一个手动计算窗口元素的函数。
注意: 如果使用windowAll转换,需要使用AllWindowFunction代替。
<pre> <code> windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
}); </code></pre>
Window Reduce
WindowedStream → DataStream
将reduce函数应用到窗口上,并返回reduce的值。
<pre> <code> windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
}); </code></pre>
Window Fold
WindowedStream → DataStream
将fold函数应用到窗口上,并返回fold值。当应用到列表(1,2,3,4,5)上时,示例函数fold列表成字符串"start-1-2-3-4-5":
<pre> <code> windowedStream.fold("start", new FoldFunction<Integer, String>() {
public String fold(String current, Integer value) {
return current + "-" + value;
}
}); </code></pre>
Aggregations on windows
WindowedStream → DataStream
聚合窗口的内容。min和minBy的区别在于min返回最小值,minBy返回这个属性上具有最小值的元素(max和maxBy相同).
<pre> <code> windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key"); </code></pre>
Union
DataStream* → DataStream
合并两个或多个数据流,然后生成一个新的包含所有流的所有元素的流。注意:如果将数据流与自己合并,在新的流中会得到每个元素两次。
dataStream.union(otherStream1, otherStream2, ...);
Window Join
DataStream,DataStream → DataStream
在公共窗口和给定键上连接两个数据流。
<pre> <code> dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...}); </code></pre>
Window CoGroup
DataStream,DataStream → DataStream
在公共窗口和给定键上对两个数据流进行分组。
<pre> <code> dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...}); </code></pre>
Connect
DataStream,DataStream → ConnectedStreams
“连接”两个数据流保留它们的类型。连接允许两个流之间的共享状态。
<pre> <code> DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);</code></pre>
CoMap, CoFlatMap
ConnectedStreams → DataStream
类似于连接数据流上的map和flatMap。
<pre> <code> connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});</code></pre>
Split
DataStream → SplitStream
根据某些标准将流分成两个或多个流。
<pre> <code> SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
}); </code></pre>
Select
SplitStream → DataStream
从拆分流中选择一个或多个流。
<pre> <code> SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd"); </code></pre>
Iterate
DataStream → IterativeStream → DataStream
通过重定向操作符的输出到前一个操作符,创建一个流上的“反馈”循环。这对于连续更新模型的算法特别有用。下面的代码以一个数据流开始并持续的应用迭代体。大于0的元素被发送回反馈通道,其余元素被发送到下游。完整的描述请参见iterations
<pre> <code> IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/do something/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value <= 0;
}
}); </code></pre>
Extract Timestamps
DataStream → DataStream
从记录中提取时间戳,以便于使用事件时间语义的窗口工作。见Event Time
stream.assignTimestamps (new TimeStampExtractor() {...});

可以对Tuple数据流有以下转换:

转换操作 描述
Project
DataStream → DataStream
选取元组属性的子集。
<pre> <code> DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0); </code></pre>

物理分区

Flink通过下述函数,在转换后的流上提供低层次的精确流分区控制(如果需要的话)。

转换操作 描述
Custom partitioning
DataStream → DataStream
使用用户定义的分区器来为每个元素选择目标任务。
<pre> <code> dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0); </code></pre>
Random partitioning
DataStream → DataStream
根据均匀分布随机划分元素。
<pre> <code> dataStream.shuffle(); </code></pre>
Rebalancing (Round-robin partitioning)
DataStream → DataStream
分区元素循环,为每个分区创建相同的负载。在数据倾斜的情况下对性能优化有用。
<pre> <code> dataStream.rebalance(); </code></pre>
Rescaling
DataStream → DataStream
Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.

The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation.

In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

Please see this figure for a visualization of the connection pattern in the above example:
Apache_Flink_1_4_Documentation__Operators.png
<pre> <code> dataStream.rescale(); </code></pre>
Broadcasting
DataStream → DataStream
向每个分区广播元素。 <pre> <code> dataStream.broadcast(); </code></pre>

任务链接和资源组

链接两个子转换意味着它们在同一个线程中以获得更好的性能。Flink默认如果可能会链接操作符(例如,两个map转换)。如果需要,API对链接提供细粒度的控制。
如果想要在整个作业中禁止链接,使用StreamExecutionEnvironment.disableOperatorChaining()方法。对于更细粒度的控制,可以使用下述函数。注意这些函数只能在DataStream转换之后使用,因为它们引用了前面的转换。例如,你可以使用someStream.map(…). startnewchain(),但你不能使用someStream.startNewChain()。
在Flink中,资源组是一个槽,见slots。如果需要,您可以在单独的槽中手动隔离操作符。

转换操作 描述
Start new chain 从这个操作符开始一个新的链。两个map将被链接,并且filter不会被链接到第一个map。 <pre> <code> someStream.filter(...).map(...).startNewChain().map(...); </code></pre>
Disable chaining 不要链接map操作符. <pre> <code>someStream.map(...).disableChaining();
</code></pre>
Set slot sharing group Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). <pre> <code> someStream.filter(...).slotSharingGroup("name");</code></pre>
上一篇下一篇

猜你喜欢

热点阅读