RxLifecycle 的使用以及相关源码
我们正在开发的几个项目,都用到了这个开源库:https://github.com/trello/RxLifecycle
官方对这个库的原话解释:Lifecycle handling APIs for Android apps using RxJava
This library allows one to automatically complete sequences based on a second lifecycle stream.
This capability is useful in Android, where incomplete subscriptions can cause memory leaks.
该库允许基于第二个生命周期流自动完成序列。
此功能在 Android 中很有用,因为不完整的订阅会导致内存泄漏。
如何使用呢:
您必须从代表生命周期流的 Observable<T> 开始。 然后使用 RxLifecycle 将序列绑定到该生命周期。
您可以在生命周期发出任何内容时进行绑定:
myObservable
.compose(RxLifecycle.bind(lifecycle))
.subscribe();
或者您可以绑定到特定生命周期事件发生时:
myObservable
.compose(RxLifecycle.bindUntilEvent(lifecycle, ActivityEvent.DESTROY))
.subscribe();
或者,您可以让 RxLifecycle 确定结束序列的适当时间:
myObservable
.compose(RxLifecycleAndroid.bindActivity(lifecycle))
.subscribe();
它假设您想在相反的生命周期事件中结束序列——例如,如果在 START
期间订阅,它将在 STOP
时终止。 如果您在 PAUSE
之后订阅,它将在下一个销毁事件中终止(例如,PAUSE
将在 STOP
中终止)。
Providers
生命周期从何而来? 通常,它们由适当的 LifecycleProvider<T>
提供。 但那些实施在哪里?
public interface LifecycleProvider<E> {
/**
* 返回一系列生命周期事件
*/
Observable<E> lifecycle();
/**
* 绑定源直到特定事件发生。
*
* @param event 触发退订的事件
* @return a reusable {@link LifecycleTransformer} which unsubscribes when the event triggers.
*/
<T> LifecycleTransformer<T> bindUntilEvent(@Nonnull E event);
/**
* 绑定一个源,直到下一个合理的事件发生。
*
* @return a reusable {@link LifecycleTransformer} which unsubscribes at the correct time.
*/
<T> LifecycleTransformer<T> bindToLifecycle();
}
你有几个选择:
- 使用 rxlifecycle-components 并对提供的
RxActivity
、RxFragment
等类进行子类化。 - 使用 Android's lifecycle + rxlifecycle-android-lifecycle 生成提供者。
- 自己编写实现。
如果您使用 rxlifecycle-components,只需扩展相应的类,然后使用内置的 bindToLifecycle()
(或 bindUntilEvent()
)方法:
public class MyActivity extends RxActivity {
@Override
public void onResume() {
super.onResume();
myObservable
.compose(bindToLifecycle())
.subscribe();
}
}
如果您使用 rxlifecycle-android-lifecycle,那么您只需将 LifecycleOwner
传递给 AndroidLifecycle
即可生成提供程序:
public class MyActivity extends LifecycleActivity {
private final LifecycleProvider<Lifecycle.Event> provider
= AndroidLifecycle.createLifecycleProvider(this);
@Override
public void onResume() {
super.onResume();
myObservable
.compose(provider.bindToLifecycle())
.subscribe();
}
}
退订
RxLifecycle 实际上并没有取消订阅该序列。 相反,它终止了序列。 它这样做的方式因类型而异:
-
Observable
、Flowable
和Maybe
- 发出onCompleted()
-
Single
和Completable
- 发出onError(CancellationException)
如果一个序列需要 Subscription.unsubscribe()
行为,那么建议您自己手动处理 Subscription
并在适当的时候调用 unsubscribe()
。
以下是 RxActivity 的源码 :
public abstract class RxActivity extends Activity implements LifecycleProvider<ActivityEvent> {
//释放订阅前最后一个数据和订阅后接收到的所有数据
private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();
@Override
@NonNull
@CheckResult
public final Observable<ActivityEvent> lifecycle() {
return lifecycleSubject.hide();
}
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
}
//实际上返回了一个LifecycleTransformer
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindToLifecycle() {
//bindToLifecycle,本质上RxActivity 中的BehaviorSubject成员变量(它本身就是一个Observable)!
return RxLifecycleAndroid.bindActivity(lifecycleSubject);
}
//Activity不同的生命周期,BehaviorSubject对象会发射对应的ActivityEvent
@Override
@CallSuper
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifecycleSubject.onNext(ActivityEvent.CREATE);
}
@Override
@CallSuper
protected void onStart() {
super.onStart();
lifecycleSubject.onNext(ActivityEvent.START);
}
@Override
@CallSuper
protected void onResume() {
super.onResume();
lifecycleSubject.onNext(ActivityEvent.RESUME);
}
@Override
@CallSuper
protected void onPause() {
lifecycleSubject.onNext(ActivityEvent.PAUSE);
super.onPause();
}
@Override
@CallSuper
protected void onStop() {
lifecycleSubject.onNext(ActivityEvent.STOP);
super.onStop();
}
@Override
@CallSuper
protected void onDestroy() {
lifecycleSubject.onNext(ActivityEvent.DESTROY);
super.onDestroy();
}
}
这个 BehaviorSubject
会在不同的生命周期发射不同的 ActivityEvent
,比如在 onCreate()
生命周期发射 ActivityEvent.CREATE
,在 onStop()
发射 ActivityEvent.STOP
。
public final class BehaviorSubject<T> extends Subject<T> {
final AtomicReference<Object> value; //原子操作类,当前接收到的最后一个数据
final AtomicReference<BehaviorDisposable<T>[]> observers;//原子操作类,BehaviorDisposable内部存储了所有接受到的数据
@SuppressWarnings("rawtypes")
static final BehaviorDisposable[] EMPTY = new BehaviorDisposable[0];//标记,意味着一个空的BehaviorDisposable
@SuppressWarnings("rawtypes")
static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0]; //标记,意味着已经达到了TERMINATED,终止数据的发射
final ReadWriteLock lock;
final Lock readLock;
final Lock writeLock;
final AtomicReference<Throwable> terminalEvent;
...省略
}
- LifecycleTransformer
/**
* Transformer that continues a subscription until a second Observable emits an event.
*/
@ParametersAreNonnullByDefault
public final class LifecycleTransformer<T> implements ObservableTransformer<T, T>,
FlowableTransformer<T, T>,
SingleTransformer<T, T>,
MaybeTransformer<T, T>,
CompletableTransformer
{
final Observable<?> observable;
LifecycleTransformer(Observable<?> observable) {
checkNotNull(observable, "observable == null");
this.observable = observable;
}
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
//observable为 true 为终止
return upstream.takeUntil(observable);
}
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST));
}
@Override
public SingleSource<T> apply(Single<T> upstream) {
return upstream.takeUntil(observable.firstOrError());
}
@Override
public MaybeSource<T> apply(Maybe<T> upstream) {
return upstream.takeUntil(observable.firstElement());
}
@Override
public CompletableSource apply(Completable upstream) {
return Completable.ambArray(upstream, observable.flatMapCompletable(Functions.CANCEL_COMPLETABLE));
}
@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
LifecycleTransformer<?> that = (LifecycleTransformer<?>) o;
return observable.equals(that.observable);
}
@Override
public int hashCode() {
return observable.hashCode();
}
@Override
public String toString() {
return "LifecycleTransformer{" +
"observable=" + observable +
'}';
}
}
- RxLifecycle
public class RxLifecycle {
private RxLifecycle() {
throw new AssertionError("No instances");
}
/**
* Binds the given source to a lifecycle.
* <p>
* When the lifecycle event occurs, the source will cease to emit any notifications.
*
* @param lifecycle the lifecycle sequence
* @param event the event which should conclude notifications from the source
* @return a reusable {@link LifecycleTransformer} that unsubscribes the source at the specified event
*/
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle,
@Nonnull final R event) {
checkNotNull(lifecycle, "lifecycle == null");
checkNotNull(event, "event == null");
return bind(takeUntilEvent(lifecycle, event));
}
private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
return lifecycle.filter(new Predicate<R>() {
@Override
public boolean test(R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(event);
}
});
}
/**
* Binds the given source to a lifecycle.
* <p>
* This helper automatically determines (based on the lifecycle sequence itself) when the source
* should stop emitting items. Note that for this method, it assumes <em>any</em> event
* emitted by the given lifecycle indicates that the lifecycle is over.
*
* @param lifecycle the lifecycle sequence
* @return a reusable {@link LifecycleTransformer} that unsubscribes the source whenever the lifecycle emits
*/
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
return new LifecycleTransformer<>(lifecycle);
}
/**
* Binds the given source to a lifecycle.
* <p>
* This method determines (based on the lifecycle sequence itself) when the source
* should stop emitting items. It uses the provided correspondingEvents function to determine
* when to unsubscribe.
* <p>
* Note that this is an advanced usage of the library and should generally be used only if you
* really know what you're doing with a given lifecycle.
*
* @param lifecycle the lifecycle sequence
* @param correspondingEvents a function which tells the source when to unsubscribe
* @return a reusable {@link LifecycleTransformer} that unsubscribes the source during the Fragment lifecycle
*/
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
@Nonnull final Function<R, R> correspondingEvents) {
checkNotNull(lifecycle, "lifecycle == null");
checkNotNull(correspondingEvents, "correspondingEvents == null");
return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
final Function<R, R> correspondingEvents) {
//combineLatest方法,是将两个数据源的释放item放到一起通过一个func(某种方法)返回你想要的东西
return Observable.combineLatest(
lifecycle.take(1).map(correspondingEvents),
lifecycle.skip(1),
new BiFunction<R, R, Boolean>() {
@Override
public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(bindUntilEvent);
}
})
.onErrorReturn(Functions.RESUME_FUNCTION)
.filter(Functions.SHOULD_COMPLETE);
}
}
最终发现是调用 combineLatest
方法,逐步分析一下:
-
lifecycle.take(1)
指的是最近发射的事件,比如说我们在onCreate()
中执行了bindToLifecycle
,那么lifecycle.take(1)
指的就是ActivityEvent.CREATE
,经过map(correspondingEvents)
,这个map
中传的函数就是RxLifecycleAndroid
中的ACTIVITY_LIFECYCLE
。
也就是说,lifecycle.take(1).map(correspondingEvents)
实际上是返回了CREATE
对应的事件DESTROY
, 它意味着本次订阅将在Activity
的onDestory
进行取消。 -
lifecycle.skip(1)
就简单了,除去第一个保留剩下的。 -
第三个参数 意味着,
lifecycle.take(1).map(correspondingEvents)
的序列和lifecycle.skip(1)
进行combine
,形成一个新的序列。即是说,当Activity
走到onStart
生命周期时,为false
,这次订阅不会取消,直到onDestroy
,为true
则订阅取消。 -
而后的
onErrorReturn
和filter
是对异常的处理和判断是否应该结束订阅。