使用Retrofit2+RxJava2+ProtoBuf实现网络
引言
Retrofit 是一个用于 Android 和 Java 平台的类型安全的,底层使用OkHttp实现网络请求框架。Retrofit 通过将 API 抽象成 Java 接口而让我们连接到 REST web 服务变得很轻松。
RxJava 提供一套异步编程的 API,这套 API 是基于观察者模式的,而且是链式调用的。
Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC 数据交换格式。
主要讲解如何使用各个库封装网络请求,不讲解各库如何使用,具体可查看Rxjava2、Retrofit2 、ProtoBuf。
依赖
implementation 'com.squareup.retrofit2:retrofit:2.5.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'com.squareup.wire:wire-runtime:2.3.0-RC1'
implementation 'com.squareup.retrofit2:retrofit-adapters:2.5.0'
implementation 'com.squareup.retrofit2:converter-wire:2.5.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
其中RxAndroid是为优雅地处理异步请求和解决线程调度问题;Wire用于将请求结果转换为实体类型,并且wire是生成ProtoBuf文件的一种,没有用官方的protobuf生成java文件,主要是为了解决64k限制,减少生成java的代码量,但需要注意的是wire生成的java文件需要判断null,而官方的protobuf生成java文件是有默认值的,无需判断null。
proto文件
在客户端可能需要相关客户端信息,例如设备信息、设备类型,app信息、网络信息等。
// 设备类型
enum PBDeviceType {
DEVICE_ANDROID = 0; // 安卓
DEVICE_IOS = 1; // 苹果
DEVICE_PC = 2; // PC
}
// 设备
message PBDevice {
string deviceId = 1; // 设备ID
string deviceOs = 2; // 设备操作系统
string deviceModel = 3; // 设备模型
PBDeviceType deviceType = 4; // 设备类型,参考PBDeviceType
}
// 网络类型
enum PBNetworkType {
NET_UNKNOWN = 0; // 未知网络
NET_WIFI = 1; // WIFI
NET_2G = 2; // 2G网络
NET_3G = 3; // 3G网络
NET_4G = 4; // 4G网络
}
// APP信息
message PBAppInfo {
string versionName = 1; // 应用程序版本名
uint32 versionCode = 2; // 应用程序版本号
PBNetworkType network = 3; // 网络信息
PBDevice device = 4; // 设备信息
}
定义Request和Response
// 消息请求包
message PBRequest {
uint32 type = 1; // 消息类型
bytes messageData = 2; // 请求数据
uint64 timestamp = 3; // 客户端时间戳
PBAppInfo appInfo = 4; // APP信息
}
// 消息响应包
message PBResponse {
uint32 type = 1; // 消息类型
bytes messageData = 2; // 返回数据
uint32 resultCode = 3; // 返回的结果码
string resultInfo = 4; // 返回的结果消息提示文本(用于错误提示)
}
使用命令生成相关的Java文件
java -jar ${wire_compiler} --proto_path=${protoPath} --java_out=${modelPath} $1
- wire_compiler:wire.jar的存放路径
- protoPath:proto文件存放路径
-
modelPath:生成java文件存放路径
图片.png
请求接口
定义一个Service的接口类,管理所有请求接口
public interface Service {
@POST("users/new")
Observable<Response<PBResponse>> sendMessage(@Body PBRequest request);
}
请求处理
定义一个RetrofitHandler类,处理retrofit的初始化和发送请求,该类使用静态单例进行初始化。
private Service mService; // 请求接口
private static String mServiceUrl; // 请求url
private static class Holder {
private static final RetrofitHandler INSTANCE = new RetrofitHandler();
}
private RetrofitHandler() {
init();
}
public static RetrofitHandler getInstance(String serviceUrl) {
mServiceUrl = serviceUrl;
return Holder.INSTANCE;
}
retrofit的初始化,使用wire的factory进行数据转换,使用rxjava2进行适配。
private void init() {
assert mServiceUrl != null;
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(mServiceUrl)
.client(createClient())
.addConverterFactory(WireConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
mService = retrofit.create(Service.class);
}
private OkHttpClient createClient() {
return new OkHttpClient.Builder()
.retryOnConnectionFailure(true)
.connectionPool(new ConnectionPool(5, 1, TimeUnit.MINUTES))
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(20, TimeUnit.SECONDS)
.writeTimeout(5, TimeUnit.SECONDS)
.build();
}
retrofit进行接口调用发送请求
public Observable<PBResponse> send(final PBRequest request) {
return mService.sendMessage(request)
.map(new Function<Response<PBResponse>, PBResponse>() {
@Override
public PBResponse apply(Response<PBResponse> response) throws Exception {
if (response == null) {
return failed(request.type, 1001, "未知错误");
}
if (response.code() != 200 || response.body() == null) {
return failed(request.type, response.code(), response.message());
}
return response.body();
}
});
}
private PBResponse failed(int type, int code, String info) {
return new PBResponse.Builder()
.type(type)
.resultCode(code)
.resultInfo(info)
.build();
}
返回处理
定义ApiResult的泛型返回模型,对返回数据进行二次处理
public class ApiResult<T extends Message> {
private int code; // 返回码
private String message; // 返回信息
private T response; // 返回数据
public ApiResult(int code, String message, T response) {
this.code = code;
this.message = message;
this.response = response;
}
public int getCode() {
return code;
}
public String getMessage() {
return message;
}
public T getResponse() {
return response;
}
}
定义ApiCallback泛型回调处理类,给业务层进行相关的业务处理,如刷新UI等。
public class ApiCallback<T extends Message> {
private boolean isDisposed = false; // 返回处理标志
public boolean isDisposed() {
return isDisposed;
}
public void setDisposed(boolean disposed) {
isDisposed = disposed;
}
public void onStart() {
// 请求开始
}
public void onSuccess(T response) {
}
public void onFailure(int code, String message) {
}
public void onCompleted() {
// 请求完成
}
}
定义IRquest接口,给业务端初始化相关客户端信息
public interface IRequest {
String getVersionName();
Integer getVersionCode();
PBNetworkType getNetworkType();
PBDevice getDevice();
}
定义HttpManager类,使用单例进行实例化
private static class Holder {
private static final HttpManager INSTANCE = new HttpManager();
}
private HttpManager() {
}
public static HttpManager getInstance() {
return HttpManager.Holder.INSTANCE;
}
定义一个初始化方法,可以在application进行初始化
public void init(IRequest request, String url){
mRequest = request;
mHandler = RetrofitHandler.getInstance(url);
}
使用Rxjava2进行返回处理
@SuppressLint("CheckResult")
public <T extends Message, P extends Message> Disposable request(final T request, final int messageType,
final Class<P> pClass, final ApiCallback<P> apiCallback) {
Observable observable = mHandler.send(buildBody(request, messageType)) // 发送请求
.map(new Function<PBResponse, ApiResult<? extends Message>>() {
@Override
public ApiResult<? extends Message> apply(PBResponse pbResponse) throws Exception {
return apiResult(pClass, pbResponse);
}
});
observable.doOnDispose(new Action() { // 业务端取消订阅时调用
@Override
public void run() throws Exception {
apiCallback.setDisposed(true);
}
});
return toSubscribe(observable, new Consumer<ApiResult<P>>() {
@Override
public void accept(ApiResult<P> apiResult) throws Exception {
if (apiCallback == null || apiCallback.isDisposed()) return;
try {
if (apiResult.getCode() == 200) {
apiCallback.onSuccess(apiResult.getResponse());
} else {
apiCallback.onFailure(apiResult.getCode(), apiResult.getMessage());
}
} catch (Exception ex) {
apiCallback.onFailure(1004, "客户端处理异常");
} finally {
apiCallback.onCompleted();
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
if (apiCallback == null || apiCallback.isDisposed()) return;
//客户端本地的异常,如断网等
try {
apiCallback.onFailure(1005, "网络连接错误");
} catch (Exception e) {
} finally {
apiCallback.onCompleted();
}
}
}, new Action() {
@Override
public void run() {
if (apiCallback == null || apiCallback.isDisposed()) return;
//onCompleted will not be called when occurs network exception, like disconnected/timeout, replace invoking at onNext/onError
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) {
if (apiCallback == null || apiCallback.isDisposed()) return;
apiCallback.onStart();
}
});
}
// 异步订阅
private <T> Disposable toSubscribe(Observable<T> o, Consumer<T> onNext, Consumer<Throwable> onError, Action onComplete, Consumer<Disposable> onSubscribe) {
return o.subscribeOn(Schedulers.io())// io线程处理
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())// 主线程处理
.subscribe(onNext, onError, onComplete, onSubscribe);
}
使用
初始化
HttpManager.getInstance().init(new RequestInfo(), mServerUrl);
使用MVP或者MVVM的架构,在P或者VM层调用请求
HttpManager.getInstance().request(new PBAppInfo.Builder().build(), 1001, PBResponse.class, new ApiCallback<PBResponse>() {
@Override
public void onSuccess(PBResponse response) {
super.onSuccess(response);
}
@Override
public void onFailure(int code, String message) {
super.onFailure(code, message);
}
});