AutoDispose 源码解析

2020-03-23  本文已影响0人  leilifengxingmw

AutoDispose是RxJava2+的一个工具,通过disposal或cancellation将RxJava流的执行自动绑定到提供的作用域。

说人话就是:

AutoDispose是配合RxJava2使用的一个工具,我们提供一个作用域(在Android中,Activity的生命周期就是可以看作一个作用域)。AutoDispose会自动将RxJava的数据流和我们提供的作用域进行绑定,当作用域到达结束状态的时候(比如onDestroy),AutoDispose会自动取消RxJava数据流的执行,在Android中可以用来避免RxJava造成的内存泄漏。

举个例子,我们间隔一秒输出一个整数,可以发现当我们点击返回键的时候,数字还是会一直打印,造成内存泄漏。

代码使用Kotlin演示

Observable.interval(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Observer<Long> {
                    override fun onComplete() {
                        Log.d(TAG, "onComplete: ")
                    }
                    override fun onSubscribe(d: Disposable) {
                        Log.d(TAG, "onSubscribe: ")
                    }

                    override fun onNext(t: Long) {
                        Log.d(TAG, "onNext: $t")
                    }

                    override fun onError(e: Throwable) {
                        Log.d(TAG, "onError: ${e.message}")
                    }
                })

一直输出

D/AutoDisposeActivity: onSubscribe: 
D/AutoDisposeActivity: onNext: 0
D/AutoDisposeActivity: onNext: 1
D/AutoDisposeActivity: onNext: 2
D/AutoDisposeActivity: onNext: 3
D/AutoDisposeActivity: onNext: 4
D/AutoDisposeActivity: onNext: 5
D/AutoDisposeActivity: onNext: 6
...

如果解决这种内存泄漏问题呢,我们可以保存Disposable的引用,然后在onDestroy的时候手动调用Disposable的dispose方法,如下所示

    var disposable: Disposable? = null

    fun onClick(view: View) {
        Observable.interval(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Observer<Long> {
                    override fun onComplete() {
                        Log.d(TAG, "onComplete: ")
                    }

                    override fun onSubscribe(d: Disposable) {
                        Log.d(TAG, "onSubscribe: ")
                        disposable = d
                    }

                    override fun onNext(t: Long) {
                        Log.d(TAG, "onNext: $t")
                    }

                    override fun onError(e: Throwable) {
                        Log.d(TAG, "onError: ${e.message}")
                    }

                })

    }

    override fun onDestroy() {
        super.onDestroy()
        disposable?.dispose()
    }

通过这种方式虽然能解决问题,但是每次都手写,也是太枯燥了吧。所以AutoDispose便应运而生了。使用方式如下所示:

Observable.interval(1, TimeUnit.SECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        //注释1处
        .`as`(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
        .subscribe(object : Observer<Long> {
                override fun onComplete() {
                    Log.d(TAG, "onComplete: ")
                }

                override fun onSubscribe(d: Disposable) {
                    Log.d(TAG, "onSubscribe: ")
                }

                override fun onNext(t: Long) {
                    Log.d(TAG, "onNext: $t")
                }

                override fun onError(e: Throwable) {
                    Log.d(TAG, "onError: ${e.message}")
                }
    })

注释1处,一行代码,就解决,也是美滋滋。

//传入的this对象是一个Activity对象
.`as`(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))

下面就探索一下其中实现的原理:

先来一张图


AutoDispose.jpg

AndroidLifecycleScopeProvider的from方法

public static AndroidLifecycleScopeProvider from(LifecycleOwner owner) {
    return from(owner.getLifecycle());
}

传入的LifecycleOwner对象在这个例子中是一个Activity,owner.getLifecycle()就是返回和Activity关联的Lifecycle对象,然后内部调用重载方法

public static AndroidLifecycleScopeProvider from(Lifecycle lifecycle) {
    return from(lifecycle, DEFAULT_CORRESPONDING_EVENTS);
}

传入的DEFAULT_CORRESPONDING_EVENTS对象

private static final CorrespondingEventsFunction<Lifecycle.Event> DEFAULT_CORRESPONDING_EVENTS =
      lastEvent -> {
        switch (lastEvent) {
          case ON_CREATE:
            return Lifecycle.Event.ON_DESTROY;
          case ON_START:
            return Lifecycle.Event.ON_STOP;
          case ON_RESUME:
            return Lifecycle.Event.ON_PAUSE;
          case ON_PAUSE:
            return Lifecycle.Event.ON_STOP;
          case ON_STOP:
          case ON_DESTROY:
          default:
            throw new LifecycleEndedException("Lifecycle has ended! Last event was " + lastEvent);
        }
    };

CorrespondingEventsFunction是一个接口,上面是lambda表达式写法

public interface CorrespondingEventsFunction<E> extends Function<E, E> {

  /**
   * 指定一个event,返回对应的event,收到对应的event的时候,lifecycle应该dispose
   *
   * @param event the source or start event.
   * @return the target event that should signal disposal.
   * @throws OutsideScopeException if the lifecycle exceeds its scope boundaries.
   */
  @Override
  E apply(E event) throws OutsideScopeException;
}

从DEFAULT_CORRESPONDING_EVENTS可以看出来:

  1. 在ON_CREATE中订阅会在ON_DESTROY中取消订阅
  2. 在ON_START中订阅会在ON_STOP中取消订阅
  3. 在ON_RESUME中订阅会在ON_PAUSE中取消订阅
  4. 在ON_PAUSE中订阅会在ON_STOP中取消订阅
  5. 如果在ON_STOP或者ON_DESTROY中订阅,直接抛出异常。
public static AndroidLifecycleScopeProvider from(Lifecycle lifecycle,
             CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
    return new AndroidLifecycleScopeProvider(lifecycle, boundaryResolver);
}
private AndroidLifecycleScopeProvider(
    Lifecycle lifecycle, CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
    //注释1处
    this.lifecycleObservable = new LifecycleEventsObservable(lifecycle);
    this.boundaryResolver = boundaryResolver;
}

注释1处,创建了一个LifecycleEventsObservable对象,从名称就可以看出来,这个Observable是用来观察生命周期事件的。

最后就是返回了一个AndroidLifecycleScopeProvider对象

AutoDispose的autoDisposable(final ScopeProvider provider)方法

public static <T> AutoDisposeConverter<T> autoDisposable(final ScopeProvider provider) {
    //调用重载函数
    return autoDisposable(completableOf(provider));
}

先是调用ScopescompletableOf(ScopeProvider scopeProvider)方法,返回了一个Completable对象,然后调用AutoDispose的重载函数

public static Completable completableOf(ScopeProvider scopeProvider) {
    return Completable.defer(
        () -> {
          try {
            //注释1处
            return scopeProvider.requestScope();
          } catch (OutsideScopeException e) {
            Consumer<? super OutsideScopeException> handler =
                AutoDisposePlugins.getOutsideScopeHandler();
            if (handler != null) {
              handler.accept(e);
              return Completable.complete();
            } else {
              return Completable.error(e);
            }
          }
        });
  }

注释注释1处,调用scopeProvider的requestScope方法获取一个作用域。在这个例子中,scopeProvider是一个AndroidLifecycleScopeProvider对象。

AutoDispose的autoDisposable(final CompletableSource scope)方法简化版

public static <T> AutoDisposeConverter<T> autoDisposable(final CompletableSource scope) {
    //注释1处,返回了一个AutoDisposeConverter对象
    return new AutoDisposeConverter<T>() {
            
        //...

        //注释2处
        @Override
        public ObservableSubscribeProxy<T> apply(final Observable<T> upstream) {
            //...
            //注释3处
            return new ObservableSubscribeProxy<T>() {
                    
                //...
                @Override
                public void subscribe(Observer<? super T> observer) {
                    //注释4处
                    new AutoDisposeObservable<>(upstream, scope).subscribe(observer);
                }
            };
        }
    };
}

注释1处,返回了一个AutoDisposeConverter对象

Observable的as方法简化版

public final <R> R as(ObservableConverter<T, ? extends R> converter) {
      return converter.apply(this);
}

内部就是调用传入的converter的apply方法,将上游的Observable转化成另外一个值。在这个例子中,就是将上游的Observable转化成ObservableSubscribeProxy对象,如注释2处所示。

ObservableConverter的apply方法

public interface ObservableConverter<T, R> {
    R apply(Observable<T> upstream);
}

AutoDisposeConverter接口

public interface AutoDisposeConverter<T>
    extends FlowableConverter<T, FlowableSubscribeProxy<T>>,
        ParallelFlowableConverter<T, ParallelFlowableSubscribeProxy<T>>,
        ObservableConverter<T, ObservableSubscribeProxy<T>>,
        MaybeConverter<T, MaybeSubscribeProxy<T>>,
        SingleConverter<T, SingleSubscribeProxy<T>>,
        CompletableConverter<CompletableSubscribeProxy> {

}

从AutoDisposeConverter接口的继承结构可以看出来,AutoDisposeConverter可以转换的上游的数据类型有6种。

我们省略了大部分代码,只关注上游数据类型是Observable的情况。

注释2处,当上游数据类型是Observable的时候,AutoDisposeConverter的apply方法返回了一个ObservableSubscribeProxy对象。

然后在注释4处,ObservableSubscribeProxy的subscribe(Observer<? super T> observer)方法

@Override
public void subscribe(Observer<? super T> observer) {
    new AutoDisposeObservable<>(upstream, scope).subscribe(observer);
}

注意了:这里使用上游的Observable和传入的作用域scope构建了一个AutoDisposeObservable对象,然后调用subscribe方法订阅下游的观察者observer。

然后我们查看一下AutoDisposeObservable类

final class AutoDisposeObservable<T> extends Observable<T> implements 
   ObservableSubscribeProxy<T> {
        private final ObservableSource<T> source;
        private final CompletableSource scope;

  AutoDisposeObservable(ObservableSource<T> source, CompletableSource scope) { 
    //上游数据
    this.source = source;
    //作用域对象
    this.scope = scope;
  }

  @Override
  protected void subscribeActual(Observer<? super T> observer) {
    //注释1处
    source.subscribe(new AutoDisposingObserverImpl<>(scope, observer));
  }
}

注释1处,使用传入的作用域scope和下游的观察者observer包装成一个AutoDisposingObserverImpl对象,然后上游观察者source订阅AutoDisposingObserverImpl对象。

AutoDisposingObserverImpl部分代码

final class AutoDisposingObserverImpl<T> extends AtomicInteger implements AutoDisposingObserver<T> {

  final AtomicReference<Disposable> mainDisposable = new AtomicReference<>();

  final AtomicReference<Disposable> scopeDisposable = new AtomicReference<>();

  private final AtomicThrowable error = new AtomicThrowable();
  // 传入的作用域对象scope
  private final CompletableSource scope;
  //下游的代理观察者
  private final Observer<? super T> delegate;

  AutoDisposingObserverImpl(CompletableSource scope, Observer<? super T> delegate) {
    this.scope = scope;
    this.delegate = delegate;
  }

  //.... 
}

首先看一下AutoDisposingObserverImpl的onNext方法

@Override
public void onNext(T value) {
    if (!isDisposed()) {
      //注释1处
      if (HalfSerializer.onNext(delegate, value, this, error)) {
        // Terminal event occurred and was forwarded to the delegate, so clean up here
        mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
        AutoDisposableHelper.dispose(scopeDisposable);
      }
    }
}

注释1处,当没有取消订阅的时候,调用HalfSerializer的onNext方法,向下游观察者发送数据

public static <T> boolean onNext(Observer<? super T> observer, T value, AtomicInteger wip, AtomicThrowable error) {
    //处理多线程的情况
    if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
      //向下游发射数据
      observer.onNext(value);
      //如果处理多线程出了问题,调用下游观察者的onError或onComplete方法,并返回true。
      if (wip.decrementAndGet() != 0) {
        Throwable ex = error.terminate();
        if (ex != null) {
          observer.onError(ex);
        } else {
          observer.onComplete();
        }
        return true;
      }
    }
    return false;
}

正常情况下,向下游发射数据成功并返回fasle。如果处理多线程出了问题,调用下游观察者的onError或onComplete方法,并返回true。如果返回了true,那么AutoDisposingObserverImpl就会将自身状态置为isDisposed,就不会再向下游发射数据了,也不会调用下游观察者的的onError或onComplete方法。

到这里我们要明白一点:如果AutoDisposingObserverImpl自身状态为isDisposed的时候,就不会调用我们下游观察者的onNext,onError或onComplete方法了,即切断了下游观察者的订阅,这样就避免了内存泄漏。

AutoDisposingObserverImpl是何时将自身状态置为isDisposed呢?当然是在收到适当的生命周期事件的时候呀,我们继续往下看。

AutoDisposingObserverImpl的onSubscribe方法,这里才是生命周期作用域发挥作用的方法。

@Override
public void onSubscribe(final Disposable d) {
    //注释1处
    DisposableCompletableObserver o =
        new DisposableCompletableObserver() {
            @Override
            public void onError(Throwable e) {
                scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
                //将自身状态置为isDisposed
                AutoDisposingObserverImpl.this.onError(e);
            }

            @Override
            public void onComplete() {
                scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
                //将自身状态置为isDisposed
                AutoDisposableHelper.dispose(mainDisposable);
            }
        };
    //注释2处
    if (AutoDisposeEndConsumerHelper.setOnce(scopeDisposable, o, getClass())) {
        //注释3处
        delegate.onSubscribe(this);
        //注释4处
        scope.subscribe(o);
        //注释5处
        AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
    }
}

注释1处,初始化了一个DisposableCompletableObserver对象o,在该对象的onError方法和onComplete方法中会将AutoDisposingObserverImpl当前状态置为isDisposed。

注释2处,为scopeDisposable赋值为o
注释3处,调用下游观察者的onSubscribe方法。
注释4处,scope订阅DisposableCompletableObserver对象o。在这个例子中,scope就是AndroidLifecycleScopeProvider的requestScope方法返回的对象。
注释5处,为mainDisposable赋值为d

注意:注释4处,scope订阅DisposableCompletableObserver对象o,可以猜测,当作用域对象scope处于某个状态的时候,会调用对象o的onError方法或onComplete方法,让AutoDisposingObserverImpl将自身状态置为isDisposed,从而切断上游观察者和下游观察者的联系,也就是取消订阅。

在这个例子中,scope就是AndroidLifecycleScopeProvider的requestScope方法返回的对象。

AndroidLifecycleScopeProvider的requestScope方法

@Override
public CompletableSource requestScope() {
    return LifecycleScopes.resolveScopeFromLifecycle(this);
}

LifecycleScopes的resolveScopeFromLifecycle方法

public static <E> CompletableSource resolveScopeFromLifecycle(
      final LifecycleScopeProvider<E> provider) throws OutsideScopeException {
    return resolveScopeFromLifecycle(provider, true);
}
public static <E> CompletableSource resolveScopeFromLifecycle(
      final LifecycleScopeProvider<E> provider, final boolean checkEndBoundary)
      throws OutsideScopeException {
    //注释1处
    E lastEvent = provider.peekLifecycle();
    //注释2处
    CorrespondingEventsFunction<E> eventsFunction = provider.correspondingEvents();
    if (lastEvent == null) {
      throw new LifecycleNotStartedException();
    }
    E endEvent;
    try {
      //注释3处
      endEvent = eventsFunction.apply(lastEvent);
    } catch (Exception e) {
      if (checkEndBoundary && e instanceof LifecycleEndedException) {
        Consumer<? super OutsideScopeException> handler =
            AutoDisposePlugins.getOutsideScopeHandler();
        if (handler != null) {
          try {
            handler.accept((LifecycleEndedException) e);

            // Swallowed the end exception, just silently dispose immediately.
            //返回数据类型Completable.complete()
            return Completable.complete();
          } catch (Exception e1) {
            //返回数据类型Completable.error()
            return Completable.error(e1);
          }
        }
        throw e;
      }
      //注释4处
      return Completable.error(e);
    }
    //注释5处,调用重载方法
    return resolveScopeFromLifecycle(provider.lifecycle(), endEvent);
}

注释1处,AndroidLifecycleScopeProvider的peekLifecycle方法

@Override
public Lifecycle.Event peekLifecycle() {
    lifecycleObservable.backfillEvents();
    return lifecycleObservable.getValue();
}

我们不去仔细分析,这个方法就是获取当前的Lifecycle.Event。

注释2处,eventsFunction就是AndroidLifecycleScopeProvider类中的DEFAULT_CORRESPONDING_EVENTS。

注释3处,获取对应的取消订阅的Lifecycle.Event。

然后我们注意到,返回的数据有三种类型

  1. Completable.complete()
  2. Completable.error()
  3. 注释5处返回的数据类型

前两种类型,会导致DisposableCompletableObserver对象立即complete或者error,从而让AutoDisposingObserverImpl将状态置为isDisposed,从而切断上下游的联系。我们再看看注释5处

注释5处,调用重载方法

public static <E> CompletableSource resolveScopeFromLifecycle(
      Observable<E> lifecycle, final E endEvent) {
    @Nullable Comparator<E> comparator = null;
    if (endEvent instanceof Comparable) {
      //noinspection unchecked
      comparator = (Comparator<E>) COMPARABLE_COMPARATOR;
    }
    //注释1处,调用重载方法
    return resolveScopeFromLifecycle(lifecycle, endEvent, comparator);
  }

注释1处,调用重载方法,传入的comparator是null

public static <E> CompletableSource resolveScopeFromLifecycle(
      Observable<E> lifecycle, final E endEvent, final Comparator<E> comparator) {
    Predicate<E> equalityPredicate;
    if (comparator != null) {
      equalityPredicate = e -> comparator.compare(e, endEvent) >= 0;
    } else {
      //注释1处
      equalityPredicate = e -> e.equals(endEvent);
    }
    //注释2处,如果条件满足,会调用下游观察者的onComplete方法
    return lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();
  }

注释1处,equalityPredicate是lambda表达式写法

public interface Predicate<T> {
    /**
     * 测试指定的输入值,返回一个boolean值。
     */
    boolean test(@NonNull T t) throws Exception;
}

注释2处,传入的lifecycle是是AndroidLifecycleScopeProvider类中的lifecycleObservable对象

private AndroidLifecycleScopeProvider(Lifecycle lifecycle, 
            CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
    this.lifecycleObservable = new LifecycleEventsObservable(lifecycle);
    this.boundaryResolver = boundaryResolver;
}
return lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();

注释2处的这行代码含义就是:

  1. 先忽略一个当前的生命周期事件,为什么要忽略一个呢?暂时还没明白。
  2. takeUntil:返回一个ObservableTakeUntilPredicate,每发射一个元素以后,都会判断equalityPredicate条件是否满足,如果满足就发射一个complete事件。
  3. ignoreElements:返回了一个ObservableIgnoreElementsCompletable对象,忽略takeUntil发射的正常元素,只接收complete事件,或者error事件,这种情况下,会导致DisposableCompletableObserver对象立即complete或者error,让AutoDisposingObserverImpl将状态置为isDisposed,从而切断上下游的联系。

我们看看LifecycleEventsObservable类,我们知道当Observable调用subscribe方法订阅观察者的时候,内部会调用subscribeActual方法。

class LifecycleEventsObservable extends Observable<Event> {

  private final Lifecycle lifecycle;
  private final BehaviorSubject<Event> eventsObservable = BehaviorSubject.create();

  LifecycleEventsObservable(Lifecycle lifecycle) {
    //传入的是和Activity关联的生命周期
    this.lifecycle = lifecycle;
  }

 //...

  @Override
  protected void subscribeActual(Observer<? super Event> observer) {
    ArchLifecycleObserver archObserver =
        new ArchLifecycleObserver(lifecycle, observer, eventsObservable);
    observer.onSubscribe(archObserver);
    if (!isMainThread()) {
      observer.onError(
          new IllegalStateException("Lifecycles can only be bound to on the main thread!"));
      return;
    }
   //注释1处
    lifecycle.addObserver(archObserver);
    if (archObserver.isDisposed()) {
      lifecycle.removeObserver(archObserver);
    }
  }
  //ArchLifecycleObserver实现了LifecycleObserver接口
  static final class ArchLifecycleObserver extends MainThreadDisposable
      implements LifecycleObserver {
    private final Lifecycle lifecycle;
    private final Observer<? super Event> observer;
    private final BehaviorSubject<Event> eventsObservable;

    ArchLifecycleObserver(
        Lifecycle lifecycle,
        Observer<? super Event> observer,
        BehaviorSubject<Event> eventsObservable) {
      this.lifecycle = lifecycle;
      this.observer = observer;
      this.eventsObservable = eventsObservable;
    }

    @Override
    protected void onDispose() {
      lifecycle.removeObserver(this);
    }

    @OnLifecycleEvent(Event.ON_ANY)
    void onStateChange(LifecycleOwner owner, Event event) {
      if (!isDisposed()) {
        if (!(event == ON_CREATE && eventsObservable.getValue() == event)) {
          // Due to the INITIALIZED->ON_CREATE mapping trick we do in backfill(),
          // we fire this conditionally to avoid duplicate CREATE events.
          eventsObservable.onNext(event);
        }
        //注释2处
        observer.onNext(event);
      }
    }
  }
}

首先ArchLifecycleObserver实现了LifecycleObserver接口,然后在注释2处使用OnLifecycleEvent注解了onStateChange方法,所以当Android生命周期发生改变的时候,都会回调这个方法,当发射了相应的生命周期事件以后,比如说onDestroy的时候
lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();条件满足,takeUntil会发射一个complete事件,从而导致DisposableCompletableObserver回调onComplete方法,从而导致AutoDisposingObserverImpl状态变为isDisposed,从而切断上下游的联系,避免内存泄漏。

疑问:在LifecycleScopes类的resolveScopeFromLifecycle方法中

lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements()

为什么这里要skip一个事件呢?,目前还没想明白。

参考链接:

上一篇下一篇

猜你喜欢

热点阅读