Flink

Flink Table中双流Join的实现

2019-04-21  本文已影响4人  分裂四人组

Regular Join

Regular joins are the most generic type of join in which any new records or changes to either side of the join input are visible and are affecting the whole join result. For example, if there is a new record on the left side, it will be joined with all of the previous and future records on the right side.

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

NonWindowJoin

NonWindowInnerJoin
  override def processElement(
      value: CRow,
      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
      out: Collector[CRow],
      timerState: ValueState[Long],
      currentSideState: MapState[Row, JTuple2[Long, Long]],
      otherSideState: MapState[Row, JTuple2[Long, Long]],
      isLeft: Boolean): Unit = {

    val inputRow = value.row
    // Step1: 更新状态:
    // - 写入Input到State或者从State中进行回撤;
   // -  设定该row的expiredTime;
    updateCurrentSide(value, ctx, timerState, currentSideState)

    cRowWrapper.setCollector(out)
    cRowWrapper.setChange(value.change)

   // Step2: 遍历对面State状态:
  // 疑点: 针对InnerJoin是否有必要每个Input都要遍历所有的对侧状态,这样当状态值很大时性能会骤减?
    val otherSideIterator = otherSideState.iterator()
    // join other side data
    while (otherSideIterator.hasNext) {
      val otherSideEntry = otherSideIterator.next()
      val otherSideRow = otherSideEntry.getKey
      val otherSideCntAndExpiredTime = otherSideEntry.getValue
      cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
     // Step3: 调用JoinFunction执行真实的Join任务
      callJoinFunction(inputRow, isLeft, otherSideRow, cRowWrapper)
      // clear expired data. Note: clear after join to keep closer to the original semantics
     // Step4: 清理对侧状态数据
      if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {
        otherSideIterator.remove()
      }
    }
  }

Time-windowed Joins

A time-windowed join is defined by a join predicate, that checks if the time attributes of the input records are within certain time constraints, i.e., a time window.

SELECT *
FROM
  Orders o,
  Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

DataStreamWindowJoin

WindowBound的计算

其推算逻辑如下:

--sql1: [-3600000, 3600000]
SELECT t1.a, t2.b 
FROM MyTable as t1 
join MyTable2 as t2 
on t1.a = t2.a 
and  t1.proctime between t2.proctime - interval '1' hour  and t2.proctime + interval '1' hour;

--sql2: [-999, 999]
t1.proctime > t2.proctime - interval '1' second and t1.proctime < t2.proctime + interval '1' second

--sql3: [-1999, 1999]
t2.c > t1.c - interval '2' second and t2.c < t1.c + interval '2' second

RowTimeBoundedStreamJoin(Join的实现逻辑)

下面基于处理左流的逻辑分析执行Join的实现。

涉及到的状态管理
涉及到的几个时间变量
步骤梳理
/**
    * Process rows from the left stream.
    */
  override def processElement1(
      cRowValue: CRow,
      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
      out: Collector[CRow]): Unit = {

    joinCollector.innerCollector = out
    // 更新OperatorTime,即leftOperatorTime/rightOperatorTime的值;
    updateOperatorTime(ctx)
    val leftRow = cRowValue.row
    val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow)
    // 通过左表的WindowBound计算右表的左沿值(可以理解为对称的结构)
    val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize
    // 通过左表的WindowBound计算右表的右沿值
    val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize
    var emitted: Boolean = false

    // Check if we need to join the current row against cached rows of the right input.
    // The condition here should be rightMinimumTime < rightQualifiedUpperBound.
    // We use rightExpirationTime as an approximation of the rightMinimumTime here,
    // since rightExpirationTime <= rightMinimumTime is always true.
    // 通过上次计算的右表ExpriedTime来评估是否需要跟左表做Join:
    // - 从反面看:因为rightExpirationTime是上次计算的值,真实值一定比该值大,如果该值都不小于rightQualifiedUpperBound,说明右流很快了,左右流的Join的窗口范围早已经过了,就不需要Join了;
    if (rightExpirationTime < rightQualifiedUpperBound) {
      // Upper bound of current join window has not passed the cache expiration time yet.
      // There might be qualifying rows in the cache that the current row needs to be joined with.
      // leftOperatorTime: 左表的操作时间;
      // - 如果是ProcTimeBoundedStreamJoin处理:该时间即为processiong time,当前处理的时间;
      // - 如果是RowTimeBoundedStreamJoin处理:该时间即为event time,返回的是当前的watermark(?);
      // calExpirationTime函数的逻辑即为:leftOperatorTime
      rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
      // Join the leftRow with rows from the right cache.
      // LeftRow同右表状态做Join;
      val rightIterator = rightCache.iterator()
      while (rightIterator.hasNext) {
        val rightEntry = rightIterator.next
        val rightTime = rightEntry.getKey

        // 确认右表状态的范围在预期的WindowFbound范围之内
        if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) {
          val rightRows = rightEntry.getValue
          var i = 0
          var entryUpdated = false
          while (i < rightRows.size) {
            joinCollector.reset()
            val tuple = rightRows.get(i)
            // 执行Join的逻辑
            joinFunction.join(leftRow, tuple.f0, joinCollector)
            // 标记该行是否emitted过,该flag可以用作在最后确认是否在left join/outer join时最终是否emit null值使用:
            // 如果已经emitted过,则最终不会emit null;
            emitted ||= joinCollector.emitted
            if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
              if (!tuple.f1 && joinCollector.emitted) {
                // Mark the right row as being successfully joined and emitted.
                tuple.f1 = true
                entryUpdated = true
              }
            }
            i += 1
          }
          if (entryUpdated) {
            // Write back the edited entry (mark emitted) for the right cache.
            rightEntry.setValue(rightRows)
          }
        }
        // 处理有表状态key已经过期的情况;
        if (rightTime <= rightExpirationTime) {
          if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
            val rightRows = rightEntry.getValue
            var i = 0
            while (i < rightRows.size) {
              val tuple = rightRows.get(i)
              if (!tuple.f1) {
                // Emit a null padding result if the right row has never been successfully joined.
                joinCollector.collect(paddingUtil.padRight(tuple.f0))
              }
              i += 1
            }
          }
          // eager remove
          rightIterator.remove()
        } // We could do the short-cutting optimization here once we get a state with ordered keys.
      }
    }

    // Check if we need to cache the current row.
    if (rightOperatorTime < rightQualifiedUpperBound) {
      // Operator time of right stream has not exceeded the upper window bound of the current
      // row. Put it into the left cache, since later coming records from the right stream are
      // expected to be joined with it.
      var leftRowList = leftCache.get(timeForLeftRow)
      if (null == leftRowList) {
        leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
      }
      leftRowList.add(JTuple2.of(leftRow, emitted))
      leftCache.put(timeForLeftRow, leftRowList)
      if (rightTimerState.value == 0) {
        // Register a timer on the RIGHT stream to remove rows.
        registerCleanUpTimer(ctx, timeForLeftRow, leftRow = true)
      }
    } else if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) {
      if (!emitted) {
        // Emit a null padding result if the left row is not cached and successfully joined.
        joinCollector.collect(paddingUtil.padLeft(leftRow))
      }
    }
  

疑惑

参考:

上一篇 下一篇

猜你喜欢

热点阅读