从Flink SQL "doesn't support cons

2022-01-07  本文已影响0人  LittleMagic

前言

相信我们在初学Flink SQL时,多少遇到过像这样的错误信息:

org.apache.flink.table.api.TableException: 
X[算子名] doesn't support consuming update and delete changes which is produced by node Y[算子名]

为什么有些下游算子不能接受上游算子发来的UPDATE和DELETE消息呢?本文以1.13版本为准来简单地捋一下。

回顾ChangelogMode

笔者之前写过一篇自定义Flink SQL Connector的简明教程,其中提到在定义DynamicTableSink(以及ScanTableSource)的时候,都需要覆写getChangelogMode()方法,告诉Planner这个Connector可以接受或产生的数据变化类型。变化的标记由四种RowKind表示,即INSERT(+I)、UPDATE_BEFORE(-U)、UPDATE_AFTER(+U)和DELETE(-D):

    /** Insertion operation. */
    INSERT("+I", (byte) 0),

    /**
     * Update operation with the previous content of the updated row.
     *
     * <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that
     * needs to retract the previous row first. It is useful in cases of a non-idempotent update,
     * i.e., an update of a row that is not uniquely identifiable by a key.
     */
    UPDATE_BEFORE("-U", (byte) 1),

    /**
     * Update operation with new content of the updated row.
     *
     * <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that
     * needs to retract the previous row first. OR it describes an idempotent update, i.e., an
     * update of a row that is uniquely identifiable by a key.
     */
    UPDATE_AFTER("+U", (byte) 2),

    /** Deletion operation. */
    DELETE("-D", (byte) 3);

但是,ChangelogMode只能作用于Connector,中间算子(其实也包含Source/Sink)产生和接受哪些类型的变化在Planner内部是如何定义的?下面了解一下相关的两种RelTraits。

ModifyKindSet / UpdateKind Trait

复习一下之前讲过的RelTraitRelTraitDef的含义:

A set of physical properties & their definitions carried by a relational expression.

站在RelNode的角度上讲,ChangelogMode确实可以作为附加在其上的物理属性。Blink Planner的物理计划层使用了两个RelTrait来承载数据变化的语义。第一个是ModifyKindSetTrait,表示INSERT(I)、UPDATE(U)和DELETE(D)三者组成的集合,部分代码如下,比较容易理解:

object ModifyKindSetTrait {
  /**
   * An empty [[ModifyKindSetTrait]] which doesn't contain any [[ModifyKind]].
   */
  val EMPTY = new ModifyKindSetTrait(ModifyKindSet.newBuilder().build())

  /**
   * Insert-only [[ModifyKindSetTrait]].
   */
  val INSERT_ONLY = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY)

  /**
   * A modify [[ModifyKindSetTrait]] that contains all change operations.
   */
  val ALL_CHANGES = new ModifyKindSetTrait(ModifyKindSet.ALL_CHANGES)

  /**
   * Creates an instance of [[ModifyKindSetTrait]] from th given [[ChangelogMode]].
   */
  def fromChangelogMode(changelogMode: ChangelogMode): ModifyKindSetTrait = {
    val builder = ModifyKindSet.newBuilder
    changelogMode.getContainedKinds.foreach {
      case RowKind.INSERT => builder.addContainedKind(ModifyKind.INSERT)
      case RowKind.DELETE => builder.addContainedKind(ModifyKind.DELETE)
      case _ => builder.addContainedKind(ModifyKind.UPDATE) // otherwise updates
    }
    new ModifyKindSetTrait(builder.build)
  }
}

第二个则是UpdateKindTrait,表示UPDATE_BEFORE(-U)和UPDATE_AFTER(+U),也不难。注意它除了可以从ChangelogMode转换而来,还可以从ModifyKindSet转换而来:

object UpdateKindTrait {

  /**
   * An [[UpdateKindTrait]] that describes the node doesn't provide any kind of updates
   * as a provided trait, or requires nothing about kind of updates as a required trait.
   *
   * <p>It also indicates that the [[ModifyKindSetTrait]] of current node doesn't contain
   * [[ModifyKind#UPDATE]] operation.
   */
  val NONE = new UpdateKindTrait(UpdateKind.NONE)

  /**
   * An [[UpdateKindTrait]] that describes the node produces update changes just as a
   * single row of [[org.apache.flink.types.RowKind#UPDATE_AFTER]]
   */
  val ONLY_UPDATE_AFTER = new UpdateKindTrait(UpdateKind.ONLY_UPDATE_AFTER)

  /**
   * An [[UpdateKindTrait]] that describes the node produces update changes consists of
   * a row of [[org.apache.flink.types.RowKind#UPDATE_BEFORE]] and
   * [[org.apache.flink.types.RowKind#UPDATE_AFTER]].
   */
  val BEFORE_AND_AFTER = new UpdateKindTrait(UpdateKind.BEFORE_AND_AFTER)

  /**
   * Returns ONLY_UPDATE_AFTER [[UpdateKindTrait]] if there is update changes.
   * Otherwise, returns NONE [[UpdateKindTrait]].
   */
  def onlyAfterOrNone(modifyKindSet: ModifyKindSet): UpdateKindTrait = {
    val updateKind = if (modifyKindSet.contains(ModifyKind.UPDATE)) {
      UpdateKind.ONLY_UPDATE_AFTER
    } else {
      UpdateKind.NONE
    }
    new UpdateKindTrait(updateKind)
  }

  /**
   * Returns BEFORE_AND_AFTER [[UpdateKindTrait]] if there is update changes.
   * Otherwise, returns NONE [[UpdateKindTrait]].
   */
  def beforeAfterOrNone(modifyKindSet: ModifyKindSet): UpdateKindTrait = {
    val updateKind = if (modifyKindSet.contains(ModifyKind.UPDATE)) {
      UpdateKind.BEFORE_AND_AFTER
    } else {
      UpdateKind.NONE
    }
    new UpdateKindTrait(updateKind)
  }

  /**
   * Creates an instance of [[UpdateKindTrait]] from the given [[ChangelogMode]].
   */
  def fromChangelogMode(changelogMode: ChangelogMode): UpdateKindTrait = {
    val hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE)
    val hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER)
    (hasUpdateBefore, hasUpdateAfter) match {
      case (true, true) => BEFORE_AND_AFTER
      case (false, true) => ONLY_UPDATE_AFTER
      case (true, false) =>
        throw new IllegalArgumentException("Unsupported changelog mode: " +
          ChangelogPlanUtils.stringifyChangelogMode(Some(changelogMode)))
      case (false, false) => NONE
    }
  }
}

RelTrait相容性

补充一个之前的Calcite入门讲义里略去的点,就是RelTrait的核心方法:

boolean satisfies(RelTrait trait);

它用于判断此RelTrait与另外一个RelTrait的相容性,亦即T1是否满足T2的约束。显然,如果T1与T2相同,或者T1比T2更严格,那么此方法返回true,否则返回false。举个栗子,对于RelCollation而言,(ORDER BY a, b) satisfies (ORDER BY a)就是成立的,反过来则不成立。

ModifyKindSetTrait#satisfies()方法的定义如下,注释写得很清楚,即T1是T2的子集:

  override def satisfies(relTrait: RelTrait): Boolean = relTrait match {
    case other: ModifyKindSetTrait =>
      // it’s satisfied when modify kinds are included in the required set,
      // e.g. [I,U] satisfy [I,U,D]
      //      [I,U,D] not satisfy [I,D]
      this.modifyKindSet.getContainedKinds.forall(other.modifyKindSet.contains)
    case _ => false
  }

UpdateKindTrait#satisfies()则要求两者完全相同:

  override def satisfies(relTrait: RelTrait): Boolean = relTrait match {
    case other: UpdateKindTrait =>
      // should totally match
      other.updateKind == this.updateKind
    case _ => false
  }

接下来就可以进入Blink Planner的相关逻辑了。

物理计划阶段的ChangelogMode推断

Blink Planner通过名为FlinkChangelogModeInferenceProgram的优化程序来为每个StreamPhysicalRel推断出ChangelogMode信息,并检查产生的ModifyKindSetTraitUpdateKindTrait的上下游相容性。主要的逻辑分为两步:

    // step1: satisfy ModifyKindSet trait
    val physicalRoot = root.asInstanceOf[StreamPhysicalRel]
    val rootWithModifyKindSet = SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR.visit(
      physicalRoot,
      // we do not propagate the ModifyKindSet requirement and requester among blocks
      // set default ModifyKindSet requirement and requester for root
      ModifyKindSetTrait.ALL_CHANGES,
      "ROOT")

    // step2: satisfy UpdateKind trait
    val rootModifyKindSet = getModifyKindSet(rootWithModifyKindSet)
    // use the required UpdateKindTrait from parent blocks
    val requiredUpdateKindTraits = if (rootModifyKindSet.contains(ModifyKind.UPDATE)) {
      if (context.isUpdateBeforeRequired) {
        Seq(UpdateKindTrait.BEFORE_AND_AFTER)
      } else {
        // update_before is not required, and input contains updates
        // try ONLY_UPDATE_AFTER first, and then BEFORE_AND_AFTER
        Seq(UpdateKindTrait.ONLY_UPDATE_AFTER, UpdateKindTrait.BEFORE_AND_AFTER)
      }
    } else {
      // there is no updates
      Seq(UpdateKindTrait.NONE)
    }

可见是通过两个特殊定义的Visitor(参见访问者模式)对物理计划树进行遍历与转换。以SatisfyModifyKindSetTraitVisitor为例,它的visit()方法代码框架如下,也体现了Scala模式匹配的强大之处。

    def visit(
        rel: StreamPhysicalRel,
        requiredTrait: ModifyKindSetTrait,
        requester: String): StreamPhysicalRel = rel match {
      case sink: StreamPhysicalSink =>
        val name = s"Table sink '${sink.tableIdentifier.asSummaryString()}'"
        val queryModifyKindSet = deriveQueryDefaultChangelogMode(sink.getInput, name)
        val sinkRequiredTrait = ModifyKindSetTrait.fromChangelogMode(
          sink.tableSink.getChangelogMode(queryModifyKindSet))
        val children = visitChildren(sink, sinkRequiredTrait, name)
        val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY)
        // ignore required trait from context, because sink is the true root
        sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]

      case sink: StreamPhysicalLegacySink[_] => // ......

      case deduplicate: StreamPhysicalDeduplicate =>
        // deduplicate only support insert only as input
        val children = visitChildren(deduplicate, ModifyKindSetTrait.INSERT_ONLY)
        val providedTrait = if (!deduplicate.keepLastRow && !deduplicate.isRowtime) {
          // only proctime first row deduplicate does not produce UPDATE changes
          ModifyKindSetTrait.INSERT_ONLY
        } else {
          // other deduplicate produce update changes
          ModifyKindSetTrait.ALL_CHANGES
        }
        createNewNode(deduplicate, children, providedTrait, requiredTrait, requester)

      case agg: StreamPhysicalGroupAggregate =>
        // agg support all changes in input
        val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES)
        val inputModifyKindSet = getModifyKindSet(children.head)
        val builder = ModifyKindSet.newBuilder()
          .addContainedKind(ModifyKind.INSERT)
          .addContainedKind(ModifyKind.UPDATE)
        if (inputModifyKindSet.contains(ModifyKind.UPDATE) ||
            inputModifyKindSet.contains(ModifyKind.DELETE)) {
          builder.addContainedKind(ModifyKind.DELETE)
        }
        val providedTrait = new ModifyKindSetTrait(builder.build())
        createNewNode(agg, children, providedTrait, requiredTrait, requester)

      case tagg: StreamPhysicalGroupTableAggregateBase => // ......
      case agg: StreamPhysicalPythonGroupAggregate => // ......

      case window: StreamPhysicalGroupWindowAggregateBase =>
        // WindowAggregate and WindowTableAggregate support insert-only in input
        val children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY)
        val builder = ModifyKindSet.newBuilder()
          .addContainedKind(ModifyKind.INSERT)
        if (window.emitStrategy.produceUpdates) {
          builder.addContainedKind(ModifyKind.UPDATE)
        }
        val providedTrait = new ModifyKindSetTrait(builder.build())
        createNewNode(window, children, providedTrait, requiredTrait, requester)

      case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank =>
        // WindowAggregate and WindowRank support insert-only in input
        val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
        val providedTrait = ModifyKindSetTrait.INSERT_ONLY
        createNewNode(rel, children, providedTrait, requiredTrait, requester)

      case limit: StreamPhysicalLimit => // ......
      case _: StreamPhysicalRank | _: StreamPhysicalSortLimit => // ......
      case sort: StreamPhysicalSort => // ......
      case cep: StreamPhysicalMatch => // ......
      case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin |
           _: StreamPhysicalOverAggregate | _: StreamPhysicalPythonOverAggregate => // ......
      case join: StreamPhysicalJoin => // ......
      case windowJoin: StreamPhysicalWindowJoin => // ......
      case temporalJoin: StreamPhysicalTemporalJoin => // ......

      case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase |
           _: StreamPhysicalLookupJoin | _: StreamPhysicalExchange |
           _: StreamPhysicalExpand | _: StreamPhysicalMiniBatchAssigner |
           _: StreamPhysicalWatermarkAssigner | _: StreamPhysicalWindowTableFunction =>
        // transparent forward requiredTrait to children
        val children = visitChildren(rel, requiredTrait, requester)
        val childrenTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
        // forward children mode
        createNewNode(rel, children, childrenTrait, requiredTrait, requester)

      case union: StreamPhysicalUnion => // ......
      case normalize: StreamPhysicalChangelogNormalize => // ......

      case ts: StreamPhysicalTableSourceScan =>
        // ScanTableSource supports produces updates and deletions
        val providedTrait = ModifyKindSetTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
        createNewNode(ts, List(), providedTrait, requiredTrait, requester)

      case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
           _: StreamPhysicalValues => // ......
      case scan: StreamPhysicalIntermediateTableScan => // ......

      case _ =>
        throw new UnsupportedOperationException(
          s"Unsupported visit for ${rel.getClass.getSimpleName}")
    }

可见,这个访问者以Sink为根开始推断ChangelogModerequiredTrait参数就是父节点需要子节点满足的ModifyKindSetTrait,一直传播到Source为止。对于那些不会对变化语义产生影响的节点(如CalcExchange等),则会直接将requiredTrait转发到子节点。

在这过程中,若有父节点和子节点Trait不相容的情况出现,就会抛出文章开头所述的错误信息,见createNewNode()方法:

    private def createNewNode(
        node: StreamPhysicalRel,
        children: List[StreamPhysicalRel],
        providedTrait: ModifyKindSetTrait,
        requiredTrait: ModifyKindSetTrait,
        requestedOwner: String): StreamPhysicalRel = {
      if (!providedTrait.satisfies(requiredTrait)) {
        val diff = providedTrait.modifyKindSet.minus(requiredTrait.modifyKindSet)
        val diffString = diff.getContainedKinds
          .toList.sorted // for deterministic error message
          .map(_.toString.toLowerCase)
          .mkString(" and ")
        // creates a new node based on the new children, to have a more correct node description
        // e.g. description of GroupAggregate is based on the ModifyKindSetTrait of children
        val tempNode = node.copy(node.getTraitSet, children).asInstanceOf[StreamPhysicalRel]
        val nodeString = tempNode.getRelDetailedDescription
        throw new TableException(
          s"$requestedOwner doesn't support consuming $diffString changes " +
          s"which is produced by node $nodeString")
      }
      val newTraitSet = node.getTraitSet.plus(providedTrait)
      node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel]
    }

为了方便理解,来用一条包含去重+窗口聚合逻辑的SQL语句做说明:

SELECT userId, COUNT(DISTINCT orderId)
FROM (
  SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime [ASC | DESC]) AS rn
    FROM rtdw_ods.kafka_order_done_log /*+ OPTIONS('scan.startup.mode'='latest-offset') */
  ) WHERE rn = 1
) GROUP BY userId, TUMBLE(procTime, INTERVAL '5' SECOND);

经过试验可以发现,如果去重保留第一条数据(即ORDER BY procTime ASC),那么这条语句可以正常执行。但若是保留最后一条数据(即ORDER BY procTime DESC),就会抛出如下的异常:

Exception in thread "main" org.apache.flink.table.api.TableException: StreamPhysicalGroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[LastRow], key=[suborderid], order=[PROCTIME])
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:389)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:166)
......

再次参考代码可得知,GroupWindowAggregate只能接受子节点的INSERT语义,但是Deduplicate在保留最后一条的情况下会产生回撤语义,故无法执行。

SatisfyUpdateKindTraitVisitor的处理方式类似,不再赘述。

Hack一下

通过查看执行层的GroupWindowAggregate代码,可知它其实是能够支持回撤流输入的。我们只需要对FlinkChangelogModeInferenceProgram做三处简单的改动就能达到目的:

重新构建flink-table-planner-blink模块,再提交上一节的保留最后一条去重+窗口聚合的SQL,发现可以正常执行,且结果正确。

The End

明后两天就是Flink Forward Asia 2021 Online咯~

晚安晚安。

上一篇 下一篇

猜你喜欢

热点阅读