AutoDispose 源码解析
AutoDispose是RxJava2+的一个工具,通过disposal或cancellation将RxJava流的执行自动绑定到提供的作用域。
说人话就是:
AutoDispose是配合RxJava2
使用的一个工具,我们提供一个作用域(在Android中,Activity的生命周期就是可以看作一个作用域)。AutoDispose会自动将RxJava的数据流和我们提供的作用域进行绑定,当作用域到达结束状态的时候(比如onDestroy),AutoDispose会自动取消RxJava数据流的执行,在Android中可以用来避免RxJava造成的内存泄漏。
举个例子,我们间隔一秒输出一个整数,可以发现当我们点击返回键的时候,数字还是会一直打印,造成内存泄漏。
代码使用Kotlin演示
Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<Long> {
override fun onComplete() {
Log.d(TAG, "onComplete: ")
}
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "onSubscribe: ")
}
override fun onNext(t: Long) {
Log.d(TAG, "onNext: $t")
}
override fun onError(e: Throwable) {
Log.d(TAG, "onError: ${e.message}")
}
})
一直输出
D/AutoDisposeActivity: onSubscribe:
D/AutoDisposeActivity: onNext: 0
D/AutoDisposeActivity: onNext: 1
D/AutoDisposeActivity: onNext: 2
D/AutoDisposeActivity: onNext: 3
D/AutoDisposeActivity: onNext: 4
D/AutoDisposeActivity: onNext: 5
D/AutoDisposeActivity: onNext: 6
...
如果解决这种内存泄漏问题呢,我们可以保存Disposable的引用,然后在onDestroy的时候手动调用Disposable的dispose方法,如下所示
var disposable: Disposable? = null
fun onClick(view: View) {
Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<Long> {
override fun onComplete() {
Log.d(TAG, "onComplete: ")
}
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "onSubscribe: ")
disposable = d
}
override fun onNext(t: Long) {
Log.d(TAG, "onNext: $t")
}
override fun onError(e: Throwable) {
Log.d(TAG, "onError: ${e.message}")
}
})
}
override fun onDestroy() {
super.onDestroy()
disposable?.dispose()
}
通过这种方式虽然能解决问题,但是每次都手写,也是太枯燥了吧。所以AutoDispose便应运而生了。使用方式如下所示:
Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
//注释1处
.`as`(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
.subscribe(object : Observer<Long> {
override fun onComplete() {
Log.d(TAG, "onComplete: ")
}
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "onSubscribe: ")
}
override fun onNext(t: Long) {
Log.d(TAG, "onNext: $t")
}
override fun onError(e: Throwable) {
Log.d(TAG, "onError: ${e.message}")
}
})
注释1处,一行代码,就解决,也是美滋滋。
//传入的this对象是一个Activity对象
.`as`(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
下面就探索一下其中实现的原理:
先来一张图
AutoDispose.jpg
AndroidLifecycleScopeProvider的from方法
public static AndroidLifecycleScopeProvider from(LifecycleOwner owner) {
return from(owner.getLifecycle());
}
传入的LifecycleOwner对象在这个例子中是一个Activity,owner.getLifecycle()
就是返回和Activity关联的Lifecycle对象,然后内部调用重载方法
public static AndroidLifecycleScopeProvider from(Lifecycle lifecycle) {
return from(lifecycle, DEFAULT_CORRESPONDING_EVENTS);
}
传入的DEFAULT_CORRESPONDING_EVENTS对象
private static final CorrespondingEventsFunction<Lifecycle.Event> DEFAULT_CORRESPONDING_EVENTS =
lastEvent -> {
switch (lastEvent) {
case ON_CREATE:
return Lifecycle.Event.ON_DESTROY;
case ON_START:
return Lifecycle.Event.ON_STOP;
case ON_RESUME:
return Lifecycle.Event.ON_PAUSE;
case ON_PAUSE:
return Lifecycle.Event.ON_STOP;
case ON_STOP:
case ON_DESTROY:
default:
throw new LifecycleEndedException("Lifecycle has ended! Last event was " + lastEvent);
}
};
CorrespondingEventsFunction是一个接口,上面是lambda表达式写法
public interface CorrespondingEventsFunction<E> extends Function<E, E> {
/**
* 指定一个event,返回对应的event,收到对应的event的时候,lifecycle应该dispose
*
* @param event the source or start event.
* @return the target event that should signal disposal.
* @throws OutsideScopeException if the lifecycle exceeds its scope boundaries.
*/
@Override
E apply(E event) throws OutsideScopeException;
}
从DEFAULT_CORRESPONDING_EVENTS可以看出来:
- 在ON_CREATE中订阅会在ON_DESTROY中取消订阅
- 在ON_START中订阅会在ON_STOP中取消订阅
- 在ON_RESUME中订阅会在ON_PAUSE中取消订阅
- 在ON_PAUSE中订阅会在ON_STOP中取消订阅
- 如果在ON_STOP或者ON_DESTROY中订阅,直接抛出异常。
public static AndroidLifecycleScopeProvider from(Lifecycle lifecycle,
CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
return new AndroidLifecycleScopeProvider(lifecycle, boundaryResolver);
}
private AndroidLifecycleScopeProvider(
Lifecycle lifecycle, CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
//注释1处
this.lifecycleObservable = new LifecycleEventsObservable(lifecycle);
this.boundaryResolver = boundaryResolver;
}
注释1处,创建了一个LifecycleEventsObservable对象,从名称就可以看出来,这个Observable是用来观察生命周期事件的。
最后就是返回了一个AndroidLifecycleScopeProvider对象
AutoDispose的autoDisposable(final ScopeProvider provider)方法
public static <T> AutoDisposeConverter<T> autoDisposable(final ScopeProvider provider) {
//调用重载函数
return autoDisposable(completableOf(provider));
}
先是调用Scopes
的completableOf(ScopeProvider scopeProvider)
方法,返回了一个Completable对象,然后调用AutoDispose的重载函数
public static Completable completableOf(ScopeProvider scopeProvider) {
return Completable.defer(
() -> {
try {
//注释1处
return scopeProvider.requestScope();
} catch (OutsideScopeException e) {
Consumer<? super OutsideScopeException> handler =
AutoDisposePlugins.getOutsideScopeHandler();
if (handler != null) {
handler.accept(e);
return Completable.complete();
} else {
return Completable.error(e);
}
}
});
}
注释注释1处,调用scopeProvider的requestScope方法获取一个作用域。在这个例子中,scopeProvider是一个AndroidLifecycleScopeProvider对象。
AutoDispose的autoDisposable(final CompletableSource scope)方法简化版
public static <T> AutoDisposeConverter<T> autoDisposable(final CompletableSource scope) {
//注释1处,返回了一个AutoDisposeConverter对象
return new AutoDisposeConverter<T>() {
//...
//注释2处
@Override
public ObservableSubscribeProxy<T> apply(final Observable<T> upstream) {
//...
//注释3处
return new ObservableSubscribeProxy<T>() {
//...
@Override
public void subscribe(Observer<? super T> observer) {
//注释4处
new AutoDisposeObservable<>(upstream, scope).subscribe(observer);
}
};
}
};
}
注释1处,返回了一个AutoDisposeConverter对象
Observable的as方法简化版
public final <R> R as(ObservableConverter<T, ? extends R> converter) {
return converter.apply(this);
}
内部就是调用传入的converter
的apply方法,将上游的Observable转化成另外一个值。在这个例子中,就是将上游的Observable转化成ObservableSubscribeProxy对象,如注释2处所示。
ObservableConverter的apply方法
public interface ObservableConverter<T, R> {
R apply(Observable<T> upstream);
}
AutoDisposeConverter接口
public interface AutoDisposeConverter<T>
extends FlowableConverter<T, FlowableSubscribeProxy<T>>,
ParallelFlowableConverter<T, ParallelFlowableSubscribeProxy<T>>,
ObservableConverter<T, ObservableSubscribeProxy<T>>,
MaybeConverter<T, MaybeSubscribeProxy<T>>,
SingleConverter<T, SingleSubscribeProxy<T>>,
CompletableConverter<CompletableSubscribeProxy> {
}
从AutoDisposeConverter接口的继承结构可以看出来,AutoDisposeConverter可以转换的上游的数据类型有6种。
- Flowable
- ParallelFlowable
- Observable
- Maybe
- Single
- Completable
我们省略了大部分代码,只关注上游数据类型是Observable的情况。
注释2处,当上游数据类型是Observable的时候,AutoDisposeConverter的apply
方法返回了一个ObservableSubscribeProxy对象。
然后在注释4处,ObservableSubscribeProxy的subscribe(Observer<? super T> observer)
方法
@Override
public void subscribe(Observer<? super T> observer) {
new AutoDisposeObservable<>(upstream, scope).subscribe(observer);
}
注意了:这里使用上游的Observable和传入的作用域scope构建了一个AutoDisposeObservable对象,然后调用subscribe
方法订阅下游的观察者observer。
然后我们查看一下AutoDisposeObservable类
final class AutoDisposeObservable<T> extends Observable<T> implements
ObservableSubscribeProxy<T> {
private final ObservableSource<T> source;
private final CompletableSource scope;
AutoDisposeObservable(ObservableSource<T> source, CompletableSource scope) {
//上游数据
this.source = source;
//作用域对象
this.scope = scope;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//注释1处
source.subscribe(new AutoDisposingObserverImpl<>(scope, observer));
}
}
注释1处,使用传入的作用域scope和下游的观察者observer包装成一个AutoDisposingObserverImpl对象,然后上游观察者source订阅AutoDisposingObserverImpl对象。
AutoDisposingObserverImpl部分代码
final class AutoDisposingObserverImpl<T> extends AtomicInteger implements AutoDisposingObserver<T> {
final AtomicReference<Disposable> mainDisposable = new AtomicReference<>();
final AtomicReference<Disposable> scopeDisposable = new AtomicReference<>();
private final AtomicThrowable error = new AtomicThrowable();
// 传入的作用域对象scope
private final CompletableSource scope;
//下游的代理观察者
private final Observer<? super T> delegate;
AutoDisposingObserverImpl(CompletableSource scope, Observer<? super T> delegate) {
this.scope = scope;
this.delegate = delegate;
}
//....
}
首先看一下AutoDisposingObserverImpl的onNext方法
@Override
public void onNext(T value) {
if (!isDisposed()) {
//注释1处
if (HalfSerializer.onNext(delegate, value, this, error)) {
// Terminal event occurred and was forwarded to the delegate, so clean up here
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(scopeDisposable);
}
}
}
注释1处,当没有取消订阅的时候,调用HalfSerializer的onNext方法,向下游观察者发送数据
public static <T> boolean onNext(Observer<? super T> observer, T value, AtomicInteger wip, AtomicThrowable error) {
//处理多线程的情况
if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
//向下游发射数据
observer.onNext(value);
//如果处理多线程出了问题,调用下游观察者的onError或onComplete方法,并返回true。
if (wip.decrementAndGet() != 0) {
Throwable ex = error.terminate();
if (ex != null) {
observer.onError(ex);
} else {
observer.onComplete();
}
return true;
}
}
return false;
}
正常情况下,向下游发射数据成功并返回fasle。如果处理多线程出了问题,调用下游观察者的onError或onComplete方法,并返回true。如果返回了true,那么AutoDisposingObserverImpl就会将自身状态置为isDisposed,就不会再向下游发射数据了,也不会调用下游观察者的的onError或onComplete方法。
到这里我们要明白一点:如果AutoDisposingObserverImpl自身状态为isDisposed的时候,就不会调用我们下游观察者的onNext,onError或onComplete方法了,即切断了下游观察者的订阅,这样就避免了内存泄漏。
AutoDisposingObserverImpl是何时将自身状态置为isDisposed呢?当然是在收到适当的生命周期事件的时候呀,我们继续往下看。
AutoDisposingObserverImpl的onSubscribe方法,这里才是生命周期作用域发挥作用的方法。
@Override
public void onSubscribe(final Disposable d) {
//注释1处
DisposableCompletableObserver o =
new DisposableCompletableObserver() {
@Override
public void onError(Throwable e) {
scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
//将自身状态置为isDisposed
AutoDisposingObserverImpl.this.onError(e);
}
@Override
public void onComplete() {
scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
//将自身状态置为isDisposed
AutoDisposableHelper.dispose(mainDisposable);
}
};
//注释2处
if (AutoDisposeEndConsumerHelper.setOnce(scopeDisposable, o, getClass())) {
//注释3处
delegate.onSubscribe(this);
//注释4处
scope.subscribe(o);
//注释5处
AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
}
}
注释1处,初始化了一个DisposableCompletableObserver对象o
,在该对象的onError
方法和onComplete方法中会将AutoDisposingObserverImpl当前状态置为isDisposed。
注释2处,为scopeDisposable赋值为o
。
注释3处,调用下游观察者的onSubscribe方法。
注释4处,scope订阅DisposableCompletableObserver对象o
。在这个例子中,scope就是AndroidLifecycleScopeProvider的requestScope方法返回的对象。
注释5处,为mainDisposable赋值为d
。
注意:注释4处,scope订阅DisposableCompletableObserver对象o
,可以猜测,当作用域对象scope处于某个状态的时候,会调用对象o
的onError方法或onComplete方法,让AutoDisposingObserverImpl将自身状态置为isDisposed,从而切断上游观察者和下游观察者的联系,也就是取消订阅。
在这个例子中,scope就是AndroidLifecycleScopeProvider的requestScope方法返回的对象。
AndroidLifecycleScopeProvider的requestScope方法
@Override
public CompletableSource requestScope() {
return LifecycleScopes.resolveScopeFromLifecycle(this);
}
LifecycleScopes的resolveScopeFromLifecycle方法
public static <E> CompletableSource resolveScopeFromLifecycle(
final LifecycleScopeProvider<E> provider) throws OutsideScopeException {
return resolveScopeFromLifecycle(provider, true);
}
public static <E> CompletableSource resolveScopeFromLifecycle(
final LifecycleScopeProvider<E> provider, final boolean checkEndBoundary)
throws OutsideScopeException {
//注释1处
E lastEvent = provider.peekLifecycle();
//注释2处
CorrespondingEventsFunction<E> eventsFunction = provider.correspondingEvents();
if (lastEvent == null) {
throw new LifecycleNotStartedException();
}
E endEvent;
try {
//注释3处
endEvent = eventsFunction.apply(lastEvent);
} catch (Exception e) {
if (checkEndBoundary && e instanceof LifecycleEndedException) {
Consumer<? super OutsideScopeException> handler =
AutoDisposePlugins.getOutsideScopeHandler();
if (handler != null) {
try {
handler.accept((LifecycleEndedException) e);
// Swallowed the end exception, just silently dispose immediately.
//返回数据类型Completable.complete()
return Completable.complete();
} catch (Exception e1) {
//返回数据类型Completable.error()
return Completable.error(e1);
}
}
throw e;
}
//注释4处
return Completable.error(e);
}
//注释5处,调用重载方法
return resolveScopeFromLifecycle(provider.lifecycle(), endEvent);
}
注释1处,AndroidLifecycleScopeProvider的peekLifecycle方法
@Override
public Lifecycle.Event peekLifecycle() {
lifecycleObservable.backfillEvents();
return lifecycleObservable.getValue();
}
我们不去仔细分析,这个方法就是获取当前的Lifecycle.Event。
注释2处,eventsFunction就是AndroidLifecycleScopeProvider类中的DEFAULT_CORRESPONDING_EVENTS。
注释3处,获取对应的取消订阅的Lifecycle.Event。
然后我们注意到,返回的数据有三种类型
- Completable.complete()
- Completable.error()
- 注释5处返回的数据类型
前两种类型,会导致DisposableCompletableObserver对象立即complete或者error,从而让AutoDisposingObserverImpl将状态置为isDisposed,从而切断上下游的联系。我们再看看注释5处
注释5处,调用重载方法
public static <E> CompletableSource resolveScopeFromLifecycle(
Observable<E> lifecycle, final E endEvent) {
@Nullable Comparator<E> comparator = null;
if (endEvent instanceof Comparable) {
//noinspection unchecked
comparator = (Comparator<E>) COMPARABLE_COMPARATOR;
}
//注释1处,调用重载方法
return resolveScopeFromLifecycle(lifecycle, endEvent, comparator);
}
注释1处,调用重载方法,传入的comparator是null
public static <E> CompletableSource resolveScopeFromLifecycle(
Observable<E> lifecycle, final E endEvent, final Comparator<E> comparator) {
Predicate<E> equalityPredicate;
if (comparator != null) {
equalityPredicate = e -> comparator.compare(e, endEvent) >= 0;
} else {
//注释1处
equalityPredicate = e -> e.equals(endEvent);
}
//注释2处,如果条件满足,会调用下游观察者的onComplete方法
return lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();
}
注释1处,equalityPredicate是lambda表达式写法
public interface Predicate<T> {
/**
* 测试指定的输入值,返回一个boolean值。
*/
boolean test(@NonNull T t) throws Exception;
}
注释2处,传入的lifecycle是是AndroidLifecycleScopeProvider类中的lifecycleObservable
对象
private AndroidLifecycleScopeProvider(Lifecycle lifecycle,
CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
this.lifecycleObservable = new LifecycleEventsObservable(lifecycle);
this.boundaryResolver = boundaryResolver;
}
return lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();
注释2处的这行代码含义就是:
- 先忽略一个当前的生命周期事件,为什么要忽略一个呢?暂时还没明白。
- takeUntil:返回一个ObservableTakeUntilPredicate,每发射一个元素以后,都会判断equalityPredicate条件是否满足,如果满足就发射一个complete事件。
- ignoreElements:返回了一个ObservableIgnoreElementsCompletable对象,忽略takeUntil发射的正常元素,只接收complete事件,或者error事件,这种情况下,会导致DisposableCompletableObserver对象立即complete或者error,让AutoDisposingObserverImpl将状态置为isDisposed,从而切断上下游的联系。
我们看看LifecycleEventsObservable类,我们知道当Observable调用subscribe
方法订阅观察者的时候,内部会调用subscribeActual方法。
class LifecycleEventsObservable extends Observable<Event> {
private final Lifecycle lifecycle;
private final BehaviorSubject<Event> eventsObservable = BehaviorSubject.create();
LifecycleEventsObservable(Lifecycle lifecycle) {
//传入的是和Activity关联的生命周期
this.lifecycle = lifecycle;
}
//...
@Override
protected void subscribeActual(Observer<? super Event> observer) {
ArchLifecycleObserver archObserver =
new ArchLifecycleObserver(lifecycle, observer, eventsObservable);
observer.onSubscribe(archObserver);
if (!isMainThread()) {
observer.onError(
new IllegalStateException("Lifecycles can only be bound to on the main thread!"));
return;
}
//注释1处
lifecycle.addObserver(archObserver);
if (archObserver.isDisposed()) {
lifecycle.removeObserver(archObserver);
}
}
//ArchLifecycleObserver实现了LifecycleObserver接口
static final class ArchLifecycleObserver extends MainThreadDisposable
implements LifecycleObserver {
private final Lifecycle lifecycle;
private final Observer<? super Event> observer;
private final BehaviorSubject<Event> eventsObservable;
ArchLifecycleObserver(
Lifecycle lifecycle,
Observer<? super Event> observer,
BehaviorSubject<Event> eventsObservable) {
this.lifecycle = lifecycle;
this.observer = observer;
this.eventsObservable = eventsObservable;
}
@Override
protected void onDispose() {
lifecycle.removeObserver(this);
}
@OnLifecycleEvent(Event.ON_ANY)
void onStateChange(LifecycleOwner owner, Event event) {
if (!isDisposed()) {
if (!(event == ON_CREATE && eventsObservable.getValue() == event)) {
// Due to the INITIALIZED->ON_CREATE mapping trick we do in backfill(),
// we fire this conditionally to avoid duplicate CREATE events.
eventsObservable.onNext(event);
}
//注释2处
observer.onNext(event);
}
}
}
}
首先ArchLifecycleObserver实现了LifecycleObserver接口,然后在注释2处使用OnLifecycleEvent注解了onStateChange
方法,所以当Android生命周期发生改变的时候,都会回调这个方法,当发射了相应的生命周期事件以后,比如说onDestroy的时候
lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();
条件满足,takeUntil会发射一个complete事件,从而导致DisposableCompletableObserver回调onComplete方法,从而导致AutoDisposingObserverImpl状态变为isDisposed,从而切断上下游的联系,避免内存泄漏。
疑问:在LifecycleScopes类的resolveScopeFromLifecycle方法中
lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements()
为什么这里要skip一个事件呢?,目前还没想明白。
参考链接: