RxJava

RxJava<第二篇>:Consumer和Actio

2019-03-12  本文已影响23人  NoBugException

第一篇我们讲到创建观察者的代码如下:

//创建观察者
Observer<String> observer = new Observer<String>(){

  @Override
  public void onSubscribe(Disposable d) {
      System.out.println("开始采用subscribe连接!");
  }

  @Override
  public void onNext(String value) {
      System.out.println("对Next事件作出响应:"+value);
  }

  @Override
  public void onError(Throwable e) {
      System.out.println("对Error事件作出响应!");
  }

  @Override
  public void onComplete() {
      System.out.println("事件执行完毕!");
  }
};

代码中有四个回调方法,但是我们不一定会用到所有的回调方法,那么怎么才能让代码更加简洁呢?

这样我们就会用到Consumer和Action了。
以下是subscribe中可用参数的截图。

图片.png

(1)subscribe()

不带任何参数,也就是说观察者没有任何回调。

(2)subscribe(Observer<? super T> observer)

将Observer作为参数,它有四个回调方法,文章开头就说明了。

(3)subscribe(Consumer<? super T> onNext)

将Consumer作为参数,Consumer中有个回调方法accept,accept带有一个参数,接收被观察者发射过来的数据

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            emitter.onNext("hello word A");
            emitter.onNext("hello word B");
            emitter.onNext("hello word C");
            emitter.onComplete();
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

当被观察者发射onNext时,accept将被执行。

(4)subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

带有两个Consumer参数,分别负责onNext和onError的回调。

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            emitter.onNext("hello word A");
            emitter.onNext("hello word B");
            emitter.onNext("hello word C");
            emitter.onError(new Throwable("this is nullpointException"));
            emitter.onComplete();
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            System.out.println(throwable.getMessage());
        }
    });

执行效果:

图片.png

(5)subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)

带有三个参数,分别负责onNext、onError和onComplete的回调。

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            emitter.onNext("hello word A");
            emitter.onNext("hello word B");
            emitter.onNext("hello word C");
            //emitter.onError(new Throwable("this is nullpointException"));
            emitter.onComplete();
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            System.out.println(throwable.getMessage());
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("onComplete");
        }
    });

执行效果:

图片.png

(6)subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            emitter.onNext("hello word A");
            emitter.onNext("hello word B");
            emitter.onNext("hello word C");
            //emitter.onError(new Throwable("this is nullpointException"));
            emitter.onComplete();
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            System.out.println(throwable.getMessage());
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("onComplete");
        }
    }, new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("观察者和被观察者已被连接,disposable可以立即中断连接!");
        }
    });

执行效果:

图片.png

想要非常清楚Consumer和Action的用法,必须了解他们是什么东西?他们是干什么的?

(1)他们是什么?

他们都是接口,源码如下

public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}
public interface Action {
    /**
     * Runs the action and optionally throws a checked exception.
     * @throws Exception if the implementation wishes to throw a checked exception
     */
    void run() throws Exception;
}

(2)他们是干什么的?

他们都是为了触发回调的,Consumer自带一个参数,Action不带参数。

当被观察者发射onNext时,由于onNext带有参数,所有使用Consumer;
当被观察者发射onComplete时,由于onComplete不带参数,所以使用Action;

上一篇 下一篇

猜你喜欢

热点阅读