Android-RxJava

Rxjava解除订阅①:自带方式和RxLifeCycle

2020-05-04  本文已影响0人  海阔sun天空

Rxjava解除订阅三部曲:

前言

最近在维护老旧网络库的时候,发现网络库底层运用到了Rxjava,而最近凑巧又给app加上了leakcanary检测内存泄漏,发现除了网络库的Rxjava泄漏之外,还有些业务上滥用的Rxjava也存在泄漏的情况。有问题咱就得想办法解决,这个老旧的网络库有点年岁了,写的是真不咋样,奈何app里还有大量引用,该维护的还是得维护。Rxjava作为近几年非常流行的一个三方库,功能就不用多说了,谁用谁知道。

正文

Rxjava是好用,可用不好很容易造成内存泄漏。而且理论上Rxjava的每个操作符都可能会造成内存泄漏。举个例子,我们用Rx进行网络请求,然后订阅在主线程进行ui更新。网络请求是在分线程执行,而且有延迟。当请求没有返回时,我们将这个页面关闭了,后续分线程数据回来执行ui更新,而Rx还持有外部类的引用,这就造成了内存泄漏。
解决的办法Rx本身就给提供,这就是我们要讲的第一种解除订阅的方法:

1.Dispose.dispose()

这是Rxjava本身提供的一种接触订阅的方式,使用很简单,在页面关闭的时候,或者在需要的时候调用下dispose()方法就可以了。
如果最后的订阅者是Consumer,那么会有一个返回值Dispose。那么在需要的时候,就可以调用Dispose.dispose()。但往往我们使用Rxjava的时候,都需要对正常返回和异常返回做些通用处理,使用的往往是Observer,这样做的结果就是没有返回值。
其实Observer也已经给我们准备好了解除订阅的方式,我们不妨看下Observer的源码:

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}

有个方法onSubscribe(@NonNull Disposable d),有一个Dispose的参数,那么我们就可以接收这个变量,在在需要的时候调用Dispose.dispose()
这个方法onSubscribe意味着当subscribe方法被调用之前,就会拿到Dispose句柄,此时Rxjava任何一个相关的操作符处理都还未执行,调用dispose()方法后,完成解除订阅。

扩展: CompositeDisposable

上面讲的是针对单个Dispose进行订阅解除,可往往实际使用中,我们可希望看到一堆Dispose的成员变量在页面销毁的时候扎堆解除订阅。这时候就需要CompositeDisposable,简单的理解,就是可以对Dispose进行批量的处理,类似于List集合,其内部实现方法也很类似,包括add,addAll,delete,remove,clear,dispose,isDisposed
方法使用都很简单,我们把Dispose1,Dispose2,Dispose3使用add方法,添加到CompositeDisposable中,在页面销毁时调用dispose进行批量解除。
这里对dispose和clear方法进行单独说明下,dispose执行后,会改变CompositeDisposable的状态为disposed,即已完成订阅解除状态,而clear则只会批量解除订阅,不会改变整个CompositeDisposabledisposed状态。我们看下源码就知道了:
dispose方法:

dispose
而clear方法:
clear
差异就在红箭头那边。
而其他的方法都要去判断disposed状态,已经disposed的直接return,不会继续执行。

2.RxLifeCycle

接下来开始,就是比较骚的操作了。RxLifeCycle,顾名思义就是对Rxjava生命周期管理,也就是意味着,Rxjava长大,已经学会了自己该何时进行解除订阅。github直达链接

虽然没有中文文档,但摸索起来也不困难,首先添加核心依赖:

implementation 'com.trello.rxlifecycle3:rxlifecycle:3.1.0'
implementation 'com.trello.rxlifecycle3:rxlifecycle-components:3.1.0'

RxLifeCycle使用需要继承RxAppCompatActivity,Fragment也需要继承RxFragment,当然还有一些其他扩展,比如RxDialogFragment等,大家自己去体验吧。
使用起来也是很简单,直接看代码吧:

        Observable.just(1)
                .compose(this.<Integer>bindToLifecycle())
                .subscribe();

或者:

        Observable.just(1)
                .compose(this.<Integer>bindUntilEvent(ActivityEvent.DESTROY))
                .subscribe();

核心使用就这两个方法,这俩方法也是有些许的区别。

bindToLifecycle():自动识别在合适的生命周期内解除绑定。

bindUntilEvent(ActivityEvent):在指定的生命周期内解除绑定。

对于bindUntilEvent(ActivityEvent)很容易理解,指定一个生命周期解除绑定,但对于bindToLifecycle()如何自动识别生命周期有些疑问,我们不妨写个demo测试下效果如何:

    private void test() {
        subscribe = Observable.interval(0, 2, TimeUnit.SECONDS)
                .map(new Function<Long, Long>() {
                    @Override
                    public Long apply(Long aLong) throws Exception {
                        Log.d(TAG, "当前发射数值:" + aLong);
                        return aLong;
                    }
                })
                .compose(this.<Long>bindToLifecycle())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG, "当前接收数值:" + aLong);
                    }
                });
    }

每个2s发射一个数值,无限的发,调用bindToLifecycle(),然后我们在onCreate方法调用,最终打印效果:

D/zdu_Rxdemo: onCreate,subscribe.isDisposed():false
D/zdu_Rxdemo: 当前发射数值:0
D/zdu_Rxdemo: 当前接收数值:0
D/zdu_Rxdemo: onStart,subscribe.isDisposed():false
D/zdu_Rxdemo: onResume,subscribe.isDisposed():false
D/zdu_Rxdemo: 当前发射数值:1
D/zdu_Rxdemo: 当前接收数值:1
D/zdu_Rxdemo: 当前发射数值:2
D/zdu_Rxdemo: 当前接收数值:2
D/zdu_Rxdemo: onPause,subscribe.isDisposed():false
D/zdu_Rxdemo: 当前发射数值:3
D/zdu_Rxdemo: 当前接收数值:3
D/zdu_Rxdemo: onStop,subscribe.isDisposed():false
D/zdu_Rxdemo: onDestroy,subscribe.isDisposed():true

onDestroy方法中自动解除订阅了,而代码中并没有主动去调用dispose方法,可见自动解除订阅生效了。

那如果在onStart方法订阅的话,解除订阅的生命周期又不一样了:

D/zdu_Rxdemo: onStart,subscribe.isDisposed():false
D/zdu_Rxdemo: 当前发射数值:0
D/zdu_Rxdemo: 当前接收数值:0
D/zdu_Rxdemo: onResume,subscribe.isDisposed():false
D/zdu_Rxdemo: 当前发射数值:1
D/zdu_Rxdemo: 当前接收数值:1
D/zdu_Rxdemo: 当前发射数值:2
D/zdu_Rxdemo: 当前接收数值:2
D/zdu_Rxdemo: onPause,subscribe.isDisposed():false
D/zdu_Rxdemo: onStop,subscribe.isDisposed():true
D/zdu_Rxdemo: onDestroy,subscribe.isDisposed():true

onStop生命周期内就被解除订阅了,那我们在onResume中订阅的话,是不是就会在onPause中解除订阅了呢?事实上确实是这样的,日志就不打印了,我们直接去看源码实现。

刚刚我们调用的是RxAppCompatActivity的两个方法:

    @NonNull
    @CheckResult
    public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
        return RxLifecycle.bindUntilEvent(this.lifecycleSubject, event);
    }

    @NonNull
    @CheckResult
    public final <T> LifecycleTransformer<T> bindToLifecycle() {
        return RxLifecycleAndroid.bindActivity(this.lifecycleSubject);
    }

暂且现不管this.lifecycleSubject是什么,我们继续向下看源码:

    public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle,
                                                                @Nonnull final R event) {
        checkNotNull(lifecycle, "lifecycle == null");
        checkNotNull(event, "event == null");
        return bind(takeUntilEvent(lifecycle, event));
    }

    private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
        return lifecycle.filter(new Predicate<R>() {
            @Override
            public boolean test(R lifecycleEvent) throws Exception {
                return lifecycleEvent.equals(event);
            }
        });
    }

bindUntilEvent就很明确了,底层用了filter过滤操作符,过滤了非指定生命周期。但生命周期是怎么下发下来的呢?最后再做解释。

public static <T> LifecycleTransformer<T> bindActivity(@NonNull Observable<ActivityEvent> lifecycle) {
        return RxLifecycle.bind(lifecycle, ACTIVITY_LIFECYCLE);
    }

bindToLifecycle的底层源码最终跟takeUntilEvent的底层源码一致,都指向了
RxLifecycle.bind方法。我们就看下bind方法到底是执行了什么?

先看bindUntilEvent

public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
        return new LifecycleTransformer<>(lifecycle);
    }

new了一个LifecycleTransformer,在看下内部实现:

public final class LifecycleTransformer<T> implements ObservableTransformer<T, T>,
                                                      FlowableTransformer<T, T>,
                                                      SingleTransformer<T, T>,
                                                      MaybeTransformer<T, T>,
                                                      CompletableTransformer
{
    final Observable<?> observable;

    LifecycleTransformer(Observable<?> observable) {
        checkNotNull(observable, "observable == null");
        this.observable = observable;
    }

    @Override
    public ObservableSource<T> apply(Observable<T> upstream) {
        return upstream.takeUntil(observable);
    }

    @Override
    public Publisher<T> apply(Flowable<T> upstream) {
        return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST));
    }

    @Override
    public SingleSource<T> apply(Single<T> upstream) {
        return upstream.takeUntil(observable.firstOrError());
    }

    @Override
    public MaybeSource<T> apply(Maybe<T> upstream) {
        return upstream.takeUntil(observable.firstElement());
    }

    @Override
    public CompletableSource apply(Completable upstream) {
        return Completable.ambArray(upstream, observable.flatMapCompletable(Functions.CANCEL_COMPLETABLE));
    }
}

原来是一个实现ObservableTransformer等接口的类,到这里也明白了为什么RxLifeCycle要用compose操作符,并且其内部实现使用了takeUntil操作符,在符合条件后,打断上游链。这也说明了,我们要在订阅前一刻执行这个自动解除订阅打断上游链,而对下游链没有作用。

再看下bindToLifecyclebind实现,与takeUntilEvent稍微有点不同的是它要自动判断生命周期:

@NonNull
    @CheckResult
    public static <T> LifecycleTransformer<T> bindActivity(@NonNull Observable<ActivityEvent> lifecycle) {
        return RxLifecycle.bind(lifecycle, ACTIVITY_LIFECYCLE);
    }

ACTIVITY_LIFECYCLE则是一个switch取值:

private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE = new Function<ActivityEvent, ActivityEvent>() {
        public ActivityEvent apply(ActivityEvent lastEvent) throws Exception {
            switch(lastEvent) {
            case CREATE:
                return ActivityEvent.DESTROY;
            case START:
                return ActivityEvent.STOP;
            case RESUME:
                return ActivityEvent.PAUSE;
            case PAUSE:
                return ActivityEvent.STOP;
            case STOP:
                return ActivityEvent.DESTROY;
            case DESTROY:
                throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it.");
            default:
                throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented");
            }
        }
    };

这就印证了我们之前的demo,在create的时候返回是ActivityEvent.DESTORY,对应START返回的就是STOP生命周期等等。

public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,  @Nonnull final Function<R, R> correspondingEvents) {
        checkNotNull(lifecycle, "lifecycle == null");
        checkNotNull(correspondingEvents, "correspondingEvents == null");
        return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
    }

    private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
                                                                       final Function<R, R> correspondingEvents) {
        return Observable.combineLatest(
            lifecycle.take(1).map(correspondingEvents),
            lifecycle.skip(1),
            new BiFunction<R, R, Boolean>() {
                @Override
                public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                    return lifecycleEvent.equals(bindUntilEvent);
                }
            })
            .onErrorReturn(Functions.RESUME_FUNCTION)
            .filter(Functions.SHOULD_COMPLETE);
    }

这个takeUntilCorrespondingEvent就是生命周期判断,内部使用了combineLatest操作符,简单的说就是该操作符接收多个Observable以及一个函数作为参数,并且函数的签名为这些Observable发射的数据类型。当以上的任意一个Observable发射数据之后,会去取其它Observable 最近一次发射的数据,回调到函数当中,但是该函数回调的前提是所有的Observable都至少发射过一个数据项。

看不懂上面的没有关系,takeUntilCorrespondingEvent的作用就是筛选过滤生命周期,那么问题又来了,这个生命周期到底是哪发射来的呢?

其实是用了 BehaviorSubject。在特定条件下,Subject既可以发送事件,也可以接收事件,而BehaviorSubject接收到订阅前的最后一条数据和订阅后的所有数据。BehaviorSubject在Activity的每个生命周期都发射了一个生命周期事件:

    @CallSuper
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        this.lifecycleSubject.onNext(ActivityEvent.CREATE);
    }

    @CallSuper
    protected void onStart() {
        super.onStart();
        this.lifecycleSubject.onNext(ActivityEvent.START);
    }
......

其他生命周期也类似,不再贴出来了。如此,完整的RxLifeCycle源码执行分析到此为止。

结语

RxLifeCycle做为自动解除绑定的一个三方库,源码实现比较简单易读,一定程度上可以帮助我们解决Rxjava内存泄漏的问题,但不可否认的说,它也有弊端:

上一篇下一篇

猜你喜欢

热点阅读