Retrofit2源码试读(二)
1.文章介绍
Retrofit2+Rxjava2+Glide+MVP架构正是火热,以学习角度出发,分模块来熟悉下这种架构的各个模块源码以及实现方式。
什么情况下使用Retrofit2+Rxjava2呢?Retrofit2+Rxjava2的主要优势是它的解耦性,在项目越复杂时体现出的优势越明显。
2.干货
继续上一篇的分析,在本次分析中创建的是ExecutorCallAdapterFactory:
final class ExecutorCallAdapterFactory extends CallAdapter.Factory {
final Executor callbackExecutor;
ExecutorCallAdapterFactory(Executor callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}
@Override public @Nullable CallAdapter<?, ?> get(
Type returnType, Annotation[] annotations, Retrofit retrofit) {
// returnType在文章之前分析过 是Call<DataModel> ,getRawType返回是Call
if (getRawType(returnType) != Call.class) {
return null;
}
//这里根据returnType本文返回为Call<DataModel> ,那么responseType就是DataModel
final Type responseType = Utils.getCallResponseType(returnType);
return new CallAdapter<Object, Call<?>>() {
@Override public Type responseType() {
return responseType;
}
@Override public Call<Object> adapt(Call<Object> call) {
return new ExecutorCallbackCall<>(callbackExecutor, call);
}
};
}
...
}
传入的参数是Call<DataModel>,以下相当于通过反射获取泛型的参数DataModel:
static Type getCallResponseType(Type returnType) {
if (!(returnType instanceof ParameterizedType)) {
throw new IllegalArgumentException(
"Call return type must be parameterized as Call<Foo> or Call<? extends Foo>");
}
return getParameterUpperBound(0, (ParameterizedType) returnType);
}
static Type getParameterUpperBound(int index, ParameterizedType type) {
Type[] types = type.getActualTypeArguments();
if (index < 0 || index >= types.length) {
throw new IllegalArgumentException(
"Index " + index + " not in range [0," + types.length + ") for " + type);
}
Type paramType = types[index];
if (paramType instanceof WildcardType) {
return ((WildcardType) paramType).getUpperBounds()[0];
}
return paramType;
}
可以看到在Retrofit中,如果不主动添加CallAdapter,那么声明的方法的返回值必须是Call类。这里本应该分析到了adapt,把数据转换成这个Call类型,这个函数在文章后面会用到这里先不分析。
CallAdapter返回的参数responseType非常重要,就是泛型的实际类型,后面也会用得上。
网络请求适配后,还需要有数据格式的适配:
//这里就把responseType用上了,记住是返回的实际类型,本文是DataModel
Converter<ResponseBody, ResponseT> responseConverter =
createResponseConverter(retrofit, method, responseType);
private static <ResponseT> Converter<ResponseBody, ResponseT> createResponseConverter(
Retrofit retrofit, Method method, Type responseType) {
Annotation[] annotations = method.getAnnotations();
try {
return retrofit.responseBodyConverter(responseType, annotations);
} catch (RuntimeException e) { // Wide exception range because factories are user code.
throw methodError(method, e, "Unable to create converter for %s", responseType);
}
}
public <T> Converter<ResponseBody, T> responseBodyConverter(Type type, Annotation[] annotations) {
return nextResponseBodyConverter(null, type, annotations);
}
public <T> Converter<ResponseBody, T> nextResponseBodyConverter(
@Nullable Converter.Factory skipPast, Type type, Annotation[] annotations) {
checkNotNull(type, "type == null");
checkNotNull(annotations, "annotations == null");
int start = converterFactories.indexOf(skipPast) + 1;
for (int i = start, count = converterFactories.size(); i < count; i++) {
Converter<ResponseBody, ?> converter =
converterFactories.get(i).responseBodyConverter(type, annotations, this);
if (converter != null) {
//noinspection unchecked
return (Converter<ResponseBody, T>) converter;
}
}
StringBuilder builder = new StringBuilder("Could not locate ResponseBody converter for ")
.append(type)
.append(".\n");
if (skipPast != null) {
builder.append(" Skipped:");
for (int i = 0; i < start; i++) {
builder.append("\n * ").append(converterFactories.get(i).getClass().getName());
}
builder.append('\n');
}
builder.append(" Tried:");
for (int i = start, count = converterFactories.size(); i < count; i++) {
builder.append("\n * ").append(converterFactories.get(i).getClass().getName());
}
throw new IllegalArgumentException(builder.toString());
}
和CallAdapter类似的处理,在未声明Converter时,默认有一个new BuiltInConverters()和platform.defaultConverterFactories():
List<? extends Converter.Factory> defaultConverterFactories() {
return emptyList();
}
defaultConverterFactories返回的是一个空的List,那么BuiltInConverters是否是默认使用的呢?
final class BuiltInConverters extends Converter.Factory {
/** Not volatile because we don't mind multiple threads discovering this. */
private boolean checkForKotlinUnit = true;
@Override public @Nullable Converter<ResponseBody, ?> responseBodyConverter(
Type type, Annotation[] annotations, Retrofit retrofit) {
//根据之前的分析 这里是Call泛型的实际类型 本文是DataModel
if (type == ResponseBody.class) {
return Utils.isAnnotationPresent(annotations, Streaming.class)
? StreamingResponseBodyConverter.INSTANCE
: BufferingResponseBodyConverter.INSTANCE;
}
if (type == Void.class) {
return VoidResponseBodyConverter.INSTANCE;
}
if (checkForKotlinUnit) {
try {
if (type == Unit.class) {
return UnitResponseBodyConverter.INSTANCE;
}
} catch (NoClassDefFoundError ignored) {
checkForKotlinUnit = false;
}
}
return null;
}
...
}
这里可以看到如果不配置Converter时只能对ResponseBody和Void做处理,显然本文的Demo必须要声明Converter能处理自定义的Class类型。
在本文中通过Gson把json数据映射为对象,按如下配置:
addConverterFactory(GsonConverterFactory.create())
public final class GsonConverterFactory extends Converter.Factory {
/**
* Create an instance using a default {@link Gson} instance for conversion. Encoding to JSON and
* decoding from JSON (when no charset is specified by a header) will use UTF-8.
*/
public static GsonConverterFactory create() {
return create(new Gson());
}
/**
* Create an instance using {@code gson} for conversion. Encoding to JSON and
* decoding from JSON (when no charset is specified by a header) will use UTF-8.
*/
@SuppressWarnings("ConstantConditions") // Guarding public API nullability.
public static GsonConverterFactory create(Gson gson) {
if (gson == null) throw new NullPointerException("gson == null");
return new GsonConverterFactory(gson);
}
private final Gson gson;
private GsonConverterFactory(Gson gson) {
this.gson = gson;
}
...
}
GsonConverterFactory就是对Gson的封装处理,数据格式的适配函数如下:
@Override
public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations,
Retrofit retrofit) {
TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
return new GsonResponseBodyConverter<>(gson, adapter);
}
public <T> TypeAdapter<T> getAdapter(TypeToken<T> type) {
TypeAdapter<?> cached = typeTokenCache.get(type == null ? NULL_KEY_SURROGATE : type);
if (cached != null) {
return (TypeAdapter<T>) cached;
}
Map<TypeToken<?>, FutureTypeAdapter<?>> threadCalls = calls.get();
boolean requiresThreadLocalCleanup = false;
if (threadCalls == null) {
threadCalls = new HashMap<TypeToken<?>, FutureTypeAdapter<?>>();
calls.set(threadCalls);
requiresThreadLocalCleanup = true;
}
// the key and value type parameters always agree
FutureTypeAdapter<T> ongoingCall = (FutureTypeAdapter<T>) threadCalls.get(type);
if (ongoingCall != null) {
return ongoingCall;
}
try {
FutureTypeAdapter<T> call = new FutureTypeAdapter<T>();
threadCalls.put(type, call);
for (TypeAdapterFactory factory : factories) {
TypeAdapter<T> candidate = factory.create(this, type);
if (candidate != null) {
call.setDelegate(candidate);
typeTokenCache.put(type, candidate);
return candidate;
}
}
throw new IllegalArgumentException("GSON cannot handle " + type);
} finally {
threadCalls.remove(type);
if (requiresThreadLocalCleanup) {
calls.remove();
}
}
}
从上面的函数可以看到,从factories中去查找匹配的TypeAdapter。返回去看这个函数:
TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
这里涉及到TypeToken:
public static TypeToken<?> get(Type type) {
return new TypeToken<Object>(type);
}
TypeToken(Type type) {
//这里传入的参数是Call泛型的实际类型,本文是DataModel
this.type = $Gson$Types.canonicalize($Gson$Preconditions.checkNotNull(type));
this.rawType = (Class<? super T>) $Gson$Types.getRawType(this.type);
this.hashCode = this.type.hashCode();
}
public static Type canonicalize(Type type) {
// 很显然,本文可以通过这个TypeAdapter来转完成数据适配,这里type在本文中是DataModel
if (type instanceof Class) {//原始类型
Class<?> c = (Class<?>) type;
return c.isArray() ? new GenericArrayTypeImpl(canonicalize(c.getComponentType())) : c;
} else if (type instanceof ParameterizedType) {//参数化类型
ParameterizedType p = (ParameterizedType) type;
return new ParameterizedTypeImpl(p.getOwnerType(),
p.getRawType(), p.getActualTypeArguments());
} else if (type instanceof GenericArrayType) {//数组类型
GenericArrayType g = (GenericArrayType) type;
return new GenericArrayTypeImpl(g.getGenericComponentType());
} else if (type instanceof WildcardType) {//通配符类型
WildcardType w = (WildcardType) type;
return new WildcardTypeImpl(w.getUpperBounds(), w.getLowerBounds());
} else {
// type is either serializable as-is or unsupported
return type;
}
}
public static Class<?> getRawType(Type type) {
// 很显然,本文直接返回DataModel
if (type instanceof Class<?>) {
// type is a normal class.
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) type;
// I'm not exactly sure why getRawType() returns Type instead of Class.
// Neal isn't either but suspects some pathological case related
// to nested classes exists.
Type rawType = parameterizedType.getRawType();
checkArgument(rawType instanceof Class);
return (Class<?>) rawType;
} else if (type instanceof GenericArrayType) {
Type componentType = ((GenericArrayType)type).getGenericComponentType();
return Array.newInstance(getRawType(componentType), 0).getClass();
} else if (type instanceof TypeVariable) {
// we could use the variable's bounds, but that won't work if there are multiple.
// having a raw type that's more general than necessary is okay
return Object.class;
} else if (type instanceof WildcardType) {
return getRawType(((WildcardType) type).getUpperBounds()[0]);
} else {
String className = type == null ? "null" : type.getClass().getName();
throw new IllegalArgumentException("Expected a Class, ParameterizedType, or "
+ "GenericArrayType, but <" + type + "> is of type " + className);
}
}
在Gson初始化时添加了各种TypeAdapters:
Gson(final Excluder excluder, final FieldNamingStrategy fieldNamingStrategy,
final Map<Type, InstanceCreator<?>> instanceCreators, boolean serializeNulls,
boolean complexMapKeySerialization, boolean generateNonExecutableGson, boolean htmlSafe,
boolean prettyPrinting, boolean lenient, boolean serializeSpecialFloatingPointValues,
LongSerializationPolicy longSerializationPolicy,
List<TypeAdapterFactory> typeAdapterFactories) {
this.constructorConstructor = new ConstructorConstructor(instanceCreators);
this.excluder = excluder;
this.fieldNamingStrategy = fieldNamingStrategy;
this.serializeNulls = serializeNulls;
this.generateNonExecutableJson = generateNonExecutableGson;
this.htmlSafe = htmlSafe;
this.prettyPrinting = prettyPrinting;
this.lenient = lenient;
List<TypeAdapterFactory> factories = new ArrayList<TypeAdapterFactory>();
// built-in type adapters that cannot be overridden
factories.add(TypeAdapters.JSON_ELEMENT_FACTORY);
factories.add(ObjectTypeAdapter.FACTORY);
// the excluder must precede all adapters that handle user-defined types
factories.add(excluder);
// user's type adapters
factories.addAll(typeAdapterFactories);
// type adapters for basic platform types
factories.add(TypeAdapters.STRING_FACTORY);
factories.add(TypeAdapters.INTEGER_FACTORY);
factories.add(TypeAdapters.BOOLEAN_FACTORY);
factories.add(TypeAdapters.BYTE_FACTORY);
factories.add(TypeAdapters.SHORT_FACTORY);
TypeAdapter<Number> longAdapter = longAdapter(longSerializationPolicy);
factories.add(TypeAdapters.newFactory(long.class, Long.class, longAdapter));
factories.add(TypeAdapters.newFactory(double.class, Double.class,
doubleAdapter(serializeSpecialFloatingPointValues)));
factories.add(TypeAdapters.newFactory(float.class, Float.class,
floatAdapter(serializeSpecialFloatingPointValues)));
factories.add(TypeAdapters.NUMBER_FACTORY);
factories.add(TypeAdapters.ATOMIC_INTEGER_FACTORY);
factories.add(TypeAdapters.ATOMIC_BOOLEAN_FACTORY);
factories.add(TypeAdapters.newFactory(AtomicLong.class, atomicLongAdapter(longAdapter)));
factories.add(TypeAdapters.newFactory(AtomicLongArray.class, atomicLongArrayAdapter(longAdapter)));
factories.add(TypeAdapters.ATOMIC_INTEGER_ARRAY_FACTORY);
factories.add(TypeAdapters.CHARACTER_FACTORY);
factories.add(TypeAdapters.STRING_BUILDER_FACTORY);
factories.add(TypeAdapters.STRING_BUFFER_FACTORY);
factories.add(TypeAdapters.newFactory(BigDecimal.class, TypeAdapters.BIG_DECIMAL));
factories.add(TypeAdapters.newFactory(BigInteger.class, TypeAdapters.BIG_INTEGER));
factories.add(TypeAdapters.URL_FACTORY);
factories.add(TypeAdapters.URI_FACTORY);
factories.add(TypeAdapters.UUID_FACTORY);
factories.add(TypeAdapters.CURRENCY_FACTORY);
factories.add(TypeAdapters.LOCALE_FACTORY);
factories.add(TypeAdapters.INET_ADDRESS_FACTORY);
factories.add(TypeAdapters.BIT_SET_FACTORY);
factories.add(DateTypeAdapter.FACTORY);
factories.add(TypeAdapters.CALENDAR_FACTORY);
factories.add(TimeTypeAdapter.FACTORY);
factories.add(SqlDateTypeAdapter.FACTORY);
factories.add(TypeAdapters.TIMESTAMP_FACTORY);
factories.add(ArrayTypeAdapter.FACTORY);
factories.add(TypeAdapters.CLASS_FACTORY);
// type adapters for composite and user-defined types
factories.add(new CollectionTypeAdapterFactory(constructorConstructor));
factories.add(new MapTypeAdapterFactory(constructorConstructor, complexMapKeySerialization));
this.jsonAdapterFactory = new JsonAdapterAnnotationTypeAdapterFactory(constructorConstructor);
factories.add(jsonAdapterFactory);
factories.add(TypeAdapters.ENUM_FACTORY);
factories.add(new ReflectiveTypeAdapterFactory(
constructorConstructor, fieldNamingStrategy, excluder, jsonAdapterFactory));
this.factories = Collections.unmodifiableList(factories);
}
查找匹配的TypeAdapters核心代码如下:
for (TypeAdapterFactory factory : factories) {
TypeAdapter<T> candidate = factory.create(this, type);
if (candidate != null) {
call.setDelegate(candidate);
typeTokenCache.put(type, candidate);
return candidate;
}
}
只有在TypeAdapter的create到匹配type时,才会使用该TypeAdapter。注意这里的type是声明泛型的实际类型,按本文的分析就是DataModel。
对factories举个例子,例如:
factories.add(TypeAdapters.JSON_ELEMENT_FACTORY);
public static final TypeAdapterFactory JSON_ELEMENT_FACTORY
= newTypeHierarchyFactory(JsonElement.class, JSON_ELEMENT);
/**
* Returns a factory for all subtypes of {@code typeAdapter}. We do a runtime check to confirm
* that the deserialized type matches the type requested.
*/
public static <T1> TypeAdapterFactory newTypeHierarchyFactory(
final Class<T1> clazz, final TypeAdapter<T1> typeAdapter) {
return new TypeAdapterFactory() {
@SuppressWarnings("unchecked")
@Override public <T2> TypeAdapter<T2> create(Gson gson, TypeToken<T2> typeToken) {
//从TypeToken源码可知,这里返回的就是实际类型,本文的DataModel
final Class<? super T2> requestedType = typeToken.getRawType();
if (!clazz.isAssignableFrom(requestedType)) {
return null;
}
return (TypeAdapter<T2>) new TypeAdapter<T1>() {
@Override public void write(JsonWriter out, T1 value) throws IOException {
typeAdapter.write(out, value);
}
@Override public T1 read(JsonReader in) throws IOException {
T1 result = typeAdapter.read(in);
if (result != null && !requestedType.isInstance(result)) {
throw new JsonSyntaxException("Expected a " + requestedType.getName()
+ " but was " + result.getClass().getName());
}
return result;
}
};
}
@Override public String toString() {
return "Factory[typeHierarchy=" + clazz.getName() + ",adapter=" + typeAdapter + "]";
}
};
}
通过isAssignableFrom接口判断是否可以使用JSON_ELEMENT_FACTORY。很明显这里并不能,那么会使用哪个TypeAdapterFactory呢?继续分析,我们看到最后有一个ReflectiveTypeAdapterFactory:
factories.add(new ReflectiveTypeAdapterFactory(
constructorConstructor, fieldNamingStrategy, excluder, jsonAdapterFactory));
public ReflectiveTypeAdapterFactory(ConstructorConstructor constructorConstructor,
FieldNamingStrategy fieldNamingPolicy, Excluder excluder,
JsonAdapterAnnotationTypeAdapterFactory jsonAdapterFactory) {
this.constructorConstructor = constructorConstructor;
this.fieldNamingPolicy = fieldNamingPolicy;
this.excluder = excluder;
this.jsonAdapterFactory = jsonAdapterFactory;
}
@Override public <T> TypeAdapter<T> create(Gson gson, final TypeToken<T> type) {
Class<? super T> raw = type.getRawType();
if (!Object.class.isAssignableFrom(raw)) {
return null; // it's a primitive!
}
ObjectConstructor<T> constructor = constructorConstructor.get(type);
return new Adapter<T>(constructor, getBoundFields(gson, type, raw));
}
private Map<String, BoundField> getBoundFields(Gson context, TypeToken<?> type, Class<?> raw) {
Map<String, BoundField> result = new LinkedHashMap<String, BoundField>();
if (raw.isInterface()) {
return result;
}
Type declaredType = type.getType();
while (raw != Object.class) {
Field[] fields = raw.getDeclaredFields();
for (Field field : fields) {
boolean serialize = excludeField(field, true);
boolean deserialize = excludeField(field, false);
if (!serialize && !deserialize) {
continue;
}
field.setAccessible(true);
Type fieldType = $Gson$Types.resolve(type.getType(), raw, field.getGenericType());
List<String> fieldNames = getFieldNames(field);
BoundField previous = null;
for (int i = 0, size = fieldNames.size(); i < size; ++i) {
String name = fieldNames.get(i);
if (i != 0) serialize = false; // only serialize the default name
BoundField boundField = createBoundField(context, field, name,
TypeToken.get(fieldType), serialize, deserialize);
BoundField replaced = result.put(name, boundField);
if (previous == null) previous = replaced;
}
if (previous != null) {
throw new IllegalArgumentException(declaredType
+ " declares multiple JSON fields named " + previous.name);
}
}
type = TypeToken.get($Gson$Types.resolve(type.getType(), raw, raw.getGenericSuperclass()));
raw = type.getRawType();
}
return result;
}
以上代码通过反射动态实现对Class类的映射,然后在read、write方法里面拆、组装数据:
public static final class Adapter<T> extends TypeAdapter<T> {
private final ObjectConstructor<T> constructor;
private final Map<String, BoundField> boundFields;
Adapter(ObjectConstructor<T> constructor, Map<String, BoundField> boundFields) {
this.constructor = constructor;
this.boundFields = boundFields;
}
@Override public T read(JsonReader in) throws IOException {
if (in.peek() == JsonToken.NULL) {
in.nextNull();
return null;
}
T instance = constructor.construct();
try {
in.beginObject();
while (in.hasNext()) {
String name = in.nextName();
BoundField field = boundFields.get(name);
if (field == null || !field.deserialized) {
in.skipValue();
} else {
field.read(in, instance);
}
}
} catch (IllegalStateException e) {
throw new JsonSyntaxException(e);
} catch (IllegalAccessException e) {
throw new AssertionError(e);
}
in.endObject();
return instance;
}
@Override public void write(JsonWriter out, T value) throws IOException {
if (value == null) {
out.nullValue();
return;
}
out.beginObject();
try {
for (BoundField boundField : boundFields.values()) {
if (boundField.writeField(value)) {
out.name(boundField.name);
boundField.write(out, value);
}
}
} catch (IllegalAccessException e) {
throw new AssertionError(e);
}
out.endObject();
}
}
Gson就是这样动态实现了数据适配。
当调用了loadWeatherInfo(***)这个接口后,经过动态代理继而中转到HttpServiceMethod:
private HttpServiceMethod(RequestFactory requestFactory, okhttp3.Call.Factory callFactory,
CallAdapter<ResponseT, ReturnT> callAdapter,
Converter<ResponseBody, ResponseT> responseConverter) {
this.requestFactory = requestFactory;
this.callFactory = callFactory;
this.callAdapter = callAdapter;
this.responseConverter = responseConverter;
}
@Override ReturnT invoke(Object[] args) {
return callAdapter.adapt(
new OkHttpCall<>(requestFactory, args, callFactory, responseConverter));
}
最后我们看到实际上是调用到我们之前配置的callAdapter的adapt接口,之前我还特意标注了一下adapt这个接口是作什么用,刚好这里给出解释了。
回去再看看adapt函数干什么了:
static final class ExecutorCallbackCall<T> implements Call<T> {
final Executor callbackExecutor;
final Call<T> delegate;
ExecutorCallbackCall(Executor callbackExecutor, Call<T> delegate) {
this.callbackExecutor = callbackExecutor;
this.delegate = delegate;
}
@Override public void enqueue(final Callback<T> callback) {
checkNotNull(callback, "callback == null");
delegate.enqueue(new Callback<T>() {
@Override public void onResponse(Call<T> call, final Response<T> response) {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
if (delegate.isCanceled()) {
// Emulate OkHttp's behavior of throwing/delivering an IOException on cancellation.
callback.onFailure(ExecutorCallbackCall.this, new IOException("Canceled"));
} else {
callback.onResponse(ExecutorCallbackCall.this, response);
}
}
});
}
@Override public void onFailure(Call<T> call, final Throwable t) {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
callback.onFailure(ExecutorCallbackCall.this, t);
}
});
}
});
}
@Override public boolean isExecuted() {
return delegate.isExecuted();
}
@Override public Response<T> execute() throws IOException {
return delegate.execute();
}
@Override public void cancel() {
delegate.cancel();
}
@Override public boolean isCanceled() {
return delegate.isCanceled();
}
@SuppressWarnings("CloneDoesntCallSuperClone") // Performing deep clone.
@Override public Call<T> clone() {
return new ExecutorCallbackCall<>(callbackExecutor, delegate.clone());
}
@Override public Request request() {
return delegate.request();
}
}
callbackExecutor.execute其实就是Handler.post,在ExecutorCallAdapterFactory的adapt方法里把Call代理给ExecutorCallbackCall了,ExecutorCallbackCall可以理解为Handler+Call的封装装,它的enqueue实际就是OkHttpCall.enqueue后把response通过主线程的Handler.post回调给应用。
至于为什么使用Handler,我自己的理解是:
1.借用Handler的消息队列来对任务排序
2.有某方法需要回调到主线程
至少我目前只想到这两个用途,因为真正异步涉及到的线程调度都在OkHttpCall内部的线程池了。
Call<DataModel> call = RetrofitUtils.getDeafault(WeatherService.WEATHERSERVICE_URL)
.create(WeatherService.class)
.loadWeatherInfo("101190201");
call.enqueue(new Callback<DataModel>() {
@Override
public void onResponse(Call<DataModel> call, Response<DataModel> response) {
DataModel model = response.body();
DataModel.WeatherInfoBean bean = model.getWeatherinfo();
Log.d(TAG, "onResponse:" + bean.getCity());
}
@Override
public void onFailure(Call<DataModel> call, Throwable t) {
Log.d(TAG, "onFailure:" + t.getMessage());
}
});
以上源码中的call实际上是一个ExecutorCallbackCall。
而call.enqueue实际上就是OkHttpCall.enqueue处理,根据继承多态的原理:
@Override public void enqueue(final Callback<T> callback) {
checkNotNull(callback, "callback == null");
okhttp3.Call call;
Throwable failure;
synchronized (this) {
if (executed) throw new IllegalStateException("Already executed.");
executed = true;
call = rawCall;
failure = creationFailure;
if (call == null && failure == null) {
try {
call = rawCall = createRawCall();
} catch (Throwable t) {
throwIfFatal(t);
failure = creationFailure = t;
}
}
}
if (failure != null) {
callback.onFailure(this, failure);
return;
}
if (canceled) {
call.cancel();
}
call.enqueue(new okhttp3.Callback() {
@Override public void onResponse(okhttp3.Call call, okhttp3.Response rawResponse) {
Response<T> response;
try {
response = parseResponse(rawResponse);
} catch (Throwable e) {
throwIfFatal(e);
callFailure(e);
return;
}
try {
callback.onResponse(OkHttpCall.this, response);
} catch (Throwable t) {
t.printStackTrace();
}
}
@Override public void onFailure(okhttp3.Call call, IOException e) {
callFailure(e);
}
private void callFailure(Throwable e) {
try {
callback.onFailure(OkHttpCall.this, e);
} catch (Throwable t) {
t.printStackTrace();
}
}
});
}
Retrofit2.0默认支持okhttp,可以看到Retrofit的enqueue实际上是依赖okhttp的enqueue实现的。通过okhttp创建了一个Call,并且把实际的网络请求让okhttp的队列去处理,最后把网络请求的结果通过callback返回给调用者。这里也可以感受到写代码不同的是,要么是在函数里面直接执行业务返回数据的同步方式,要么是先注册CallBack然后请求数据通过callback把数据回调回来的异步方式,而Retrofit是虽然调用了某个接口(比如本文的loadWeatherInfo),但是并没有实际的去执行某个业务(比如从网络上去请求数据),而是在execute或者enqueue的时候才真正的去执行实际业务(比如真正根据配置去创建一个网络请求)。
private okhttp3.Call createRawCall() throws IOException {
okhttp3.Call call = callFactory.newCall(requestFactory.create(args));
if (call == null) {
throw new NullPointerException("Call.Factory returned null.");
}
return call;
}
requestFactory.create(args)就是创建一个Request,但是callFactory是在哪创建的呢?细心的人可能会在文章前面找到:
public Retrofit build() {
if (baseUrl == null) {
throw new IllegalStateException("Base URL required.");
}
okhttp3.Call.Factory callFactory = this.callFactory;
if (callFactory == null) {
callFactory = new OkHttpClient();
}
...
}
如果在Retrofit初始化时没有创建callFactory,就会默认使用OkHttpClient,这种情况的函数流程如下:
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
this.timeout = new AsyncTimeout() {
@Override protected void timedOut() {
cancel();
}
};
this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
那么在的OkHttpCall里面的call.enqueue调用在继承多态的特性下就是如下调用:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
// 很简单的事件回调 可以看出这个是在调用线程
eventListener.callStart(this);
// 真正的实现异步处理
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
揭开异步机制的面纱:
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
}
promoteAndExecute();
}
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
Dispatcher默认的线程池:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
在Dispatcher内部先统计了正在执行的异步任务,来确保执行效率,然后把任务放在了线程池里执行。
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
AsyncCall继承于NamedRunnable:
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
...
@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
可以看到实际处理是调用execute,根据Java的继承多态特性,会调用到AsyncCall的execute,这里已经能看到数据的请求了,来看一下如何实现的:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
在这里添加了interceptors并且创建了一个RealInterceptorChain:
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
// index首次是0,然后index + 1实现链式调用
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 这里在之前添加过client.interceptors()、BridgeInterceptor等
Interceptor interceptor = interceptors.get(index);
// 通过各种interceptor对数据做处理
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
注意client.interceptors()是在最开始就处理的,比如声明的Log信息拦截器:
OkHttpClient.Builder builder = new OkHttpClient.Builder();
OkHttpClient okHttpClient = builder.build();
if (BuildConfig.DEBUG) {
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
builder.addInterceptor(loggingInterceptor);
}
通过一系列的interceptor,最终我们需要获取到网络请求到的数据,在最后一个interceptor CallServerInterceptor里:
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
获得的数据response就交给了new okhttp3.Callback来处理:
@Override public void onResponse(okhttp3.Call call, okhttp3.Response rawResponse) {
Response<T> response;
try {
response = parseResponse(rawResponse);
} catch (Throwable e) {
throwIfFatal(e);
callFailure(e);
return;
}
try {
callback.onResponse(OkHttpCall.this, response);
} catch (Throwable t) {
t.printStackTrace();
}
}
既然可以从网络上获取数据了,也就是说网络的适配介绍差不多了,那么从网络上获取的数据还需要转换为应用需要的数据,是如何处理的呢?接着往下分析:
Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
ResponseBody rawBody = rawResponse.body();
// Remove the body's source (the only stateful object) so we can pass the response along.
rawResponse = rawResponse.newBuilder()
.body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
.build();
int code = rawResponse.code();
if (code < 200 || code >= 300) {
try {
// Buffer the entire body to avoid future I/O.
ResponseBody bufferedBody = Utils.buffer(rawBody);
return Response.error(bufferedBody, rawResponse);
} finally {
rawBody.close();
}
}
if (code == 204 || code == 205) {
rawBody.close();
return Response.success(null, rawResponse);
}
ExceptionCatchingResponseBody catchingBody = new ExceptionCatchingResponseBody(rawBody);
try {
//很重要 这里使用了配置的Converter来转换数据
T body = responseConverter.convert(catchingBody);
return Response.success(body, rawResponse);
} catch (RuntimeException e) {
// If the underlying source threw an exception, propagate that rather than indicating it was
// a runtime exception.
catchingBody.throwIfCaught();
throw e;
}
}
从代码中可以看到,正常情况下Retrofit把从网络上获取的数据通过配置的Converter来转换成应用层需要的格式,还记得responseConverter是什么吗,回到文章里面去找找吧,对于本文的分析就是GsonResponseBodyConverter:
@Override public T convert(ResponseBody value) throws IOException {
JsonReader jsonReader = gson.newJsonReader(value.charStream());
try {
// 这里的adapter从哪来?读者是否还记得
T result = adapter.read(jsonReader);
if (jsonReader.peek() != JsonToken.END_DOCUMENT) {
throw new JsonIOException("JSON document was not fully consumed.");
}
return result;
} finally {
value.close();
}
}
本文的前部分就介绍过TypeAdapter,其中ReflectiveTypeAdapterFactory是本文使用到的Adapter,它的read是通过反射来实现类型的动态映射:
@Override public T read(JsonReader in) throws IOException {
if (in.peek() == JsonToken.NULL) {
in.nextNull();
return null;
}
T instance = constructor.construct();
try {
in.beginObject();
while (in.hasNext()) {
String name = in.nextName();
BoundField field = boundFields.get(name);
if (field == null || !field.deserialized) {
in.skipValue();
} else {
field.read(in, instance);
}
}
} catch (IllegalStateException e) {
throw new JsonSyntaxException(e);
} catch (IllegalAccessException e) {
throw new AssertionError(e);
}
in.endObject();
return instance;
}
处理完毕后的类型T就是Call泛型的实际类型,在本文中就是DataModel,数据处理成功后再通过应用层的回调把实例对象返回到应用:
try {
//这个response是Response<T>,泛型T已经转换为实际类型
callback.onResponse(OkHttpCall.this, response);
} catch (Throwable t) {
t.printStackTrace();
}
回调到应用层就是如下形式,以下源码中的new Callback<T>与上文的callback一致:
delegate.enqueue(new Callback<T>() {
@Override public void onResponse(Call<T> call, final Response<T> response) {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
if (delegate.isCanceled()) {
// Emulate OkHttp's behavior of throwing/delivering an IOException on cancellation.
callback.onFailure(ExecutorCallbackCall.this, new IOException("Canceled"));
} else {
callback.onResponse(ExecutorCallbackCall.this, response);
}
}
});
}
中间经过了一层ExecutorCallbackCall的调度,最终还是调度到应用层的Callback:
call.enqueue(new Callback<DataModel>() {
@Override
public void onResponse(Call<DataModel> call, Response<DataModel> response) {
DataModel model = response.body();
DataModel.WeatherInfoBean bean = model.getWeatherinfo();
Log.d(TAG, "onResponse:" + bean.getCity());
}
@Override
public void onFailure(Call<DataModel> call, Throwable t) {
Log.d(TAG, "onFailure:" + t.getMessage());
}
});
3.结束语
比较简陋的把Retrofit的源码摸了一遍,了解了其中部分实现机制,发现确实解耦的比较好,目前只是初试Retrofit源码,有个框架上的认识而已,通过积累会完善这个系列。
Rxjava的分析会是下一步的计划,反正都是难啃的骨头,一起加油!文章的分析流程不确保完全正确,有分析的不对的地方欢迎指正。