RxJava timeout 的使用

2021-07-13  本文已影响0人  蓝笔头

例子

1)引入 Maven 依赖

        <!-- https://mvnrepository.com/artifact/io.reactivex.rxjava3/rxjava -->
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.0.13</version>
        </dependency>

2)实验代码:

import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;

public class TestTimeout {

    public static void main(String[] args) {
        Observable<String> source = Observable.create(emitter -> {
            emitter.onNext("A");

            Thread.sleep(800);
            emitter.onNext("B");

            Thread.sleep(400);
            emitter.onNext("C");

            Thread.sleep(1200);
            emitter.onNext("D");
            emitter.onComplete();
        });

        source.timeout(1, TimeUnit.SECONDS)
            .subscribe(
                item -> System.out.println("onNext: " + item),
                error -> System.out.println("onError: " + error),
                () -> System.out.println("onComplete will not be printed!"));
    }
}

3)控制台输出:

onNext: A
onNext: B
onNext: C
onError: java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.

源码解析

ObservableTimeoutTimed

1)创建 ObservableTimeoutTimed

package io.reactivex.rxjava3.core;

public abstract class Observable<@NonNull T> implements ObservableSource<T> {
    // 返回一个当前 Observable 的包装 Observable,但对每个发出的项目应用超时策略。
    // 如果从其前任开始的指定超时时间内未发出下一项,则生成的 Observable 将终止并通知观察者 TimeoutException。
    public final Observable<T> timeout(long timeout, @NonNull TimeUnit unit) {
        return timeout0(timeout, unit, null, Schedulers.computation());
    }
    
    private Observable<T> timeout0(long timeout, @NonNull TimeUnit unit,
            @Nullable ObservableSource<? extends T> fallback,
            @NonNull Scheduler scheduler) {
        Objects.requireNonNull(unit, "unit is null");
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableTimeoutTimed<>(this, timeout, unit, scheduler, fallback));
    }
}

2)超时处理机制

package io.reactivex.rxjava3.internal.operators.observable;

public final class ObservableTimeoutTimed<T> extends AbstractObservableWithUpstream<T, T> {

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        TimeoutObserver<T> parent = new TimeoutObserver<>(observer, timeout, unit, scheduler.createWorker());
        observer.onSubscribe(parent);
        // 1. 初始化延迟任务,idx=0
        parent.startTimeout(0L);
        source.subscribe(parent);
    }

    static final class TimeoutObserver<T> extends AtomicLong
    implements Observer<T>, Disposable, TimeoutSupport {

        @Override
        public void onNext(T t) {
            long idx = get();
            if (idx == Long.MAX_VALUE || !compareAndSet(idx, idx + 1)) {
                return;
            }

            task.get().dispose();

            downstream.onNext(t);
            
            // 3. 重新触发延迟任务处理,idx = idx + 1
            startTimeout(idx + 1);
        }

        void startTimeout(long nextIndex) {
            // 2. 执行延迟任务
            // 实验代码中配置延迟时间就是 (1, TimeUnit.SECONDS)
            task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
        }

        @Override
        public void onTimeout(long idx) {
            // 5. 判断初始化 TimeoutTask 实例时的 idx 是否和当前的一致,
            // 如果一致,说明在这个区间没有执行新的 onNext() 方法,因此:调用下游的 onError() 方法,并传入 TimeoutException 异常
            // 如果不一致,说明在这个时间区间执行了新的 onNext() 方法,因此忽略。
            if (compareAndSet(idx, Long.MAX_VALUE)) {
                DisposableHelper.dispose(upstream);

                downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));

                worker.dispose();
            }
        }
    }

    static final class TimeoutTask implements Runnable {

        final TimeoutSupport parent;

        final long idx;

        TimeoutTask(long idx, TimeoutSupport parent) {
            this.idx = idx;
            this.parent = parent;
        }

        @Override
        public void run() {
            // 4. TimeoutObserver 中的 startTimeout() 传入的超时时间到了后执行当前方法
            // 调用 TimeoutObserver 中的 onTimeout 方法
            // 并传入初始化 TimeoutTask 实例时的 idx 参数
            parent.onTimeout(idx);
        }
    }

    interface TimeoutSupport {

        void onTimeout(long idx);

    }
}

LambdaObserver

1)创建 LambdaObserver

package io.reactivex.rxjava3.core;

public abstract class Observable<@NonNull T> implements ObservableSource<T> {
    // 订阅当前的 Observable 并提供回调来处理它发出的项目以及它发出的任何错误或完成通知。
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
            @NonNull Action onComplete) {
        Objects.requireNonNull(onNext, "onNext is null");
        Objects.requireNonNull(onError, "onError is null");
        Objects.requireNonNull(onComplete, "onComplete is null");

        LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());

        subscribe(ls);

        return ls;
    }
}

2)订阅方法执行逻辑

package io.reactivex.rxjava3.internal.observers;

public final class LambdaObserver<T> extends AtomicReference<Disposable>
        implements Observer<T>, Disposable, LambdaConsumerIntrospection {

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            try {
                // 调用实际的 onNext() 方法
                // 也就是 item -> System.out.println("onNext: " + item)
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                get().dispose();
                onError(e);
            }
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!isDisposed()) {
            // 设置 DISPOSED
            lazySet(DisposableHelper.DISPOSED);
            try {
                // 调用实际的 onError() 方法
                // 也就是 error -> System.out.println("onError: " + error)
                onError.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(new CompositeException(t, e));
            }
        } else {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) { // 如果已经设置过 DISPOSED,则直接忽略
            lazySet(DisposableHelper.DISPOSED);
            try {
                // 否则调用实际的 onComplete() 方法
                // 也就是  () -> System.out.println("onComplete will not be printed!")
                onComplete.run();
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(e);
            }
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读