数客联盟

Flink Async-IO 源码分析

2019-11-11  本文已影响0人  WestC

Async IO的设计

Flink 基于事件的消息驱动流处理引擎,对于每条消息都会触发一次全流程的处理,因此在与外部存储系统交互时,对于每条消息都需要一次外部请求,对于性能的损耗较大,严重制约了flink的吞吐量。 Flink 1.2中引入了Async IO(异步IO)来加快flink与外部系统的交互性能,提升吞吐量。[FLIP-12: Asynchronous I/O Design and Implementation]。 其设计的核心是对原有的每条处理后的消息发送至下游operator的执行流程进行改进。其核心实现是引入了一个AsyncWaitOperator,在其processElement/processWatermark方法中完成对消息的处理。其执行流程是:

  1. 将每条消息封装成一个StreamRecordQueueEntry(其实现了ResultFuture),放入StreamElementQueue
  2. 消息与外部系统交互的逻辑放入AsynInvoke方法中,将交互执行结果放入StreamRecordQueueEntry
  3. 启动一个emitter线程,从StreamElementQueue中读取已经完成的StreamRecordQueueEntry,将其结果发送至下游operator算子

顺序/无序的消息模式

在异步处理消息阶段,由于网络延迟,服务器响应等因素可能导致先发出的请求返回比后发出的请求更晚的情况,如果要严格做到消息发送至下游是有序的,则可能需要更多的存储空间,也会引发更高的消息处理时延,而不同的业务场景对于消息的顺序有不一样的要求(如在基于Eventtime的消息统计时watermark前的消息必须保证在watermark后发送至下游operator),因此在实现AsyncWaitOperator时,同时支持有序(Order)和无序(Unorder)的消息处理场景。

Ordered/有序

有序处理指的是消息流入operator的顺序与经过处理后流入下一级operator的顺序一致。

Flink基于OrderedStreamElementQueue实现了有序消息处理。 在顺序消息处理的场景中,先到的消息先发出。因此对于ProcessingTime和EventTime模式下的实现是一致的。其实现较为简单,使用简单的java的队列即可。如下:

  1. 每条消息在处理完之后,其onCompleteHandler方法会调用检查位于队列头部的StreamElementQueueEntry是否已经完成,如果完成则会调用headIsComplete的signalAll方法
private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
   lock.lockInterruptibly();

   try {
      if (!queue.isEmpty() && queue.peek().isDone()) {
         LOG.debug("Signal ordered stream element queue has completed head element.");
         headIsCompleted.signalAll();
      }
   } finally {
      lock.unlock();
   }
}
  1. 通过emmiter线程循环从ArrayDeque中循环读取消息处理结果并发送至下游operator
public AsyncResult peekBlockingly() throws InterruptedException {
lock.lockInterruptibly();
try {
   while (queue.isEmpty() || !queue.peek().isDone()) {
     
      headIsCompleted.await();
   }
        ...
   return queue.peek();
} finally {
   lock.unlock();
}
}

Unordered

无序处理指的是消息流入operator的顺序与经过处理后流入下一级operator的顺序无必然关联。

Flink基于UnorderedStreamElementQueue实现了无序消息处理,由于在该queue中实现了两种不同时间模式下的无序处理,其实现较Order模式更为复杂。查看源码发现其实现也比较精妙,主要数据结构如下:

/** Queue of uncompleted stream element queue entries segmented by watermarks. */
private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;

/** Queue of completed stream element queue entries. */
private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;

/** First (chronologically oldest) uncompleted set of stream element queue entries. */
private Set<StreamElementQueueEntry<?>> firstSet;

// Last (chronologically youngest) uncompleted set of stream element queue entries. New
// stream element queue entries are inserted into this set.
// 在类初始化方法中,将lastSet = firstSet
private Set<StreamElementQueueEntry<?>> lastSet;

其核心逻辑如下:

  1. 消息的处理

    1. 接收到的消息封装成StreamElementQueueEntry

    2. 通过调用addEntry方法将StreamElementQueueEntry放入对应的queue中

      private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
            assert(lock.isHeldByCurrentThread());
        
            if (streamElementQueueEntry.isWatermark()) {
              //只有EventTime模式下接收到watermark类型的消息才会走入此分支
               lastSet = new HashSet<>(capacity);
      
               if (firstSet.isEmpty()) {
                 // 只有在所有的queue中所有消息均发送至下游operator或者第一条消息就是watermark消息才会走进此分支
                  firstSet.add(streamElementQueueEntry);
               } else {
                 // 每次进入此分支,会生成一个只包含watermark消息的entry放入uncompleteQueue,同时生成一个lasteSet并放入uncomplteQueue,用于存放后续接收到的消息的entry
                  Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
                  watermarkSet.add(streamElementQueueEntry);
                  uncompletedQueue.offer(watermarkSet);
               }
               uncompletedQueue.offer(lastSet);
            } else {
               lastSet.add(streamElementQueueEntry);
            }
                  ...
            numberEntries++;
         }
      }
      
  2. 消息的发送

    1. 消息处理完毕后,其onCompleteHandler方法会试图将该消息的entry放入completedQueue,同时会遍历所有可以放入completeQueue的消息

      public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
         lock.lockInterruptibly();
         try {
           // 从firstSet中移除该entry,如果该entry不在firsetSeq中,则跳过
            if (firstSet.remove(streamElementQueueEntry)) {
              //将该entry放入completeQueue中
               completedQueue.offer(streamElementQueueEntry);
              //rstSet为空,且firset != lastSet 说明此时后续至少还有一些set中可能包含已经处理完的消息待放入completeQueue中
               while (firstSet.isEmpty() && firstSet != lastSet) {
                  firstSet = uncompletedQueue.poll();
                  Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
                  while (it.hasNext()) {
                     StreamElementQueueEntry<?> bufferEntry = it.next();
                     if (bufferEntry.isDone()) {
                        completedQueue.offer(bufferEntry);
                        it.remove();
                     }
                  }
               }
               LOG.debug("Signal unordered stream element queue has completed entries.");
               hasCompletedEntries.signalAll();
            }
         } finally {
            lock.unlock();
         }
      }
      
    2. 通过emmiter线程循环从completeQueue中循环读取消息处理结果并发送至下游operator

      //每次顺序从completedQueue取出消息发送至下游
      public AsyncResult poll() throws InterruptedException {
         lock.lockInterruptibly();
         try {
            while (completedQueue.isEmpty()) {
               hasCompletedEntries.await();
            }
            numberEntries--;
            notFull.signalAll();
                  ...
            return completedQueue.poll();
         } finally {
            lock.unlock();
         }
      }
      
基于ProcessingTime的Unorder模式

该模式下,不存在watermark类型的消息,因此所有消息的entry都是放入lastSeq(此场景下lastSet和firstSet是同一个),且此时incompleteQueue并没有被使用到;在消息entry的onCompleteHandler方法中,直接将该消息的entry放入completeQueue中,通过emmitter线程发送至下游operator,因此该场景下实现的是完全无序的处理模式。

基于EventTime的Unorder模式

该模式下,实现较为复杂。如图:

Unordered EventTime mode
上一篇 下一篇

猜你喜欢

热点阅读