RxJava原理分析
之前看有的项目上使用到了RxJava,最近比较系统的学习了一下,将学习收获整理一下。
一.RxJava简介
a.定义
定义 | 作用 | 特点 |
---|---|---|
一个基于事件流、实现 异步操作的库 |
实现异步操作 类似与AyncTask、Handler的使用 |
逻辑简洁 实现优雅:基于事件流的链式调用 使用简单:随着程序的复杂,依然保持简洁优雅 |
b.思路
被观察者 (Observable) 通过订阅(Subscribe)按顺序发送事件给观察者(Observer)
观察者(Observer)按顺序接收事件&作出对应的响应动作。
二.使用流程
a:创建被观察者(Observable)&定义需发送的事件
Observable<String> obervable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("事件1");
e.onNext("事件2");
e.onNext("事件3");
}
});
b:创建观察者(Observer) & 定义响应事件的行为
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//该方法最先调用
}
@Override
public void onNext(String value) {
//事件接收
Log.e(TAG, "value is: " + value);
}
@Override
public void onError(Throwable e) {
//异常回调
}
@Override
public void onComplete() {
//事件发送完毕回调
}
}
从方法名字可以得到:事件的接收处理是在onNext()里面,异常处理是在onError()里面,事件发送完毕处理是在onComplete()里面,事件的最早触发是在onSubscribe()里面[后面源码会分析到];
c:通过订阅(subscribe)连接观察者和被观察者
observable.subscribe(observer);
Observable的subscribe()具备多个重载的方法,可以灵活运用,来实现自己的需求;
//表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
public final Disposable subscribe() {}
// 表示观察者只对被观察者发送的Next事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示观察者对被观察者发送的任何事件都作出响应(一般采用此种方式)
public final void subscribe(Observer<? super T> observer) {}
通过subscribe传入Consumer实例时,执行对应事件的响应回调都是在accept里面进行处理;
/**
* A functional interface (callback) that accepts a single value.
* @param <T> the value type
*/
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(T t) throws Exception;
}
输出以下结果:
value is: 事件1
value is: 事件2
value is: 事件3
以上就完成了基于事件流的链式调用过程,接下来通过源码来分析一下详细工作过程:
三.源码分析
a:创建被观察者(Observable)& 定义需发送的事件
//调用create时创建Observable
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//最终是创建了ObservableCreate类对象
//将创建的ObservableOnSubscribe对象传入
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
通过以上可以看到,通过create()方法,创建了ObservableCreate对象,然后将ObservableOnSubscribe变量source作为参数传入。
接下来看一下ObservableCreate的实现:
//ObservableCreate类,继承Observable类
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//复写了subscribeActual()
//作用:订阅时,通过接口回调调用Observerable与Observer的方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//先调用oberver的onSubscribe方法
//印证了上述onSubscribe先调用的猜想
observer.onSubscribe(parent);
try {
//调用创建的ObservableOnSubscribe对象的subscribe方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//执行e.onNext("事件1")方法
static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
//observer是订阅前创建的Observer
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
//发送的事件不可为空
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//若无断开连接(未调用Disposable.dispose())
if (!isDisposed()) {
//调用Observer的onNext()方法
observer.onNext(t);
}
}
// onError()及onComplete()方法调用后都会调用dispose();就不会发送onNext()事件了
// 印证了Observer在收到onError()或onComplete()后,就不会再收到onNext()了
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
通过源码可以看到在执行Observable.create()实际上是创建了一个ObservableCreate对象,并将创建的ObservableOnSubscribe对象作为参数传入,复写了subscribeActual()方法,在方法内创建了携带Observer的CreateEmitter对象,并分别回调了Observer的onSubscribe,ObservableOnSubscribe的subscribe方法。在create时,仅仅是定义,即:subscribeActual()此时还未被回调。
b:创建观察者(Observer) & 定义响应事件的行为
//Observer.java
public interface Observer<T> {
//Observer是一个接口
// 接口内含4个方法,分别用于响应对应于被观察者发送的不同事件
void onSubscribe(Disposable d); // 内部参数:Disposable 对象,可结束事件
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//该方法最先调用
}
@Override
public void onNext(String value) {
//事件接收
Log.e(TAG, "value is: " + value);
}
@Override
public void onError(Throwable e) {
//异常回调
}
@Override
public void onComplete() {
//事件发送完毕回调
}
}
c:通过订阅(subscribe)连接观察者和被观察者
observable.subscribe(observer);
//Observable.java
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//subscribeActual属于抽象方法,由子类实现;即由创建Observable时创建的ObservableCreate类对象
//即在调用subscribe时,实际上是调用了创建Observable时创建的ObservableCreate类对象里面的subscribeActual()方法
//印证了前面所说的在Observable.create()时未执行subscribeActual()
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
}
}
//抽象方法,需要非抽象子类实现
protected abstract void subscribeActual(Observer<? super T> observer);
从源码可以看到,在执行subscribe()时,最终是调用了ObservableCreate类里面的subscribeActual()方法,在subscribeActual()方法内先是调用了Observer的onSubscribe(),接下来调用source即:ObservableOnSubscribe复写的subscribe()方法,在ObservableOnSubscribe的subscribe()内调用了ObservableEmitter的onNext()、onError()等方法,就开始了事件流的执行。
总结
步骤 | 逻辑实现 | 源码分析 |
---|---|---|
创建Observable&定义发送事件 | 1.调用Observable.create() 2.创建ObservableOnSubscribe对象 3.复写subscribe() |
1.创建ObservableCreate类对象 2.复写subscribeActual()方法 |
创建Observer&定义响应事件的行为 | 1.创建Observer类对象 2.复写onSubscribe(),onNext(),onError(),onComplete()方法 |
1.Observer是一个接口 2.接口内有四个方法,分别响应Observable发送的不同事件 |
subscribe订阅 | 调用Observable.subscribe(observer) | 1.最终调用Observable的子类对象ObservableCreate类的subscribeActual()方法,主要调用如下: a.创建CreateEmitter对象; b. 调用ObservableOnSubscribe对象复写的subscribe(); c.调用Observer复写的onSubscribe(); d.在subscribe()里面调用onNext(),onError(),onComplete()再回调Observer复写的对应方法; |
用一张类图来表示一下类之间的联系:
四.线程切换
observable.subscribeOn(Schedulers.io())//切换到IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread())//切换回到主线程 处理请求结果
RxJava2的订阅原理是执行subscribe()时从下往上依次调用Observable的各个子类的subscribeActual()方法,在最上层调用onNext()等方法时,会从上往下依次调用Observer的onNext()等方法,最终会调用app传入的observer的next()等方法。
a.subscribeOn(Schedulers.io())
首先subscribeOn(Schedulers.io())最终会调用ObservableSubscribeOn.subscribeActual()方法,内部是将source.subscribe()放到一个Runnable执行,该source就是ObservableCreate(),即会调用到ObservableCreate.subscribeActual(),最终会调用到ObservableOnSubscribe.subscribe(CreateEmitter):
//Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
//ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
......
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//切换线程
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
//在新线程里面执行subscribe
source.subscribe(parent);
}
}));
}
......
}
Schedulers.io()返回一个IoScheduler,该类继承Scheduler,看一下scheduleDirect(new Runnable())执行了什么操作:
//IoScheduler.java从Scheduler.java继承
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
通过以上可以看到,在执行scheduleDirect(x,x,x)后,会先执行createWorker(),接着执行w.schedule(Runnable),看一下实现逻辑:
//IoScheduler.java
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
......
......
@Override
public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
......
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
//NewThreadWorker.java
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
parent.remove(sr);
RxJavaPlugins.onError(ex);
}
return sr;
}
可以看到,把传入的Runnable封装成为一个ScheduleRunnable对象。并把这个对象放入线程池中去执行,执行的时候会运行ScheduleRunnable的run方法,最终又会调用ObservableSubscribeOn的run方法,进而调用source.subscribe(),至此subscribeOn()的线程切换就完成了。
b.observeOn(AndroidSchedulers.mainThread())
observeOn()时会传入AndroidSchedulers.mainThread(),会创建HandlerScheduler,然后创建Handler,将主线程的Looper传入Handler,后续消息队列都是在主线程执行的,看一下具体逻辑:
//AndroidSchedulers.java
public final class AndroidSchedulers {
private static final class MainHolder {
//创建HandlerScheduler,内部持有主线程Handler的引用
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
......
}
然后在Observable.observeOn()时会创建ObservableObserveOn,并把上述创建的HandlerScheduler传入,先看一下ObservableObserveOn的逻辑实现:
//ObservableObserveOn.java
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
......
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
.......
.......
@Override
public void onNext(T t) {
........
//在onNext()中执行schdule()
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//执行worker的schedule方法,该worker是HandlerScheduler中的HandlerWorker,后面会讲到
worker.schedule(this);
}
}
void drainNormal() {
......
//a就是app传入的Observer
a.onNext(v);
......
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
}
当执行subscribe(observer)时,会先调用ObservableObserveOn. subscribeActual(),从上面可以看到:
会先执行scheduler.createWorker(),这个Worker 对象实际上是在AndroidSchedulers.mainThread()内部的HandlerScheduler中生成的,接下来会讲到;
然后执行source.subscribe(ObserveOnObserver),该ObserveOnObserver对app传入的observer进行了封装,当最上层调用onNext()等方法后,会最终调用到ObserveOnObserver内部onNext()等方法,从上面逻辑实现可以看到,进而会调用schedule()---->worker.schedule(this)[ObserveOnObserver本身是一个Runnable],该worker就是HandlerWorker,接下来执行到HandlerWorker的schedule(x,x,x),这里面会有一个主线程的Handler对象,然后把特定的线程任务[ObserveOnObserver]通过handler.sendMessageDelayed()方法转移到主线程中去执行,一起看一下HandlerScheduler的实现逻辑:
final class HandlerScheduler extends Scheduler {
........
@Override
public Worker createWorker() {
//该handler是主线程的handler
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
......
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
.......
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
.......
return scheduled;
}
........
}
private static final class ScheduledRunnable implements Runnable, Disposable {
.......
@Override
public void run() {
try {
//该delegate就是ObserveOnObserver[本身就是一个runnable],最终在主线程调用run()
delegate.run();
}
.......
}
最后会在主线程里面执行observer的onNext()等方法,以上就是observer线程切换。
简单总结一下:
subscribe(observer)后涉及到Observable类的执行顺序:ObservableObserveOn-->ObservableSubscribeOn-->ObservableCreate-->ObservableOnSubscribe.subscribe(CreateEmitter e);
e.onNext()后涉及到Observer类的执行顺序:CreateEmitter-->SubscribeOnObserver-->ObserveOnObserver-->Observer(app)
五.实例分析
由于RxJava是开源的库,要想使用的话需要添加依赖,在AndroidStudio里面添加如下:
// Android 支持 Rxjava,使用RxJava2的版本
implementation 'io.reactivex.rxjava2:rxjava:2.0.2'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
a.简单事件流处理
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("你好");
e.onNext("RxJava");
e.onNext("今天学习一下RxJava");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
//Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时
.subscribe(new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
// 该方法最先调用
Log.e(TAG, "-----onSubscribe()------");
mDisposable = d;
}
@Override
public void onNext(String value) {
Log.e(TAG, "-----onNext(): " + value);
//1.dispose()后,observer就不再接收后面的消息,即"今天学习一下RxJava"接收不到了
if (value.equals("RxJava")) {
mDisposable.dispose();
}
//2.在收到onComplete()或onError()之后,就不会回调该方法了
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e(TAG, "-----onComplete()");
}
});
/**
*以下两种方式可以代替e.onNext("你好");e.onNext("RxJava");e.onNext("今天学习一下RxJava");
*/
//1.Observable observable = Observable.just("你好","RxJava","今天学习一下RxJava");
//String[] words = {"你好","RxJava","今天学习一下RxJava"};
//2.Observable observable = Observable.fromArray(words);
未加dispose()输出结果为:
-----onNext():你好
-----onNext():RxJava
-----onNext():今天学习一下RxJava
加dispose()输出结果为:
-----onNext():你好
-----onNext():RxJava
b.Retrofit+RxJava网络请求
Retrofit是square开源的网络Restful请求框架,底层是基于okhttp的,开发者只需要定义接口就可以了,Retrofit提供了注解可以表示该接口请求的请求方式、参数、url等。定义好了接口以后,在调用该远程接口的时候直接使用该接口就好像通过RPC方式使用本地类一样方便。
1.加入依赖
// Android 支持 Retrofit
implementation 'com.squareup.retrofit2:retrofit:2.1.0'
// 衔接 Retrofit & RxJava,要注意使用RxJava2的版本
implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
// 支持Gson解析
implementation 'com.squareup.retrofit2:converter-gson:2.1.0'
2.创建接收服务器返回数据的类
根据请求返回的Json数据格式,来定义与其对应的类,本实例是参考天气查询,返回的数据格式如下:
{"data":{"yesterday":{"date":"10日星期四","high":"高温 28℃","fx":"西南风","low":"低温 20℃","fl":"<![CDATA[2级]]>","type":"小雨"},
"city":"青岛",
"forest":[{"date":"11日星期五","fengli":"<![CDATA[3级]]>","dengxiang":"南风","high":"高温 25℃","low":"低温 21℃","type":"小雨"},{"date":"12日星期六","fengli":"<![CDATA[3级]]>","dengxiang":"东风","high":"高温 25℃","low":"低温 20℃","type":"小雨"},{"date":"13日星期天","fengli":"<![CDATA[2级]]>","dengxiang":"东风","high":"高温 26℃","low":"低温 20℃","type":"阴"},{"date":"14日星期一","fengli":"<![CDATA[2级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 22℃","type":"晴"},{"date":"15日星期二","fengli":"<![CDATA[4级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 20℃","type":"小雨"}],
"ganmao":"感冒低发期,天气舒适,请注意多吃蔬菜水果,多喝水哦。","wendu":"22"},
"status":1000,
"desc":"OK"}
创建返回数据对应的类WeatherInfo.java
public class WeatherInfo {
private DataBean data;
private int status;
private String desc;
public DataBean getData() {
return data;
}
public void setData(DataBean data) {
this.data = data;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public static class DataBean {
private YesterdayBean yesterday;
private String city;
private String aqi;
private String ganmao;
private String wendu;
private List<ForecastBean> forecast;
public YesterdayBean getYesterday() {
return yesterday;
}
public void setYesterday(YesterdayBean yesterday) {
this.yesterday = yesterday;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getAqi() {
return aqi;
}
public void setAqi(String aqi) {
this.aqi = aqi;
}
public String getGanmao() {
return ganmao;
}
public void setGanmao(String ganmao) {
this.ganmao = ganmao;
}
public String getWendu() {
return wendu;
}
public void setWendu(String wendu) {
this.wendu = wendu;
}
public List<ForecastBean> getForecast() {
return forecast;
}
public void setForecast(List<ForecastBean> forecast) {
this.forecast = forecast;
}
public static class YesterdayBean {
private String date;
private String high;
private String fx;
private String low;
private String fl;
private String type;
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getHigh() {
return high;
}
public void setHigh(String high) {
this.high = high;
}
public String getFx() {
return fx;
}
public void setFx(String fx) {
this.fx = fx;
}
public String getLow() {
return low;
}
public void setLow(String low) {
this.low = low;
}
public String getFl() {
return fl;
}
public void setFl(String fl) {
this.fl = fl;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String toString() {
return "{" + "\"date\":\"" + getDate() + "\"," + "\"high\":\"" + getHigh() + "\","
+ "\"fx\":\"" + getFx() + "\"," + "\"low\":\"" + getLow() + "\","
+ "\"fl\":\"" + getFl() +
"\"," + "\"type\":\"" + getType() + "\"}";
}
}
public static class ForecastBean {
private String date;
private String high;
private String fengli;
private String low;
private String fengxiang;
private String type;
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getHigh() {
return high;
}
public void setHigh(String high) {
this.high = high;
}
public String getFengli() {
return fengli;
}
public void setFengli(String fengli) {
this.fengli = fengli;
}
public String getLow() {
return low;
}
public void setLow(String low) {
this.low = low;
}
public String getFengxiang() {
return fengxiang;
}
public void setFengxiang(String fengxiang) {
this.fengxiang = fengxiang;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String toString() {
return "{" + "\"date\":\"" + getDate() + "\"," + "\"fengli\":\"" + getFengli()
+ "\"," + "\"dengxiang\":\"" + getFengxiang() + "\"," + "\"high\":\""
+ getHigh() + "\"," + "\"low\":\"" + getLow() + "\"," + "\"type\":\""
+ getType() + "\"}";
}
}
public String toString() {
String s = "";
for (ForecastBean fb : forecast) {
s += fb.toString() + ",";
}
return "\"data\":" + "{" + "\"yesterday\":" + getYesterday().toString() + "," +
"\"city\":\"" + getCity() + "\"," + "\"forest\":" + "[" + s + "]" + "," +
"\"ganmao\":\"" + getGanmao() + "\"," + "\"wendu\":\"" + getWendu() + "\""
+ "}";
}
}
public String toString() {
return "{" + getData().toString() + "," + "\"status\":" + getStatus() + "," + "\"desc\":\""
+ getDesc() + "\"" + "}";
}
}
3.创建用于描述网络请求的接口RetrofitApi
采用注解 + Observable<...>接口描述网络请求参数
public interface RetrofitApi {
/**
* 注解里传入 网络请求 的部分URL地址
* Retrofit把网络请求的URL分成了两部分:一部分放在Retrofit对象里,另一部分放在网络请求接口里
* 如果接口里的url是一个完整的网址,那么放在Retrofit对象里的URL可以忽略
* 采用Observable<...>接口
* getWeatherInfo()是接受网络请求数据的方法
*/
@GET("weather_mini")
Observable<WeatherInfo> getWeatherInfo(@Query("city") String city);
//可以直接在GET中加入请求的参数
@GET("weather_mini?city=青岛")
Observable<WeatherInfo> getWeatherInfoTwo();
}
4.创建Retrofit实例及执行请求
private void getWeatherInfo() {
//1.创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://wthrcdn.etouch.cn/") // 设置网络请求Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(可将得到的Json串转换为对应的WeatherInfo类)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
.build();
//2.创建网络请求接口的实例
final RetrofitApi request = retrofit.create(RetrofitApi.class);
//3.采用Observable<...>形式对网络请求进行封装
Observable<WeatherInfo> observable = request.getWeatherInfo("青岛");
//4.通过线程切换发送网络请求
observable.subscribeOn(Schedulers.io())//切换到IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread())//切换回到主线程 处理请求结果
.subscribe(new Observer<WeatherInfo>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(WeatherInfo result) {
//5.接收服务器返回的数据
Log.e(TAG, "Weather info is: " + result.toString());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "请求失败 : " + e.toString());
}
@Override
public void onComplete() {
}
});
}
请求后onNext()里面打印如下:
Weather info is: {"data":{"yesterday":{"date":"10日星期四","high":"高温 28℃","fx":"西南风","low":"低温 20℃","fl":"<![CDATA[2级]]>","type":"小雨"},
"city":"青岛",
"forest":[{"date":"11日星期五","fengli":"<![CDATA[3级]]>","dengxiang":"南风","high":"高温 25℃","low":"低温 21℃","type":"小雨"},{"date":"12日星期六","fengli":"<![CDATA[3级]]>","dengxiang":"东风","high":"高温 25℃","low":"低温 20℃","type":"小雨"},{"date":"13日星期天","fengli":"<![CDATA[2级]]>","dengxiang":"东风","high":"高温 26℃","low":"低温 20℃","type":"阴"},{"date":"14日星期一","fengli":"<![CDATA[2级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 22℃","type":"晴"},{"date":"15日星期二","fengli":"<![CDATA[4级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 20℃","type":"小雨"}],
"ganmao":"感冒低发期,天气舒适,请注意多吃蔬菜水果,多喝水哦。","wendu":"22"},
"status":1000,
"desc":"OK"}
c.功能防抖及联想搜索优化
功能防抖主要是为了频繁点击只处理其中一次事件;联想搜索优化主要是为了在输入文字时不频繁去服务器进行请求。以上两种实现都是在observer中对onNext()事件进行拦截处理,看是否满足条件,满足就执行下一步observer.onNext(),不满足就不往下执行。
/*对控件点击及文字输入进行监听,采用了RxBinding,需要加入依赖
// Rxbinding
implementation 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
功能防抖实现:
//使用throttleFirst(1, TimeUnit.SECONDS):参数1:指定的时间段内;参数2:指定时间的单位
//在1s内多次点击,只响应第一次
private void throttleOperation() {
RxView.clicks(clickBtn).throttleFirst(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e(TAG, "执行点击事件");
getWeatherInfo();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("Seven", "对throttle Complete事件作出响应");
}
});
}
//频繁点击,输出如下:
09-11 10:19:15.138 E/TAG (31259): 执行点击事件
09-11 10:19:16.221 E/TAG (31259): 执行点击事件
09-11 10:19:17.306 E/TAG (31259): 执行点击事件
看一下关键代码实现是在ObservableThrottleFirstTimed.java里面的DebounceTimedObserver类:
static final class DebounceTimedObserver<T>
extends AtomicReference<Disposable>
implements Observer<T>, Disposable, Runnable {
......
......
@Override
public void onNext(T t) {
if (!gate && !done) {
gate = true;
actual.onNext(t);
Disposable d = get();
if (d != null) {
d.dispose();
}
DisposableHelper.replace(this, worker.schedule(this, timeout, unit));
}
}
@Override
public void run() {
gate = false;
}
......
......
}
通过以上逻辑可以看到,在首次执行完onNext()后,会将gate设为true,然后执行worker.schedule(this,timeout, unit),延时timeout执行该runnable,在run()中把gate设为false,下次onNext()就可以执行了。
联想搜索优化:
//联想搜索优化,根据指定时间过滤事件的过滤操作符
//比如在搜索框输入文字时,在指定时间内不再有文字输入时,才会发送请求,否则不发送
//若在这段时间内,输入框有文字输入或变化,则继续等待该段时间,循环上述过程
private void debounceOperation() {
RxTextView.textChanges(editTxt)
.debounce(1, TimeUnit.SECONDS).skip(1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<CharSequence>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(CharSequence charSequence) {
resultTxt.setText("服务器请求字符串 = " + charSequence.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
看一下关键代码实现是在ObservableDebounceTimed.java里面的DebounceTimedObserver类:
static final class DebounceTimedObserver<T> implements Observer<T>, Disposable {
......
......
@Override
public void onNext(T t) {
if (done) {
return;
}
long idx = index + 1;
index = idx;
Disposable d = timer.get();
if (d != null) {
d.dispose();
}
DebounceEmitter<T> de = new DebounceEmitter<T>(t, idx, this);
if (timer.compareAndSet(d, de)) {
d = worker.schedule(de, timeout, unit);
de.setResource(d);
}
}
......
......
void emit(long idx, T t, DebounceEmitter<T> emitter) {
if (idx == index) {
actual.onNext(t);
emitter.dispose();
}
}
}
static final class DebounceEmitter<T> extends AtomicReference<Disposable> implements Runnable, Disposable {
......
......
@Override
public void run() {
if (once.compareAndSet(false, true)) {
parent.emit(idx, value, this);
}
}
.......
}
通过以上逻辑可以看到,在执行onNext()时并没有直接执行下一个observer.onNext(),而是执行了worker.schedule(de,timeout, unit),延时timeout执行该runnable,在run()中执行该observer的emit(),然后在执行下一个observer的onNext()。
d.操作符使用案例**
merge()
/**
* 使用Merge操作符,合并两个Observable
* 如果两个Observable<T>,T是相同的类型,比如String, 那么在subscribe里面的new Observer<String>和 onNext(String)
* 如果两个Observable<T>,T不是相同的类型,如下: 那么在subscribe里面的new Observer<Object>和 onNext(Object)即可
*/
private void mergeOperation() {
//设置第1个Observable:获取字符串
Observable<String> strOb = Observable.just("字符串");
//设置第2个Observable:请求获取整数
Observable<Integer> intOb = Observable.just(1000);
//通过merge()合并事件 & 同时发送事件
Observable.merge(strOb, intOb)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
if (value instanceof String) {
result += value;
} else {
Log.d(TAG, "数据为: " + (Integer)value);
}
}
@Override
public void onError(Throwable e) {
}
// 接收合并事件后,统一展示
@Override
public void onComplete() {
Log.d(TAG, "获取数据完成");
setMergeTxt(result);
}
});
}
//打印如下:
09-11 10:31:14.873 D/TAG (31259): 数据为: 1000
09-11 10:31:14.873 D/TAG (31259): 获取数据完成
09-11 10:31:14.875 D/TAG (31259): result is: 字符串
zip()
@SuppressLint("CheckResult")
private void zipOperation() {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
RetrofitApi request = retrofit.create(RetrofitApi.class);
//采用Observable<...>形式 对 2个网络请求 进行封装
//即2个网络请求异步 & 同时发送
Observable<Translation> observable = request.getTranslation().subscribeOn(
Schedulers.io()); // 新开线程进行网络请求1
Observable<Translation1> observable1 = request.getTranslationTwo().subscribeOn(
Schedulers.io());// 新开线程进行网络请求2
// 通过使用Zip()对两个网络请求进行合并再发送
Observable.zip(observable, observable1,
new BiFunction<Translation, Translation1, String>() {
// 注:创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
@Override
public String apply(Translation translation,
Translation1 translation1) throws Exception {
return translation.show() + " & " + translation1.show();
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
// 成功返回数据时调用, Consumer<T>, T对应apply的返回值,apply对应BiFunction最后的参数类型
@Override
public void accept(String combineInfo) throws Exception {
// 结合显示2个网络请求的数据结果
Log.d(TAG, "最终接收到的数据是:" + combineInfo);
setZipTxt(combineInfo);
}
}, new Consumer<Throwable>() {
// 网络请求错误时调用
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "请求失败: " + throwable.toString() );
}
});
}
e.其他
RxJava对应很多操作符供开发使用,详情请参考大神整理的以下表格:
RxJava操作符.png
六.总结
基于事件流的异步操作库,事件流是通过subscribe触发的(实际上是subscribeActual),每个Observable子类的subscribeActual实现逻辑不同。
从下往上订阅,不断执行Observable的subscribeActual();
事件从上往下发射,不断执行Observer的onNext()等方法。
非常感谢网上大神整理的一些文章,学习之后记录下来备后续使用。