Android开发Android源码剖析Android开发经验谈

OkHttp源码解析

2019-05-01  本文已影响28人  正规程序员

OkHttp简单使用

 implementation 'com.squareup.okhttp3:okhttp:3.14.1'
<uses-permission android:name="android.permission.INTERNET" />
OkHttpClient okHttpClient = new OkHttpClient();

final Request request = new Request.Builder()
        .url("http://www.baidu.com")
        .method("GET", null)
        .build();

okHttpClient.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        Log.d("net", "onFailure=" + e.getMessage());
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
        //成功失败的回调都是在工作线程中
        String content = response.networkResponse().toString();
        Log.d("net", "onResponse=" + content);
    }
});

核心源码解析

void enqueue(Callback responseCallback, boolean forWebSocket) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
    }
    //通过任务调度器执行线程池任务,并控制请求任务执行或等待状态
    client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
}
private int maxRequests = 64;//最大并发请求数
private int maxRequestsPerHost = 5;//每个主机的最大请求数
private ExecutorService executorService;//线程池
//异步请求 等待队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//异步请求 运行队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//同步请求 运行队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

//带自定义线程池的构造方法
public Dispatcher(ExecutorService executorService) {
  this.executorService = executorService;
}
//默认线程池的构造方法
public Dispatcher() {
}
private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); 
        i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; 
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; 

        i.remove();
        //移除等待队列中任务,添加到运行队列中和线程池任务中。
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

拦截器原理

拦截器主要用来添加、移除、转换请求或响应的头部信息。比如替换域名为IP地址,在请求头部添加公共参数等。

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;
    
    @Nullable Connection connection();
    ...
  }
}

请求体Request request = chain.request();
响应体Response response = chain.proceed(request);

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    //client中自定义拦截器
    interceptors.addAll(client.interceptors());
    //重定向与失败重连拦截器
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    //请求报头、响应报头处理,Cookie持久化策略
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //客户端的缓存策略,请求、响应缓存信息读写
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //与服务器建立连接
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      //客户端网络环境拦截器
      interceptors.addAll(client.networkInterceptors());
    }
    //发送请求,读取服务器响应
    interceptors.add(new CallServerInterceptor(forWebSocket));

    //设置完整拦截器链
    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      //通过拦截器获取响应结果
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
}

OkHttp的拦截器列表按顺序调用,默认先调用用户自定义拦截器 client.interceptors()。

public Response proceed(Request request, Transmitter transmitter, 
 @Nullable Exchange exchange)throws IOException {
    ...
    //责任链模式,数组内拦截器依次调用Interceptor#intercept(),RealInterceptorChain#proceed()
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, 
        readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
    ...
    return response;
}

责任链模式:在责任链模式里,很多对象由每一个对象对其下家的引用而连接起来形成一条链。请求在这个链上传递,直到链上的某一个对象决定处理此请求。

网络通信缓存策略

网络请求中的缓存分为:服务器缓存,第三方缓存,客户端缓存。其中客户端缓存是性价比最高的,消耗服务器资源最少。

客户端缓存需要服务器确定数据的有效期,通常在请求报头和响应报头中通知缓存状态。

在OkHttp中的网络通信缓存策略采用的是Last-Modified/If-Modified-Since和Cache-Control结合的,后者优先级高。

@Override public Response intercept(Chain chain) throws IOException {
 ...
 if (cacheResponse != null) {
     //状态码304表示服务器上对应数据未过期(未更改),本地缓存(如果有的话)仍有效
     if (networkResponse.code() == HTTP_NOT_MODIFIED) {
         Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), 
                networkResponse.headers()))
            .sentRequestAtMillis(
                networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(
                networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        //关闭网络请求连接,返回本地缓存的所有信息    
        networkResponse.body().close();
   
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
 ...
}

连接池

频繁的建立和断开的Socket连接(三次握手、四次挥手)是非常消耗网络资源的,HTTP协议中报文属性Keep-Alive对降低延迟,提升效率很有帮助。

复用连接就要维护一个连接池,并对连接的添加、回收和状态检测进行管理。

//连接池中维护一个双端队列Deque来存储连接
private final Deque<RealConnection> connections = new ArrayDeque<>();

/**
 * maxIdleConnections:每个地址的最大空闲连接数,默认5个
 * keepAliveDuration:连接持续时间,默认5分钟
 */
public RealConnectionPool(int maxIdleConnections, 
  long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " 
          + keepAliveDuration);
    }
}

//连接池中维护一个单线程的线程池,用来清理过期的链接(移出双端队列)。
 private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));

//如果空闲连接超过5个或者keepalive时间大于5分钟,则将该连接清理掉。
 private final Runnable cleanupRunnable = () -> {
    while (true) {
     //遍历链接并标记空闲连接,全部为活跃链接时,过5分钟再检测清理。
      long waitNanos = cleanup(System.nanoTime());
      if (waitNanos == -1) return;
      if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (RealConnectionPool.this) {
          try {
            RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }
  };
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<Transmitter>> references = connection.transmitters;
    //遍历Transmitter弱引用列表(低版本OkHttp是StreamAllocation)
    for (int i = 0; i < references.size(); ) {
      Reference<Transmitter> reference = references.get(i);
      //若引用不为空,则continue
      if (reference.get() != null) {
        i++;
        continue;
      }

      TransmitterReference transmitterRef = (TransmitterReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
      //若Transmitter未被使用,则移除引用
      references.remove(i);
      connection.noNewExchanges = true;
      //若列表为空,则说明该连接没有被使用(弱引用不被持有说明是GC回收对象),即空连接。
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }
    return references.size();
  }
上一篇下一篇

猜你喜欢

热点阅读