Guava异步回调

2022-07-14  本文已影响0人  M_lear

写的比较糙,大家可能会看的比较懵。其实本文就是把debug出来的逻辑给记录下来了而已。

正文

从ListeningExecutorService的submit开始分析。

在AbstractListeningExecutorService中重写了newTaskFor方法。
newTaskFor返回的是TrustedListenableFutureTask对象。

AbstractExecutorService的submit逻辑:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

所以submit执行的是TrustedListenableFutureTask的run逻辑。

TrustedListenableFutureTask的run逻辑执行的是TrustedFutureInterruptibleTask的run逻辑。

TrustedFutureInterruptibleTask的run逻辑是继承InterruptibleTask的。

InterruptibleTask的run逻辑:

    public final void run() {
        Thread currentThread = Thread.currentThread();
        if (this.compareAndSet((Object)null, currentThread)) {
            boolean run = !this.isDone();
            T result = null;
            Throwable error = null;

            try {
                if (run) {
                    result = this.runInterruptibly();
                }
            } catch (Throwable var9) {
                error = var9;
            } finally {
                if (!this.compareAndSet(currentThread, DONE)) {
                    this.waitForInterrupt(currentThread);
                }

                if (run) {
                    if (error == null) {
                        this.afterRanInterruptiblySuccess(NullnessCasts.uncheckedCastNullableTToT(result));
                    } else {
                        this.afterRanInterruptiblyFailure(error);
                    }
                }

            }

        }
    }

模板模式。
其中调用的runInterruptibly、afterRanInterruptiblySuccess、afterRanInterruptiblyFailure都是抽象方法,在子类实现。

    @ParametricNullness
    abstract T runInterruptibly() throws Exception;

    abstract void afterRanInterruptiblySuccess(@ParametricNullness T var1);

    abstract void afterRanInterruptiblyFailure(Throwable var1);

子类TrustedFutureInterruptibleTask的这三个方法:

        @ParametricNullness
        V runInterruptibly() throws Exception {
            return this.callable.call();
        }

        void afterRanInterruptiblySuccess(@ParametricNullness V result) {
            TrustedListenableFutureTask.this.set(result);
        }

        void afterRanInterruptiblyFailure(Throwable error) {
            TrustedListenableFutureTask.this.setException(error);
        }

runInterruptibly执行的就是Callable的call方法。

如果把子类实现串进去,整体上InterruptibleTask的run逻辑其实类似于JDK FutureTask的run逻辑。

外部类TrustedListenableFutureTask的set和setException方法(继承自TrustedFuture),都会调用complete方法。

complete方法会调用executeListener执行所有的回调逻辑。

回调逻辑封装在CallbackListener的run:

        public void run() {
            if (this.future instanceof InternalFutureFailureAccess) {
                Throwable failure = InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess)this.future);
                if (failure != null) {
                    this.callback.onFailure(failure);
                    return;
                }
            }

            Object value;
            try {
                value = Futures.getDone(this.future);
            } catch (ExecutionException var3) {
                this.callback.onFailure(var3.getCause());
                return;
            } catch (Error | RuntimeException var4) {
                this.callback.onFailure(var4);
                return;
            }

            this.callback.onSuccess(value);
        }

Futures.getDone

    @ParametricNullness
    @CanIgnoreReturnValue
    public static <V> V getDone(Future<V> future) throws ExecutionException {
        Preconditions.checkState(future.isDone(), "Future was expected to be done: %s", future);
        return Uninterruptibles.getUninterruptibly(future);
    }

Uninterruptibles.getUninterruptibly

    @ParametricNullness
    @CanIgnoreReturnValue
    public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
        boolean interrupted = false;

        try {
            while(true) {
                try {
                    Object var2 = future.get();
                    return var2;
                } catch (InterruptedException var6) {
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }

        }
    }

这里的get不会阻塞,因为Callable任务已经执行完了,这里只是单纯获取执行结果。

回到上面的run逻辑,后面就是根据future的get结果调用对应的回调逻辑。

上一篇 下一篇

猜你喜欢

热点阅读