Flink Joining

2020-09-08  本文已影响0人  Rex_2013

前言

数据流操作的一个常见需求是对两条数据流中的事件进行联结(connect)或Join。connect在前面Flink API
文章中。
Flink DataStream API中内置有两个可以根据时间条件对数据流进行Join的算子:Window JoinInterval Join

如果Flink内置的Join算子无法表达所需的Join语义,那么你可以通过CoProcessFunction、BroadcastProcessFunction或KeyedBroadcastProcessFunction实现自定义的Join逻辑。

注意,你要设计的Join算子需要具备高效的状态访问模式及有效的状态清理策略。

Window Join

顾名思义,Window Join需要用到Flink中的窗口机制。其原理是将两条输入流中的元素分配到公共窗口中并在窗口完成时进行Join(或Cogroup)。

下面的例子展示了如何定义基于窗口的Join。

input1.join(input2)
  .where(<KeySelector>)       // 为input1指定键值属性
  .equalTo(<KeySelector>)     // 为input2指定键值属性
  .window(<WindowAssigner>)      // 指定WindowAssigner
  [.trigger(...)]   // 选择性的指定Trigger
  [.evictor(...)]   // 选择性的指定Evictor
  .apply(<JoinFunction>)       // 指定JoinFunction

有关语义的一些注意事项:

Window Join

两条输入流都会根据各自的键值属性进行分区,公共窗口分配器会将二者的事件映射到公共窗口内(其中同时存储了两条流中的数据)。当窗口的计时器触发时,算子会遍历两个输入中元素的每个组合(叉乘积)去调用JoinFunction。同时你也可以自定义触发器或移除器。由于两条流中的事件会被映射到同一个窗口中,因此该过程中的触发器和移除器与常规窗口算子中的完全相同。

除了对窗口中的两条流进行inner join,如果要实现两流的实现left/right join。你可以对它们进行Cogroup,只需将算子定义开始位置的join改为coGroup()即可。Join和Cogroup的总体逻辑相同,区别是:Join会为两侧输入中的每个事件对调用JoinFunction;而Cogroup中用到的CoGroupFunction会以两个输入的元素遍历器为参数,只在每个窗口中被调用一次。

注意,对划分窗口后的数据流进行Join可能会产生意想不到的语义。例如,假设你为执行Join操作的算子配置了1小时的滚动窗口,那么一旦来自两个输入的元素没有被划分到同一窗口,它们就无法Join在一起,即使二者彼此仅相差1秒钟。

Tumbling Window Join

Tumbling Window Join

如图所示,我们定义了一个大小为2毫秒的滚动窗口,该窗口的形式为[0,1], [2,3], ...。该图显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。注意,在翻转窗口中[6,7]什么也不发射,因为在绿色流中不存在要与橙色元素⑥和joined连接的元素。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply { (e1, e2) => e1 + "," + e2 }

Sliding Window Join

Sliding Window Join

在此示例中,我们使用大小为2毫秒的滑动窗口,并将它们滑动1毫秒,从而形成滑动窗口[-1, 0],[0,1],[1,2],[2,3], …。x轴下方的连接元素是传递给JoinFunction每个滑动窗口的元素。在这里,您还可以看到例如橙色②与绿色③在窗口中如何结合[2,3],而没有与窗口中的任何事物结合[1,2]。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply { (e1, e2) => e1 + "," + e2 }

Session Window Join

Session Window Join

在这里,我们定义了一个会话窗口连接,其中每个会话之间的间隔至少为1ms。共有三个会话,在前两个会话中,两个流中的联接元素都传递给JoinFunction。在第三个会话中,绿色流中没有元素,因此⑧和⑨不连接!

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply { (e1, e2) => e1 + "," + e2 }

Interval Join

window Join 是基于窗口来做join。很多实际业务场景,一些动作是有先后顺序的。比如浏览和点击的行为,不可能是同时进行的。
Interval Join的Join会对两条流中拥有相同键值以及彼此之间时间戳不超过某一指定间隔的事件进行Join。

Interval Join

在上面的示例中,我们将两个流“橙色”和“绿色”连接在一起,下限为-2毫秒,上限为+1毫秒。缺省情况下,这些界限是包容性的,但.lowerBoundExclusive()并.upperBoundExclusive可以应用到改变行为。

使用更正式的符号,这将转化为
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

如三角形所示。

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] {
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
         out.collect(left + "," + right); 
        }
      });
    });

参考官网 interval-join

上一篇下一篇

猜你喜欢

热点阅读