网络请求Android知识Android开发

Retrofit源码分析(二)

2017-03-02  本文已影响580人  TruthKeeper

这篇就补上RxJavaAdapter的支持了,末尾附上笔者总结心得
⁄(⁄ ⁄•⁄ω⁄•⁄ ⁄)⁄

Retrofit源码分析(一)

介绍

RxJava现在绝对是个Android开发的大杀器了,面试的时候你要是说用过实际项目里肯定能在一定程度上加分的(๑•̀ㅂ•́)و✧
这里附上扔物线大大的文章:给 Android 开发者的 RxJava 详解
完整依赖

    compile 'com.squareup.retrofit2:retrofit:2.1.0'
    compile 'com.squareup.retrofit2:converter-gson:2.1.0'
    compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
    compile 'io.reactivex:rxandroid:1.2.1'
public interface API{
//一般用法
//@GET("users/search/{keyword}")
//Call<List<UserBean>> searchUserList(@Path("keyword") String keyword);

//Rx用法,Observable是rx包下的,不是android.database下的
@GET("users/search/{keyword}")
Observable<List<UserBean>> searchUserList(@Path("keyword") String keyword);
}
new Retrofit.Builder()
                //多加这一句
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .build();
//发起一个网络请求
 retrofit.create(API.class)
                .searchUserList("张三")
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Observer<List<UserBean>>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(List<UserBean> userBeen) {

                    }
                });

入口

前文我们已经分析过,默认发起的Call是封装成OkHttpCall,然后由serviceMethod里的adapter负责处理,那么现在对应的CallAdapter接口实现类是怎么处理呢?
其实这里我们就不是普通的司机来开了,而是一个精通排水沟漂移的老司机来开车,下面我们来领教下老司机的车技

public final class RxJavaCallAdapterFactory extends CallAdapter.Factory {
  
  public static RxJavaCallAdapterFactory create() {
    return new RxJavaCallAdapterFactory(null);
  }

  public static RxJavaCallAdapterFactory createWithScheduler(Scheduler scheduler) {
    if (scheduler == null) throw new NullPointerException("scheduler == null");
    return new RxJavaCallAdapterFactory(scheduler);
  }

  private final Scheduler scheduler;

  private RxJavaCallAdapterFactory(Scheduler scheduler) {
    this.scheduler = scheduler;
  }

  @Override
  public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
    //可以看到这里对returnType做了一些校验
    //只支持rx.Observable,rx.Single,rx.Completable
    //然而笔者只认识Observable。。。
    Class<?> rawType = getRawType(returnType);
    String canonicalName = rawType.getCanonicalName();
    boolean isSingle = "rx.Single".equals(canonicalName);
    boolean isCompletable = "rx.Completable".equals(canonicalName);
    if (rawType != Observable.class && !isSingle && !isCompletable) {
      return null;
    }
    if (!isCompletable && !(returnType instanceof ParameterizedType)) {
      String name = isSingle ? "Single" : "Observable";
      throw new IllegalStateException(name + " return type must be parameterized"
          + " as " + name + "<Foo> or " + name + "<? extends Foo>");
    }

    if (isCompletable) {
      return CompletableHelper.createCallAdapter(scheduler);
    }
    //看这里,一般情况直接到这里然后return
    CallAdapter<Observable<?>> callAdapter = getCallAdapter(returnType, scheduler);
    if (isSingle) {
      return SingleHelper.makeSingle(callAdapter);
    }
    return callAdapter;
  }
  //这个方法实现了返回真正的CallAdapter
  private CallAdapter<Observable<?>> getCallAdapter(Type returnType, Scheduler scheduler) {
     //将returnType转换成内部Type
    Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
    //获取到内部的Class
    Class<?> rawObservableType = getRawType(observableType);
    //Response的情况
    if (rawObservableType == Response.class) {
      if (!(observableType instanceof ParameterizedType)) {
        throw new IllegalStateException("Response must be parameterized"
            + " as Response<Foo> or Response<? extends Foo>");
      }
      Type responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
      return new ResponseCallAdapter(responseType, scheduler);
    }
    //Result的情况
    if (rawObservableType == Result.class) {
      if (!(observableType instanceof ParameterizedType)) {
        throw new IllegalStateException("Result must be parameterized"
            + " as Result<Foo> or Result<? extends Foo>");
      }
      Type responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
      return new ResultCallAdapter(responseType, scheduler);
    }
    //一般返回这个类型
    return new SimpleCallAdapter(observableType, scheduler);
  }
  //中间暂时省略2个类。

  static final class ResponseCallAdapter implements CallAdapter<Observable<?>> {
    private final Type responseType;
    private final Scheduler scheduler;

    ResponseCallAdapter(Type responseType, Scheduler scheduler) {
      this.responseType = responseType;
      this.scheduler = scheduler;
    }

    @Override public Type responseType() {
      return responseType;
    }

    @Override public <R> Observable<Response<R>> adapt(Call<R> call) {
      Observable<Response<R>> observable = Observable.create(new CallOnSubscribe<>(call));
      if (scheduler != null) {
        return observable.subscribeOn(scheduler);
      }
      return observable;
    }
  }
  
  static final class SimpleCallAdapter implements CallAdapter<Observable<?>> {
    private final Type responseType;
    private final Scheduler scheduler;

    SimpleCallAdapter(Type responseType, Scheduler scheduler) {
      this.responseType = responseType;
      this.scheduler = scheduler;
    }

    @Override public Type responseType() {
      return responseType;
    }

    @Override public <R> Observable<R> adapt(Call<R> call) {
      Observable<R> observable = Observable.create(new CallOnSubscribe<>(call)) //
          .lift(OperatorMapResponseToBodyOrError.<R>instance());
      if (scheduler != null) {
        return observable.subscribeOn(scheduler);
      }
      return observable;
    }
  }

  static final class ResultCallAdapter implements CallAdapter<Observable<?>> {
    private final Type responseType;
    private final Scheduler scheduler;

    ResultCallAdapter(Type responseType, Scheduler scheduler) {
      this.responseType = responseType;
      this.scheduler = scheduler;
    }

    @Override public Type responseType() {
      return responseType;
    }

    @Override public <R> Observable<Result<R>> adapt(Call<R> call) {
      Observable<Result<R>> observable = Observable.create(new CallOnSubscribe<>(call)) //
          .map(new Func1<Response<R>, Result<R>>() {
            @Override public Result<R> call(Response<R> response) {
              return Result.response(response);
            }
          }).onErrorReturn(new Func1<Throwable, Result<R>>() {
            @Override public Result<R> call(Throwable throwable) {
              return Result.error(throwable);
            }
          });
      if (scheduler != null) {
        return observable.subscribeOn(scheduler);
      }
      return observable;
    }
  }
}

看了上面一坨源码(其实已经省略了2个类)不知道大家有没有晕,反正我是晕了(⌒▽⌒)
那么我们从入口方法开始看起

@Override
public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {

这个方法是CallAdapter.Factory的抽象方法,那么这个抽象方法的调用入口在哪里呢,没错我们又回到了ServiceMethod,在这个构建者模式的build方法中,对这个执行方法进行了判断。

public ServiceMethod build() {
      callAdapter = createCallAdapter();
      //以下省略。。。
}
private CallAdapter<?> createCallAdapter() {
        Type returnType = method.getGenericReturnType();
        //返回类型的校验
        if (Utils.hasUnresolvableType(returnType)) {
            throw methodError(
                    "Method return type must not include a type variable or wildcard: %s", returnType);
        }
        //返回是无类型的话
        if (returnType == void.class) {
            throw methodError("Service methods cannot return void.");
        }
        Annotation[] annotations = method.getAnnotations();
        try {
            //适配器模式
            return retrofit.callAdapter(returnType, annotations);
        } catch (RuntimeException e) { // Wide exception range because factories are user code.
            throw methodError(e, "Unable to create call adapter for %s", returnType);
        }
    }
  //Retrofit的方法
  public CallAdapter<?> callAdapter(Type returnType, Annotation[] annotations) {
    return nextCallAdapter(null, returnType, annotations);
  }

   //匹配到合适的Adapter
  public CallAdapter<?> nextCallAdapter(CallAdapter.Factory skipPast, Type returnType,
      Annotation[] annotations) {
    checkNotNull(returnType, "returnType == null");
    checkNotNull(annotations, "annotations == null");

    int start = adapterFactories.indexOf(skipPast) + 1;
    for (int i = start, count = adapterFactories.size(); i < count; i++) {
      CallAdapter<?> adapter = adapterFactories.get(i).get(returnType, annotations, this);
      if (adapter != null) {
        return adapter;
      }
    }
    //省略下面的异常抛出代码
}

当当当当,看官可以看到,这里对adapterFactories进行了遍历,这个adapterFactories就是在我们给Retrofit添加的Adapter支持库,默认带有一个ExecutorCallAdapterFactory,好了那么调用的入口有了,我们也不用害怕了。

深入

ExecutorCallAdapterFactory我的上一篇文章中讲到内部就简单的用Handler Post到主线程来实现了。
在上文的get方法我们略过rx.Single,rx.Completable,只分析rx.Observable才不是我不会呢,在最后有一句

CallAdapter<Observable<?>> callAdapter = getCallAdapter(returnType, scheduler);

getCallAdapter的实现细节代码就不再次帖了,省得你们说我凑字数。在实际开发中我们都会对接口返回的数据做一层包装,例如咱家的后台给的,我们自然要一层封装

public class ResponseBean<T> {
    private int status;
    private String msg;
    private T data;
    private long time;
}

那么我们在API中会这样声明一个接口

public interface API{
//注意这个Call不是OkHttp3的Call
@GET("users/search/{keyword}")
Observable<ResponseBean<List<UserBean>>> searchUserList(@Path("keyword") String keyword);
}

getCallAdapter方法第一行我们看到了

Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);

这个方法呢,是将返回的类型中的泛型参数作为Type获取出来,
有关Type这个类笔者写了一个类来测试

public class Test {
    public static void main(String[] args) {
        Method method = null;
        try {
            Test t=new Test();
            method = t.getClass().getMethod("demoApi");
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        }
        Observable<ApplePen<Apple, Pen>> observable = demoApi();
        Type returnType = method.getGenericReturnType();
        Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
        System.out.println("Method的返回类型:" + returnType);
        System.out.println("Observable的类型" + observable.getClass().getGenericSuperclass());
        System.out.println("Observable的内部类型" + observableType);
    }

    /**
     * 获取参数化的类型
     * @param index 位置索引
     * @param type
     * @return
     */
    public static Type getParameterUpperBound(int index, ParameterizedType type) {
        Type[] types = type.getActualTypeArguments();
        if (index < 0 || index >= types.length) {
            throw new IllegalArgumentException(
                    "Index " + index + " not in range [0," + types.length + ") for " + type);
        }
        Type paramType = types[index];
        if (paramType instanceof WildcardType) {
            //通配符类型 <?>这种
            return ((WildcardType) paramType).getUpperBounds()[0];
        }
        return paramType;
    }

    /**
     * 模拟API中的网络接口
     * @return
     */
    public static Observable<ApplePen<Apple, Pen>> demoApi() {
        ApplePen<Apple, Pen> applePen = new ApplePen<>();
        return Observable.just(applePen);
    }

    public static class Apple {
    }

    public static class Pen {
    }

    public static class ApplePen<T1, T2> {
    }
}

最后输出的结果

Method的返回类型:rx.Observable<com.tk.test.Test.com.tk.test.Test$ApplePen<com.tk.test.Test$Apple, com.tk.test.Test$Pen>>
Observable的类型rx.Observable<T>
Observable的内部类型com.tk.test.Test.com.tk.test.Test$ApplePen<com.tk.test.Test$Apple, com.tk.test.Test$Pen>

相信到这里,大家应该也看懂了,getCallAdapter这个方法本质是将我们接口定义的Type类型构造出了CallAdapter,在API中定义的类型匹配如下:
Observable<Response<T>>,对应ResponseCallAdapter
Observable<Result<T>>,对应ResultCallAdapter
Observable<其他类型>,对应SimpleCallAdapter
一般我们都是根据项目封装自己的实体Bean,那我们来看看SimpleCallAdapter司机的开车方法adapt(),可以在源码看到其实方法很简单。

 @Override public <R> Observable<R> adapt(Call<R> call) {
      Observable<R> observable = Observable.create(new CallOnSubscribe<>(call)) //
          .lift(OperatorMapResponseToBodyOrError.<R>instance());
      if (scheduler != null) {
        return observable.subscribeOn(scheduler);
      }
      return observable;
    }

默认scheduler调度器为空,我们也可以这样来让所有API中的Rx接口统一在io线程中处理。

new Retrofit.Builder()
                .addCallAdapterFactory(RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io()))
                .build();

那么我们顺藤摸瓜来看看

//RxJavaCallAdapterFactory的静态内部类
static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
    private final Call<T> originalCall;

    CallOnSubscribe(Call<T> originalCall) {
      this.originalCall = originalCall;
    }

    @Override public void call(final Subscriber<? super Response<T>> subscriber) {
      // Since Call is a one-shot type, clone it for each new subscriber.
      Call<T> call = originalCall.clone();

      // Wrap the call in a helper which handles both unsubscription and backpressure.
      RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
      subscriber.add(requestArbiter);
      subscriber.setProducer(requestArbiter);
    }
  }

注意这里需要一定的RxJava基础了,不过就既然能坚持看到这里应该都是dalao了(;¬_¬)
可以看出主要逻辑在CallOnSubscribe这个类中实现,然后经过一个lift操作符变换,当Http响应非[200,300)时抛出一个HttpException异常,非常简单是不是,接下来我们来会会CallOnSubscribe,其实它也就做了三件事:

Ps:AtomicBoolean,用于线程更高并发的场景,对变量赋值和方法提供原子性操作,通俗来讲就是加锁,不知道有没有说错:-)

那么我们来看看RequestArbiterProducer的具体方法实现

@Override 
public void request(long n) {
      if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
      if (n == 0) return; // Nothing to do when requesting 0.
      if (!compareAndSet(false, true)) return; // Request was already triggered.

      try {
        //同步发起网络请求
        Response<T> response = call.execute();
        if (!subscriber.isUnsubscribed()) {
          subscriber.onNext(response);
        }
      } catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        if (!subscriber.isUnsubscribed()) {
          subscriber.onError(t);
        }
        return;
      }

      if (!subscriber.isUnsubscribed()) {
        subscriber.onCompleted();
      }
    }

Producer在RxJava1.0中是一个可以向上游请求数据的类,RxJava2已经出了,据说有不小的改动,笔者这里就照自己的理解抛砖引玉了。

大部分情况下Subscriber(比如订报纸的用户) 都是被动接受Observable(报刊)发出的数据,但要是Observable发得太快(新闻太多),Subscriber 处理不过来(用户看不过来),在应对这种场景,RxJava诞生了一种Subscriber主动pull的机制,通过给Subscriber设置Producer(感兴趣的内容推送),通过 Subscriber#setProducer 方法,Subscriber就会通过Producer向Observable根据自己的能力来请求数据,通过 Producer#request 方法,在Producer收到请求之后在推给上游,上游再根据请求的量给 Subscriber 发数据(比如对编程感兴趣)

总结

分析到这里其实Retrofit的源码已经可见一斑了,最后我们整体来过一遍,开始高能

Retrofit完美的完成了OkHttp对其他扩展功能的松耦合,最后附上一张个人理解的图,如有错误,敬请指正!


“你们要进窄门。因为引到灭亡,那门是宽的,路是大的,进去的人也多;引到永生,那门是窄的,路是小的,找着的人也少。” (马太福音 7:13-14 和合本)

上一篇下一篇

猜你喜欢

热点阅读