Flink专题

Flink Streaming:Window Join机制

2019-02-15  本文已影响64人  尼小摩

window join连接两个流的元素,它们共享一个公共key并位于同一个窗口中。可以使用窗口分配器定义这些窗口,并对来自这两个流的元素求值。

然后将两边的元素传递给用户定义的JoinFunction或FlatJoinFunction,用户可以在其中发出满足连接条件的结果。

一般用法概括如下:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

语义注意事项:

在下一节中,我们将根据一些示例场景,来讲述不同类型的窗口连接的行为。

翻滚窗口Join(Tumbling Window Join)

执行滚动窗口连接(Tumbling Window Join)时,具有公共Key和公共tumbling window的所有元素都以成对组合的形式进行连接,并传递给JoinFunctionFlatJoinFunction。因为这就像一个内连接,在滚动窗口中没有来自另一个流的元素的流的元素不会被输出!

如图所示,我们定义了一个大小为2毫秒的滚动窗口,其结果为[0,1],[2,3], ...。该图像显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。注意,在翻滚窗口[6,7]中没有发出任何内容,因为在绿色流中没有元素与橙色元素⑥、⑦连接。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply { (e1, e2) => e1 + "," + e2 }

滑动窗口Join(Sliding Window Join)

在执行滑动窗口连接(Sliding Window Join)时,具有公共Key和公共滑动窗口(Sliding Window )的所有元素都作为成对组合进行连接,并传递给JoinFunctionFlatJoinFunction。当前滑动窗口中没有来自另一个流的元素的流的元素不会被发出!请注意,有些元素可能会在一个滑动窗口中连接,但不会在另一个窗口中连接!

在本例中,我们使用的滑动窗口大小为2毫秒,滑动1毫秒,滑动窗口结果[1,0],[0,1],[1,2],[2、3],.... x轴以下是每个滑动窗口的Join结果将被传递给JoinFunction的元素。在这里你还可以看到橙②与绿色③窗口Join(2、3),但不与任何窗口Join[1,2]。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply { (e1, e2) => e1 + "," + e2 }
    

会话窗口Join(Session Window Join)

在执行会话窗口连接时,具有相同键的所有元素(当“组合”时满足会话条件)都以成对的组合进行连接,并传递给JoinFunctionFlatJoinFunction。再次执行内部连接,因此如果会话窗口只包含来自一个流的元素,则不会发出任何输出!

在这里,定义一个会话窗口连接,其中每个会话被至少1ms的间隔所分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三次会话中绿色流没有元素,所以⑧⑨不会Join。

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply { (e1, e2) => e1 + "," + e2 }

间隔Join(Interval Join)

interval join用一个公共Key连接两个流的元素(将它们称为A & B),其中流B的元素的时间戳具有相对于流A中的元素的时间戳。 这也可以更正式地表示为b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中a和b是A和B中共享一个公钥的元素。下界和上界都可以是负的或正的,只要下界小于或等于上界。interval连接目前只执行内部连接。

当将一对元素传递给ProcessJoinFunction时,它们将给两个元素分配更大的时间戳(可以通过ProcessJoinFunction.Context访问)。
注意:间隔连接目前只支持事件时间。

在上面的示例中,我们将“橙色”和“绿色”两个流连接起来,它们的下界为-2毫秒,上界为+1毫秒。默认情况下,这些是包含边界的,但是可以通过.lowerboundexclusive(). upperboundexclusive()进行设置。

再用更正式的符号来表示angeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound 如三角形所示。

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] {
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
         out.collect(left + "," + right); 
        }
      });
    });
上一篇 下一篇

猜你喜欢

热点阅读