Android开发学习

RxJava2.x-线程调度

2018-08-22  本文已影响56人  河马过河

一、线程调度

 public Observable<Integer> getRxJavaCreateExampleData() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 1);
                emitter.onNext(1);

                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 2);
                emitter.onNext(2);
//                Thread.sleep(5000);
                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 3);
                emitter.onNext(3);
                emitter.onComplete();
                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 4);
                emitter.onNext(4);

            }
        });
    }

  public void rxJavaSchedulersExample() {
        Disposable disposable = model.getRxJavaCreateExampleData()
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-doOnNext-:" + integer);
                    }
                })
                .observeOn(Schedulers.newThread())
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-filter-:" + integer);
                        return integer > 2;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-Consumer-:" + integer);
                    }
                });
        compositeDisposable.add(disposable);
    }

日志

08-22 11:31:40.252 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:1
08-22 11:31:40.253 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:2
08-22 11:31:40.253 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-doOnNext-:1
08-22 11:31:40.254 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:3
    getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:4
08-22 11:31:40.254 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-doOnNext-:2
    rxJavaSchedulersExample--:main-doOnNext-:3
08-22 11:31:40.255 26676-26853/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:1
    rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:2
    rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:3
08-22 11:31:40.255 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-Consumer-:3

总结

1、Schedules线程:① io②newThread③computation ④single⑤trampoline
2、AndroidSchedules线程:AndroidSchedulers.mainThread()
3、subscribeOn 控制的是上游被观察者 ,调用多次,只有第一次起作用;observeOn控制的是下游观察者,可以多次调用

备注

Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程

河马过河微信公众号.jpg
上一篇下一篇

猜你喜欢

热点阅读