flink异步io
2022-12-23 本文已影响0人
熊云昆
最近在项目中用到了flink异步io模式去查询redis,相比于之前的同步访问模式,性能提升了好几倍,感叹异步io模式的强大,趁着这段时间有空好好看了一下异步io模式的实现源码。
flink源码中异步io的实现逻辑都在AsyncWaitOperator这个类中:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// add element first to the queue
final ResultFuture<OUT> entry = addToWorkQueue(element);
final ResultHandler resultHandler = new ResultHandler(element, entry);
// register a timeout for the entry if timeout is configured
if (timeout > 0L) {
resultHandler.registerTimeout(getProcessingTimeService(), timeout);
}
userFunction.asyncInvoke(element.getValue(), resultHandler);
}
主要流程是:
1.先把输入数据放入到队列中
2.将输入数据封装为ResultHandler对象,这个对象在异步回调时会传入进去
3.调用asyncInvoke方法
异步io分为两种模式:
有序模式
实现方式是采用ArrayDeque来实现,当resultHandler有回调返回的时候,会同时更新ArrayDeque中的元素状态,然后会判断ArrayDeque队列第一个元素是否有结果,如果有则弹出执行
@Override
public boolean hasCompletedElements() {
return !queue.isEmpty() && queue.peek().isDone();
}
@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
if (hasCompletedElements()) {
final StreamElementQueueEntry<OUT> head = queue.poll();
head.emitResult(output);
}
}
无序模式
无序模式实现的方式也比较简单,通过定义的Segment来实现:
Segment(int initialCapacity) {
incompleteElements = new HashSet<>(initialCapacity);
completedElements = new ArrayDeque<>(initialCapacity);
}
/** Signals that an entry finished computation. */
void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
// adding only to completed queue if not completed before
// there may be a real result coming after a timeout result, which is updated in the
// queue entry but
// the entry is not re-added to the complete queue
if (incompleteElements.remove(elementQueueEntry)) {
completedElements.add(elementQueueEntry);
}
}
int emitCompleted(TimestampedCollector<OUT> output) {
final StreamElementQueueEntry<OUT> completedEntry = completedElements.poll();
if (completedEntry == null) {
return 0;
}
completedEntry.emitResult(output);
return 1;
}
可以看到,通过定义了一个HashSet来存放未完成数据回调的incompleteElements对象,因为HashSet也是无序的,而在完成回调后,调用completed方法,将数据从HashSet移除,放入到completedElements再调用emitCompleted方法将数据发给下游
一点总结
1.flink异步io接口必须封装在外部存储io异步访问接口中使用才有效,如果外部存储io接口只支持同步,那其实实现效果跟同步访问一样,性能并没有提升
2.使用flink异步io后,在运行过程中默认会断开算子链,不过目前看性能并没有损失多少,可以接受