RxJava

2017-06-12  本文已影响0人  allencaicai

http://www.jianshu.com/p/fe08ce770c15

什么是Rx

Rx是响应式编程的意思,本质上就是观察者设计模式,是以观察者(Observer)和订阅者(Subscriber)为基础的异步响应方式

在Android编程的时候,经常使用后台线程,那么就可以使用这种方式,能够使得逻辑比较清晰明了(有的人说会增加好多的代码,但是我觉得代码的链式结构让代码看起来更加简洁明了)

Rx模式以及有点

优势一

创建:Rx可以方便的创建事件流和数据流

组合:Rx使用查询式的操作符合组合和变换数据流

监听:Rx可以订阅任何可观察的数据量并执行操作

优势二(简化代码)

函数式风格:对可观察数据流使用无副作用的输入流输出函数,避免了程序里面的错综复杂的状态

简化代码:Rx的操作符通常可以将复杂的难题简化成很少的几行代码(配合lambda表达式还能简化)

异步错误处理:Rx提供了何时的错误处理机制

轻松使用并发:Rx的Observables和Schedulers让开发着可以很方便的切换UI线程和子线程,摆脱底层的线程同步和各种并发问题

响应式编程

Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和符合变得非常高效。

你可以把Observable当做Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好,使用Observable,在数据准备好的时候,生产者将数据推送给消费者,数据可以同步或者异步的到达,方式更加灵活。

RxJava观察者模式

需求:A对象(观察者)对B对象(被观察者)的某种变化高度敏感,需要在B变化的一瞬间做出反应。

RxJava四个基本概念

Observable(被观察者)

Observer(观察者)

subscribe(订阅)

事件

Observable和Observer通过subscribe()方法实现订阅的关系,从而Observable可以在需要的时候发出事件来通知Observer。

手动实现观察者模式

首先我们需要有观察者和被观察者。

被观察者接口(里面简单的定义添加观察者,移除观察者,通知观察者三个方法)

publicinterfaceWatched{//添加观察者publicvoidaddWatcher(Watcher watcher);//移除观察者publicvoidremoveWatcher(Watcher watcher);//通知观察者publicvoidnotifyWathers(String str);}

观察者接口(定义更新的方法)

publicinterfaceWatcher{//数据变化进行更新publicvoidupdate(Stringstr);}

被观察者实现类

publicclassConcreteWathedimplementsWatched{//观察者List mList =newArrayList<>();@OverridepublicvoidaddWatcher(Watcher watcher){        mList.add(watcher);    }@OverridepublicvoidremoveWatcher(Watcher watcher){        mList.remove(watcher);    }@OverridepublicvoidnotifyWathers(String str){for(Watcher w : mList) {            w.update(str);        }    }}

观察者实现类

publicclassConcreteWatherimplementsWatcher{    @Overridepublicvoidupdate(Stringstr) {        System.out.println(str);    }}

测试类

publicstaticvoid main(String[] args){        Watched watched =newConcreteWathed();        Watcher watcher1 =newConcreteWather();        Watcher watcher2 =newConcreteWather();        Watcher watcher3 =newConcreteWather();        watched.addWatcher(watcher1);        watched.addWatcher(watcher2);        watched.addWatcher(watcher3);        watched.notifyWathers("I go");    }

输出结果

IgoIgoIgo

当然了,这只是简单的实现,只要晓得原理就行,除了自己实现,官方也给我们提供了观察者与被观察者接口。只要我们去实现接口就可以了。

利用系统提供的类和接口实现观察者模式

被观察者

publicclassXTObservableextendsObservable{privateintdata =0;publicintgetData(){returndata;    }publicvoidsetData(inti){if(this.data != i){this.data = i;            setChanged();//发生改变notifyObservers();//通知观察者}    }}

观察者

publicclassXTobserverimplementsObserver{publicXTobserver(XTObservable observable){        observable.addObserver(this);    }@Overridepublicvoidupdate(Observable observable, Object o){        System.out.println("data is changed"+ ((XTObservable) observable).getData());    }}

测试类

publicclassTest{publicstaticvoid main(String[] args) {        XTObservable mObservable =newXTObservable();        XTobserver mXTobserver =newXTobserver(mObservable);        mObservable.setData(1);        mObservable.setData(2);        mObservable.setData(3);    }}

输出结果

datais changed1datais changed2datais changed3

上面已经手动实现观察者模式和通过系统提供类实现,当然这都不是重点,重点是Rx响应式编程

RxAndroid使用

一:使用前配置

在项目工程的build.gradle文件添加这样的一句话(如果使用lambda)

classpath'me.tatarka:gradle-retrolambda:2.5.0'(这一句在gradle版本下面紧接着)

在该module工程的build.gradle文件中添加

applyplugin:'me.tatarka.retrolambda'(使用lambda)在文件的第二行

在buildTypes节点的下(不是节点内)添加下面一句

compileOptions {        sourceCompatibility JavaVersion.VERSION_1_8targetCompatibility JavaVersion.VERSION_1_8}

然后在依赖中添加下面几句(没有提示一定添加的可以根据自己选择性添加)

//rx一定添加compile'io.reactivex:rxjava:1.1.0'compile'io.reactivex:rxandroid:1.1.0'compile'com.google.code.gson:gson:2.4'compile'com.jakewharton:butterknife:7.0.1'compile'com.squareup.picasso:picasso:2.5.2'//添加compile'com.squareup.okhttp3:okhttp:3.+'

至此,使用环境已经配置好了,接下来我们来简单的使用一下。

利用create创建来使用Rx

/**

* 使用create方式

*/publicstaticvoidcreateObserable(){//定义被观察者Observable observable = Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){if(!subscriber.isUnsubscribed()) {//观察者和被观察者还有订阅消息subscriber.onNext("hello");//返回的数据subscriber.onNext("hi");                    subscriber.onNext(getUserName());//因为是传入的是字符串泛型subscriber.onCompleted();//完成}            }        });//定义观察者Subscriber showSub =newSubscriber() {@OverridepublicvoidonCompleted(){                Log.i(TAG,"onCompleted");//用于对话框消失}@OverridepublicvoidonError(Throwable e){                Log.i(TAG, e.getMessage());//错误处理}@OverridepublicvoidonNext(Object o){                Log.i(TAG, o.toString());            }        };        observable.subscribe(showSub);//两者产生订阅}/**    * 可以用来写成我们的下载返回数据    *    *@return*/publicstaticStringgetUserName(){return"jsonName";    }

在主activity中调用,我们来看下控制台输出的结果:

也是一个测试,打印

/**

* 打印的功能  链式结构,更加易于代码的可毒性

*/publicstaticvoidcreatePrint(){        Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){if(!subscriber.isUnsubscribed()) {for(inti =0; i <10; i++) {                        subscriber.onNext(i);                    }                    subscriber.onCompleted();                }            }        }).subscribe(newSubscriber() {@OverridepublicvoidonCompleted(){                Log.i(TAG,"onCompleted");            }@OverridepublicvoidonError(Throwable e){                Log.i(TAG, e.getMessage());            }@OverridepublicvoidonNext(Integer integer){                Log.i(TAG,"result--->:"+ integer);            }        });    }

看下控制台结果

from函数

/**

* 使用在被观察者,返回的对象一般都是数据类型

* 它接收一个集合作为输入,然后每次输出一个元素给subscriber

*/publicstaticvoidfrom(){        Integer[] items = {1,2,3,4,5,6,7,8};        Observable onservable = Observable.from(items);        onservable.subscribe(newAction1() {            @Overridepublicvoidcall(Object o){                Log.i(TAG, o.toString());            }        });    }

控制台结果

interval函数

/**

* 指定某一时刻进行数据发送

* interval()函数的两个参数:一个指定两次发射的时间间隔,另一个是用到的时间单位

*/publicstaticvoidinterval(){        Integer[] items = {1,2,3,4};        Observable observable = Observable.interval(1,1, TimeUnit.SECONDS);        observable.subscribe(newAction1() {@Overridepublicvoidcall(Object o){                Log.i(TAG, o.toString());            }        });    }

just函数

/**

* 假如我们只有3个独立的AppInfo对象并且我们想把他们转化为Observable并填充到RecyclerView的item中:

* 这里我们有两个数组,然后通过转化为Observable组成一个item

*/publicstaticvoidjust(){        Integer[] items1 = {1,2,3,4};        Integer[] items2 = {2,4,6,8};        Observable observable = Observable.just(items1, items2);        observable.subscribe(newSubscriber() {@OverridepublicvoidonCompleted(){                Log.i(TAG,"onCompleted");            }@OverridepublicvoidonError(Throwable e){                Log.i(TAG, e.getMessage());            }@OverridepublicvoidonNext(Integer[] integers){for(inti =0; i < integers.length; i++) {                    Log.i(TAG,"result--->"+ i);                }            }        });    }

输出结果:

range函数

/**

* 指定输出数据的范围

*/publicstaticvoidrange() {        Observable observable = Observable.range(1,4);        observable.subscribe(newSubscriber() {            @OverridepublicvoidonCompleted() {Log.i(TAG,"onCompleted");            }            @OverridepublicvoidonError(Throwable e) {Log.i(TAG, e.getMessage());            }            @OverridepublicvoidonNext(Integero) {Log.i(TAG,"next---->"+ o);            }        });    }

输出结果:

filter函数

/**

* 使用过滤功能  发送消息的时候,先过滤在发送

*/publicstaticvoidfilter() {        Observable observable = Observable.just(1,2,3,4,5,6);        observable.filter(newFunc1() {            @OverridepublicBooleancall(Integero) {returno <5;            }        }).observeOn(Schedulers.io()).subscribe(newSubscriber() {            @OverridepublicvoidonCompleted() {Log.i(TAG,"onCompleted");            }            @OverridepublicvoidonError(Throwable e) {Log.i(TAG, e.getMessage());            }            @OverridepublicvoidonNext(Object o) {Log.i(TAG, o.toString());            }        });    }

输出结果:

好了,几个常用到的函数已经介绍完了,接下来就用几个例子来说验证一下吧。

使用Rx+OkHttp下载图片

Rx下载的封装

/**

* 声明一个被观察者对象,作为结果返回

*/publicObservable downLoadImage(String path) {returnObservable.create(newObservable.OnSubscribe(){@Overridepublicvoidcall(Subscriber subscriber){if(!subscriber.isUnsubscribed()) {//存在订阅关系//访问网络操作//请求体Request request =newRequest.Builder().url(path).get().build();//异步回调mOkHttpClient.newCall(request).enqueue(newCallback() {@OverridepublicvoidonFailure(Call call, IOException e){                            subscriber.onError(e);                        }@OverridepublicvoidonResponse(Call call, Response response)throwsIOException{if(response.isSuccessful()) {byte[] bytes = response.body().bytes();if(bytes !=null) {                                    subscriber.onNext(bytes);//返回结果}                            }                            subscriber.onCompleted();//访问完成}                    });                }            }        });    }

在使用的时候调用

//使用HTTP协议获取数据mUtils.downLoadImageOne(url)                    .subscribeOn(Schedulers.io())//在子线程请求.observeOn(AndroidSchedulers.mainThread())//结果返回到主线程这一步很厉害啊,不用我们去用handler或者async切换线程了// 主要我们去调用一下代码,就已经帮我们切换好了线程,是不是感觉有点很厉害啊.subscribe(newSubscriber() {@OverridepublicvoidonCompleted(){                    Log.i(TAG,"onCompleted");//对话框消失}@OverridepublicvoidonError(Throwable e){                    Log.i(TAG,e.getMessage());                }@OverridepublicvoidonNext(byte[] bytes){                    Bitmap bitmap = BitmapFactory.decodeByteArray(bytes,0,bytes.length);                    mImageView.setImageBitmap(bitmap);                }            });

Rx+okhttp实现登录

/**

*

* @param url  登录地址

* @param params  请求参数

* @return  后台返回的数据

*/publicObservable login(Stringurl,Mapparams) {returnObservable.create((Observable.OnSubscribe) subscriber -> {if(!subscriber.isUnsubscribed()) {//创建formbodyFormBody.Builder builder =newFormBody.Builder();if(params!=null&& !params.isEmpty()) {//循环获取body中的数据for (Map.Entry entry :params.entrySet()) {                        builder.add(entry.getKey(), entry.getValue());                    }                }//请求体RequestBody requestBody = builder.build();                Request request =newRequest.Builder().url(url).post(requestBody).build();                mOkHttpClient.newCall(request).enqueue(newCallback() {                    @OverridepublicvoidonFailure(Call call, IOException e) {                        subscriber.onError(e);                    }                    @OverridepublicvoidonResponse(Call call, Response response) throws IOException {if(response.isSuccessful()) {//交给观察者处理数据subscriber.onNext(response.body().string());                        }//完成的回调subscriber.onCompleted();                    }                });            }        });    }

登录调用

Mapparams=newHashMap();params.put("username", userName.getText().toString().trim());params.put("password", passWord.getText().toString().trim());            mUtils.login(url,params).subscribeOn(Schedulers.io())                    .observeOn(AndroidSchedulers.mainThread()).subscribe(newSubscriber() {                @OverridepublicvoidonCompleted() {Log.i(TAG,"onCompleted");                }                @OverridepublicvoidonError(Throwable e) {Log.i(TAG, e.getMessage());                }                @OverridepublicvoidonNext(Strings) {if(JsonUtils.parse(s)) {                        Intent intent =newIntent(LoginActivity.this, ContentActivity.class);                        startActivity(intent);                    }                }            });

如果有想需要代码的,可以看这里,所有代码已经传至github。https://github.com/wuyinlei/RxAndroidDemo

上一篇下一篇

猜你喜欢

热点阅读