RxJava2基础教程一

2018-04-26  本文已影响0人  lurenjia

RxJava基本上是面试必问的东西,虽然现在的项目的里面没有用到,但是学一下这个东西是必然的,不然面试一问你这么火的RxJava是什么东西,你都说不上个123来,那面试肯定挂。

首先RxJava到底是什么呢?
a library for composing asynchronous and event-based programs by using observable sequences. 这个是官网的一句话说是一个用于可观测的序列来组成的异步的、基于事件的库。
现在有一个很火的名词,叫做响应式编程,什么是相应式编程呢? 响应式编程其实就是一种基于数据流编程概念的模式。[可以边看下这篇文章]https://www.jianshu.com/p/c95e29854cb1/)

RxJava正是基于这种模式来实现的,他可以把一些很复杂的逻辑串联成一条线,看起来很直观清楚。网上有一句话是说他可以使你的程序变得很简洁,但是随着程序业务的越来越复杂,他依然能够保持简洁。

设计模式中有一种模式叫做观察者模式,就是观察者对被观察者的某一种行为或者属性很感兴趣,当被观察者这个行为发生或者某个属性改变时观察者需要作出相应的改变。

RxJava真是基于这种设计模式而来。
学习Rxjava先从基本的学起,Observer观察者,Observable被观察者, subscribe注册,还有一个Flowable这个是Rxjava2中才有的跟被压有关,这个以后再说。
先从基本的学起观察者,被观察者与注册之间的关系:看代码


 Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onNext("4");
                emitter.onComplete();
            }
        });

        Observer observer1 = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {

                Log.i(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Object o) {

                Log.i(TAG, "onNext: " + o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete: " );
            }
        };

        observable.subscribe(observer1);

运行起来:

onSubscribe: 
onNext: 1
onNext: 2
onNext: 3
onNext: 4

看起来其实挺长的,可以直接用流式一下写下来:

 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onNext("4");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                
            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

其实是一样的。上面的代码每次next都会调用观察者中的onNext,complete与之对应,还有error也可以调用但是不能和complete同时调用,在被观察者中抛异常也会调用到error中。 看什么的方法其实很啰嗦,其实不是每一次都要实现那么多没有用的方法,Rxjava中提供了一些更简洁的方法,比如你可能只需要实现onNext方法就行:


        Observable.create(new ObservableOnSubscribe<String>() {
             @Override
             public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                 emitter.onNext("1");
             }
         }).subscribe(new Consumer<String>() {
             @Override
             public void accept(String s) throws Exception {

                 Log.i(TAG, "accept: s = " + s);
             }
         });
         结果
         : accept: s = 1

当然你也可以直接在subscribe中在直接new Consumer来表示可能的error。

不知道你有没有注意到在程序中都是被观察者subscribe的观察者,感觉从观察者模式上来说好像应该是观察者注册被观察者才对,其实从响应式编程来说,在一条河流中,上游发生了下游作出相应好像更符合逻辑,哈哈 不知道对不对。
下面说下线程调度,在Android开发中都知道不能在UI线程中做耗时的操作,正常的开发中都是开一个子线程来做耗时操作然后使用Handler机制来更新UI,这样做很麻烦,搞不好还可能内存泄漏,RxJava给我们提供了更方便的机制来切换线程,这就用到了subscribeOn 和observeOn。subscribeOn 表示注册时的线程其实就上又的线程,observerOn表示的是下游的线程。

Observable.create(new ObservableOnSubscribe<String>() {
             @Override
             public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                 Log.i(TAG, "subscribe: currentThread =" + Thread.currentThread().getName());
                 emitter.onNext("1");
             }
         }).subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<String>() {
             @Override
             public void accept(String s) throws Exception {
                 Log.i(TAG, "accept: currentThread = " + Thread.currentThread().getName());
                 Log.i(TAG, "accept: s = " + s);
             }
         });
    运行结果:
     subscribe: currentThread =RxNewThreadScheduler-1
     accept: currentThread = main
     accept: s = 1
         

其中SchedulersRxjava提供给我们的一些线程的调度者, 它包含如下几个:

Schedulers.newThread(): 开启一个新的线程

Schedulers.io(): io线程,比如读写文件,数据库,访问后台等

Schedulers.computation(): 这个用于大量计算,比如说压缩图片

Schedulers.trampoline(): 在当前的线程立即执行任务,会停掉当前线程执行的其他任务

Schedulers.single():说是拥有一个线程单例,具体不知道是干啥的没用过

还有最后一个 AndroidSchedulers.mainThread() 主线程。这个需要引入单独的库

上面我们是从Observable.create开始的,RxJava还给我们提供了很多用于各种场景的API,下面一一介绍,最后介绍RxJava的应用场景,最后再分析一下他的源码。

上一篇下一篇

猜你喜欢

热点阅读