flink异步io

2022-12-23  本文已影响0人  熊云昆

最近在项目中用到了flink异步io模式去查询redis,相比于之前的同步访问模式,性能提升了好几倍,感叹异步io模式的强大,趁着这段时间有空好好看了一下异步io模式的实现源码。
flink源码中异步io的实现逻辑都在AsyncWaitOperator这个类中:

@Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        // add element first to the queue
        final ResultFuture<OUT> entry = addToWorkQueue(element);

        final ResultHandler resultHandler = new ResultHandler(element, entry);

        // register a timeout for the entry if timeout is configured
        if (timeout > 0L) {
            resultHandler.registerTimeout(getProcessingTimeService(), timeout);
        }

        userFunction.asyncInvoke(element.getValue(), resultHandler);
    }

主要流程是:
1.先把输入数据放入到队列中
2.将输入数据封装为ResultHandler对象,这个对象在异步回调时会传入进去
3.调用asyncInvoke方法
异步io分为两种模式:

有序模式

实现方式是采用ArrayDeque来实现,当resultHandler有回调返回的时候,会同时更新ArrayDeque中的元素状态,然后会判断ArrayDeque队列第一个元素是否有结果,如果有则弹出执行

@Override
    public boolean hasCompletedElements() {
        return !queue.isEmpty() && queue.peek().isDone();
    }

    @Override
    public void emitCompletedElement(TimestampedCollector<OUT> output) {
        if (hasCompletedElements()) {
            final StreamElementQueueEntry<OUT> head = queue.poll();
            head.emitResult(output);
        }
    }

无序模式

无序模式实现的方式也比较简单,通过定义的Segment来实现:

Segment(int initialCapacity) {
   incompleteElements = new HashSet<>(initialCapacity);
   completedElements = new ArrayDeque<>(initialCapacity);
}
/** Signals that an entry finished computation. */
void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
   // adding only to completed queue if not completed before
   // there may be a real result coming after a timeout result, which is updated in the
   // queue entry but
   // the entry is not re-added to the complete queue
   if (incompleteElements.remove(elementQueueEntry)) {
       completedElements.add(elementQueueEntry);
    }
}
int emitCompleted(TimestampedCollector<OUT> output) {
     final StreamElementQueueEntry<OUT> completedEntry = completedElements.poll();
     if (completedEntry == null) {
         return 0;
     }
     completedEntry.emitResult(output);
     return 1;
}

可以看到,通过定义了一个HashSet来存放未完成数据回调的incompleteElements对象,因为HashSet也是无序的,而在完成回调后,调用completed方法,将数据从HashSet移除,放入到completedElements再调用emitCompleted方法将数据发给下游

一点总结

1.flink异步io接口必须封装在外部存储io异步访问接口中使用才有效,如果外部存储io接口只支持同步,那其实实现效果跟同步访问一样,性能并没有提升
2.使用flink异步io后,在运行过程中默认会断开算子链,不过目前看性能并没有损失多少,可以接受

上一篇下一篇

猜你喜欢

热点阅读