Rxjava中使用zip合并数据源
我们在应用开发的时候。往往会遇到这种情况:一个页面会有不同的数据源,这些数据源有的还来自不同的服务器,我们渲染页面的时候都想等数据请求完成,在统一渲染页面,但不同数据源的接口响应时间是不同的,怎么完美的解决这个问题呢?今天,我将为大家带来 Rxjava创建操作符的常见开发应用场景:合并数据源需求 ,并结合Retrofit 与RxJava 实现,希望大家会喜欢。
即,同时向4个数据源获取数据 -> 合并数据 -> 统一展示到客户端
此处采用Merge() & Zip()操作符进行讲解,其中:
Merge()例子 :实现较为简单的从(网络 + 本地)获取数据 & 统一展示
Zip()例子:结合Retrofit 与RxJava,实现较为复杂的合并2个网络请求向2个服务器获取数据 & 统一展示
我这里结合项目重点说明永Zip命令怎么合并数据源。结合保险详情页面四个接口的数据合并
一、首先创建4个被观察者:
public Flowable getUserList(String template) {
return homeService.getUserList(template)
.map(new Function, HomeUserCardListBean>() {
@Override
public HomeUserCardListBean apply(QSCResponse response) throws Exception {
if (response.code != AppConstants.HTTP_CODE) {
throw new QSCExplicitException(response);
}
return response.data;
}
})
.onErrorResumeNext(new Function>() {
@Override
public Publisher apply(Throwable throwable) throws Exception {
return QSCExceptionUtil.getNetworkError(new QSCExplicitException("获取用户配置信息数据失败", QSCExplicitException.CODE_UNKNOWN_ERROR));
}
})
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io());
}
public Flowable getCardList(String template) {
return homeService.getCardList(template)
.map(new Function, HomeCardListBean>() {
@Override
public HomeCardListBean apply(QSCResponse response) throws Exception {
if (response.code != AppConstants.HTTP_CODE) {
throw new QSCExplicitException(response);
}
return response.data;
}
})
.onErrorResumeNext(new Function>() {
@Override
public Publisher apply(Throwable throwable) throws Exception {
return QSCExceptionUtil.getNetworkError(new QSCExplicitException("获取身份列表数据失败", QSCExplicitException.CODE_UNKNOWN_ERROR));
}
})
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io());
}
public Flowable getPriceList(String template) {
return homeService.getPriceList(template)
.map(new Function, HomePriceListBean>() {
@Override
public HomePriceListBean apply(QSCResponse response) throws Exception {
if (response.code != AppConstants.HTTP_CODE) {
throw new QSCExplicitException(response);
}
return response.data;
}
})
.onErrorResumeNext(new Function>() {
@Override
public Publisher apply(Throwable throwable) throws Exception {
return QSCExceptionUtil.getNetworkError(new QSCExplicitException("获取价格列表数据失败", QSCExplicitException.CODE_UNKNOWN_ERROR));
}
})
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io());
}
public Flowable getInsurancedPosition(String template) {
return homeService.getInsurancedPosition(template)
.map(new Function, HomeInsurancedPeopleCard.HomeInsurancedPeople>() {
@Override
public HomeInsurancedPeopleCard.HomeInsurancedPeople apply(QSCResponse response) throws Exception {
if (response.code != AppConstants.HTTP_CODE) {
throw new QSCExplicitException(response);
}
return response.data;
}
})
.onErrorResumeNext(new Function>() {
@Override
public Publisher apply(Throwable throwable) throws Exception {
return QSCExceptionUtil.getNetworkError(new QSCExplicitException("获取保单详情数据失败", QSCExplicitException.CODE_UNKNOWN_ERROR));
}
})
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io());
}
二、合并数据源
//合并数据源
disposableRecharge = Flowable.zip(getUserList(template), getCardList(template), getPriceList(template), getInsurancedPosition(template), new Function4<HomeUserCardListBean, HomeCardListBean, HomePriceListBean, HomeInsurancedPeopleCard.HomeInsurancedPeople, HomeInsurancedPeopleCard>() {
@Override
public HomeInsurancedPeopleCard apply(HomeUserCardListBean homeUserCardListBean, HomeCardListBean homeCardListBean, HomePriceListBean homePriceListBean, HomeInsurancedPeopleCard.HomeInsurancedPeople homeInsurancedPeopleCard) throws Exception {
//
HomeInsurancedPeopleCard insurancedPeopleCard = new HomeInsurancedPeopleCard(template);
//
insurancedPeopleCard.homeUserCardListBean = homeUserCardListBean;
insurancedPeopleCard.homeCardListBean = homeCardListBean;
insurancedPeopleCard.homePriceListBean = homePriceListBean;
insurancedPeopleCard.homeInsurancedPeople = homeInsurancedPeopleCard;
//
return insurancedPeopleCard;
}
}).subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onErrorResumeNext(new Function>() {
@Override
public Publisher apply(Throwable throwable) throws Exception {
return QSCExceptionUtil.getNetworkError(new QSCExplicitException("获取投保详情页面数据失败", QSCExplicitException.CODE_UNKNOWN_ERROR));
}
}).subscribe(new Consumer() {
//
@Override
public void accept(HomeInsurancedPeopleCard homeInsurancedPeopleDataCard) throws Exception {
host.tv_username.setText(homeInsurancedPeopleDataCard.homePriceListBean.title);
//
host.clearCardAdapter();
host.addCard(homeInsurancedPeopleDataCard);
//alert
host.hideAnimation();
host.onCompleteRefresh();
}
}, new Consumer() {
@Override
public void accept(Throwable throwable) throws Exception {
host.netError(QSCExceptionUtil.getNetErrorCode(throwable));
host.onCompleteRefresh();
disposables.offer(disposableData);
}
})
;
disposables.offer(disposableData);
}
合并数据源的时候根据具体需求可以切换不同线程,看数据请求是同步获异步,observeOn(AndroidSchedulers.mainThread())这是主线程,subscribeOn(Schedulers.io())子线程,两者相同是同步、不同是异步