OkHttp

2018-12-14  本文已影响0人  ewinddc

介绍

一个现代的Http请求客户端,可以在java或者android使用,有以下特点

收获

sample

OkHttpClient client = new OkHttpClient();

  Request request = new Request.Builder()
      .url(url)
      .build();

  Response response = client.newCall(request).execute();
  return response.body().string();

调用流程

接口分析

构建OKHttpClient

一如既往,提供一个外观类OKHttpClient封装了所有的配置,这个类毫无意外也是通过Builder构建。
Builder

构建Request

也提供Builder

Call

public interface Call extends Cloneable {
 
  Response execute() throws IOException;
  void enqueue(Callback responseCallback);
  void cancel();
 
  Request request();

  interface Factory {
    Call newCall(Request request);
  }
}

提供同步和异步方法,注意OKHttp enqueue后的callback返回并不是UI线程,Retrofit帮我们转接了。

框架设计

okhttp架构.png
这是原文,这个图大致按照调用栈大致描绘了层次关系。

拦截器

拦截器是OKHttp的一大特性,它是典型的责任链模式,链式递归调用

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();
    Response proceed(Request request) throws IOException;
    Call call();
}

分为application和network拦截器,主要处理request和response
一个interceptor通常步骤

  1. 处理Request
  2. 调用Chain#proceed
  3. 处理Response并返回

我们知道OkHttp通过newCall,返回的其实是RealCall,然后我们看RealCall#execute方法

public Response execute() throws IOException {
 
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      return result;
    }finally {
      client.dispatcher().finished(this);
    }
  }
   Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

getResponseWithInterceptorChain干了4件事

public final class RealInterceptorChain implements Interceptor.Chain {
 public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
 
    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
    return response;
  }
}

这个RealInterceptorChain#proceed里又构建了RealInterceptorChain,调用了拦截器链表的下一个,每个拦截器的intercept方法需要调用的chain都是这个RealInterceptorChain,只不过新的实例,新的参数。Interceptor负责调chain#proceed触发下一个拦截器


拦截器.png

内置拦截器

自定义拦截器

总结拦截器我们发现,整个流程一层层往下贯穿,再一层层往上,跟网络协议栈的思路是一样的。这里其实也可以用装饰者来实现

interface ITask {
    Response call(Requst requst);
  }
  
  class TaskImpl implements ITask{
    private ITask nextTask;
    public TaskImpl(ITask task){
      nextTask = task;
    }

    public Response call(Requst requst) {
    // 在此处可以处理request
      if(nextTask != null){
        response = nextTask.call(requst);
      }
    // 在此处可以处理response
      return response;
    }
  }

class main(){
    ITask a = new TaskImpl();
    ITask b = new TaskImpl(a);
    ITask c = new TaskImpl(b);
    c.call(request);
  }

任务调度

通常我们会调用call#enqueu(callback)异步方法等待结果返回,OKHttp内部维护线程池用来执行请求,具体实现类是Dispatcher

public final class Dispatcher {
private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
 /** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

void enqueue(AsyncCall call) {
    synchronized (this) {
      readyAsyncCalls.add(call);
    }
    promoteAndExecute();
  }

  private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }
}

内部维护了3个任务队列来存储请求,一个线程池来执行任务
enqueue

缓存

CacheInterceptor来拦截缓存,使用DiskLruCache来实现缓存,CacheStrategy做缓存策略

public final class CacheInterceptor implements Interceptor {
  public Response intercept(Chain chain) throws IOException {
     Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();

    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

     // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }
  }
}

CacheStrategy

public final class CacheStrategy {
/** The request to send on the network, or null if this call doesn't use the network. */
  public final @Nullable Request networkRequest;
  /** The cached response to return or validate; or null if this call doesn't use a cache. */
  public final @Nullable Response cacheResponse;

  public static class Factory {
    final long nowMillis;
    final Request request;
    final Response cacheResponse;
    private Date servedDate;
    private Date lastModified;
     private Date expires;
    private String etag;
  }

  public CacheStrategy get() {
      CacheStrategy candidate = getCandidate();
      if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
        // We're forbidden from using the network and the cache is insufficient.
        return new CacheStrategy(null, null);
      }
      return candidate;
    }

    private CacheStrategy getCandidate() {
     CacheControl requestCaching = request.cacheControl();
      if (requestCaching.noCache() || hasConditions(request)) {
        return new CacheStrategy(request, null);
      }

  if (etag != null) {
        conditionName = "If-None-Match";
        conditionValue = etag;
      } else if (lastModified != null) {
        conditionName = "If-Modified-Since";
        conditionValue = lastModifiedString;
      } else if (servedDate != null) {
        conditionName = "If-Modified-Since";
        conditionValue = servedDateString;
      } else {
        return new CacheStrategy(request, null); // No condition! Make a regular request.
      }
      。。。
    }
}

连接池

性能提升的关键,为了实现http1.1的长连接和http2的多路复用

地址

流程

public interface Connection {
  Route route();
  //TCP连接
  Socket socket();
  //TLS
  Handshake handshake();
  //Http协议
  Protocol protocol();
}

ConnectInterceptor通过StreamAllocation#newStream获得Connection

CallServerInterceptor

真正的http请求和解析都在这个拦截器里面,依赖okio这个库。

/** Encodes HTTP requests and decodes HTTP responses.  */
interface ExchangeCodec {

  /** Returns an output stream where the request body can be streamed.  */
  fun createRequestBody(request: Request, contentLength: Long): Sink

  /** This should update the HTTP engine's sentRequestMillis field.  */
  fun writeRequestHeaders(request: Request)

  /** Flush the request to the underlying socket and signal no more bytes will be transmitted.  */
  fun finishRequest()

 fun readResponseHeaders(expectContinue: Boolean): Response.Builder?

  fun openResponseBodySource(response: Response): Source
}

okhttp迁移很多文件为Kotlin,我们至少要大致能看懂Kotlin代码

public final class Http1ExchangeCodec implements ExchangeCodec {
 /** The client that configures this stream. May be null for HTTPS proxy tunnels. */
  private final OkHttpClient client;

  /** The connection that carries this stream. */
  private final RealConnection realConnection;
  //socket对应的输入流
  private final BufferedSource source;
  //socket对应的输出流
  private final BufferedSink sink;

 /** HTTP协议标准,写入request到流,requestline和header */
  public void writeRequest(Headers headers, String requestLine) throws IOException {
    if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
    sink.writeUtf8(requestLine).writeUtf8("\r\n");
    for (int i = 0, size = headers.size(); i < size; i++) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n");
    }
    sink.writeUtf8("\r\n");
    state = STATE_OPEN_REQUEST_BODY;
  }

@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
       StatusLine statusLine = StatusLine.parse(readHeaderLine());

      Response.Builder responseBuilder = new Response.Builder()
          .protocol(statusLine.protocol)
          .code(statusLine.code)
          .message(statusLine.message)
          .headers(readHeaders());

      return responseBuilder;
 
  }

/** 按照http标准读取header,一行一行的读 */
  private Headers readHeaders() throws IOException {
    Headers.Builder headers = new Headers.Builder();
    // parse the result headers until the first blank line
    for (String line; (line = readHeaderLine()).length() != 0; ) {
      addHeaderLenient(headers, line);
    }
    return headers.build();
  }

 @Override public Source openResponseBodySource(Response response) {
    if (!HttpHeaders.hasBody(response)) {
      return newFixedLengthSource(0);
    }

  //分段读取
    if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
      return newChunkedSource(response.request().url());
    }

  // size已知
    long contentLength = HttpHeaders.contentLength(response);
    if (contentLength != -1) {
      return newFixedLengthSource(contentLength);
    }

    return newUnknownLengthSource();
  }
}

[如何调试](https://blog.csdn.net/alvinhuai/article/details/81288270,用Android Studio跑OkHttp的sampleClient模块,加一些配置,可在本机直接跑,也可以用AS的Debugger断点调试。像Retrofit这种纯java的库,都可以在本机调试,效率高。
)

reference

上一篇 下一篇

猜你喜欢

热点阅读