RxLifecycle 的使用以及相关源码

2021-12-19  本文已影响0人  JeffreyWorld

我们正在开发的几个项目,都用到了这个开源库:https://github.com/trello/RxLifecycle
官方对这个库的原话解释:Lifecycle handling APIs for Android apps using RxJava

This library allows one to automatically complete sequences based on a second lifecycle stream.
This capability is useful in Android, where incomplete subscriptions can cause memory leaks.
该库允许基于第二个生命周期流自动完成序列。
此功能在 Android 中很有用,因为不完整的订阅会导致内存泄漏。

如何使用呢:

您必须从代表生命周期流的 Observable<T> 开始。 然后使用 RxLifecycle 将序列绑定到该生命周期。
您可以在生命周期发出任何内容时进行绑定:

myObservable
    .compose(RxLifecycle.bind(lifecycle))
    .subscribe();

或者您可以绑定到特定生命周期事件发生时:

myObservable
    .compose(RxLifecycle.bindUntilEvent(lifecycle, ActivityEvent.DESTROY))
    .subscribe();

或者,您可以让 RxLifecycle 确定结束序列的适当时间:

myObservable
    .compose(RxLifecycleAndroid.bindActivity(lifecycle))
    .subscribe();

它假设您想在相反的生命周期事件中结束序列——例如,如果在 START 期间订阅,它将在 STOP 时终止。 如果您在 PAUSE 之后订阅,它将在下一个销毁事件中终止(例如,PAUSE 将在 STOP 中终止)。

Providers

生命周期从何而来? 通常,它们由适当的 LifecycleProvider<T> 提供。 但那些实施在哪里?

public interface LifecycleProvider<E> {
/**
* 返回一系列生命周期事件
*/
Observable<E> lifecycle();

/**
* 绑定源直到特定事件发生。
*
* @param event 触发退订的事件
* @return a reusable {@link LifecycleTransformer} which unsubscribes when the event triggers.
*/
<T> LifecycleTransformer<T> bindUntilEvent(@Nonnull E event);

/**
* 绑定一个源,直到下一个合理的事件发生。
*
* @return a reusable {@link LifecycleTransformer} which unsubscribes at the correct time.
*/
<T> LifecycleTransformer<T> bindToLifecycle();
}

你有几个选择:

  1. 使用 rxlifecycle-components 并对提供的 RxActivityRxFragment 等类进行子类化。
  2. 使用 Android's lifecycle + rxlifecycle-android-lifecycle 生成提供者。
  3. 自己编写实现。

如果您使用 rxlifecycle-components,只需扩展相应的类,然后使用内置的 bindToLifecycle()(或 bindUntilEvent())方法:

public class MyActivity extends RxActivity {
    @Override
    public void onResume() {
        super.onResume();
        myObservable
            .compose(bindToLifecycle())
            .subscribe();
    }
}

如果您使用 rxlifecycle-android-lifecycle,那么您只需将 LifecycleOwner 传递给 AndroidLifecycle 即可生成提供程序:

public class MyActivity extends LifecycleActivity {
    private final LifecycleProvider<Lifecycle.Event> provider
        = AndroidLifecycle.createLifecycleProvider(this);

    @Override
    public void onResume() {
        super.onResume();
        myObservable
            .compose(provider.bindToLifecycle())
            .subscribe();
    }
}

退订

RxLifecycle 实际上并没有取消订阅该序列。 相反,它终止了序列。 它这样做的方式因类型而异:

如果一个序列需要 Subscription.unsubscribe() 行为,那么建议您自己手动处理 Subscription 并在适当的时候调用 unsubscribe()

以下是 RxActivity 的源码 :

public abstract class RxActivity extends Activity implements LifecycleProvider<ActivityEvent> {

    //释放订阅前最后一个数据和订阅后接收到的所有数据
    private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();

    @Override
    @NonNull
    @CheckResult
    public final Observable<ActivityEvent> lifecycle() {
        return lifecycleSubject.hide();
    }

    @Override
    @NonNull
    @CheckResult
    public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
        return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
    }

    //实际上返回了一个LifecycleTransformer
    @Override
    @NonNull
    @CheckResult
    public final <T> LifecycleTransformer<T> bindToLifecycle() {
        //bindToLifecycle,本质上RxActivity 中的BehaviorSubject成员变量(它本身就是一个Observable)!
        return RxLifecycleAndroid.bindActivity(lifecycleSubject);
    }

    //Activity不同的生命周期,BehaviorSubject对象会发射对应的ActivityEvent
    @Override
    @CallSuper
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        lifecycleSubject.onNext(ActivityEvent.CREATE);
    }

    @Override
    @CallSuper
    protected void onStart() {
        super.onStart();
        lifecycleSubject.onNext(ActivityEvent.START);
    }

    @Override
    @CallSuper
    protected void onResume() {
        super.onResume();
        lifecycleSubject.onNext(ActivityEvent.RESUME);
    }

    @Override
    @CallSuper
    protected void onPause() {
        lifecycleSubject.onNext(ActivityEvent.PAUSE);
        super.onPause();
    }

    @Override
    @CallSuper
    protected void onStop() {
        lifecycleSubject.onNext(ActivityEvent.STOP);
        super.onStop();
    }

    @Override
    @CallSuper
    protected void onDestroy() {
        lifecycleSubject.onNext(ActivityEvent.DESTROY);
        super.onDestroy();
    }
}

这个 BehaviorSubject 会在不同的生命周期发射不同的 ActivityEvent ,比如在 onCreate()生命周期发射 ActivityEvent.CREATE ,在 onStop() 发射 ActivityEvent.STOP

public final class BehaviorSubject<T> extends Subject<T> {

    final AtomicReference<Object> value; //原子操作类,当前接收到的最后一个数据

    final AtomicReference<BehaviorDisposable<T>[]> observers;//原子操作类,BehaviorDisposable内部存储了所有接受到的数据

    @SuppressWarnings("rawtypes")
    static final BehaviorDisposable[] EMPTY = new BehaviorDisposable[0];//标记,意味着一个空的BehaviorDisposable

    @SuppressWarnings("rawtypes")
    static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0]; //标记,意味着已经达到了TERMINATED,终止数据的发射
    final ReadWriteLock lock;
    final Lock readLock;
    final Lock writeLock;

    final AtomicReference<Throwable> terminalEvent;

    ...省略

}

/**
 * Transformer that continues a subscription until a second Observable emits an event.
 */
@ParametersAreNonnullByDefault
public final class LifecycleTransformer<T> implements ObservableTransformer<T, T>,
                                                      FlowableTransformer<T, T>,
                                                      SingleTransformer<T, T>,
                                                      MaybeTransformer<T, T>,
                                                      CompletableTransformer
{
    final Observable<?> observable;

    LifecycleTransformer(Observable<?> observable) {
        checkNotNull(observable, "observable == null");
        this.observable = observable;
    }

    @Override
    public ObservableSource<T> apply(Observable<T> upstream) {
        //observable为 true 为终止
        return upstream.takeUntil(observable);
    }

    @Override
    public Publisher<T> apply(Flowable<T> upstream) {
        return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST));
    }

    @Override
    public SingleSource<T> apply(Single<T> upstream) {
        return upstream.takeUntil(observable.firstOrError());
    }

    @Override
    public MaybeSource<T> apply(Maybe<T> upstream) {
        return upstream.takeUntil(observable.firstElement());
    }

    @Override
    public CompletableSource apply(Completable upstream) {
        return Completable.ambArray(upstream, observable.flatMapCompletable(Functions.CANCEL_COMPLETABLE));
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) { return true; }
        if (o == null || getClass() != o.getClass()) { return false; }

        LifecycleTransformer<?> that = (LifecycleTransformer<?>) o;

        return observable.equals(that.observable);
    }

    @Override
    public int hashCode() {
        return observable.hashCode();
    }

    @Override
    public String toString() {
        return "LifecycleTransformer{" +
            "observable=" + observable +
            '}';
    }
}

public class RxLifecycle {

    private RxLifecycle() {
        throw new AssertionError("No instances");
    }

    /**
     * Binds the given source to a lifecycle.
     * <p>
     * When the lifecycle event occurs, the source will cease to emit any notifications.
     *
     * @param lifecycle the lifecycle sequence
     * @param event the event which should conclude notifications from the source
     * @return a reusable {@link LifecycleTransformer} that unsubscribes the source at the specified event
     */
    @Nonnull
    @CheckReturnValue
    public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle,
                                                                @Nonnull final R event) {
        checkNotNull(lifecycle, "lifecycle == null");
        checkNotNull(event, "event == null");
        return bind(takeUntilEvent(lifecycle, event));
    }

    private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
        return lifecycle.filter(new Predicate<R>() {
            @Override
            public boolean test(R lifecycleEvent) throws Exception {
                return lifecycleEvent.equals(event);
            }
        });
    }

    /**
     * Binds the given source to a lifecycle.
     * <p>
     * This helper automatically determines (based on the lifecycle sequence itself) when the source
     * should stop emitting items. Note that for this method, it assumes <em>any</em> event
     * emitted by the given lifecycle indicates that the lifecycle is over.
     *
     * @param lifecycle the lifecycle sequence
     * @return a reusable {@link LifecycleTransformer} that unsubscribes the source whenever the lifecycle emits
     */
    @Nonnull
    @CheckReturnValue
    public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
        return new LifecycleTransformer<>(lifecycle);
    }

    /**
     * Binds the given source to a lifecycle.
     * <p>
     * This method determines (based on the lifecycle sequence itself) when the source
     * should stop emitting items. It uses the provided correspondingEvents function to determine
     * when to unsubscribe.
     * <p>
     * Note that this is an advanced usage of the library and should generally be used only if you
     * really know what you're doing with a given lifecycle.
     *
     * @param lifecycle the lifecycle sequence
     * @param correspondingEvents a function which tells the source when to unsubscribe
     * @return a reusable {@link LifecycleTransformer} that unsubscribes the source during the Fragment lifecycle
     */
    @Nonnull
    @CheckReturnValue
    public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
                                                      @Nonnull final Function<R, R> correspondingEvents) {
        checkNotNull(lifecycle, "lifecycle == null");
        checkNotNull(correspondingEvents, "correspondingEvents == null");
        return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
    }

    private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
                                                                       final Function<R, R> correspondingEvents) {
        //combineLatest方法,是将两个数据源的释放item放到一起通过一个func(某种方法)返回你想要的东西        
        return Observable.combineLatest(
            lifecycle.take(1).map(correspondingEvents),
            lifecycle.skip(1),
            new BiFunction<R, R, Boolean>() {
                @Override
                public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                    return lifecycleEvent.equals(bindUntilEvent);
                }
            })
            .onErrorReturn(Functions.RESUME_FUNCTION)
            .filter(Functions.SHOULD_COMPLETE);
    }
}

最终发现是调用 combineLatest 方法,逐步分析一下:

  1. lifecycle.take(1) 指的是最近发射的事件,比如说我们在 onCreate() 中执行了 bindToLifecycle,那么lifecycle.take(1) 指的就是 ActivityEvent.CREATE,经过 map(correspondingEvents),这个 map 中传的函数就是 RxLifecycleAndroid 中的 ACTIVITY_LIFECYCLE
    也就是说,lifecycle.take(1).map(correspondingEvents)实际上是返回了 CREATE 对应的事件 DESTROY, 它意味着本次订阅将在 ActivityonDestory 进行取消。

  2. lifecycle.skip(1) 就简单了,除去第一个保留剩下的。

  3. 第三个参数 意味着,lifecycle.take(1).map(correspondingEvents) 的序列和 lifecycle.skip(1) 进行combine,形成一个新的序列。即是说,当 Activity 走到 onStart 生命周期时,为 false ,这次订阅不会取消,直到 onDestroy,为 true 则订阅取消。

  4. 而后的 onErrorReturnfilter 是对异常的处理和判断是否应该结束订阅。

上一篇下一篇

猜你喜欢

热点阅读