RxJava学习总结

2018-03-07  本文已影响41人  Neo_duan

四个基本概念

Observable(被观察者)
Observer(观察者)
subscribe(订阅)
事件

基本流程:Observable和Observer通过subscribe订阅,从而Observable完成
某些操作,获取结果回调触发事件通知Oberver

水管模型

作用

异步回调
简便易读的链式调用

简单使

//1.创建被观察者 -----------水管的上游
    Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter e) throws Exception {
            //do something
            e.onNext("subscribe  哈哈");
            e.onComplete();
            
            //解读
            ObservableEmitter:发射器,发出事件,同时发出onComplete和onError报异常
                Observer是要接收到onComplete或onError,将不会再次接收其他事件,即终止
        }
    });

// 2.创建观察者-----------水管下游
Observer<String> observer = new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {
        log("onSubscribe");
        
        //解读
        d.dispose(); 相当于取消队列(切换水管)。不再接收事件,
            如:Observable发送多次onNext事件,如果调用d.dispose(),则会不会接收onNext等后续事件的回调,但是Observable的事件还是正常发出来,只不过在此中断了。
    }

    @Override
    public void onNext(String str) {
        log("onNext " + str);
    }

    @Override
    public void onError(Throwable e) {
        log("onError");
    }

    @Override
    public void onComplete() {
        log("onComplete");
    }
};

//订阅------连接水管
observable.subscribe(observer);

回调事件

onSubscribe():第一个回调,相当于onStart().用于解除订阅
    执行在subscribe线程中,注意UI操作
onNext(): Observable调用onNext()时候调用
onComplete():事件队列完结时调用,当不再有onNext()事件发出时触发,队列终止
onError():事件队列异常,同时队列终止,不会有其他事件发生


注意:onComplete() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

observable.subscribe的方法说明

subscribe():不关心事件流
subscribe(Consumer):只关心onNext()方法
     Observable.just("hello", "world").subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            log(s);
            //打印
            hello
            world
        }
    });

操作符

1.just

just(T...): 将传入的参数依次发出

Observable.just("hello", "world", "abc").subscribe(new Observer<String>() {}

Log:

12-19 15:57:53.069 5586-5586/com.neo.duan.rxjava D/MainActivity:  onSubscribe 
12-19 15:57:53.069 5586-5586/com.neo.duan.rxjava D/MainActivity: onNext hello
12-19 15:57:53.069 5586-5586/com.neo.duan.rxjava D/MainActivity: onNext world
12-19 15:57:53.069 5586-5586/com.neo.duan.rxjava D/MainActivity: onNext abc
12-19 15:57:53.069 5586-5586/com.neo.duan.rxjava D/MainActivity: onComplete 

fromArray(T[]) / fromIterable(Iterable<? extends T>): 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。

依次发出数组或者集合中的对象,打印如Just操作符输出

map操作符

将上游的操作符进行转化为下游操作符
.map(new Function<Integer, String>() {
    //接收的参数Integer
    @Override
    public String apply(Integer str) throws Exception {           
         //new Function<Integer, String>
        //Integer:定义传入参数,ObservableEmitter发射器传入的参数
        //String:定义返回参数
        return "This is result :" + str;
    }
})

flatMap操作符

把一个Observable转换为另一个Observable

zip操作符

将多个Observable合成一个Obsevable输出
组合的过程是分别从 两根水管里各取出一个事件 来进行组合, 并且一个事件只能被使用一次, 
组合的顺序是严格按照事件发送的顺利 来进行的.最短的先取完并组合,多余的不发出事件

Observable observable1 = Observable.create(new ObservableOnSubscribe() {
        @Override
        public void subscribe(ObservableEmitter e) throws Exception {
            e.onNext("1");
            log("onNext1");
            e.onNext("2");
            log("onNext2");
            e.onNext("3");
            log("onNext3");
        }
    }).observeOn(Schedulers.newThread());

    Observable observable2 = Observable.create(new ObservableOnSubscribe() {
        @Override
        public void subscribe(ObservableEmitter e) throws Exception {
            e.onNext("a");
            log("onNexta");
            e.onNext("b");
            log("onNextb");
            e.onNext("c");
            log("onNextc");
        }
    }).observeOn(Schedulers.newThread());

    Observable.zip(observable1, observable2, new BiFunction<String, String, String>() {
        @Override
        public String apply(String str1, String str2) throws Exception {
            return str1 + str2;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String str) throws Exception {
            log(str);
        }
    });
    
    log:
    
    12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNext1
    12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNext2
    12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNext3
    12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNexta
    12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNextb
    12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNextc
    12-20 17:21:34.244 8092-8114/com.neo.duan.rxjava D/MainActivity: 1a
    12-20 17:21:34.244 8092-8114/com.neo.duan.rxjava D/MainActivity: 2b
    12-20 17:21:34.244 8092-8114/com.neo.duan.rxjava D/MainActivity: 3c

Scheduler调度器

Schedulers.computation():指cupu密集计算,不会被io等操作限制性能的操作,例如图形计算,不要把io操作放在里面
Schedulers.io():I/O操作(读写文件、读写数据库、网络交互)。行为i额模式
    和newThread差不多,区别是io()的内部实现是用一个无数量上限的线程池,可以重用空闲线程,所以多数情况下io/比newThread更有效率。不要把
    计算工作放在io中,可以避免创建不必要的线程
Schedulers.newThread():启用一个常规的新线程执行操作
Schedulers.single():
AndroidSchedulers.mainThread():Android UI线程

调度器使用

observable.subscribeOn(Schedulers.newThread())//observable上游发送事件线程
            .observeOn(Schedulers.io()) //下游接收事件线程
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    log(s);
                    log("accept thread:" + Thread.currentThread().getName());
                }
            });
            
subscribeO(Scheduler):多次指定上游线程,只有第一次指定有效
observeOn(Scheduler):多次指定下游线程,每调依次,线程切换一次

//TODO 源码解析

//TODO Android Studio运行Java项目

1.新建一个Android项目
2.在Android项目中 new module--选择Java Libary即可

gradle中引用RxJava,运行报错Caused by: java.lang.ClassNotFoundException: io.reactivex.ObservableOnSubscribe
解决方式:
上一篇 下一篇

猜你喜欢

热点阅读