RxJava2.X版本-zip语法

2018-08-08  本文已影响547人  河马过河

一:zip语法练习

  public Observable<Integer> getRxJavaCreateExampleData() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 1);
                emitter.onNext(1);

                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 2);
                emitter.onNext(2);

                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 3);
                emitter.onNext(3);
                emitter.onComplete();
                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 4);
                emitter.onNext(4);

            }
        });
    }
 public Observable<String> getRxJavaStringData() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                LogUtils.debug(TAG, "getRxJavaStringData---:" + Thread.currentThread().getName() + "--:A");
                emitter.onNext("A");
                LogUtils.debug(TAG, "getRxJavaStringData---:" + Thread.currentThread().getName() + "--:B");
                emitter.onNext("B");
                LogUtils.debug(TAG, "getRxJavaStringData---:" + Thread.currentThread().getName() + "--:C");
                emitter.onNext("C");
            }
        });
    }
    public void rxJavaZipExample() {
        Disposable subscribe = io.reactivex.Observable.zip(model.getRxJavaCreateExampleData(), model.getRxJavaStringData(), new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                LogUtils.error(TAG, "rxJavaZipExample--zip--:" + Thread.currentThread().getName() + "--:" + integer + "--:" + s);
                return "this is zip method --:" + integer + "--:" + s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                LogUtils.error(TAG, "rxJavaZipExample--Consumer--:" + Thread.currentThread().getName() + "--:" + s);
            }
        });
        compositeDisposable.add(subscribe);

    }

日志

08-08 14:34:07.429 18625-18625/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:main--:1
08-08 14:34:07.430 18625-18625/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:main--:2
    getRxJavaCreateExampleData---:main--:3
    getRxJavaCreateExampleData---:main--:4
    getRxJavaStringData---:main--:A
08-08 14:34:07.430 18625-18625/com.example.zhang E/MainPresenter: rxJavaZipExample--zip--:main--:1--:A
    rxJavaZipExample--Consumer--:main--:this is zip method --:1--:A
08-08 14:34:07.430 18625-18625/com.example.zhang D/MainModel: getRxJavaStringData---:main--:B
08-08 14:34:07.430 18625-18625/com.example.zhang E/MainPresenter: rxJavaZipExample--zip--:main--:2--:B
    rxJavaZipExample--Consumer--:main--:this is zip method --:2--:B
08-08 14:34:07.431 18625-18625/com.example.zhang D/MainModel: getRxJavaStringData---:main--:C
08-08 14:34:07.431 18625-18625/com.example.zhang E/MainPresenter: rxJavaZipExample--zip--:main--:3--:C
    rxJavaZipExample--Consumer--:main--:this is zip method --:3--:C

总结

1、组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的,可以看到,1 永远是和 A 结合的,2 永远是和 B 结合的
2、最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同
二、线程调度

1、线程调度

    public void rxJavaZipExample() {
        Disposable subscribe = io.reactivex.Observable.zip(model.getRxJavaCreateExampleData().subscribeOn(Schedulers.io()), model.getRxJavaStringData().subscribeOn(Schedulers.io()), new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                LogUtils.error(TAG, "rxJavaZipExample--zip--:" + Thread.currentThread().getName() + "--:" + integer + "--:" + s);
                return "this is zip method --:" + integer + "--:" + s;
            }
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        LogUtils.error(TAG, "rxJavaZipExample--Consumer--:" + Thread.currentThread().getName() + "--:" + s);
                    }
                });
        compositeDisposable.add(subscribe);

    }

日志

08-08 14:47:33.312 26412-26827/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:1
08-08 14:47:33.312 26412-26828/com.example.zhang D/MainModel: getRxJavaStringData---:RxCachedThreadScheduler-2--:A
    getRxJavaStringData---:RxCachedThreadScheduler-2--:B
    getRxJavaStringData---:RxCachedThreadScheduler-2--:C
08-08 14:47:33.312 26412-26827/com.example.zhang E/MainPresenter: rxJavaZipExample--zip--:RxCachedThreadScheduler-1--:1--:A
08-08 14:47:33.312 26412-26827/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:2
08-08 14:47:33.313 26412-26827/com.example.zhang E/MainPresenter: rxJavaZipExample--zip--:RxCachedThreadScheduler-1--:2--:B
08-08 14:47:33.313 26412-26827/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:3
08-08 14:47:33.313 26412-26827/com.example.zhang E/MainPresenter: rxJavaZipExample--zip--:RxCachedThreadScheduler-1--:3--:C
08-08 14:47:33.313 26412-26827/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:4
08-08 14:47:33.314 26412-26412/com.example.zhang E/MainPresenter: rxJavaZipExample--Consumer--:main--:this is zip method --:1--:A
    rxJavaZipExample--Consumer--:main--:this is zip method --:2--:B
    rxJavaZipExample--Consumer--:main--:this is zip method --:3--:C
河马过河微信公众号.jpg
上一篇下一篇

猜你喜欢

热点阅读