Android DemoAndroid开发俱乐部Android开发

RxJava+Retrofit2搭建网络请求组件完整配置、示例代

2018-04-27  本文已影响678人  乱世白衣

本文是对之前写的两篇文章的综合概述及全流程梳理
Android 利用RxJava和Retrofit搭建网络请求组件——监听回调及部分源码解析
Android 利用RxJava和Retrofit搭建网络请求组件——基础配置及部分源码解析

基础配置

一.定义网络请求方法的接口文件:ApiService.class(名字随意)

ApiService.class定义了各个请求方法,请求方式及参数类型均以注解形式标识,示例文件仅描述了GET与POST请求方式

/**
 * 注意:ApiService 必须是接口,且不能实现其他接口
 * Created by cuiyan on 16-10-18.
*/
public interface ApiService {
    /*************************GET 请求方式*************************/
   
    /**
    * ApiConstants.QQ_SPORT_API 请求接口地址api(相对路径)
    * 域名或ip部分会在构建Retrofit时设置
    * 返回Observable对象(内部包装了Call实例),订阅(subscribe)执行时会调用Call.execute()方法发送网络请求。
    * 默认情况下,返回结果为retrofit2.Call对象,只有在构建Retrofit实例时设置了RxJava2CallAdapterFactory才支持请求方法返回Observable对象
    */
    @GET(ApiConstants.QQ_SPORT_API)
    Observable<NewsResult> getQQSportNews(@Query("baid") String baid, @Query("apikey") String apiKey);

    @GET(ApiConstants.QQ_SPORT_API)
    Observable<NewsResult> getQQSportNews1(@QueryMap Map<String, String> paramsMap);

    /*************************POST 请求方式*************************/

    /**
    * @Body 注解参数均表示以实体形式提交请求数据
    */
    @POST(ApiConstants.QQ_SPORT_API)
    Observable<NewsResult> getQQSportNews2(@Body NewsResult bodyParam);

    /**
    * @Field 或 @FieldMap 注解参数均表示以表单形式提交参数,相应的,
    * 请求方法必须添加 @FormUrlEncoded 注解
    */
    @FormUrlEncoded
    @POST(ApiConstants.QQ_SPORT_API)
    Observable<NewsResult> getQQSportNews3(@Field("baid") String baid, @Field("apikey") String   apiKey);

    @FormUrlEncoded
    @POST(ApiConstants.QQ_SPORT_API)
    Observable<NewsResult> getQQSportNews4(@FieldMap Map<String, String> paramsMap);
}
二.ApiService实例构建及相关配置
public class BaseServiceUtil {
    private static final int DEFAULT_TIMEOUT = 10;
   
    /**
     @param serviceClass 网络请求接口描述文件类,步骤一中的ApiService.class或其他类似class
     @return S S实例,即ApiService实例
    */
    public static synchronized <S> S createService(Class<S> serviceClass) {
        return createService(serviceClass, null);
    }

    public static <S> S createService(Class<S> serviceClass, String baseUrl) {
        CommonInterceptor interceptor = new CommonInterceptor();
        OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder()
                .connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
                .readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
        Retrofit.Builder retrofitBuilder = new Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())   // 设置Converter.Factory(请求响应数据转换器),我们设置的是GsonConverterFactory
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create());  // 重点1:为Retrofit设置CallAdapter.Factory,这里我们设置的是RxJava2CallAdapterFactory。ApiService 中网络请求方法默认返回retrofit2.Call对象,添加RxJava2CallAdapterFactory后支持返回Observable对象
//      retrofitBuilder.callFactory()
        // 一定要设置且必须格式正确的baseUrl
        if (!TextUtils.isEmpty(baseUrl)) {
            retrofitBuilder.baseUrl(baseUrl);
        } else {
            retrofitBuilder.baseUrl(BuildConfig.BASE_URL);
        }
        clientBuilder.interceptors().clear();
        clientBuilder.interceptors().add(interceptor);  // 重点2:为OkHttpClient(实现了okhttp3.Call.Factory接口)设置拦截器

//        设置https证书
//        try {
//            clientBuilder.sslSocketFactory(RqbTrustManager.getInstance().getSSLSocketFactory("BKS", R.raw.rqb_ssl));
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
        OkHttpClient client = clientBuilder.build();

//      重点3:为Retrofit设置okhttp3.Call.Factory,OkHttpClient实现了okhttp3.Call.Factory接口,该方法内实际调用了
//      callFactory(okhttp3.Call.Factory factory)方法为Retrofit设置okhttp3.Call.Factory。此行与下一行可合并,此处有意拆开做标注
        retrofitBuilder.client(client); 

//      重点4:构建Retrofit实例
        Retrofit retrofit = retrofitBuilder.build();   

//      重点5:构建ApiService实例
        return retrofit.create(serviceClass);   
    }
}
三.网络请求订阅者(观察者)
 /**
 * 网络请求订阅者
 * Created by cuiyan on 16/6/2 14:09
 */
public class NetRequestSubscriber<T> implements Observer<T> {
    private Dialog progressDialog;
    private Disposable disposable;
    private NetRequestCallback<T> netRequestCallback;
    private Context context;

    /**
     * @param netRequestCallback 网络请求回调
     */
    public NetRequestSubscriber(@NonNull NetRequestCallback<T> netRequestCallback, Context context) {
        this(netRequestCallback, context, false, null);
    }

    /**
     * @param netRequestCallback    网络请求回调
     * @param showProgress          是否显示网络请求加载对话框
     * @param progressTip           loading提示语
     * @see NetProgressDialog
     */
    public NetRequestSubscriber(@NonNull final NetRequestCallback<T> netRequestCallback, Context context, boolean showProgress, String progressTip) {
        this.netRequestCallback = netRequestCallback;
        this.context = context;
        if (showProgress) {
            progressDialog = NetProgressDialog.getInstance(context, progressTip, new OnNetProgressCancelListener() {
                @Override
                public void onCancelRequest() {
                    cancelRequest() 
                }
            });
        }
    }

    /**
     * @param netRequestCallback 网络请求回调
     * @param progressDialog     dialog 自定义对话框
     */
    public NetRequestSubscriber(@NonNull NetRequestCallback<T> netRequestCallback, Context context, @NonNull Dialog progressDialog) {
        this.netRequestCallback = netRequestCallback;
        this.context = context;
        this.progressDialog = progressDialog;
    }


    @Override
    public void onSubscribe(@NonNull Disposable d) {
        this.disposable = d;
        showProgress();
        onRequestStart();
    }

    @Override
    public synchronized void onNext(final T t) {
        if (t == null) {
            onRequestResultNull();
        } else {
            if (t instanceof BaseResult && !Config.REQUEST_SUCCESS_CODE.equals(((BaseResult) t).getCode())) {
                ToastUtil.showToast(context, ((BaseResult) t).getMessage());
            }
            onRequestSuccess(t);
        }
    }

    @Override
    public synchronized void onError(Throwable throwable) {
        dismissProgress();
        onRequestError(throwable);
        if (throwable instanceof HttpException) {
            ToastUtil.showToast(context, ((HttpException) throwable).message() + ((HttpException) throwable).code());
        } else {
            if (BuildConfig.DEBUG) {
                ToastUtil.showToast(context, "error:" + throwable.getMessage());
            } else {
                ToastUtil.showToast(context, context.getString(R.string.error_net_request_failed));
            }
        }
    }

    /**
     * {@link NetRequestSubscriber#onError(Throwable)}
     * {@link Observer#onError(Throwable)}
     * {@link Observer#onComplete()} (Throwable)}
     * 该方法与onError方法互斥
     */
    @Override
    public void onComplete() {
        dismissProgress();
        netRequestCallback.onFinish();
    }

    private void onRequestStart() {
        if (Looper.myLooper() != context.getMainLooper()) {
            Handler handler = new Handler(context.getMainLooper());
            handler.post(new Runnable() {
                @Override
                public void run() {
                    netRequestCallback.onStart();
                }
            });
        } else {
            netRequestCallback.onStart();
        }
    }

    private void onRequestSuccess(final T t) {
        if (Looper.myLooper() != context.getMainLooper()) {
            Handler handler = new Handler(context.getMainLooper());
            handler.post(new Runnable() {
                @Override
                public void run() {
                    netRequestCallback.onSuccess(t);
                }
            });
        } else {
            netRequestCallback.onSuccess(t);
        }
    }

    private void onRequestResultNull() {
        if (Looper.myLooper() != context.getMainLooper()) {
            Handler handler = new Handler(context.getMainLooper());
            handler.post(new Runnable() {
                @Override
                public void run() {
                    netRequestCallback.onResultNull();
                }
            });
        } else {
            netRequestCallback.onResultNull();
        }
    }

    private void onRequestError(final Throwable throwable) {
        throwable.printStackTrace();
        if (Looper.myLooper() != context.getMainLooper()) {
            Handler handler = new Handler(context.getMainLooper());
            handler.post(new Runnable() {
                @Override
                public void run() {
                    netRequestCallback.onError(throwable);
                    netRequestCallback.onFinish();
                }
            });
        } else {
            netRequestCallback.onError(throwable);
            netRequestCallback.onFinish();
        }
    }

    /**
     *
     *
     */
    private void showProgress() {
        if (progressDialog != null && !progressDialog.isShowing()) {
            progressDialog.show();
        }
    }

    private void dismissProgress() {
        if (progressDialog != null && progressDialog.isShowing()) {
            progressDialog.dismiss();
        }
    }

    public void cancelRequest() {
        dismissProgress();
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
        }
        netRequestCallback.onCancel();
        netRequestCallback.onFinish();
    }
}
四.网络请求控制器
public class BaseController {
    /**
     * @param subscriber 订阅者
     */
    @SuppressWarnings("unchecked")
    public static synchronized void sendRequest(final NetRequestSubscriber subscriber, Observable observable) {
        observable.subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(subscriber);
    }

    /**
     * @param activity   用于与observable绑定,activity生命周期结束时,自动取消订阅
     * @param observable 被观察者
     * @param subscriber 订阅者
     */
    @SuppressWarnings("unchecked")
    public static synchronized void sendRequest(RxActivity activity, final NetRequestSubscriber subscriber, Observable observable) {
        observable.subscribeOn(Schedulers.io())
                .compose(activity.bindToLifecycle()) //防止内存泄漏,activity生命周期结束后取消订阅
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subscriber);
    }

    /**
     * @param fragment   用于与observable绑定,fragment生命周期结束时,自动取消订阅
     * @param subscriber 订阅者
     */
    @SuppressWarnings("unchecked")
    public static synchronized void sendRequest(RxFragment fragment, final NetRequestSubscriber subscriber, Observable observable) {
        observable.compose(fragment.bindToLifecycle()) //防止内存泄漏,fragment生命周期结束后取消订阅
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subscriber);
    }
}

上述代码构建了完整网络请求功能,下面贴出应用示例。完整示例:https://github.com/670832188/TestApp

private void getNews() {
    NetRequestSubscriber<NewsResult> subscriber = new NetRequestSubscriber<>(new NetRequestCallback<NewsResult>() {
        @Override
        public void onStart() {
            setContentState(STATE_NET_PROGRESS);
        }

        @Override
        public void onSuccess(@NonNull NewsResult newsResult) {
            if (newsResult.getData() != null && newsResult.getData().size() > 0) {
                setContentState(STATE_DATA_CONTENT);
                newsAdapter.updateDataList(newsResult.getData());
            } else {
                setContentState(STATE_DATA_EMPTY);
            }
        }

        @Override
        public void onResultNull() {
            setContentState(STATE_NET_ERROR);
        }

        @Override
        public void onError(Throwable throwable) {
            setContentState(STATE_NET_ERROR);
        }

        @Override
        public void onCancel() {
            super.onCancel();
        }

        @Override
        public void onFinish() {
            super.onFinish();
        }
    }, this);

    Observable<NewsResult> observable = BaseServiceUtil.createService(ApiService.class, ApiConstants.JUHE_BASE_URL).getQQSportNews("69", Constant.JUHE_API_KEY);
    BaseController.sendRequest(this, subscriber, observable);
}

流程梳理

上述代码中标注了五处重点:
1.Retrofit设置RxJava2CallAdapterFactory;
2.OkHttpClient(okhttp3.Call.Factory)设置Interceptor;
3.Retrofit设置OkHttpClient(okhttp3.Call.Factory);
4.构建Retrofit实例
5.构建ApiService实例
下面从Retrofit和ApiService实例构建为入口,分析基本流程及上述上述几个配置是如何工作的
先看一下创建Retrofit实例的方法Retrofit.build()源码

 1.   public Retrofit build() {
 2.       if (baseUrl == null) {
 3.           throw new IllegalStateException("Base URL required.");
 4.       }

 5.       okhttp3.Call.Factory callFactory = this.callFactory;
 6.       if (callFactory == null) {
 7.           callFactory = new OkHttpClient();
 8.        }

 9.       Executor callbackExecutor = this.callbackExecutor;
 10.      if (callbackExecutor == null) {
 11.          callbackExecutor = platform.defaultCallbackExecutor();
 12.      }

 13.      // Make a defensive copy of the adapters and add the default Call adapter.
 14.      List<CallAdapter.Factory> adapterFactories = new ArrayList<>(this.adapterFactories);
 15.      adapterFactories.add(platform.defaultCallAdapterFactory(callbackExecutor));

 16.      // Make a defensive copy of the converters.
 17.      List<Converter.Factory> converterFactories = new ArrayList<>(this.converterFactories);

 18.      return new Retrofit(callFactory, baseUrl, converterFactories, adapterFactories,callbackExecutor, validateEagerly);
 19.  }

第1-第3行:首先检测了之前设置的baseUrl,没有设置的话直接抛出异常。

第5-第8行:检测是否设置了okhttp3.Call.Factory,没有设置的话使用默认的OkHttpClient实例(前述代码中说过OkHttpClient实现了okhttp3.Call.Factory接口)。我们在代码中创建、配置了OkHttpClient实例(设置超时时间、拦截器),并设置于Retrofit实例。之所以没有使用默认的OkHttpClient,是因为我们需要根据需求自定义超时时间、拦截器等

第14-15行:获取CallAdapter.Factory列表,包括我们设置的RxJava2CallAdapterFactory和框架内置的默认CallAdapter.Factory,追踪源码可以看到,内置的是ExecutorCallAdapterFactory.ExecutorCallbackCall。
注意:列表的顺序,先添加的是我们设置的RxJava2CallAdapterFactory,请记住我们的RxJava2CallAdapterFactory是NO.1 NO.1 NO.1
第17行:获取Converter.Factory列表,包括我们设置的GsonConverterFactory和内置的Converter.Factory(参见Rerofit.Builder构造方法)。

第18行:callFactory、adapterFactories 等作为构建参数传入Retrofit构造方法创建Retrofit实例,看一下其构造方法:

Retrofit(okhttp3.Call.Factory callFactory, HttpUrl baseUrl,
        List<Converter.Factory> converterFactories, List<CallAdapter.Factory> adapterFactories,
        @Nullable Executor callbackExecutor, boolean validateEagerly) {
    this.callFactory = callFactory;
    this.baseUrl = baseUrl;
    this.converterFactories = unmodifiableList(converterFactories); // Defensive copy at call site.
    this.adapterFactories = unmodifiableList(adapterFactories); // Defensive copy at call site.
    this.callbackExecutor = callbackExecutor;
    this.validateEagerly = validateEagerly;
}

成员变量callFactory 是我们设置的OkHttpClient;converterFactories包含了我们设置的GsonConverterFactory;adapterFactories 包含了我们设置的RxJava2CallAdapterFactory。暂且先有些印象,方便后续分析。

下面看ApiService实例构建方法Retrofit .create(final Class<T> service)源码:

1.    public <T> T create(final Class<T> service) {
2.        Utils.validateServiceInterface(service);
3.        if (validateEagerly) {
4.            eagerlyValidateMethods(service);
5.        }
6.        return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service }, 
7.            new InvocationHandler() {
8.                private final Platform platform = Platform.get();

9.                @Override 
10                public Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
11.                   // If the method is a method from Object then defer to normal invocation.
12.                   if (method.getDeclaringClass() == Object.class) {
13.                       return method.invoke(this, args);
14.                   }
15.                   if (platform.isDefaultMethod(method)) {
16.                      return platform.invokeDefaultMethod(method, service, proxy, args);
17.                   }
18.                   ServiceMethod<Object, Object> serviceMethod = (ServiceMethod<Object, Object>) loadServiceMethod(method);
19.                   OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);  // 请留意okHttpCall 
20.                   return serviceMethod.callAdapter.adapt(okHttpCall);  // 请留心callAdapter
21.              }
22.         });
23.    }

很直接、很暴力啊,利用Proxy创建的ApiService代理对象。查看InvocationHandler的invoke方法,看看代理操作做了什么处理。我们只关心我们的网络请求方法,我们的网络请求方法都是ApiService定义的,第12行判断条件不满足,pass;第15行判断条件也不成立,具体原因可查看platform的创建方法。

现在只剩下18-20行是我们需要关心的。先看第20行,我们的网络请求被万恶的代理商劫持了,返回结果是serviceMethod.callAdapter.adapt(okHttpCall),如果你的记忆不是很差的话,应该记得我们的请求方法返回的Observable对象。是很明显,我们需要知道这个callAdapter是什么,了解它的adapt方法起到了什么作用。定位到第18行,深入loadServiceMethod方法窥视一下

1.   ServiceMethod<?, ?> loadServiceMethod(Method method) {
2.       ServiceMethod<?, ?> result = serviceMethodCache.get(method);
3.       if (result != null) return result;

4.       synchronized (serviceMethodCache) {
5.           result = serviceMethodCache.get(method);
6.           if (result == null) {
7.               result = new ServiceMethod.Builder<>(this, method).build();
8.               serviceMethodCache.put(method, result);
9.           }
10.      }
11.      return result;
12.  }

很遗憾,这个方法没有窥测到ServiceMethod是神马角色。定位到上述代码第7行,继续追击ServiceMethod(ServiceMethod.class)

1.   Builder(Retrofit retrofit, Method method) {
2.       this.retrofit = retrofit;
3.       this.method = method;
4.       this.methodAnnotations = method.getAnnotations();
5.       this.parameterTypes = method.getGenericParameterTypes();
6.       this.parameterAnnotationsArray = method.getParameterAnnotations();
7.   }

8.   public ServiceMethod build() {
9.       callAdapter = createCallAdapter();
10.      responseType = callAdapter.responseType();
11.      if (responseType == Response.class || responseType == okhttp3.Response.class) {
12.          throw methodError("'"+ Utils.getRawType(responseType).getName() + "' is not a valid response body type. Did you mean ResponseBody?");
13.      }
14.      responseConverter = createResponseConverter();

15.      for (Annotation annotation : methodAnnotations) {
16           parseMethodAnnotation(annotation);
17.      }

18.      // 此处省略一万个字...
19.      return new ServiceMethod<>(this);  // 此处要记住,后面有用
20.   }

// 构造方法一并看了
21.   ServiceMethod(Builder<R, T> builder) {
22.       this.callFactory = builder.retrofit.callFactory();  // 此处请留意
23.       this.callAdapter = builder.callAdapter;
24.       this.baseUrl = builder.retrofit.baseUrl();
25.       this.responseConverter = builder.responseConverter;
26.       this.httpMethod = builder.httpMethod;
27.       this.relativeUrl = builder.relativeUrl;
28.       this.headers = builder.headers;
29.       this.contentType = builder.contentType;
30.       this.hasBody = builder.hasBody;
31.       this.isFormEncoded = builder.isFormEncoded;
32.       this.isMultipart = builder.isMultipart;
33.       this.parameterHandlers = builder.parameterHandlers;
34.   }

看Builder构造方法第2行,ServiceMethod持有我们之前创建的Retrofit实例。再看第9行callAdapter = createCallAdapter(); 我们要找的callAdapter(CallAdapter)终于浮出水面。我们之前说过,创建Retrofit实例的时候我们为其设置了CallAdapter.Factory,就是那个RxJava2CallAdapterFactory...(此处伏笔~~)。
从第9行继续追踪源码,最终定位到Retrofit的nextCallAdapter方法,此方法返回的CallAdapter就是我们要寻找的目标了

1.   public CallAdapter<?, ?> nextCallAdapter(@Nullable CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) {
2.       checkNotNull(returnType, "returnType == null");
3.       checkNotNull(annotations, "annotations == null");

4.       int start = adapterFactories.indexOf(skipPast) + 1;
5.       for (int i = start, count = adapterFactories.size(); i < count; i++) {
6.           CallAdapter<?, ?> adapter = adapterFactories.get(i).get(returnType, annotations, this);
7.           if (adapter != null) {
8.               return adapter;
9.           }
10.      }
11.      // 此处省略一万字(异常处理相关)
12.   }

这段代码很就容易看懂了~。遍历CallAdapter.Factory列表,通过Factory的get方法获取匹配的CallAdapter,一猜就是通过我们设置的RxJava2CallAdapterFactory获取的。为什么这么说呢?还记得之前我们说过,我们设置的RxJava2CallAdapterFactory是第一顺位,状元郎。看一下第4行,遍历起点位置start值是多少?如果你自己追踪到这块代码,就会知道skipPast为null,所以start等于0,因此是从头到尾遍历列表,而我们的RxJava2CallAdapterFactory处于列表第一位。那就看一看RxJava2CallAdapterFactory部分源码吧

public final class RxJava2CallAdapterFactory extends CallAdapter.Factory {
  /**
   * Returns an instance which creates synchronous observables that do not operate on any scheduler
   * by default.
   */
  public static RxJava2CallAdapterFactory create() {
    return new RxJava2CallAdapterFactory(null, false);
  }

  private final @Nullable Scheduler scheduler;
  private final boolean isAsync;

  private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
    this.scheduler = scheduler;
    this.isAsync = isAsync;
  }

  @Override
  public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
    Class<?> rawType = getRawType(returnType);

    if (rawType == Completable.class) {
      // Completable is not parameterized (which is what the rest of this method deals with) so it
      // can only be created with a single configuration.
      return new RxJava2CallAdapter(Void.class, scheduler, isAsync, false, true, false, false,
          false, true);
    }

    boolean isFlowable = rawType == Flowable.class;
    boolean isSingle = rawType == Single.class;
    boolean isMaybe = rawType == Maybe.class;
    if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) {
      return null;
    }

    boolean isResult = false;
    boolean isBody = false;
    Type responseType;
    if (!(returnType instanceof ParameterizedType)) {
      String name = isFlowable ? "Flowable"
          : isSingle ? "Single"
          : isMaybe ? "Maybe" : "Observable";
      throw new IllegalStateException(name + " return type must be parameterized"
          + " as " + name + "<Foo> or " + name + "<? extends Foo>");
    }

    Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
    Class<?> rawObservableType = getRawType(observableType);
    if (rawObservableType == Response.class) {
      if (!(observableType instanceof ParameterizedType)) {
        throw new IllegalStateException("Response must be parameterized"
            + " as Response<Foo> or Response<? extends Foo>");
      }
      responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
    } else if (rawObservableType == Result.class) {
      if (!(observableType instanceof ParameterizedType)) {
        throw new IllegalStateException("Result must be parameterized"
            + " as Result<Foo> or Result<? extends Foo>");
      }
      responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
      isResult = true;
    } else {
      responseType = observableType;
      isBody = true;
    }

    return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable,
        isSingle, isMaybe, false);
  }
}

上述源码做了删减,只保留了我们关心的部分。我们在设置RxJava2CallAdapterFactory时,使用的是RxJava2CallAdapterFactory无参静态方法create()创建的RxJava2CallAdapterFactory实例,显然scheduler为null,isAsync为false。继续看get方法,定位到return语句,返回的是RxJava2CallAdapter,构造RxJava2CallAdapter实例时传入了一堆参数,我们关心一下scheduler、isAsync和isBody参数,不过多解析该方法,请自行深度查阅。scheduler为null,isAsync为false,isBody为true,以此为基础继续看RxJava2CallAdapter源码,代码不多,全部贴出

final class RxJava2CallAdapter<R> implements CallAdapter<R, Object> {
    private final Type responseType;
    private final @Nullable Scheduler scheduler;
    private final boolean isAsync;
    private final boolean isResult;
    private final boolean isBody;
    private final boolean isFlowable;
    private final boolean isSingle;
    private final boolean isMaybe;
    private final boolean isCompletable;
    // 构造方法
    RxJava2CallAdapter(Type responseType, @Nullable Scheduler scheduler, boolean isAsync,
            boolean isResult, boolean isBody, boolean isFlowable, boolean isSingle, boolean isMaybe,
            boolean isCompletable) {
        this.responseType = responseType;
        this.scheduler = scheduler;
        this.isAsync = isAsync;
        this.isResult = isResult;
        this.isBody = isBody;
        this.isFlowable = isFlowable;
        this.isSingle = isSingle;
        this.isMaybe = isMaybe;
        this.isCompletable = isCompletable;
    }

    @Override public Type responseType() {
        return responseType;
    }

    @Override 
    public Object adapt(Call<R> call) {
        Observable<Response<R>> responseObservable = isAsync
        ? new CallEnqueueObservable<>(call)
        : new CallExecuteObservable<>(call);

        Observable<?> observable;
        if (isResult) {
            observable = new ResultObservable<>(responseObservable);
        } else if (isBody) {
            // 这个是该方法最终返回的Observable对象
            observable = new BodyObservable<>(responseObservable);
        } else {
           observable = responseObservable;
        }

        if (scheduler != null) {
            observable = observable.subscribeOn(scheduler);
        }

        if (isFlowable) {
            return observable.toFlowable(BackpressureStrategy.LATEST);
        }
        if (isSingle) {
            return observable.singleOrError();
        }
        if (isMaybe) {
            return observable.singleElement();
        }
        if (isCompletable) {
            return observable.ignoreElements();
        }
        return observable;
    }
}

直接看adapt方法第一行responseObservable,是一个Observable对象,该方法最终返回的也是Observable对象,且与responseObservable相关。先看看responseObservable吧,前述说过isAsync为false,所以responseObservable是CallExecuteObservable实例,感兴趣您也可以看看CallEnqueueObservable源码(Android 利用RxJava和Retrofit搭建网络请求组件——监听回调及部分源码解析中有所提及)。之前还说过scheduler为null,isBody为true,综合下来,最终return的是BodyObservable对象(对于其他情形可自行查阅相关代码),该BodyObservable实例的构造参数是responseObservable(CallExecuteObservable)。不管怎样,BodyObservable确实是Observable,看来代理商并没有做什么坏事,真的与ApiService中我们期望的结果一样。到此为止,我们已经知晓ApiService是如何返回Observable对象了,主动脉已经打通。不再分析BodyObservable源码,读者可自行查看,只是做了相关封装处理,相对比较简单。接下来继续分析CallExecuteObservable源码

final class CallExecuteObservable<T> extends Observable<Response<T>> {
  private final Call<T> originalCall;

  CallExecuteObservable(Call<T> originalCall) {
    this.originalCall = originalCall;
  }

  @Override protected void subscribeActual(Observer<? super Response<T>> observer) {
    // Since Call is a one-shot type, clone it for each new observer.
1.    Call<T> call = originalCall.clone();
2.    observer.onSubscribe(new CallDisposable(call));

3.    boolean terminated = false;
4.    try {
5.     Response<T> response = call.execute();
      if (!call.isCanceled()) {
        observer.onNext(response);
      }
      if (!call.isCanceled()) {
        terminated = true;
        observer.onComplete();
      }
    } catch (Throwable t) {
      Exceptions.throwIfFatal(t);
      if (terminated) {
        RxJavaPlugins.onError(t);
      } else if (!call.isCanceled()) {
        try {
          observer.onError(t);
        } catch (Throwable inner) {
          Exceptions.throwIfFatal(inner);
          RxJavaPlugins.onError(new CompositeException(t, inner));
        }
      }
    }
  }

  private static final class CallDisposable implements Disposable {
    private final Call<?> call;

    CallDisposable(Call<?> call) {
      this.call = call;
    }

    @Override public void dispose() {
      call.cancel();
    }

    @Override public boolean isDisposed() {
      return call.isCanceled();
    }
  }

先看一下我标记的第5行Response<T> response = call.execute();这行代码就是执行网络请求了,至于call实例的具体实现方式稍后分析,目前您只需要知道这么多。
继续分析该方法之前,我们先回忆一下,还记得订阅者NetRequestSubscriber中有个cancelRequest()方法吗,调用了onSubscribe(@NonNull Disposable d)方法中传入的Disposable对象的dispose()方法来取消网络请求,为什么这个方法能取消网络请求呢?回答这个问题要继续看一下subscribeActual方法。看我标记的第2行observer.onSubscribe(new CallDisposable(call));CallDisposable是CallExecuteObservable定义的内部类,实现了Disposable接口,dispose()方法中调用了call.cancel()方法来取消网络请求,是不是与NetRequestSubscriber中的取消请求方法对上号了~。

到此为止,通过分析Retrofit实例、ApiService实例的构建及RxJava2CallAdapterFactory源码追踪,我们已经知道了网络请求的执行、取消以及如何返回的Observable,之前提及的几处重点已经粗略了解大半。但是我们还不知道拦截器如何被触发工作的。下面继续分析一下call实例是如何实现的,call.execute()方法到底做了什么勾当,或许它与拦截器暗中勾结呢
回顾ApiService实例的构建方法,第18行

OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);  // 请留意okHttpCall

上面提及的call实例就是这个okHttpCall啦。没办法,只能再看一看OkHttpCall源码了,做一下简单了解,打入OkHttpCall内部,直接定位到execute()方法

@Override public Response<T> execute() throws IOException {
    okhttp3.Call call;

    synchronized (this) {
      if (executed) throw new IllegalStateException("Already executed.");
      executed = true;

      if (creationFailure != null) {
        if (creationFailure instanceof IOException) {
          throw (IOException) creationFailure;
        } else {
          throw (RuntimeException) creationFailure;
        }
      }

      call = rawCall;
      if (call == null) {
        try {
          // 注释1:继续追踪createRawCall()方法吧...
          call = rawCall = createRawCall();
        } catch (IOException | RuntimeException e) {
          creationFailure = e;
          throw e;
        }
      }
    }

    if (canceled) {
      call.cancel();
    }
    // 此处请关联上述注释1
    return parseResponse(call.execute());
  }

看过源码很头疼,显然我们还要继续追踪createRawCall()方法,继续吧

1.  private okhttp3.Call createRawCall() throws IOException {
2.       Request request = serviceMethod.toRequest(args);
          // 此处想哭
3.       okhttp3.Call call = serviceMethod.callFactory.newCall(request);
4.       if (call == null) {
5.           throw new NullPointerException("Call.Factory returned null.");
6.       }
7.       return call;
8. }

代码简洁明了,继续调查第3行吧,如果您的记忆不算差,应该记得之前我们为了寻找CallAdapter简单分析过此serviceMethod的创建过程,现在它又回来了:我马三立又回来了...
在ServiceMethod的构造方法中,可以知道callFactory其实是Retrofit实例的callFactory,而Retrofit实例的callFactory是我们设置的OkHttpClient实例,如果您已经没有印象请返回查看。好啦,直接看OkHttpClient的newwCall(Request request)方法,了解Call实例时如何创建的

 @Override
 public Call newCall(Request request) {
     // 心中一万个草泥马,又牵涉到RealCall这个类
     return new RealCall(this, request, false /* for web socket */);
 }

哎,没办法,继续查看RealCall这个类吧,直接定位其execute()方法及相关联方法

1.   final class RealCall implements Call {

2.       @Override
3.       public Response execute() throws IOException {
4.           synchronized (this) {
5.               if (executed) throw new IllegalStateException("Already Executed");
6.               executed = true;
7.           }
8.           captureCallStackTrace();
9.           try {
10.              client.dispatcher().executed(this);
11.              Response result = getResponseWithInterceptorChain();
12.              if (result == null) throw new IOException("Canceled");
13.              return result;
14.          } finally {
15.              client.dispatcher().finished(this);
16.          }
17.      }

18.      Response getResponseWithInterceptorChain() throws IOException {
19.          // Build a full stack of interceptors.
20.          List<Interceptor> interceptors = new ArrayList<>();
21.          interceptors.addAll(client.interceptors());  // 我们设置的拦截器,处于列表第一位
               // 以下几个为内置拦截器
22.          interceptors.add(retryAndFollowUpInterceptor); 
23.          interceptors.add(new BridgeInterceptor(client.cookieJar()));
24.          interceptors.add(new CacheInterceptor(client.internalCache()));
25.          interceptors.add(new ConnectInterceptor(client));
26.          if (!forWebSocket) {
27.              interceptors.addAll(client.networkInterceptors());
28.          }
29.          interceptors.add(new CallServerInterceptor(forWebSocket));
               // 注意此处传入的index参数为0,拦截器列表也被传入RealInterceptorChain构造方法
30.          Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest);
31.          return chain.proceed(originalRequest);
32.      }
33.   }

看一下第11行代码,执行这行代码获取网络请求响应数据,但是仍然看不出拦截器是如何起作用的,但是请注意注释部分,追击getResponseWithInterceptorChain()方法试试。看第31行代码:执行chain.proceed(originalRequest)获取的响应数据,硬着头皮看看chain是什么鬼。定位到第30行,查阅一下RealInterceptorChain构造方法及proceed方法

1.  public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection, int index, Request request) {
2.       this.interceptors = interceptors;
3.       this.connection = connection;
4.       this.streamAllocation = streamAllocation;
5.       this.httpCodec = httpCodec;
6.       // 上一步传入此处的index为0
7.       this.index = index;
8.       this.request = request;
9.  }

10. public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
11.     if (index >= interceptors.size()) throw new AssertionError();
12.     calls++;
13.     // If we already have a stream, confirm that the incoming request will use it.
14.     if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
15.         throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
                   + " must retain the same host and port");
16.     }

17.      // If we already have a stream, confirm that this is the only call to chain.proceed().
18.     if (this.httpCodec != null && calls > 1) {
             throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
                   + " must call proceed() exactly once");
19      }

20.     // Call the next interceptor in the chain.
21.     RealInterceptorChain next = new RealInterceptorChain( interceptors, streamAllocation, httpCodec, connection, index + 1, request);
          // 上一步传入的index 为0,因此取我们设置的拦截器
22.     Interceptor interceptor = interceptors.get(index);
23.     Response response = interceptor.intercept(next);

24.     // Confirm that the next interceptor made its required call to chain.proceed().
25.     if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
            throw new IllegalStateException("network interceptor " + interceptor
                       + " must call proceed() exactly once");
26.     }

27.     // Confirm that the intercepted response isn't null.
28.     if (response == null) {
            throw new NullPointerException("interceptor " + interceptor + " returned null");
29.     }

30.     return response;
31. }

定位到第22行,终于看到了拦截器,取index为0,所以取第一个,就是我们设置的拦截器了;再看第23行,执行interceptor.intercept(next)拦截请求。拦截器触发流程解析就此终结,至于拦截器的用法请参考Retrofit Interceptor(拦截器) 拦截请求并做相关处理

到此为止,我们已经梳理了网络请求大致流程,我们做的配置也做了解析,本文也到此结束


上一篇下一篇

猜你喜欢

热点阅读