RxJava系列专题(Android方向)Android开发经验谈Android开发

RxJava操作符(二)

2017-09-30  本文已影响155人  小白要超神

注:只包含标准包中的操作符,用于个人学习及备忘
参考博客:http://blog.csdn.net/maplejaw_/article/details/52396175

本篇将介绍rxjava中的错误处理/重试机制、连接操作、阻塞操作以及工具集的使用,只针对用法不涉及原理,对RxJava不熟悉的可参考:http://gank.io/post/560e15be2dca930e00da1083

错误处理 / 重试机制

连接操作

//创建缓存2个数据的可连接的Observable
private ConnectableObservable<Long> relayCountObserver() {
    return Observable.interval(1, TimeUnit.SECONDS)
            .observeOn(Schedulers.newThread())
            .take(6)
            .replay(2);
}

//创建缓存3秒前的数据的可连接的Observable
private ConnectableObservable<Long> relayTimeObserver() {
    return Observable.interval(1, TimeUnit.SECONDS)
            .observeOn(Schedulers.newThread())
            .take(6)
            .replay(3, TimeUnit.SECONDS);
}

//创建缓存3秒前的最后2个数据的可连接的Observable
private ConnectableObservable<Long> relayTimeAndCountObserver() {
    return Observable.interval(1, TimeUnit.SECONDS)
            .observeOn(Schedulers.newThread())
            .take(6)
            .replay(2, 3, TimeUnit.SECONDS);
}

//创建两个观察者对可连接的Obserable进行订阅,第一个观察者直接订阅,
// 第二个观察者在Observable发射4个数据后再进行订阅
private void subscribe(ConnectableObservable cb) {
    Action1<Long> action2 = new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            Log.d("debug", "action2:" + aLong);
        }
    };

    Action1<Long> action1 = new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            Log.d("debug", "action1:" + aLong);
            if (aLong == 3) {   //发射第4个数据时(即过了4秒),action2开始订阅
                cb.subscribe(action2);
            }
        }
    };

    cb.subscribe(action1);
    cb.connect();
}

    //创建不同的可连接的Obserable进行订阅,并观察打印结果
    //订阅缓存两个数据的可连接的Observable
    subscribe(relayCountObserver());
    /**
     * action1:打印0,1,2,3,4,5
     * action2:打印2,3,4,5(action2订阅前已经发射了4个数据,即0,1,2,3,
     * 其中2,3被可连接的Observable缓存,因此可获取到)
     */

    //订阅缓存3秒的可连接的Observable
    subscribe(relayTimeObserver());
    /**
     * action1:打印0,1,2,3,4,5
     * action2:打印1,2,3,4,5(action2订阅前已经发射了4秒,可连接的Observable缓存了3秒内的数据,
     * 即1,2,3,因此可获取到
     */

    //订阅缓存3秒前最后2个数据的可连接的Observable
    subscribe(relayTimeAndCountObserver());
    /**
     * action1:打印0,1,2,3,4,5
     * action2:打印2,3,4,5(action2订阅前已经发射了4秒,3秒内发射的数据为1,2,3,最后2位数据被
     * Observable缓存,即2,3,因此可获取到
     */

阻塞操作

BlockingObservable:一个阻塞的Observable,普通的Observable可使用Observable.toBlocking( )方法或者BlockingObservable.from( )方法转换成BlockingObservable

*first / firstOrDefault / last / lastOrDefault:这几个操作符的用法与在普通的Observable中用法相同,用于阻塞操作时会直接返回发射的数据(使用first或last时,找不到指定元素会报异常)

    Integer first = just(1, 2, 3).toBlocking().first();
    Log.d("debug", first.toString());   //打印1

    Integer first1 = just(1, 2, 3).toBlocking().first(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            return integer > 2;
        }
    });
    Log.d("debug", first1.toString());  //打印3

    Integer first3 = Observable.just(1, 2, 3).toBlocking().firstOrDefault(4, new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            return integer > 3;
        }
    });
            
    Log.d("debug", first3.toString());  //打印4

    //last与lastOrDefault也是一样,不重复了

工具集

最后

Rxjava标准库的操作符已经介绍完毕

上一篇下一篇

猜你喜欢

热点阅读