RxJava初探-Consumer理解和使用

2019-12-10  本文已影响0人  kevinsEegets

在上一章我们了解了如何如何用RxJava实现一个简单消息发送,我们先回顾一下上节的代码


 Observable.create(ObservableOnSubscribe<Int> {
            it?.onNext(1)
            it?.onNext(2)
            it?.onNext(3)
            it?.onComplete()
        }).subscribe(object: Observer<Int> {
            override fun onComplete() {
                debugMsg("onComplete")
            }

            override fun onSubscribe(d: Disposable?) {
                debugMsg("onSubscribe")
            }

            override fun onNext(value: Int?) {
                debugMsg("==",value)
            }

            override fun onError(e: Throwable?) {
                debugMsg("onError")
            }
        })

如上代码我们看到,每次我们想要通过RxJava实现一个事件时,都需要实现 onSubscribe(), onComplete() onNext(),onError() 我们实际开发中有时候是用不到这么多回调方法的, 有没有更好的方法呢?

这时候我们就可以用到Consumer和Action了

我们先看看Consumer源码的解释,很简单

/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}

我们看看Consumer源码的解释

A functional interface (callback) that accepts a single value. 只接受单个值返回接口

可以看到只有一个方法accept(T t), Consumer参数的方法表示下游只关心onNext事件, 其他的事件我们不考虑

Consumer的accept()带有一个T参数,负责接收上游发送过来的事件

我们用Consumer修改一下我们上一章最后的代码

Observable.create(ObservableOnSubscribe<Int> {
            it?.onNext(1)
            it?.onNext(2)
            it?.onNext(3)
            it?.onComplete()
        }).subscribe(object: Consumer<Int> {
            override fun accept(t: Int?) {
                debugMsg("msg--->$t")
            }
        })

日志输出

D/com.eegegts: .MainActivity$loadRxjava3_1$2--->msg--->1
D/com.eegegts: .MainActivity$loadRxjava3_1$2--->msg--->2
D/com.eegegts: .MainActivity$loadRxjava3_1$2--->msg--->3

可以看到我们的代码少了很多, 日志也输出了我们发送的事件

那问题来了,假如我现在就想用Consumer来关心完成和别的事件呢,其实也是可以的,我们看下图

2019-12-10_09-50.png

可以看到subscribe参数支持了很多, 包括我们熟悉的Consumer<T>, Consumer<Throwable>以及Action

我们再看看Action的源码解释

/**
 * A functional interface similar to Runnable but allows throwing a checked exception.
 */
public interface Action {
    /**
     * Runs the action and optionally throws a checked exception.
     * @throws Exception if the implementation wishes to throw a checked exception
     */
    void run() throws Exception;
}

可以看到有一个可以输出异常的run()方法,定义的是类似Runnable的接口

2019-12-10_10-00.png

如上图,我们可以看出此处的Action代表的是onComplete, 我们就明白了,在run()方法中可以执行完成后的操作.我们看看subscribe全部参数的使用例子

Observable.create (ObservableOnSubscribe<Int>{
            it.onNext(1)
            "".debugMsg("MainActivity", "emit 2")
            it.onNext(2)
            "".debugMsg("MainActivity","emit 3")
            it.onNext(3)
            "".debugMsg("MainActivity","emit complete")
            it.onComplete()
        }).subscribe(object :Consumer<Int> {
            override fun accept(it: Int?) {
                debugMsg("onNext", it)
            }
        }, object :Consumer<Throwable> {
            override fun accept(it: Throwable?) {
                debugMsg("throwable",it)
            }

        }, object :Action {
            override fun run() {
                debugMsg("complete")
            }
        }, object :Consumer<Disposable> {
            override fun accept(it: Disposable?) {
                debugMsg("disposable",it)
            }
        })

日志输出

D/com.eegegts: .MainActivity$loadRxjava3_2$5--->disposable null
D/com.eegegts: .MainActivity$loadRxjava3_2$2--->onNext 1
D/com.eegegts: String--->MainActivity emit 2
D/com.eegegts: .MainActivity$loadRxjava3_2$2--->onNext 2
D/com.eegegts: String--->MainActivity emit 3
D/com.eegegts: .MainActivity$loadRxjava3_2$2--->onNext 3
D/com.eegegts: String--->MainActivity emit complete
D/com.eegegts: .MainActivity$loadRxjava3_2$4--->complete

通过日志可以看出,我们的complete, disposable以及throwable都输出了,我们总结得出 Consumer其实可以做Observable的所有事情.

下一章我们将看看如何用RxJava有哪些线程以及轻松的实现线程切换~

上一篇下一篇

猜你喜欢

热点阅读