数客联盟

Watermarks in Parallel Streams

2020-01-16  本文已影响0人  Woople

目录

文档解读

文档路径

/Application Development/Streaming (DataStream API)/Event Time/Watermarks in Parallel Streams

Watermarks are generated at, or directly after, source functions. Each parallel subtask of a source function usually generates its watermarks independently. These watermarks define the event time at that particular parallel source.

As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an operator advances its event time, it generates a new watermark downstream for its successor operators.

Some operators consume multiple input streams; a union, for example, or operators following a keyBy(…) or partition(…) function. Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator.

上文中主要说明的是每个subtask会独立计算各自的watermark,然后向后传播这个watermark,当遇到了需要shuffle的算子,例如上图中的window,那么会取各自subtask发送过来的watermark的最小值作为当前算子的watermark。

以kafka作为source为例,上图的中“多个”source,可以认为是同一个topic中的不同partition,也可以看作是不同的topic,如果是不同的topic也就是所谓的多流。这两种情况,如果出现需要对齐watermark的时候,都是取的最小值,只是源码实现的方式不同。

同一个流

在同一个流中不同partition的场景,不同subtask之间认为有一个channel,实际代码会调用到StatusWatermarkValve#inputWatermark,其中会调用findAndOutputNewMinWatermarkAcrossAlignedChannels取所有channel中watermark的最小值作为新的watermark

private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
  long newMinWatermark = Long.MAX_VALUE;
  boolean hasAlignedChannels = false;

  // determine new overall watermark by considering only watermark-aligned channels across all channels
  for (InputChannelStatus channelStatus : channelStatuses) {
    if (channelStatus.isWatermarkAligned) {
      hasAlignedChannels = true;
      newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
    }
  }

  // we acknowledge and output the new overall watermark if it really is aggregated
  // from some remaining aligned channel, and is also larger than the last output watermark
  if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
    lastOutputWatermark = newMinWatermark;
    outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
  }
}

两条流

目前flink只支持两个流,在AbstractStreamOperator.java中,非常清楚描述了生成新watermark的规则

// ---------------- two-input operator watermarks ------------------

// We keep track of watermarks from both inputs, the combined input is the minimum
// Once the minimum advances we emit a new watermark for downstream operators
private long combinedWatermark = Long.MIN_VALUE;
private long input1Watermark = Long.MIN_VALUE;
private long input2Watermark = Long.MIN_VALUE;

public void processWatermark1(Watermark mark) throws Exception {
  input1Watermark = mark.getTimestamp();
  long newMin = Math.min(input1Watermark, input2Watermark);
  if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
  }
}

public void processWatermark2(Watermark mark) throws Exception {
  input2Watermark = mark.getTimestamp();
  long newMin = Math.min(input1Watermark, input2Watermark);
  if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
  }
}

扩展阅读

上文是提到的都是“多输入”,这里的多个输入可以认为是同一个数据源的不同分区,也可以认为是多个数据源,总之这种情况是取watermark的最小值,“有多输入”就会有“单输入”,所谓单输入,可以认为是官网中所说的Each parallel subtask of a source function,这里简单说明一下。可以参考BoundedOutOfOrdernessTimestampExtractor#getCurrentWatermark生成watermark的方法,可见相当于每次计算,都是取potentialWM的最大值作为watermark。

public final Watermark getCurrentWatermark() {
    // this guarantees that the watermark never goes backwards.
    long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
    if (potentialWM >= lastEmittedWatermark) {
        lastEmittedWatermark = potentialWM;
    }
    return new Watermark(lastEmittedWatermark);
}

所以,可以简单总结一句话,对于watermark,单输入取其大,多输入取其小

上一篇下一篇

猜你喜欢

热点阅读