RxJava

RxJava<第五篇>:5种被观察者的创建

2019-03-13  本文已影响1人  NoBugException

5种被观察者分别是:Observable,Flowable, Single, Completable, Maybe。
五种被观察者可通过toObservable,toFlowable,toSingle,toCompletable,toMaybe相互转换。

(1)Observable

subscribe(onNext()) 
subscribe(onNext(),onError()) 
subscribe(onNext(),onError(),onComplete())     
subscribe(onNext(),onError(),onComplete(),onSubscribe())

具体实现前几篇已经说明了,本篇就不介绍了。

(2)Flowable

可以看成是Observable的实现,只有Flowable支持压背

压背的实现会在后续章节中讲解。

(3)Single

只有onSuccess和onError回调,Single只会发射一次数据
具体实现如下:

    Single.create(new SingleOnSubscribe<CountBean>() {
        @Override
        public void subscribe(SingleEmitter<CountBean> e) throws Exception {
            if(!e.isDisposed()){
                CountBean countBean = new CountBean();
                countBean.setCount(0);
                if(countBean.getCount() == 1){
                    e.onSuccess(countBean);
                }else{
                    e.onError(new Throwable("nullpoint exception"));
                }
            }
        }
    }).subscribe(new Consumer<CountBean>() {
        @Override
        public void accept(CountBean countBean) throws Exception {
            System.out.println("count:" + countBean.getCount());
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            System.out.println("exception:" + throwable.getMessage());
        }
    });
    Single.create(new SingleOnSubscribe<CountBean>() {
        @Override
        public void subscribe(SingleEmitter<CountBean> e) throws Exception {
            if(!e.isDisposed()){
                CountBean countBean = new CountBean();
                countBean.setCount(0);
                if(countBean.getCount() == 1){
                    e.onSuccess(countBean);
                }else{
                    e.onError(new Throwable("nullpoint exception"));
                }
            }
        }
    }).subscribe(new SingleObserver<CountBean>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("被观察者和观察者开始连接!");
        }

        @Override
        public void onSuccess(CountBean countBean) {
            System.out.println("count:"+countBean.getCount());
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("exception:"+e.getMessage());
        }
    });

(4)Completable

只有onComplete和onError事件, 和Single不同, Completable不发射数据。

具体实现如下:

    Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(CompletableEmitter e) throws Exception {
            if(!e.isDisposed()){
                CountBean countBean = new CountBean();
                countBean.setCount(0);
                if(countBean.getCount() == 1){
                    e.onComplete();
                }else{
                    e.onError(new Throwable("nullpoint exception"));
                }
            }
        }
    }).subscribe(new Action() {
        @Override
        public void run() throws Exception {

        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            
        }
    });
    Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(CompletableEmitter e) throws Exception {
            if(!e.isDisposed()){
                CountBean countBean = new CountBean();
                countBean.setCount(0);
                if(countBean.getCount() == 1){
                    e.onComplete();
                }else{
                    e.onError(new Throwable("nullpoint exception"));
                }
            }
        }
    }).subscribe(new CompletableObserver() {
        @Override
        public void onSubscribe(Disposable d) {
            
        }

        @Override
        public void onComplete() {

        }

        @Override
        public void onError(Throwable e) {

        }
    });

(5)Maybe

没有onNext方法,同样需要onSuccess发射数据,且只能发射0或1个数据,多发也不再处理。

具体实现如下:

    Maybe.create(new MaybeOnSubscribe<String>() {

        @Override
        public void subscribe(MaybeEmitter<String> e) throws Exception {
            if(!e.isDisposed()){
                CountBean countBean = new CountBean();
                countBean.setCount(1);
                if(countBean.getCount() == 1){
                    e.onSuccess("aaaaa");
                }else if(countBean.getCount() == 0){
                    e.onComplete();
                }else{
                    e.onError(new Throwable("nullpoint exception"));
                }
            }
        }
    }).subscribe(new MaybeObserver<String>() {

        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(String s) {
            System.out.println("onSuccess:"+s);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("onError");
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
上一篇 下一篇

猜你喜欢

热点阅读