OKHttp源码分析与手写实现

2020-06-09  本文已影响0人  Coder_Sven

1,使用

implementation 'com.squareup.okhttp3:okhttp:3.14.2'

1.1异步GET请求

String url = "http://wwww.baidu.com";
OkHttpClient okHttpClient = new OkHttpClient();
final Request request = new Request.Builder()
        .url(url)
        .get()//默认就是GET请求,可以不写
        .build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        Log.d(TAG, "onFailure: ");
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
        Log.d(TAG, "onResponse: " + response.body().string());
    }
});

1.2同步GET请求

String url = "http://wwww.baidu.com";
OkHttpClient okHttpClient = new OkHttpClient();
final Request request = new Request.Builder()
        .url(url)
        .build();
final Call call = okHttpClient.newCall(request);
new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            Response response = call.execute();
            Log.d(TAG, "run: " + response.body().string());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}).start();

2,分析源码

public OkHttpClient() {
  this(new Builder());
}

  OkHttpClient(Builder builder) {
    this.dispatcher = builder.dispatcher;
    this.proxy = builder.proxy;
    this.protocols = builder.protocols;
    this.connectionSpecs = builder.connectionSpecs;
    this.interceptors = Util.immutableList(builder.interceptors);
    this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
    this.eventListenerFactory = builder.eventListenerFactory;
    this.proxySelector = builder.proxySelector;
    this.cookieJar = builder.cookieJar;
    this.cache = builder.cache;
    this.internalCache = builder.internalCache;
    this.socketFactory = builder.socketFactory;

    boolean isTLS = false;
    for (ConnectionSpec spec : connectionSpecs) {
      isTLS = isTLS || spec.isTls();
    }

    if (builder.sslSocketFactory != null || !isTLS) {
      this.sslSocketFactory = builder.sslSocketFactory;
      this.certificateChainCleaner = builder.certificateChainCleaner;
    } else {
      X509TrustManager trustManager = Util.platformTrustManager();
      this.sslSocketFactory = newSslSocketFactory(trustManager);
      this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
    }

    if (sslSocketFactory != null) {
      Platform.get().configureSslSocketFactory(sslSocketFactory);
    }

    this.hostnameVerifier = builder.hostnameVerifier;
    this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
        certificateChainCleaner);
    this.proxyAuthenticator = builder.proxyAuthenticator;
    this.authenticator = builder.authenticator;
    this.connectionPool = builder.connectionPool;
    this.dns = builder.dns;
    this.followSslRedirects = builder.followSslRedirects;
    this.followRedirects = builder.followRedirects;
    this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
    this.callTimeout = builder.callTimeout;
    this.connectTimeout = builder.connectTimeout;
    this.readTimeout = builder.readTimeout;
    this.writeTimeout = builder.writeTimeout;
    this.pingInterval = builder.pingInterval;

    if (interceptors.contains(null)) {
      throw new IllegalStateException("Null interceptor: " + interceptors);
    }
    if (networkInterceptors.contains(null)) {
      throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
    }
  }

new Request.Builder().url(url).build()

[---------->Request.java]

public Builder() {
  this.method = "GET";
  this.headers = new Headers.Builder();
}

...
public Request build() {
    if (url == null) throw new IllegalStateException("url == null");
    return new Request(this);
}

okHttpClient.newCall

[------->OkHttpClient.java]

@Override public Call newCall(Request request) {
  return RealCall.newRealCall(this, request, false /* for web socket */);
}

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  // Safely publish the Call instance to the EventListener.
  RealCall call = new RealCall(client, originalRequest, forWebSocket);
  call.transmitter = new Transmitter(client, call);
  return call;
}

call.enqueue

[--------->RealCall.java]

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  transmitter.callStart();
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

client.dispatcher()

[---------->Dispatcher.java]

void enqueue(AsyncCall call) {
  synchronized (this) {
    readyAsyncCalls.add(call);

    // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
    // the same host.
    if (!call.get().forWebSocket) {
      AsyncCall existingCall = findExistingCallWithHost(call.host());
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
    }
  }
  promoteAndExecute();
}
private boolean promoteAndExecute() {
  assert (!Thread.holdsLock(this));

  List<AsyncCall> executableCalls = new ArrayList<>();
  boolean isRunning;
  synchronized (this) {
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall asyncCall = i.next();

      if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

      i.remove();
      asyncCall.callsPerHost().incrementAndGet();
      executableCalls.add(asyncCall);
      runningAsyncCalls.add(asyncCall);
    }
    isRunning = runningCallsCount() > 0;
  }

  for (int i = 0, size = executableCalls.size(); i < size; i++) {
    AsyncCall asyncCall = executableCalls.get(i);
    asyncCall.executeOn(executorService());
  }

  return isRunning;
}

executorService()

public synchronized ExecutorService executorService() {
  if (executorService == null) {
  //开线程池
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

asyncCall.executeOn

[------->RealCall#AsyncCall.java]

void executeOn(ExecutorService executorService) {
  assert (!Thread.holdsLock(client.dispatcher()));
  boolean success = false;
  try {
    executorService.execute(this);
    success = true;
  } catch (RejectedExecutionException e) {
    InterruptedIOException ioException = new InterruptedIOException("executor rejected");
    ioException.initCause(e);
    transmitter.noMoreExchanges(ioException);
    responseCallback.onFailure(RealCall.this, ioException);
  } finally {
    if (!success) {
      client.dispatcher().finished(this); // This call is no longer running!
    }
  }
}

executorService.execute(this)

[---------->NamedRunnable.java]

@Override public final void run() {
  String oldName = Thread.currentThread().getName();
  Thread.currentThread().setName(name);
  try {
    execute();
  } finally {
    Thread.currentThread().setName(oldName);
  }
}

execute()

[------->RealCall#AsyncCall.java]

@Override protected void execute() {
  boolean signalledCallback = false;
  transmitter.timeoutEnter();
  try {
    Response response = getResponseWithInterceptorChain();
    signalledCallback = true;
    responseCallback.onResponse(RealCall.this, response);
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
    client.dispatcher().finished(this);
  }
}
    Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        //①用户添加的全局拦截器
        interceptors.addAll(client.interceptors());
        //错误,重定向拦截器
        interceptors.add(new RetryAndFollowUpInterceptor(client));
        //桥接拦截器,桥接应用层与网络层,添加必要的头
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        //缓存处理
        interceptors.add(new CacheInterceptor(client.internalCache()));
        //连接拦截器
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
            //②通过okHttpClient.Builder#addNetworkIntercerptor()传进来的拦截器只对非网页的请求生效
            interceptors.addAll(client.networkInterceptors());
        }
        //访问服务器的拦截器
        interceptors.add(new CallServerInterceptor(forWebSocket));

        Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
                originalRequest, this, client.connectTimeoutMillis(),
                client.readTimeoutMillis(), client.writeTimeoutMillis());

        boolean calledNoMoreExchanges = false;
        try {
            Response response = chain.proceed(originalRequest);
            if (transmitter.isCanceled()) {
                closeQuietly(response);
                throw new IOException("Canceled");
            }
            return response;
        } catch (IOException e) {
            calledNoMoreExchanges = true;
            throw transmitter.noMoreExchanges(e);
        } finally {
            if (!calledNoMoreExchanges) {
                transmitter.noMoreExchanges(null);
            }
        }
    }

走到这里,就到了通过拦截器完成发起请求和返回Responce。

总结一下上面部分的时序图

okhttp 时序图_wps图片.jpg

拦截器-interceptor

用户可以传入的interceptor分为两类:

①一类是全局的 interceptor,该类 interceptor 在整个拦截器链中最早被调用,通过 OkHttpClient.Builder#addInterceptor(Interceptor) 传入;

②另外一类是非网页请求的 interceptor ,这类拦截器只会在非网页请求中被调用,并且是在组装完请求之后,真正发起网络请求前被调用,所有的 interceptor 被保存在 List interceptors 集合中,按照添加顺序来逐个调用,具体可参考 RealCall#getResponseWithInterceptorChain() 方法。通过 OkHttpClient.Builder#addNetworkInterceptor(Interceptor) 传入;

这里举一个简单的例子,例如有这样一个需求,我要监控App通过 OkHttp 发出的所有原始请求,以及整个请求所耗费的时间,针对这样的需求就可以使用第一类全局的 interceptor 在拦截器链头去做。

OkHttpClient okHttpClient = new OkHttpClient.Builder()
        .addInterceptor(new LoggingInterceptor())
        .build();
Request request = new Request.Builder()
        .url("http://www.publicobject.com/helloworld.txt")
        .header("User-Agent", "OkHttp Example")
        .build();
okHttpClient.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        Log.d(TAG, "onFailure: " + e.getMessage());
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
        ResponseBody body = response.body();
        if (body != null) {
            Log.d(TAG, "onResponse: " + response.body().string());
            body.close();
        }
    }
});
public class LoggingInterceptor implements Interceptor {
    private static final String TAG = "LoggingInterceptor";
    
    @Override
    public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();

        long startTime = System.nanoTime();
        Log.d(TAG, String.format("Sending request %s on %s%n%s",
                request.url(), chain.connection(), request.headers()));

        Response response =  chain.proceed(request);

        long endTime = System.nanoTime();
        Log.d(TAG, String.format("Received response for %s in %.1fms%n%s",
                response.request().url(), (endTime - startTime) / 1e6d, response.headers()));

        return response;
    }
}

针对这个请求,打印出来的结果

Sending request http://www.publicobject.com/helloworld.txt on null
User-Agent: OkHttp Example
        
Received response for https://publicobject.com/helloworld.txt in 1265.9ms
Server: nginx/1.10.0 (Ubuntu)
Date: Wed, 28 Mar 2018 08:19:48 GMT
Content-Type: text/plain
Content-Length: 1759
Last-Modified: Tue, 27 May 2014 02:35:47 GMT
Connection: keep-alive
ETag: "5383fa03-6df"
Accept-Ranges: bytes

分析一下拦截器源码

Interceptor.Chain chain = new RealInterceptorChain

public RealInterceptorChain(List<Interceptor> interceptors, Transmitter transmitter,
    @Nullable Exchange exchange, int index, Request request, Call call,
    int connectTimeout, int readTimeout, int writeTimeout) {
  this.interceptors = interceptors;
  this.transmitter = transmitter;
  this.exchange = exchange;
  this.index = index;
  this.request = request;
  this.call = call;
  this.connectTimeout = connectTimeout;
  this.readTimeout = readTimeout;
  this.writeTimeout = writeTimeout;
}

Response response = chain.proceed(originalRequest);

@Override public Response proceed(Request request) throws IOException {
  return proceed(request, transmitter, exchange);
}

public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
    throws IOException {
  if (index >= interceptors.size()) throw new AssertionError();

  calls++;

  // If we already have a stream, confirm that the incoming request will use it.
  if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must retain the same host and port");
  }

  // If we already have a stream, confirm that this is the only call to chain.proceed().
  if (this.exchange != null && calls > 1) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must call proceed() exactly once");
  }

  // Call the next interceptor in the chain.
  //重点在index+1,,通过不断的index+1,可以把interceptors集合里面的所有拦截器的proceed都执行一遍,得到最终的Responce
  RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
      index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
  Interceptor interceptor = interceptors.get(index);
  Response response = interceptor.intercept(next);

  // Confirm that the next interceptor made its required call to chain.proceed().
  if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
    throw new IllegalStateException("network interceptor " + interceptor
        + " must call proceed() exactly once");
  }

  // Confirm that the intercepted response isn't null.
  if (response == null) {
    throw new NullPointerException("interceptor " + interceptor + " returned null");
  }

  if (response.body() == null) {
    throw new IllegalStateException(
        "interceptor " + interceptor + " returned a response with no body");
  }

  return response;
}

拦截器是从数组的0下标开始一个个调用的。所以我们来看看ConnectInterceptor的源码

@Override public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Request request = realChain.request();
  Transmitter transmitter = realChain.transmitter();

  // We need the network to satisfy this request. Possibly for validating a conditional GET.
  boolean doExtensiveHealthChecks = !request.method().equals("GET");
  Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

  return realChain.proceed(request, transmitter, exchange);
}

transmitter.newExchange

Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
  synchronized (connectionPool) {
    if (noMoreExchanges) {
      throw new IllegalStateException("released");
    }
    if (exchange != null) {
      throw new IllegalStateException("cannot make a new request because the previous response "
          + "is still open: please call response.close()");
    }
  }

  ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
  Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

  synchronized (connectionPool) {
    this.exchange = result;
    this.exchangeRequestDone = false;
    this.exchangeResponseDone = false;
    return result;
  }
}

exchangeFinder.find

[------->ExchangeFinder.java]

public ExchangeCodec find(
    OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
  int connectTimeout = chain.connectTimeoutMillis();
  int readTimeout = chain.readTimeoutMillis();
  int writeTimeout = chain.writeTimeoutMillis();
  int pingIntervalMillis = client.pingIntervalMillis();
  boolean connectionRetryEnabled = client.retryOnConnectionFailure();

  try {
    RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
        writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
    return resultConnection.newCodec(client, chain);
  } catch (RouteException e) {
    trackFailure();
    throw e;
  } catch (IOException e) {
    trackFailure();
    throw new RouteException(e);
  }
}
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
    int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
    boolean doExtensiveHealthChecks) throws IOException {
  while (true) {
    RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
        pingIntervalMillis, connectionRetryEnabled);

    // If this is a brand new connection, we can skip the extensive health checks.
    synchronized (connectionPool) {
      if (candidate.successCount == 0) {
        return candidate;
      }
    }

    // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
    // isn't, take it out of the pool and start again.
    if (!candidate.isHealthy(doExtensiveHealthChecks)) {
      candidate.noNewExchanges();
      continue;
    }

    return candidate;
  }
}
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
    int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
  boolean foundPooledConnection = false;
  RealConnection result = null;
  Route selectedRoute = null;
  RealConnection releasedConnection;
  Socket toClose;
  synchronized (connectionPool) {
    if (transmitter.isCanceled()) throw new IOException("Canceled");
    hasStreamFailure = false; // This is a fresh attempt.

    // Attempt to use an already-allocated connection. We need to be careful here because our
    // already-allocated connection may have been restricted from creating new exchanges.
    releasedConnection = transmitter.connection;
    toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
        ? transmitter.releaseConnectionNoEvents()
        : null;

    if (transmitter.connection != null) {
      // We had an already-allocated connection and it's good.
      result = transmitter.connection;
      releasedConnection = null;
    }

    if (result == null) {
      // Attempt to get a connection from the pool.
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
        foundPooledConnection = true;
        result = transmitter.connection;
      } else if (nextRouteToTry != null) {
        selectedRoute = nextRouteToTry;
        nextRouteToTry = null;
      } else if (retryCurrentRoute()) {
        selectedRoute = transmitter.connection.route();
      }
    }
  }
  closeQuietly(toClose);

  if (releasedConnection != null) {
    eventListener.connectionReleased(call, releasedConnection);
  }
  if (foundPooledConnection) {
    eventListener.connectionAcquired(call, result);
  }
  if (result != null) {
    // If we found an already-allocated or pooled connection, we're done.
    return result;
  }

  // If we need a route selection, make one. This is a blocking operation.
  boolean newRouteSelection = false;
  if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
    newRouteSelection = true;
    routeSelection = routeSelector.next();
  }

  List<Route> routes = null;
  synchronized (connectionPool) {
    if (transmitter.isCanceled()) throw new IOException("Canceled");

    if (newRouteSelection) {
      // Now that we have a set of IP addresses, make another attempt at getting a connection from
      // the pool. This could match due to connection coalescing.
      routes = routeSelection.getAll();
      if (connectionPool.transmitterAcquirePooledConnection(
          address, transmitter, routes, false)) {
        foundPooledConnection = true;
        result = transmitter.connection;
      }
    }

    if (!foundPooledConnection) {
      if (selectedRoute == null) {
        selectedRoute = routeSelection.next();
      }

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
      result = new RealConnection(connectionPool, selectedRoute);
      connectingConnection = result;
    }
  }

  // If we found a pooled connection on the 2nd time around, we're done.
  if (foundPooledConnection) {
    eventListener.connectionAcquired(call, result);
    return result;
  }

  // Do TCP + TLS handshakes. This is a blocking operation.
  result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
      connectionRetryEnabled, call, eventListener);
  connectionPool.routeDatabase.connected(result.route());

  Socket socket = null;
  synchronized (connectionPool) {
    connectingConnection = null;
    // Last attempt at connection coalescing, which only occurs if we attempted multiple
    // concurrent connections to the same host.
    if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
      // We lost the race! Close the connection we created and return the pooled connection.
      result.noNewExchanges = true;
      socket = result.socket();
      result = transmitter.connection;
    } else {
      connectionPool.put(result);
      transmitter.acquireConnectionNoEvents(result);
    }
  }
  closeQuietly(socket);

  eventListener.connectionAcquired(call, result);
  return result;
}

result.connect

[--------------->RealConnection.java]

public void connect(int connectTimeout, int readTimeout, int writeTimeout,
    int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
    EventListener eventListener) {
  if (protocol != null) throw new IllegalStateException("already connected");

  RouteException routeException = null;
  List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
  ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

  if (route.address().sslSocketFactory() == null) {
    if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
      throw new RouteException(new UnknownServiceException(
          "CLEARTEXT communication not enabled for client"));
    }
    String host = route.address().url().host();
    if (!Platform.get().isCleartextTrafficPermitted(host)) {
      throw new RouteException(new UnknownServiceException(
          "CLEARTEXT communication to " + host + " not permitted by network security policy"));
    }
  } else {
    if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
      throw new RouteException(new UnknownServiceException(
          "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"));
    }
  }

  while (true) {
    try {
      if (route.requiresTunnel()) {
        connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
        if (rawSocket == null) {
          // We were unable to connect the tunnel but properly closed down our resources.
          break;
        }
      } else {
        connectSocket(connectTimeout, readTimeout, call, eventListener);
      }
      establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
      eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
      break;
    } catch (IOException e) {
      closeQuietly(socket);
      closeQuietly(rawSocket);
      socket = null;
      rawSocket = null;
      source = null;
      sink = null;
      handshake = null;
      protocol = null;
      http2Connection = null;

      eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);

      if (routeException == null) {
        routeException = new RouteException(e);
      } else {
        routeException.addConnectException(e);
      }

      if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
        throw routeException;
      }
    }
  }

  if (route.requiresTunnel() && rawSocket == null) {
    ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
        + MAX_TUNNEL_ATTEMPTS);
    throw new RouteException(exception);
  }

  if (http2Connection != null) {
    synchronized (connectionPool) {
      allocationLimit = http2Connection.maxConcurrentStreams();
    }
  }
}

connectSocket

private void connectSocket(int connectTimeout, int readTimeout, Call call,
    EventListener eventListener) throws IOException {
  Proxy proxy = route.proxy();
  Address address = route.address();

  rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
      ? address.socketFactory().createSocket()
      : new Socket(proxy);

  eventListener.connectStart(call, route.socketAddress(), proxy);
  rawSocket.setSoTimeout(readTimeout);
  try {
    Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
  } catch (ConnectException e) {
    ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
    ce.initCause(e);
    throw ce;
  }

  // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
  // More details:
  // https://github.com/square/okhttp/issues/3245
  // https://android-review.googlesource.com/#/c/271775/
  try {
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
  } catch (NullPointerException npe) {
    if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
      throw new IOException(npe);
    }
  }
}

Platform.get().connectSocket

[--------->Platform.java]

public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout)
    throws IOException {
  socket.connect(address, connectTimeout);
}

连接拦截器的时序图:

连接拦截器_wps图片.jpg

连接池(connectionPool)

在上面exchangeFinder.find的findConnection我们发现引入了connectionPool,顾名思义,这是一个连接池。


okhttp连接池复用_wps图片.jpg

每一个创建的socket对象都会放入到连接池中进行准备复用,并且给每个socket对象设置超时时间,在未超过时间时,同一个IP,端口的链接可以进行socket对象的复用,这样就可以避免重复打开链接所带来的性能下降。

我们看看connectionPool是怎么实现的

[-------->ExchangeFinder.java]

ExchangeFinder(Transmitter transmitter, RealConnectionPool connectionPool,
    Address address, Call call, EventListener eventListener) {
  this.transmitter = transmitter;
  this.connectionPool = connectionPool;
  this.address = address;
  this.call = call;
  this.eventListener = eventListener;
  this.routeSelector = new RouteSelector(
      address, connectionPool.routeDatabase, call, eventListener);
}

[---------->Transmitter.java]

  public Transmitter(OkHttpClient client, Call call) {
    this.client = client;
    this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
    this.call = call;
    this.eventListener = client.eventListenerFactory().create(call);
    this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
  }

this.exchangeFinder = new ExchangeFinder(this, connectionPool, createAddress(request.url()),
    call, eventListener);

[-------->OkHttpClient.java]

public ConnectionPool connectionPool() {
  return connectionPool;
}
public Builder() {
    ....
    connectionPool = new ConnectionPool();
}

[------------>ConnectionPool.java]

public final class ConnectionPool {
  final RealConnectionPool delegate;
  ....
}

[---------->RealConnectionPool.java]

public final class RealConnectionPool {

  private static final Executor executor = new ThreadPoolExecutor(0 /*              corePoolSize */,Integer.MAX_VALUE /* maximumPoolSize */, 
     60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool",     true));
      
  private final Deque<RealConnection> connections = new ArrayDeque<>();
     
  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }
  
    boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
      @Nullable List<Route> routes, boolean requireMultiplexed) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (requireMultiplexed && !connection.isMultiplexed()) continue;
      if (!connection.isEligible(address, routes)) continue;
      //从链接池里找到可以复用的链接
      transmitter.acquireConnectionNoEvents(connection);
      return true;
    }
    return false;
  }
 
}

手写okhttp架构

https://github.com/games2sven/okhttp

上一篇下一篇

猜你喜欢

热点阅读