Android技术知识Android开发程序员

OkHttp网络框架源码理解分析

2019-03-16  本文已影响8人  请叫我张懂

涉及到的设计模式

OkHttp同步请求

发送请求后,UI线程之后就会进入阻塞状态,直到请求结束。
1.创建 OkHttpClientRequest 对象

2.将 Request 封装成 Call 对象

3.调用 Callexecute() 方法执行同步请求

OkHttp异步请求

发送请求后,不阻塞UI线程

1.创建 OkHttpClientRequest 对象

2.将 Request 封装成 Call 对象

3.调用 Callenqueue() 方法执行异步请求( CallBack 对象的 onResponseonFailure 都是在子线程中的)


我们先分析请求的第一步和第二步,因为同步和异步的前两步是相同的

第一步

OkHttpClient.Builder的构造方法(建造者设计模式)

Request.Builder的构造方法(建造者设计模式)

插入知识点

Dispatcher类为重点,后面有很多地方涉及到,所以我们先详细分析。

Dispatcher.java部分代码(队列)

/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

Dispatcher.java部分代码(线程池)

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}

Dispatcher.java部分代码(executed,enqueue)

synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
}
  
synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
}

Dispatcher.java 部分代码(finished())

 /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }
    
    public synchronized int runningCallsCount() {
        return runningAsyncCalls.size() + runningSyncCalls.size();
    }
    
    private void promoteCalls() {
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
    
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
  }
    

第二步

1. OkHttpClient.newCall()方法

 @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

2. RealCall.newRealCall方法

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }

3. RealCall的构造方法

  private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
  }

同步请求 call.execute()方法

因为 Call 为接口,我们需要研究的是实现类 RealCallexecute() 方法

RealCall.java部分代码

  @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
  }

异步请求 call.enqueue()方法

因为 Call 为接口,我们需要研究的是实现类 RealCallenqueue() 方法

RealCall.java部分代码

 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

AsynCall分析

AsyncCall继承于NamedRunnable,NamedRunnable实现了Runnable接口

NamedRunnable.java部分代码

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();

AsynCall.java部分代码

   @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

到目前为止,同步、异步请求的基本流程和Dispatcher都有所了解了。接下来我们分析与拦截器。

来源 OkHttp_Wiki

Interceptors are a powerful mechanism that can monitor, rewrite, and retry calls.

拦截器是 OkHttp 提供的对 Http 请求和响应进行统一处理的强大机制,它可以实现网络监听、请求以及响应重写、请求失败重试等功能。

OkHttp uses lists to track interceptors, and interceptors are called in order.

OkHttp 使用列表来跟踪拦截器,并按顺序调用拦截器。

Interceptors are registered as either application or network interceptors.

拦截器可以注册为应用拦截器或网络拦截器;

拦截器.png

应用拦截器与网络拦截器的区别

应用拦截器:

Don't need to worry about intermediate responses like redirects and retries.

应用拦截器不能操作中间的响应结果,比如重定向和重试

Are always invoked once, even if the HTTP response is served from the cache.

应用拦截器始终调用一次,即使Http响应是从缓存中提供的。

Observe the application's original intent. Unconcerned with OkHttp-injected headers like If-None-Match.

关注原始的intent,不关心注入的headers,比如If-None-Match。

Permitted to short-circuit and not call Chain.proceed().

应用拦截器允许短路,并且不调用 Chain.proceed(),即可以决定要调用的拦截器

Permitted to retry and make multiple calls to Chain.proceed().

应用拦截器允许请求失败重试,并多次调用其他拦截器

网络拦截器:

Able to operate on intermediate responses like redirects and retries.

网络拦截器可以操作,如重定向和重试等中间操作的结果

Not invoked for cached responses that short-circuit the network.

网络拦截器不允许调用缓存来短路执行中的请求

Observe the data just as it will be transmitted over the network.

网络拦截器可以观察网络传输中的数据

Access to the Connection that carries the request.

网络拦截器可以获取 Connection 中携带的请求信息


以上来自OkHttp官方文档

注意力移到 RealCallgetResponseWithInterceptorChain() 方法。

RealCall.java部分代码

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    
    return chain.proceed(originalRequest);
}

RealInterceptorChain.java部分代码

@Override
public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
}
  
  
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();
    calls++;
    ...
    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
    ...
    return response;
  }
拦截器2.png

重点就落在了每个拦截器的 intercept() 方法上,接下来我们分析系统给我们5个拦截器,忽略用户自定义的拦截器

1. RetryAndFollowUpInterceptor

RetryAndFollowUpInterceptor.java部分代码

@Override 
public Response intercept(Chain chain) throws IOException {
    ...
    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    
    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      ...
      Response response;
      boolean releaseConnection = true;
      try {
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
          if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getFirstConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
         boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
            if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }
      ...
      Request followUp;
      try {
        followUp = followUpRequest(response, streamAllocation.route());
      } catch (IOException e) {
        streamAllocation.release();
        throw e;
      }
    
      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }
    
      closeQuietly(response.body());
    
      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }
      ...
      request = followUp;
      priorResponse = response;
    }
}

2. BridgeInterceptor

BridgeInterceptor.java部分代码

@Override
public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
    
    RequestBody body = userRequest.body();
    //主要是对请求头添加字段,省略
    ...
    
    Response networkResponse = chain.proceed(requestBuilder.build());
    
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);
    
    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }
    
    return responseBuilder.build();
}

3. CacheInterceptor

在看CacheInterceptor之前我们要先看一下Cache.java

示例:

   OkHttpClient okHttpClient = new OkHttpClient().newBuilder()
                .cache(new Cache(new File("cache"), 8 * 1024 * 1024))
                .build();

Cache.put()方法

Cache.java部分代码

@Nullable
CacheRequest put(Response response) {
    String requestMethod = response.request().method();
    if (HttpMethod.invalidatesCache(response.request().method())) {
      try {
        remove(response.request());
      } catch (IOException ignored) {
      }
      return null;
    }
    if (!requestMethod.equals("GET")) {
      return null;
    }
    ...
    Entry entry = new Entry(response);
    DiskLruCache.Editor editor = null;
    try {
      editor = cache.edit(key(response.request().url()));
      if (editor == null) {
        return null;
      }
      entry.writeTo(editor);
      return new CacheRequestImpl(editor);
    } catch (IOException e) {
      abortQuietly(editor);
      return null;
    }
    
    Entry(Response response) {
      this.url = response.request().url().toString();
      this.varyHeaders = HttpHeaders.varyHeaders(response);
      this.requestMethod = response.request().method();
      this.protocol = response.protocol();
      this.code = response.code();
      this.message = response.message();
      this.responseHeaders = response.headers();
      this.handshake = response.handshake();
      this.sentRequestMillis = response.sentRequestAtMillis();
      this.receivedResponseMillis = response.receivedResponseAtMillis();
    }
}

HttpMethod.java部分代码

 public static boolean invalidatesCache(String method) {
    return method.equals("POST")
        || method.equals("PATCH")
        || method.equals("PUT")
        || method.equals("DELETE")
        || method.equals("MOVE");     // WebDAV
  }
  

Cache.java部分代码

   CacheRequestImpl(final DiskLruCache.Editor editor) {
      this.editor = editor;
      this.cacheOut = editor.newSink(ENTRY_BODY);
      this.body = new ForwardingSink(cacheOut) {
        @Override public void close() throws IOException {
          synchronized (Cache.this) {
            if (done) {
              return;
            }
            done = true;
            writeSuccessCount++;
          }
          super.close();
          editor.commit();
        }
      };
    }

Cache.get()方法

Cache.java部分代码

@Nullable
Response get(Request request) {
    String key = key(request.url());
    DiskLruCache.Snapshot snapshot;
    Entry entry;
    try {
      snapshot = cache.get(key);
      if (snapshot == null) {
        return null;
      }
    } catch (IOException e) {
      // Give up because the cache cannot be read.
      return null;
    }
    try {
      entry = new Entry(snapshot.getSource(ENTRY_METADATA));
    } catch (IOException e) {
      Util.closeQuietly(snapshot);
      return null;
    }
    
    Response response = entry.response(snapshot);
    
    if (!entry.matches(request, response)) {
      Util.closeQuietly(response.body());
      return null;
    }
    
    return response;
}

public Response response(DiskLruCache.Snapshot snapshot) {
    String contentType = responseHeaders.get("Content-Type");
    String contentLength = responseHeaders.get("Content-Length");
    Request cacheRequest = new Request.Builder()
      .url(url)
      .method(requestMethod, null)
      .headers(varyHeaders)
      .build();
    return new Response.Builder()
      .request(cacheRequest)
      .protocol(protocol)
      .code(code)
      .message(message)
      .headers(responseHeaders)
      .body(new CacheResponseBody(snapshot, contentType, contentLength))
      .handshake(handshake)
      .sentRequestAtMillis(sentRequestMillis)
      .receivedResponseAtMillis(receivedResponseMillis)
      .build();
}

CacheResponseBody(final DiskLruCache.Snapshot snapshot,
    String contentType, String contentLength) {
  this.snapshot = snapshot;
  this.contentType = contentType;
  this.contentLength = contentLength;

  Source source = snapshot.getSource(ENTRY_BODY);
  bodySource = Okio.buffer(new ForwardingSource(source) {
    @Override public void close() throws IOException {
      snapshot.close();
      super.close();
    }
  });
}

4. ConnectInterceptor

ConnectInterceptor.java部分代码

public final class ConnectInterceptor implements Interceptor {
    public final OkHttpClient client;
    
    public ConnectInterceptor(OkHttpClient client) {
      this.client = client;
    }
    
    @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        StreamAllocation streamAllocation = realChain.streamAllocation();
        
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
        RealConnection connection = streamAllocation.connection();
        
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
    }
}

所以我们要将目光放在最为重要的 streamAllocation 对象

StreamAllocation.java部分代码

public HttpCodec newStream(
  OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();
    
    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
    
      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
}

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
  int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
  boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);
    
      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }
    
      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }
    
      return candidate;
    }
}

那什么是不健康的 RealConnection?

1.Socket连接未关闭;
2.Socket输入输出流未关闭;
3.Http2Connection未关闭等等

为了理解 RealConnection 对象的创建过程,进一步分析 findConnection 方法

StreamAllocation.java部分代码

/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
  int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    ...
    synchronized (connectionPool) {
      ...
      // Attempt to use an already-allocated connection. We need to be careful here because our
      // already-allocated connection may have been restricted from creating new streams.
      releasedConnection = this.connection;
      if (this.connection != null) {
        // We had an already-allocated connection and it's good.
        result = this.connection;
        releasedConnection = null;
      }
      ...
    
      if (result == null) {
        // Attempt to get a connection from the pool.
        Internal.instance.acquire(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    ...

    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result;
    }
    
    ...
    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    ...
    synchronized (connectionPool) {
      reportedAcquired = true;
    
      // Pool the connection.
      Internal.instance.put(connectionPool, result);
      ...
    }
    ...
    return result;
}

ConnectionPool连接池分析

OkHttp将客户端与服务端的连接抽象成了Connection的一个类。而RealConnection就是Connection的实现类。为了在一定时间范围内更好地管理Connection类以及连接复用,OkHttp提供了ConnectionPool连接池。

ConnectionPool.acquire/get方法分析

ConnectionPool.java部分代码

 void acquire(Address address, StreamAllocation streamAllocation, @Nullable Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return;
      }
    }
  }

StreamAllocation.java部分代码

public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();
    
    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}

RealConnection.java部分代码

/** Current streams carried by this connection. */
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
ConnectionPool.put方法 分析

ConnectionPool.java部分代码

private final Deque<RealConnection> connections = new ArrayDeque<>();

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
}

Connection回收

ConnectionPool.java部分代码

private final Runnable cleanupRunnable = () -> {
    while (true) {
      long waitNanos = cleanup(System.nanoTime());
      if (waitNanos == -1) return;
      if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (ConnectionPool.this) {
          try {
            ConnectionPool.this.wait(waitMillis, (int) waitNanos);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }
};

 

cleanup() 方法类似于 GC 标记清除算法不做深究,我们重点看一下如何找到最不活跃的连接

ConnectionPool.java部分代码(cleanup调用)

private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);
    
      if (reference.get() != null) {
        i++;
        continue;
      }
    
      ....
    
      references.remove(i);
      connection.noNewStreams = true;
    
      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }
    
    return references.size();
}

5. CallServerInterceptor

CallServerInterceptor.java部分代码

public final class CallServerInterceptor implements Interceptor {
  ...
  @Override public Response intercept(Chain chain) throws IOException {
    final RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    final HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();
    ...
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
       ...
      }

      if (responseBuilder == null) {
        if (request.body() instanceof DuplexRequestBody) {
          ...
          request.body().writeTo(bufferedRequestBody);
        } else {
         ...
        }
      } else if (!connection.isMultiplexed()) {
        ...
      }
    }
    ...
    httpCodec.finishRequest();
    ...
    responseBuilder = httpCodec.readResponseHeaders(false);
    ...
    if (forWebSocket && code == 101) {
        ...
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }
    ...

    return response;
  }
}

学习渠道

上一篇下一篇

猜你喜欢

热点阅读