RxJava1中关于Subscription的一些误解纠正
通常我们使用RxJava时会这样写:
Subscription subscription = getUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
得到一个Subscription对象之后,在Activity销毁的时候去取消订阅以防内存泄漏:
if (subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
}
这样做固然没错, 可是真的每次都需要手动取消订阅吗? 答案显然不是.
我们来看看源码:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
}
当调用subscribe时,调用了第二个方法, 当subscriber不是SafeSubscriber的实例时, 则创建一个SafeSubscriber对象. 那么来看看这个SafeSubscriber是个什么鬼:
public class SafeSubscriber<T> extends Subscriber<T> {
@Override
public void onCompleted() {
...
try {
actual.onCompleted();
} catch (Throwable e) {
...
} finally { // NOPMD
try {
unsubscribe();
} catch (Throwable e) {}
}
}
@Override
public void onError(Throwable e) {
...
_onError(e);
}
@Override
public void onNext(T args) {}
protected void _onError(Throwable e) { // NOPMD
...
try {
unsubscribe();
} catch (Throwable unsubscribeException) {}
}
}
可以看出该类仅仅是对Subscriber做了个包装.
仔细查看代码, 发现在onComplated() 方法和onError() 方法中, 都调用了unsubscribe() 方法进行取消订阅.
因此我们可以大胆猜测, 只要OnCompleted() 和 onError() 执行之后, 都会取消订阅.
下面就简单的进行一下测试:
Subscription subscription = Observable.just(1)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("complete");
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onNext(Integer integer) {
System.out.println("integer:" + integer);
}
});
boolean flag = subscription.isUnsubscribed();
System.out.println("flag:" + flag);
运行结果为:
integer:1
complete
flag: true
说明onCompleted执行之后的确取消了订阅. 再来看看OnError:
Subscription subscription = Observable.just(1)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
throw new RuntimeException("Oops!");
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("complete");
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onNext(Integer integer) {
System.out.println("integer:" + integer);
}
});
boolean flag = subscription.isUnsubscribed();
System.out.println("flag:" + flag);
运行结果为:
error
flag: true
说明OnError执行之后也取消了订阅.
那么究竟什么时候需要手动取消订阅呢? 我们再看一个例子:
final Subscription subscription = Observable.just(1)
.subscribeOn(Schedulers.io())
.delay(2, SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("complete");
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onNext(Integer integer) {
System.out.println("integer:" + integer);
}
});
new Thread(new Runnable() {
@Override
public void run() {
while (true){
System.out.println("flag:" + subscription.isUnsubscribed());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
我们模拟平时App网络请求的例子, 在后台线程中发起网络请求, 在主线程更新界面, 并延时两秒模拟网络不好的情况, 同时新启一个线程, 每隔一秒打印一次当前Subscription的状态, 下面是运行结果:
I/System.out: flag:false
I/System.out: flag:false
I/System.out: integer:1
I/System.out: complete
I/System.out: flag:true
I/System.out: flag:true
可以看到在前两秒时由于还在后台执行网络请求,所以并没有取消订阅, 而当complete执行之后, 订阅就取消了.
因此我们在开发中, 如果有异步的操作还正在进行, 在Activity销毁时才需要手动取消订阅, 而如果是同步的操作, 或者异步操作已经完成, 则并不需要手动取消订阅.
最后
在现在这个金三银四的面试季,我自己在网上也搜集了很多资料做成了文档和架构视频资料免费分享给大家【包括高级UI、性能优化、架构师课程、NDK、Kotlin、混合式开发(ReactNative+Weex)、Flutter等架构技术资料】,希望能帮助到您面试前的复习且找到一个好的工作,也节省大家在网上搜索资料的时间来学习。