RxJava的并发实现
2021-11-03 本文已影响0人
我爱田Hebe
RxJava的并发实现
我们在开发App过程中,常常遇见这种需求,例如首页,仅一个界面就要请求3个甚至更多的接口,更变态的是这些接口必须按顺序请求,来以此展示返回结果,那么这样我们就无法用普通的并发去同时请求接口了,因为我们无法预知各个接口的请求完成时间,普通的也是最简单的办法就是依次请求接口了,A接口请求完成->B接口请求完成->C接口...简单粗暴有木有?并且在加载效率上(接口请求时间)会差很多,那么有没有更优雅的办法去解决这种需求呢?那必须有,利用RxJava的Observable.zip方法即可实现并发请求!
假如ApiService中有两个接口:
@GET("test1")
Observable<HttpResult<TestModel1>> test1(@QueryMap HashMap<String, String> options);
@GET("test2")
Observable<HttpResult<TestModel2>> test2(@QueryMap HashMap<String, String> options);
HttpResult为自定义数据结构:
public class HttpResult<T> {
public int status;
public String msg;
public T data;
}
TestModel1和TestModel2则分别为两个返回的数据结构!
接口封装后的请求方法: test1:
Observable o1 = Observable.create((ObservableOnSubscribe<TestModel1>) emitter ->
//接口请求
ApiUtil.getInstance()
.getApiService()
.test1()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<HttpResult<TestModel1>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(HttpResult<TestModel1> httpResult) {
emitter.onNext(httpResult.data);
emitter.onComplete();
}
@Override
public void onError(Throwable e) {
emitter.onNext(null);
emitter.onComplete();
}
@Override
public void onComplete() {
}
}));
注意: ObservableOnSubscribe的参数是o1 中emitter要传递的参数类型,也就是你接口得到的数据类型:TestModel1!
test2:
Observable o2 = Observable.create((ObservableOnSubscribe<TestModel2>) emitter ->
//接口请求
ApiUtil.getInstance()
.getApiService()
.test2()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<HttpResult<TestModel2>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(HttpResult<TestModel2> httpResult) {
emitter.onNext(httpResult.data);
emitter.onComplete();
}
@Override
public void onError(Throwable e) {
emitter.onNext(null);
emitter.onComplete();
}
@Override
public void onComplete() {
}
}));
两个接口请求,得到两个Observable:o1和o2!
合并:
Observable.zip(o1, o2, new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object o, Object o2) throws Exception {
TestModel1 t1 = (TestModel1) o;//o1得到的结果
TestModel2 t2 = (TestModel2) o2;//o2得到的结果
FinalData f=new FinalData();//最终结果合并
f.t1=t1;
f.t2=t2;
return f;
}
}).subscribeOn(Schedulers.io()).subscribe(o -> {
FinalData f=(FinalData)o;//获取最终结果
//处理数据...
});
注意: BiFunction中的3个Obj参数,前两个对应接口返回数据类型,最后一个对应apply方法返回的数据类型(最终结果)!
如果是3个或以上接口,那么合并时可以根据接口数量使用Function3,Function4...
Observable.zip(o1, o2,o3, new Function3<Object, Object, Object,Object>() {
@Override
public Object apply(Object o, Object o2,Object o3) throws Exception {
}
}).subscribeOn(Schedulers.io()).subscribe(o -> {
});
除了zip操作符,rxjava还提供了concat,merge,join等其它合并操作符,但它们又各有不同,有兴趣的可以去多了解一下!