Android 开发技术分享Android Developers

Okhttp源码解读第一篇——整体架构

2021-03-11  本文已影响0人  A_si

OkHttp概述

OkHttp 是适用于 Android 和 Java 应用程序的高效HTTP客户端。最早的版本是基于 HttpURLConnection 和 HttpClient 的封装,之后 Android 6.0 版移除了对 Apache HTTP 客户端的支持,OkHttp 也移除了 HttpClient,然后移除 HttpURLConnection 的底层支持,连接底层到应用全部自己实现,成为业界公认最佳方案,HttpURLConnection 的底层实现也改为了 OkHttp 的实现方式。

优势:

使用

使用 OkHttp 同步请求

   OkHttpClient client = new OkHttpClient();

   Request request = new Request.Builder()
      .url(BASE_URL + "/date")
      .build();

    Call call = client.newCall(request);
    Response response = call.execute();

使用 OkHttp 异步请求

  OkHttpClient client = new OkHttpClient();

  Request request = new Request.Builder()
      .url(BASE_URL + "/date")
      .build();

    Call call = client.newCall(request);
    call.enqueue(new Callback() {
        public void onResponse(Call call, Response response) 
          throws IOException {
            // ...
        }
        
        public void onFailure(Call call, IOException e) {
            fail();
        }
    });

源码走读

异步请求

call.enqueue开始,查看方法实现:

  fun enqueue(responseCallback: Callback)

这是一个抽象方法,所以需要回到Call call = client.newCall(request),查看newCall的实现,

  /** Prepares the [request] to be executed at some point in the future. */
  override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

那么真正调用enqueue的是RealCall,查看 RealCallenqueue的实现:

  override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

callStart()是一个监听器,监听请求的开始,重点在这里:client.dispatcher.enqueue(AsyncCall(responseCallback))。点进dispatcher

/**
 * Policy on when async requests are executed.
 *
 * Each dispatcher uses an [ExecutorService] to run calls internally. If you supply your own
 * executor, it should be able to run [the configured maximum][maxRequests] number of calls
 * concurrently.
 */
class Dispatcher constructor() {}

通过注释,可以知道这是一个使用 ExecutorService 的线程调度器。

  /**
   * The maximum number of requests to execute concurrently. Above this requests queue in memory,
   * waiting for the running calls to complete.
   *
   * If more than [maxRequests] requests are in flight when this is invoked, those requests will
   * remain in flight.
   */
  @get:Synchronized var maxRequests = 64
    set(maxRequests) {
      require(maxRequests >= 1) { "max < 1: $maxRequests" }
      synchronized(this) {
        field = maxRequests
      }
      promoteAndExecute()
    }

可以 Set 的同时执行的最大请求数,默认同时执行64个请求。

 /**
   * The maximum number of requests for each host to execute concurrently. This limits requests by
   * the URL's host name. Note that concurrent requests to a single IP address may still exceed this
   * limit: multiple hostnames may share an IP address or be routed through the same HTTP proxy.
   *
   * If more than [maxRequestsPerHost] requests are in flight when this is invoked, those requests
   * will remain in flight.
   *
   * WebSocket connections to hosts **do not** count against this limit.
   */
  @get:Synchronized var maxRequestsPerHost = 5
    set(maxRequestsPerHost) {
      require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
      synchronized(this) {
        field = maxRequestsPerHost
      }
      promoteAndExecute()
    }

每个主机能同时执行的最大请求数,默认是5个。

  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

内部使用一个核心线程数为0、容量无限大、60秒超时的线程池。

查看enqueue

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

readyAsyncCalls.add(call)把准备执行但未执行的 AsyncCall 放入队列,然后记录主机连接数,最后调用promoteAndExecute()

 private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

遍历队列,挑选符合条件可以被执行的 Call ,把他们放在正在执行的队列,然后调用asyncCall.executeOn(executorService)执行:

  fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

executorService.execute(this)这里既然传入了this,那么这个类肯定实现了 Runnable 接口:

  inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable 

我们查看 AsyncCall 的 run方法,找出怎么执行的:

 override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)
        }
      }
    }
  }

val response = getResponseWithInterceptorChain()这里直接返回的是 Response,说明getResponseWithInterceptorChain里会发起请求和返回,核心也就在这里。

同步请求

下面看下同步请求的执行:

  override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

很简单,直接调用getResponseWithInterceptorChain()

整体架构分析到此,下一篇详细分析getResponseWithInterceptorChain()

上一篇下一篇

猜你喜欢

热点阅读