Rxjava相关Android收藏Retrofit RXjava 网络框架

RxJava 从入门到放弃再到不离不弃

2017-05-31  本文已影响1602人  戴定康

作者寄语

很久之前就想写一个专题,专写Android开发框架,专题的名字叫 XXX 从入门到放弃 ,沉淀了这么久,看过网络诸多大神的博客,静下心来开始写这个专题,为什么叫入门到放弃呢;相信大家学习新框架的时候,尤其是像Rxjava或者Dagger等等这种新的编程思想;需要一定的阅读理解能力和思维逻辑;那么本专题旨在帮助大家不要太过急功近利,不要被冗长的代码和文章,晦涩的思想所打败,相信大家只要坚持看完,一定会有所收获的;废话不多说,那么这个专题开篇就以RxJava来讲吧,预计后面还会有几篇大型框架的讲解,想想还有点小激动;

友情提示:文章较长,请耐心看完;

使用下面方式,最后发现“OnSubscribe”还是在默认线程中运行;原因是subscribeOn这类操作后,返回的是一个新的Observable。

observable.subscribeOn(Schedulers.io());
observable.observeOn(AndroidSchedulers.mainThread());
observable .subscribe(subscribe);

可以修改为下面两种方式:

observable = observable.subscribeOn(Schedulers.io());
observable = observable.observeOn(AndroidSchedulers.mainThread());
observable .subscribe(subscribe);
//OR
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscribe);

前面讲到了,可以利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map() flatMap() 等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?

答案是:能。
因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。上代码:

    Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .map(mapOperator) // 新线程,由 observeOn() 指定
        .observeOn(Schedulers.io())
        .map(mapOperator2) // IO 线程,由 observeOn() 指定
        .observeOn(AndroidSchedulers.mainThread) 
        .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。

不过,不同于 observeOn()subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn() 呢?会有什么效果?

这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。

Scheduler 的原理

其实, subscribeOn()observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程):

subscribeOn()原理图:

observeOn() 原理图:

从图中可以看出,subscribeOn()observeOn() 都做了线程切换的工作(图中的 "schedule..." 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。

最后,我用一张图来解释当多个 subscribeOn()observeOn() 混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整):

图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 onserveOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。

延伸:doOnSubscribe()

doOnSubscribe()一般用于执行一些初始化操作.

然而,虽然超过一个的 subscribeOn() 对事件处理的流程没有影响,但在流程之前却是可以利用的。

在前面讲 Subscriber 的时候,提到过 SubscriberonStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。

而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

示例:

    Observable.create(onSubscribe)
        .subscribeOn(Schedulers.io())
        .doOnSubscribe(new Action0() {
            @Override
            public void call() {
                progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
            }
        })
        .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subscriber);

如上,在 doOnSubscribe() 的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。

RxJava 的适用场景和使用方式

RxJava + Retrofit

Retrofit 是 Square 的一个著名的网络请求库。对于Retrofit不了解的同学
可以参考我之前写的文章:全新的网络加载框架Retrofit2,上位的小三

Retrofit 除了提供了传统的 Callback 形式的 API,还有 RxJava 版本的 Observable 形式 API。下面我用对比的方式来介绍 RetrofitRxJavaAPI 和传统版本的区别。

以获取一个 MovieEntity 对象的接口作为例子。使用Retrofit 的传统 API,你可以用这样的方式来定义请求:

    @GET("top250")
    Call<MovieEntity> getTopMovie(@Query("start") int start, @Query("count") int count);//正常返回Call对象

我们来写getMovie方法的代码:

    //进行网络请求
    private void getMovie(){
        String baseUrl = "https://api.douban.com/v2/movie/";
    
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl(baseUrl)
                .addConverterFactory(GsonConverterFactory.create())
                .build();
    
        MovieService movieService = retrofit.create(MovieService.class);
        Call<MovieEntity> call = movieService.getTopMovie(0, 10);
        call.enqueue(new Callback<MovieEntity>() {
            @Override
            public void onResponse(Call<MovieEntity> call, Response<MovieEntity> response) {
                resultTV.setText(response.body().toString());
            }
    
            @Override
            public void onFailure(Call<MovieEntity> call, Throwable t) {
                resultTV.setText(t.getMessage());
            }
        });
    }

以上为没有经过封装的、原生态的Retrofit写网络请求的代码。

而使用 RxJava 形式的 API,定义同样的请求是这样的:

    @GET("top250")
    Observable<MovieEntity> getTopMovie(@Query("start") int start, @Query("count") int count);//RxJava返回Observable对象

Retrofit本身对Rxjava提供了支持,getMovie方法改为:

    //进行网络请求
    private void getMovie(){
        String baseUrl = "https://api.douban.com/v2/movie/";
    
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl(baseUrl)
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())//提供RXjava支持
                .build();
    
        MovieService movieService = retrofit.create(MovieService.class);
    
        movieService.getTopMovie(0, 10)//返回Observable对象
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<MovieEntity>() {
                    @Override
                    public void onCompleted() {
                        Toast.makeText(MainActivity.this, "Get Top Movie Completed", Toast.LENGTH_SHORT).show();
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        resultTV.setText(e.getMessage());
                    }
    
                    @Override
                    public void onNext(MovieEntity movieEntity) {
                        resultTV.setText(movieEntity.toString());
                    }
                });
    }

这样基本上就完成了RetrofitRxjava的结合,大家可以自己进行封装;那么用上了RxJava,我们就可以用它强大的操作符来对数据进行处理和操作,各位看官可以具体去实现,我在这里不做多做赘述。

参考文章:RxJava 与 Retrofit 结合的最佳实践

RxBinding

RxBindingJake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJavaBinding API。所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这样的注册绑定对象的 API

举个设置点击监听的例子。使用 RxBinding ,可以把事件监听用这样的方法来设置:

Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件
    .subscribe(new Action1<ViewClickEvent>() {
        @Override
        public void call(ViewClickEvent event) {
            // Click handling
        }
    });

看起来除了形式变了没什么区别,实质上也是这样。甚至如果你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener() 来实现的。然而,仅仅这一个形式的改变,却恰好就是 RxBinding 的目的:扩展性。通过 RxBinding 把点击监听转换成 Observable 之后,就有了对它进行扩展的可能。扩展的方式有很多,根据需求而定。一个例子是前面提到过的 throttleFirst() 操作符,用于去抖动,也就是消除手抖导致的快速连环点击:

RxView.clickEvents(button)
    .throttleFirst(500, TimeUnit.MILLISECONDS)
    .subscribe(clickAction);

如果想对 RxBinding 有更多了解,可以去它的 GitHub 项目 下面看看。

RxLifecyle

RxLifecycle 配合 Activity/Fragment 生命周期来管理订阅的。 由于 RxJava Observable 订阅后(调用 subscribe 函数),一般会在后台线程执行一些操作(比如访问网络请求数据),当后台操作返回后,调用 ObserveronNext 等函数,然后在 更新 UI 状态。 但是后台线程请求是需要时间的,如果用户点击刷新按钮请求新的微博信息,在刷新还没有完成的时候,用户退出了当前界面返回前面的界面,这个时候刷新的 Observable 如果不取消订阅,则会导致之前的 Activity 无法被 JVM 回收导致内存泄露。 这就是 Android 里面的生命周期管理需要注意的地方,RxLifecycle 就是用来干这事的。比如下面的示例:

myObservable
    .compose(RxLifecycle.bindUntilEvent(lifecycle, ActivityEvent.DESTROY))
    .subscribe();

这样Activitydestroy的时候就会自动取消这个observer

RxBus

RxBus并不是一个库,而是一种模式。相信大多数开发者都使用过EventBus或者Otto,作为事件总线通信库,如果你的项目已经加入RxJavaEventBus,不妨用RxBus代替EventBus,以减少库的依赖。RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式。

拓展链接:
用RxJava实现事件总线(Event Bus)
[深入RxBus]:支持Sticky事件

RxPermission

RxPermission是基于RxJava开发的用于帮助在Android 6.0中处理运行时权限检测的框架。在Android 6.0中,系统新增了部分权限的运行时动态获取。而不再是在以前的版本中安装的时候授予权限。

拓展链接:
使用RxPermission框架对android6.0权限进行检测

总结

简而言之Rxjava是一个很牛逼的库,如果你的项目中还没有使用RxJava的话,建议可以尝试去集成使用;对大多数人而已RxJava是一个比较难上手的库了,不亚于Dagger的上手难度;不过当你认识学习使用过了,你就会发现RxJava的魅力所在;如果看一遍没有看懂的童鞋,建议多看几次;动手写写代码,我想信本文可以给到你们一些帮助;你们真正的体会到什么是 从入门到放弃再到不离不弃 ;这就是RxJava的魅力所在。

拓展阅读:

我所理解的RxJava——上手其实很简单

深入浅出RxJava - 大头鬼

给 Android 开发者的 RxJava 详解 - 抛物线

如果有疑问和见解,也欢迎大家在下面留言,我会一一回复大家

以上

上一篇下一篇

猜你喜欢

热点阅读