RxJava2系列第二篇---异步

2018-09-01  本文已影响0人  sofarsogoo_932d

目录

第一篇---基本使用
第二篇---异步
第一篇---操作符

异步

在该系列第一篇最开始,我们已经说了RxJava是一个异步编程框架,之所以这么说,就是因为它在线程的切换方面非常方便。
介绍异步之前,我们先看看下面几个方法

subscribe(Observer<? super T> observer)
subscribe(Consumer<? super T> onNext)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe)

在基本使用中,我们用的是第一个方法,但此方法需要重写所有的事件,但有的时候我们并不需要对所有的事件进行处理,因此就有了下面几种方法,看参数我们就知道每个方法分别处理的是什么事件,比如第二个方法,只处理next事件,第三个方法,只处理next和error事件等等。

为了代码的简洁性,接下来我将使用Consumer作为观察者。

Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d("TAG","subscribe:"+Thread.currentThread().getName());
            emitter.onNext("1");
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("TAG","accept:"+Thread.currentThread().getName());
        }
});

输出结果

D/TAG: subscribe:main
D/TAG: accept:main

结果分析
如果我将上述代码,放在一个子线程中去,发现结果线程的名字将不再是main。说明在哪创建上述代码,则上游和下游就会处于那个线程,并且它们处于同一个线程。

如果我们要在子线程中发送交易,主线程更新UI,这种情况就满足不了我们的需求了。我们需要的是上线处于子线程,负责发送网络请求,下游处于主线程,负责更新UI。通过RxJava的线程调度器可以轻松实现上述需求。

在Observable中有两个方法

subscribeOn(Scheduler scheduler)  //指定上游所在的线程
observeOn(Scheduler scheduler)  //指定下游所在的线程

先来看看下面的代码

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d("TAG", "subscribe:" + Thread.currentThread().getName());
            emitter.onNext("1");
        }
    });
    Consumer<String> consumer = new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
        }
    };
//关注点
observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(consumer);

输出结果

D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:main

我们来看看Schedulers和AndroidSchedulers
这两个类并无继承关系,是相互独立的两个final类

AndroidSchedulers

/** A {@link Scheduler} which executes actions on the Android main thread.*/
public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
    if (looper == null) throw new NullPointerException("looper == null");
    return new HandlerScheduler(new Handler(looper));
}

前者指定方法在主线程中执行
后者指定方法在哪个线程执行,由Looper所在的线程决定

Schedulers
该调度器里面有下面几个主要方法

//新的子线程
public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
//计算密集型任务
public static Scheduler computation() {
    return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}
//io密集型任务
 public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}
public static Scheduler trampoline() {
    return TRAMPOLINE;
}
public static Scheduler single() {
    return RxJavaPlugins.onSingleScheduler(SINGLE);
}

其他两个暂时没用到,就先不说明了。

回到开始的异步代码
修改关注点

observable.subscribeOn(Schedulers.newThread())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .observeOn(Schedulers.io())
            .subscribe(consumer);

输出结果

D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:RxCachedThreadScheduler-2

结果分析
修改后的代码指定了2次上游发送事件的线程,下游也指定了2次线程,通过输出结果,我们可以得出结论:上游线程只有第一次指定的有效,下游线程最终会切换至最后一个指定的线程。

为了更加清晰的知道下游线程的切换过程,我们修改代码如下

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d("TAG", "subscribe:" + Thread.currentThread().getName());
            emitter.onNext("1");
            emitter.onNext("2");
            emitter.onNext("3");
        }
    });
    Consumer<String> consumer = new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
        }
    };
    observable.subscribeOn(Schedulers.newThread())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
                }
            })
            .observeOn(Schedulers.io())
            .doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
                }
            })
            .subscribe(consumer);

输出结果如下

D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:main
D/TAG: accept:2:main
D/TAG: accept:3:main
D/TAG: accept:1:RxCachedThreadScheduler-2
D/TAG: accept:1:RxCachedThreadScheduler-2
D/TAG: accept:2:RxCachedThreadScheduler-2
D/TAG: accept:2:RxCachedThreadScheduler-2
D/TAG: accept:3:RxCachedThreadScheduler-2
D/TAG: accept:3:RxCachedThreadScheduler-2

结果分析
从输出结果,我们可以看出,每一个doOnNext都会接受到全部事件,并且每一个observeOn指定的是它下面的那个事件所处的线程。

线程切换原理分析

RxJava线程切换图.png

来一段RxJava的调用链代码

Observable.just(1)
    .map(new Function<Integer, String>() {
      @Override
      public String apply(Integer integer) throws Exception {
        LogUtil.d("rxjava", "map1: " + Thread.currentThread().getId());
        return integer.toString();
      }
    })
    .subscribeOn(Schedulers.newThread()) // s1
    .map(new Function<String, Integer>() {
      @Override
      public Integer apply(String s) throws Exception {
        LogUtil.d("rxjava", "map2: " + Thread.currentThread().getId());
        return s.hashCode();
      }
    })
    .observeOn(Schedulers.newThread()) // o1
    .map(new Function<Integer, String>() {
      @Override
      public String apply(Integer integer) throws Exception {
        LogUtil.d("rxjava", "map3: " + Thread.currentThread().getId());
        return integer.toString();
      }
    })
    .subscribeOn(Schedulers.newThread()) // s2
    .observeOn(Schedulers.newThread()) // o2
    .subscribe(new Consumer<String>() {
      @Override
      public void accept(String s) throws Exception {
        LogUtil.d("rxjava", "accept: " + Thread.currentThread().getId());
      }
    });
结合图和代码来分析线程的切换过程

这段代码中包含了很多操作符,每一个点后面的都是RxJava的操作符,如just,map,subscribe等等,对应图中的lift
在这些操作符中,每调用一次操作符,都返回Observable,这就像Builder模式,只有subscribe返回的不是Observable,而是Disposable

subscribe意味着RxJava调用链开始启动,对应图中的底端的actual-subscriber。

自下而上找subscribeOn,每经过一个subscribeOn就切换一次线程(如果一个都没有,则线程默认为当前线程),直到到达顶端的Observable,对应图中的onSubscribe

自上而下找observeOn(图中的Subscribe),同样是每经过一个observeOn就切换一次线程

通过这个思路,大家想一想上述程序的打印结果
just,map1和map2处于一个线程,并且是s1所指定的线程
map3处于o1所处的线程
accept处于o2所处的线程

打印验证一下

map1: 732
map2: 732
map3: 733
accept: 734

结果一致

[]

上一篇下一篇

猜你喜欢

热点阅读