Rxjava2应用篇
2018-12-21 本文已影响0人
钉某人
Rxjava2实例.png
Github地址:https://github.com/DingMouRen/RxJava2ExamplesDemo
1.后台耗时操作,前台实时更新UI(下载文件,实时更新进度)
- 自定义ResponseBody--ProgressResponseBody,获取到从网络读取的数据,并通过EventBus发送通知
- 自定义拦截器ProgressInterceptor,对请求返回的响应Response进行处理,
- 定义一个抽象类FileCallBack,1.用于封装类似Observer的接口,2.注册EventBus来接收文件的接收进度,
- 自定义Observer--FileDownloadObserver,用于包裹FileCallBack
final FileCallBack<ResponseBody> fileCallBack = new FileCallBack<ResponseBody>("","") {
@Override
public void onSuccess(ResponseBody responseBody) {
Log.e(TAG,"onSuccess:"+responseBody.toString());
}
@Override
public void progress(long progress, long total) {
Log.e(TAG,total+"/"+progress);
mProgressBar.setMax((int) total);
mProgressBar.setProgress((int) progress);
DecimalFormat decimalFormat = new DecimalFormat("0.00");
String scaleStr = decimalFormat.format(progress * 1f/ total );
mTvProgress.setText( (int)(Float.parseFloat(scaleStr) * 100) +"%");
}
@Override
public void onStart(Disposable disposable) {
}
@Override
public void onCompleted() {
Log.e(TAG,"onComplete");
show("下载完成");
}
@Override
public void onError(Throwable e) {
Log.e(TAG,"onError:"+e.getMessage());
}
};
HttpManager.createService(Api.class,new ProgressInterceptor())
.downloadApk()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.doOnNext(new Consumer<ResponseBody>() {
@Override
public void accept(ResponseBody responseBody) throws Exception {
// fileCallBack.saveFile(responseBody);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new FileDownloadObserver<ResponseBody>(fileCallBack));
2.计算一段时间内数据的平均值
- 通过操作符buffer(3000, TimeUnit.MILLISECONDS),将这段时间发射的数据缓存在集合中
- 在Observer的onNext中对接收到的集合数据进行求平均值
/**
* rxjava处理
*/
private void rxjavaCompose() {
mPublishSubject = PublishSubject.create();
DisposableObserver<List<Double>> disposableObserver = new DisposableObserver<List<Double>>() {
@Override
public void onNext(List<Double> temperatureList) {
double resultSum = 0;//温度的和
double resultAvera = 0;
Log.e("onNext","接收到集合的大小:"+temperatureList.size());
if (temperatureList.size() > 0){
for(Double temperature : temperatureList){
resultSum += temperature;
}
resultAvera = resultSum / temperatureList.size();
}
Log.e(mActivity.getClass().getSimpleName(),"更新平均温度:"+resultAvera);
final double finalResultAvera = resultAvera;
mTvAveraTemperature.post(new Runnable() {
@Override
public void run() {
mTvAveraTemperature.append("平均3秒温度:"+ finalResultAvera +"℃ 时间:"+ new Date().toLocaleString()+"\n");
int scrollAmount = mTvAveraTemperature.getLayout().getLineTop(mTvAveraTemperature.getLineCount()) - mTvAveraTemperature.getHeight();
if (scrollAmount > 0){
mTvAveraTemperature.scrollTo(0,scrollAmount);
}else {
mTvAveraTemperature.scrollTo(0,0);
}
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
mPublishSubject.buffer(3000, TimeUnit.MILLISECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
mCompositeDisposable = new CompositeDisposable();//用于管理订阅与解除订阅
mCompositeDisposable.add(disposableObserver);
3.搜索联想功能优化
需要优化的问题
- 避免用户连续输入时造成发起不必要的请求。(debounce操作符来解决)
- 避免用户输入未空时发起不必要的请求。(filter操作符来解决)
- 避免前后发起两个请求,后面请求响应先于前面请求响应返回。(switch操作符来解决)
/**
* 初始化Observable
*/
private void initObservable() {
mPublishSubject = PublishSubject.create();
mDisposableObserver = new DisposableObserver<MyResponse<String>>() {//Disposable是一个抽象的观察者,可以通过disposable进行异步取消
@Override
public void onNext(MyResponse<String> myResponse) {
Gson gson = new Gson();
mTvLog.setText(JsonUtils.formatJson(gson.toJson(myResponse)));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
mPublishSubject.debounce(200, TimeUnit.MILLISECONDS)//不会发射时间间隔小于200毫秒的,
.filter(new Predicate<String>() {//过滤操作符,只有字符串长度大于0才能发射
@Override
public boolean test(String s) throws Exception {
return s.length() > 0;
}
}).switchMap(new Function<String, ObservableSource<MyResponse<String>>>() {//switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果
@Override
public ObservableSource<MyResponse<String >> apply(String s) throws Exception {
return HttpManager.createService(Api.class).search(s).subscribeOn(Schedulers.io());
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(mDisposableObserver);
mCompositeDisposable = new CompositeDisposable();//用于取消订阅关系
mCompositeDisposable.add(mDisposableObserver);//添加到订阅关系
}
4.轮询操作
应用场景:有的时候需要我们尝试间隔一段时间就向服务器发起一次请求,但是又不适合引入长连接的场景。
可以使用intervalRange操作符,参数含义:
- start:发送数据的起始值,为Long型。
- count:总共发送多少项数据。
- initialDelay:发送第一个数据项时的起始时延。
- period:两项数据之间的间隔时间。
- TimeUnit:时间单位。
/**
* 固定时间间隔的轮询
*/
private void startFixPolling() {
Observable<MyResponse<String>> observableFix = Observable.intervalRange(0,5,0,1000, TimeUnit.MILLISECONDS)
.take(5)
.flatMap(new Function<Long, ObservableSource<MyResponse<String>>>() {
@Override
public ObservableSource<MyResponse<String>> apply(Long aLong) throws Exception {
return HttpManager.createService(Api.class).polling().subscribeOn(Schedulers.io());
}
});
DisposableObserver<MyResponse<String>> disposableObserverFix = new DisposableObserver<MyResponse<String>>() {
@Override
public void onNext(MyResponse<String> response) {
mTvFix.append(response.data+"\n");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observableFix.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(disposableObserverFix);
mCompositeDisposable.add(disposableObserverFix);
}
5.根据错误类型进行重试请求
网络请求出错,重试的情况下需要处理的问题:
- 限制重试的次数
- 根据错误类型,判断是否需要重新请求
- 根据错误类型,等待特定的时间后再去重新请求
retryWhen操作符可以实现重新订阅,由onError事件来触发。
private void retryDemo() {
Observable<MyResponse<String>> observable = HttpManager.createService(Api.class).retry()
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
private int mRetryCount;
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
long waitTime = 0;//等待时间
if (throwable instanceof ConnectException){
mainThreadTextChange("ConnectException异常\n");
waitTime = 2000;
}
mRetryCount++;
if (waitTime > 0){
mainThreadTextChange("2秒后重新发起请求\n");
}
return waitTime > 0 && mRetryCount <= 4 ?
Observable.timer(waitTime,TimeUnit.MILLISECONDS):
Observable.error(throwable);
}
});
}
});
DisposableObserver<MyResponse<String>> disposableObserver = new DisposableObserver<MyResponse<String>>() {
@Override
public void onNext(MyResponse<String> response) {
Gson gson = new Gson();
mTv.append("onNext:\n"+ JsonUtils.formatJson(gson.toJson(response))+"\n");
}
@Override
public void onError(Throwable e) {
mTv.append("onError:"+e.getMessage()+"\n");
}
@Override
public void onComplete() {
mTv.append("onComplete\n");
}
};
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(disposableObserver);
mCompositeDisposable.add(disposableObserver);
}
private void mainThreadTextChange(final String content){
runOnUiThread(new Runnable() {
@Override
public void run() {
mTv.append(content);
}
});
}
6.多个表单的验证
应用场景:登录场景中,需要账户是一定的长度,密码也有特定的长度,此时使用操作符combineLatest来实现需求。
combineLatest可以接受多个Observable和一个函数作为参数。当其中的任意一个Observable发射数据后,会去获取其他的Observable最后一次发射的数据,回调到函数中。(此函数回调的前提是都至少发射过一次数据)
private void initRxjava2() {
mCompositeDisposable = new CompositeDisposable();
mAccoountPublishSubject = PublishSubject.create();
mPwdPublishSubject = PublishSubject.create();
Observable<Boolean> observable = Observable.combineLatest(mAccoountPublishSubject, mPwdPublishSubject,
new BiFunction<String, String, Boolean>() {
@Override
public Boolean apply(String account, String pwd) throws Exception {
int nameLength = account.length();
int pwdLength = pwd.length();
return (nameLength >=3 && nameLength <=5) && (pwdLength >=6 && pwdLength <=10);
}
});
DisposableObserver<Boolean> disposableObserver = new DisposableObserver<Boolean>() {
@Override
public void onNext(Boolean aBoolean) {
if (aBoolean){//两个输入框的内容都符合要求
mTvLogin.setEnabled(true);
mTvLogin.setBackgroundColor(Color.GREEN);
}else {//两个输入框有不符合要求的内容
mTvLogin.setEnabled(false);
mTvLogin.setBackgroundColor(Color.GRAY);
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(disposableObserver);
mCompositeDisposable.add(disposableObserver);
}
@Override
public void initListener() {
mEditAccount.addTextChangedListener(new EditTextWatcher(mAccoountPublishSubject));
mEditPwd.addTextChangedListener(new EditTextWatcher(mPwdPublishSubject));
mTvLogin.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
Toast.makeText(mActivity,"登录成功",Toast.LENGTH_SHORT).show();
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
if (mCompositeDisposable != null) mCompositeDisposable.clear();
}
/**
* EditText的TextWatcher
*/
public static class EditTextWatcher implements TextWatcher{
private PublishSubject mPublishSubject;
public EditTextWatcher(PublishSubject publishSubject){
this.mPublishSubject = publishSubject;
}
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
}
@Override
public void afterTextChanged(Editable s) {
mPublishSubject.onNext(s.toString());
}
}
7.优先加载本地缓存,同时发起网络请求
应用场景:进入新页面,为了提升用户体验,再网络请求没有返回时,优先显示缓存数据。
要求:同时发起请求网络数据和加载本地缓存数据。在网络数据未返回时,显示本地缓存数据;网络数据返回时,显示最新的网络数据
几种实现方式的缺点:
- concat实现:concat连接发射器A和发射器B,发射器A加载本地缓存数据,发射器B请求网络数据,此时只有发射器A读取完本地缓存数据后,发射器B才会去请求网络。这样的时间是加载本地缓存和请求网络的时间和。
- concatEager实现:可以解决concat无法同时加载本地缓存和请求网络的需求,但是当读取本地缓存的时间大于请求网络的时间时,请求到的网络数据必须等本地缓存读取结束之后,才能传递给下游。所耗时间为加载本地缓存的时间,但是网络数据已经返回,本不需要等待加载本地缓存数据的。
- merge实现:可以让多个observable同时发射数据,也不需要observable之间的相互等待,直接发射给下游。但是如果加载本地缓存的时间大于请求网络数据的时间,数据本来已经是最新的数据,但是此时又会被刷新成过时的本地缓存数据。
好的实现方式:publish+merge+takeUntil
- publish操作符:将普通的Observable转变成可连接的Observable,可连接的Observable类似于普通的Observable,除了它在订阅时不会开始发出项目,只有当Connect操作符应用于它时才开始。
- takeUntil操作符:当事件满足设定的条件时,该事件的下一个事件不会被发送了。
/**
* 优先加载本地缓存数据,同时请求网络数据
*/
private void requestData(final long delayTimeLocal, long delayTimeNet) {
mProgressBar.setVisibility(View.VISIBLE);
Observable<MyResponse<List<CacheToNetData>>> observable =
getNetData(delayTimeNet).publish(new Function<Observable<MyResponse<List<CacheToNetData>>>, ObservableSource<MyResponse<List<CacheToNetData>>>>() {
@Override
public ObservableSource<MyResponse<List<CacheToNetData>>> apply(Observable<MyResponse<List<CacheToNetData>>> netResponseObservable) throws Exception {
return Observable.merge(getLocalCacheData(delayTimeLocal),netResponseObservable )
.takeUntil(new Predicate<MyResponse<List<CacheToNetData>>>() {
@Override
public boolean test(MyResponse<List<CacheToNetData>> listMyResponse) throws Exception {
mainThreadTextLog("获取到的数据类型:"+listMyResponse.msg);
return listMyResponse.msg.equals("成功");
}
});
}
});
DisposableObserver<MyResponse<List<CacheToNetData>>> disposableObserver =
new DisposableObserver<MyResponse<List<CacheToNetData>>>() {
@Override
public void onNext(MyResponse<List<CacheToNetData>> listMyResponse) {
mProgressBar.setVisibility(View.GONE);
if (listMyResponse.code == 1) {
if (listMyResponse.msg.equals("本地数据")) {
mainThreadTextLog("onNext --- 加载了本地数据");
} else {
mainThreadTextLog("onNext --- 加载了网络数据");
}
mAdapter.setData(listMyResponse.data);
}
}
@Override
public void onError(Throwable e) {
mainThreadTextLog("onError:" + e.getMessage());
}
@Override
public void onComplete() {
mainThreadTextLog("onComplete");
}
};
observable.observeOn(AndroidSchedulers.mainThread())
.subscribe(disposableObserver);
mCompositeDisposable.add(disposableObserver);
}
@Override
protected void onDestroy() {
super.onDestroy();
if (mCompositeDisposable != null) mCompositeDisposable.clear();
}
/**
* 获取本地缓存数据
*/
public Observable<MyResponse<List<CacheToNetData>>> getLocalCacheData(final long delayTime) {
return Observable.create(new ObservableOnSubscribe<MyResponse<List<CacheToNetData>>>() {
@Override
public void subscribe(ObservableEmitter<MyResponse<List<CacheToNetData>>> emitter) throws Exception {
try {
mainThreadTextLog("开始加载本地缓存数据");
Thread.sleep(delayTime);
List<CacheToNetData> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
CacheToNetData bean = new CacheToNetData("来自本地缓存", "数据项 --- " + i);
list.add(bean);
}
mainThreadTextLog("结束加载本地缓存数据");
emitter.onNext(new MyResponse<List<CacheToNetData>>("本地数据", 1, list));
emitter.onComplete();
} catch (Exception e) {
mainThreadTextLog("加载本地缓存数据异常:" + e.getMessage());
if (!emitter.isDisposed()) emitter.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
/**
* 获取网络数据
*
* @param delayTime
* @return
*/
public Observable<MyResponse<List<CacheToNetData>>> getNetData(long delayTime) {
mainThreadTextLog("开始请求网络数据");
return HttpManager.createService(Api.class)
.getNetData(delayTime)
.subscribeOn(Schedulers.io())
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends MyResponse<List<CacheToNetData>>>>() {
@Override
public ObservableSource<? extends MyResponse<List<CacheToNetData>>> apply(Throwable throwable) throws Exception {
mainThreadTextLog("请求网络数据失败:" + throwable.getMessage());
return Observable.never();
}
});
}
/**
* 主线程更新UI日志
*
* @param content
*/
private void mainThreadTextLog(final String content) {
mActivity.runOnUiThread(new Runnable() {
@Override
public void run() {
mTvLog.append(content + "\n");
}
});
}
8.倒计时
使用intervalRange操作符实现倒计时功能
/**
* 开始倒计时
* @param countDownTimeLong
*/
private void startCountDown(final long countDownTimeLong) {
mCompositeDisposable.clear();
Observable<Long> observable = Observable.intervalRange(0,countDownTimeLong + 1,0,1, TimeUnit.SECONDS);
DisposableObserver<Long> disposableObserver = new DisposableObserver<Long>() {
@Override
public void onNext(final Long aLong) {
runOnUiThread(new Runnable() {
@Override
public void run() {
mTvTime.setText(formatDuring((countDownTimeLong - aLong) * 1000));
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(disposableObserver);
mCompositeDisposable.add(disposableObserver);
}
9.嵌套请求
应用场景:第一个网络请求之后,再进行一次网络请求,才能拿到需要得数据。
例子:有两道门,宝藏在第二道门后面。需要先后打开两道门,必须打开第一道门之后,才能获取到第二道门得开门密码。
private void requestData() {
String inputStr = mEditText.getText().toString().trim();
if (TextUtils.isEmpty(inputStr)){
Toast.makeText(mActivity,"输入不能为空",Toast.LENGTH_SHORT).show();
return;
}
int intputInt = Integer.parseInt(inputStr);
HttpManager.createService(Api.class)
.openFirstDoor(intputInt)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MyResponse<Nest1Bean>>() {
@Override
public void accept(MyResponse<Nest1Bean> nest1BeanMyResponse) throws Exception {
mainThreadTextLog("doOnNext:\n"+JsonUtils.formatJson(new Gson().toJson(nest1BeanMyResponse)));
}
})
.observeOn(Schedulers.io())
.flatMap(new Function<MyResponse<Nest1Bean>, ObservableSource<MyResponse<Nest2Bean>>>() {
@Override
public ObservableSource<MyResponse<Nest2Bean>> apply(MyResponse<Nest1Bean> nest1BeanMyResponse) throws Exception {
mainThreadTextLog("获取第二道门得密码,去打开第二道门");
return HttpManager.createService(Api.class).openSecondDoor(nest1BeanMyResponse.data.password);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<MyResponse<Nest2Bean>>() {
@Override
public void onSubscribe(Disposable d) {
mainThreadTextLog("onSubscribe");
}
@Override
public void onNext(MyResponse<Nest2Bean> nest2BeanMyResponse) {
mainThreadTextLog("onNext:\n"+JsonUtils.formatJson(new Gson().toJson(nest2BeanMyResponse)));
}
@Override
public void onError(Throwable e) {
mainThreadTextLog("onError:"+e.getMessage());
}
@Override
public void onComplete() {
mainThreadTextLog("onComplete\n\n");
}
});
}
10.合并两个网络请求的数据
应用场景:有的时候我们需要的数据,可能需要请求两个不同的接口才能得到,使用zip操作符可以实现需求
private void requestData() {
String gradeStr = mEditGrade.getText().toString().trim();
if (TextUtils.isEmpty(gradeStr)){
Toast.makeText(mActivity,"输入不能为空",Toast.LENGTH_SHORT).show();
return;
}
int gradeInt = Integer.parseInt(gradeStr);
Observable<MyResponse<Teacher>> observableTeacher = HttpManager.createService(Api.class).getTeacher(gradeInt);
Observable<MyResponse<List<Student>>> observableStudents = HttpManager.createService(Api.class).getStudents(gradeInt);
Observable.zip(observableTeacher, observableStudents,
new BiFunction<MyResponse<Teacher>, MyResponse<List<Student>>, ClassBean>() {
@Override
public ClassBean apply(MyResponse<Teacher> teacherMyResponse, MyResponse<List<Student>> studentListMyResponse) throws Exception {
mainThreadTextLog("请求到得老师数据:\n"+JsonUtils.formatJson(new Gson().toJson(teacherMyResponse))+
"\n请求到得学生数据:\n"+JsonUtils.formatJson(new Gson().toJson(studentListMyResponse)));
String teacherName = teacherMyResponse.data.name;
String grade = teacherMyResponse.data.grade;
List<Student> studentList = studentListMyResponse.data;
ClassBean classBean = new ClassBean(teacherName,grade,studentList);
return classBean;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ClassBean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ClassBean classBean) {
mainThreadTextLog("onNext合并后得数据:\n"+JsonUtils.formatJson(new Gson().toJson(classBean)));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
mainThreadTextLog("onComplete\n\n");
}
});
}