okhttp3学习笔记

2020-01-15  本文已影响0人  阿丹_90

源码版本:com.squareup.okhttp3:okhttp:4.3.0

1 okhttp简单应用

通过okhttp发起异步post请求示例:

// 创建OkHttpClient单例
val okHttpClient = OkHttpClient()

/* 媒体类型 用于描述http请求或响应主体的内容类型。
* 官方注释:一种[RFC 2045][rfc_2045]媒体类型,适合描述http请求或响应体的内容类型。
* 默认charset=utf-8。*/
val contentType: MediaType = "application/json; charset=utf-8".toMediaType()
// json格式请求参数
val body: RequestBody = "{\"userId\": \"001\", \"password\": \"123\"}".toRequestBody(contentType)
// 新建post请求
val post: Request = Request.Builder()
                .url("")
                .post(body)
                .build()

// 异步发起,回调形式返回结果
okHttpClient.newCall(post).enqueue(object : Callback {
            override fun onFailure(call: Call, e: IOException) {
            }

            @Throws(IOException::class)
            override fun onResponse(call: Call, response: Response) {
                val request = call.request()
                val requestBody = request.body
                val responseBody = response.body
            }
        })

可以看出用okhttp发送请求需要3步

  1. 创建一个单例OkHttpClient对象
  2. 创建原始请求Request对象
  3. 用OkHttpClient对象把Request对象包装成直接可以用Call,并排队执行。

1.1 OkHttpClient

关于OkHttpClient的官方注释:OkHttpClient是http请求(Call)的工厂,可以用来发送http请求和接受响应。只创建一个OkHttpClient实例并将其用于所有HTTP调用时,OkHttp的性能最佳。因为每个OkHttpClient实例都拥有自己的连接池和线程池。重用连接和线程可以减少延迟并节省内存。相反,为每个请求创建一个OkHttpClient实例会浪费空闲池的资源。
OkHttpClient可以直接通过构造方法创建也可以通过OkHttpClient.Builder创建。

    // 通过OkHttpClient.Builder创建OkHttpClient实例
    val okHttpClient = OkHttpClient.Builder()
            .eventListener(object : EventListener() {
                override fun callStart(call: Call) {
                    Log.d("network", "发起请求${call.request().url}")
                }
            })
            .build()

    // 通过无参构造创建OkHttpClient实例
    val okHttpClient = OkHttpClient()
    // OkHttpClient.Builder的build()方法
    fun build(): OkHttpClient = OkHttpClient(this)
open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
    // OkHttpClient无参构造
    constructor() : this(Builder())
    // 省略其他代码
    ...
}

查看源码可以看出两种方法其实是一种,但如果想自定义OkHttpClient属性则需通过OkHttpClient.Builder。我们来看一下OkHttpClient.Builder的属性和默认值,这些最后会赋值给OkHttpClient。

    // 调度器
    internal var dispatcher: Dispatcher = Dispatcher()
    // 连接池 底层用ArrayDeque
    internal var connectionPool: ConnectionPool = ConnectionPool()
    // 用户责任链/拦截器链 ArrayList
    internal val interceptors: MutableList<Interceptor> = mutableListOf()
    // 网络责任链/拦截器链 ArrayList
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
    // EventListener工厂
    internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
    // 连接失败是否重试
    internal var retryOnConnectionFailure = true
    // 身份验证
    internal var authenticator: Authenticator = Authenticator.NONE
    internal var followRedirects = true
    internal var followSslRedirects = true
    internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
    internal var cache: Cache? = null
    internal var dns: Dns = Dns.SYSTEM
    internal var proxy: Proxy? = null
    internal var proxySelector: ProxySelector? = null
    internal var proxyAuthenticator: Authenticator = Authenticator.NONE
    internal var socketFactory: SocketFactory = SocketFactory.getDefault()
    internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
    internal var x509TrustManagerOrNull: X509TrustManager? = null
    internal var connectionSpecs: List<ConnectionSpec> = OkHttpClient.DEFAULT_CONNECTION_SPECS
    internal var protocols: List<Protocol> = OkHttpClient.DEFAULT_PROTOCOLS
    internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
    internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
    internal var certificateChainCleaner: CertificateChainCleaner? = null
    internal var callTimeout = 0
    // 链接超时时间
    internal var connectTimeout = 10_000
    // 读超时时间
    internal var readTimeout = 10_000
    // 写超时时间
    internal var writeTimeout = 10_000
    internal var pingInterval = 0

1.2 Request

Request只能通过Request.Builder创建,默认get请求。可自定义请求方式、url、header、tag等。Request不能被OkHttpClient直接使用,需要包装成Call。

Call是interface,我们来看一下Call定义了哪些属性和方法。

interface Call : Cloneable {
       // 返回原始Request对象
       fun request(): Request
       // 同步执行请求,直接返回响应
       @Throws(IOException::class)
       fun execute(): Response
       // 异步执行请求,把请求加入队列等待执行,通过Callback返回响应
       fun enqueue(responseCallback: Callback)
       // 请求可取消,已经执行过的不可以取消
       fun cancel()
       // 是否执行了 execute或enqueue
       fun isExecuted(): Boolean
       fun isCanceled(): Boolean
       // 返回跨越整个调用的Timeout,包括解析DNS、连接、写入请求体、服务器处理和读取响应体过程。如果调用需要重定向或重试,所有操作都必须在一个超时周期内完成。
       fun timeout(): Timeout
       // 复制当前Call,因为一个Call只可以发起一次,想重复发起相同相求需复制一个新的Call
       public override fun clone(): Call
       // Call工厂interface
       interface Factory {
              fun newCall(request: Request): Call
       }
}

RealCall是Call的实现类,原始请求request先包装成RealCall,后续还会包装成RealCall的内部类AsnyCall,这里先不说。OkHttpClient实现了Call.Factory,所以前边说OkHttpClient是http请求(Call)的工厂。

RealCall对象是通过okHttpClient.newCall(post)创建的。

override fun newCall(request: Request): Call {
       return RealCall.newRealCall(this, request,forWebSocket = false)
}

RealCall构造方法私有,只能通过RealCall的newRealCall(client: OkHttpClient,originalRequest: Request,forWebSocket: Boolean)构建实例。

companion object {
       fun newRealCall(
              client: OkHttpClient,
              originalRequest: Request,
              forWebSocket: Boolean
       ): RealCall {
       // Safely publish the Call instance to the EventListener.
       return RealCall(client, originalRequest, forWebSocket).apply {
              transmitter = Transmitter(client,this)
              }
       }
}

1.3 发送异步请求

平时应用比较多的是异步请求,接下来我们看一下okHttp异步请求的执行过程。RealCall的enqueue方法:

override fun enqueue(responseCallback: Callback) {
       // 加锁用标记位判断当前对象是否以执行,执行过抛异常"Already Executed",没执行过则执行。
       synchronized(this){
              check(!executed){ "Already Executed" }
              executed =true
       }
       // 记录堆栈跟踪,告诉eventListener请求开始
       transmitter.callStart()
       // 通过dispatcher进行排队
       client.dispatcher.enqueue(AsyncCall(responseCallback))
}

2 Dispatcher和AsyncCall

Dispatcher 调度器,用于决定何时执行异步请求。可在创建OkHttpClient时自定义,一般没必要自定义,okhttp有很好的默认实现。说到Dispatcher就必须提到AsyncCall,AsyncCall是RealCall的一个内部类,是为了配合dispatcher进行线程调度的Runnable。
我们先看一下Dispatcher的部分源码。

  // 并发执行的请求的最大数目
  @get:Synchronized var maxRequests = 64
    set(maxRequests) {
      require(maxRequests >= 1) { "max < 1: $maxRequests" }
      synchronized(this) {
        field = maxRequests
      }
      promoteAndExecute()
    }

  // 同一host请求并发执行的请求的最大数目。WebSocket 不受此限制
  @get:Synchronized var maxRequestsPerHost = 5
    set(maxRequestsPerHost) {
      require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
      synchronized(this) {
        field = maxRequestsPerHost
      }
      promoteAndExecute()
    }

  // 线程池
  private var executorServiceOrNull: ExecutorService? = null
  // 线程池懒汉式初始化
  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

  // 准备执行的AsyncCall列表
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()
  // 正在执行的AsyncCall列表
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  // 请求入队
  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      // 把call加入准备执行的AsyncCall列表
      readyAsyncCalls.add(call)

      // 相同host的call共享同一AtomicInteger 
      // 这里我个人认为把 readyAsyncCalls.add(call)放到if语句后更好
      if (!call.get().forWebSocket) {
        // 在执行中call列表和准备执行call列表寻找相同host的call
        val existingCall = findExistingCallWithHost(call.host())
        // 存在相同host的call,则把找到的call的AtomicInteger赋值给当前call
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    // 推动执行
    promoteAndExecute()
  }

  /**
   * 在执行中call列表和准备执行call列表寻找相同host的call
   */
  private fun findExistingCallWithHost(host: String): AsyncCall? {
    for (existingCall in runningAsyncCalls) {
      if (existingCall.host() == host) return existingCall
    }
    /* 如果执行到这里肯定能找到相同host的call的,
    因为当前call已经被加入到readyAsyncCalls,
    当遍历到最后一个call就是当前call本身,
    肯定符合existingCall.host() == host,然后返回的call实际就是当前call。*/
    for (existingCall in readyAsyncCalls) {
      if (existingCall.host() == host) return existingCall
    }
    return null
  }

  /**
   * 筛选合适的待执行列表的call到执行中列表。
   * 并在线程池executorService中执行
   */
  private fun promoteAndExecute(): Boolean {
    assert(!Thread.holdsLock(this))

    // 可拓展列表,用于存放待执行列表中合适移动到执行中列表的call
    val executableCalls = mutableListOf<AsyncCall>()
    // 当前是否有请求正在执行
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      // 遍历待执行call列表
      while (i.hasNext()) {
        val asyncCall = i.next()
        // 如果当前正在执行的call数量超过了调度器允许的最大值,则终止操作
        if (runningAsyncCalls.size >= this.maxRequests) break
        // 如果与当前call相同host的call数量超过调度器允许的同一个host的最大值,则跳过当前call,继续判断下一个call是否合适。
        if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue 

        // 当前call合适
        // 移出待执行列表
        i.remove()
        // 同host请求计数加1
        asyncCall.callsPerHost().incrementAndGet()
        // 把当前call添加到可执行列表,当循环完成此列表中包含此次筛选的所有合适的call
        executableCalls.add(asyncCall)
        // 把当前call添加到执行中列表,此时此列表中包含之前正在执行的call和此次筛选出的合适执行但未执行的call
        runningAsyncCalls.add(asyncCall)
      }
      // 判断是否有正在执行的(包含合适执行但未执行的)请求
      isRunning = runningCallsCount() > 0
    }

    // 把此次循环筛选出来的call放到线程池中执行,for结束runningAsyncCalls中的所有call都处于执行中状态
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

这部分代码比较长,但是只做了一件事:把请求放到队列中合理调度执行。程序的入口是enqueue(call: AsyncCall)方法,操作一共分3步

  1. 把call放到待执行列表。
  2. 相同host的call共用同一AtomicInteger。AtomicInteger用于统计当前host正在执行的请求数量。
  3. 筛选合适的待执行call,从待执行列表取出放到执行中列表,然后在线程池中执行。

我们在看一下AsyncCall的源码

  internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
    // 多线程操作可见,且操作保证原子性
    @Volatile private var callsPerHost = AtomicInteger(0)
    fun callsPerHost(): AtomicInteger = callsPerHost
    fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsPerHost = other.callsPerHost
    }
    // 原始请求的host
    fun host(): String = originalRequest.url.host
    // 原始请求
    fun request(): Request = originalRequest
    // 当前RealCall对象,持有此内部类的外部类对象
    fun get(): RealCall = this@RealCall

    // 执行当前AsnyCall
    fun executeOn(executorService: ExecutorService) {
      assert(!Thread.holdsLock(client.dispatcher))
      var success = false
      try {
        // 把当前call(Runnable对象)加入dispatcher的线程池中执行
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        // 执行结束,尝试释放connection,并通知eventListener
        transmitter.noMoreExchanges(ioException)
        // 失败回调,把当前call和异常传回
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          // 执行结束,操作调度器,在call列表删除当前call;如队列有等待call则唤醒执行;如调度器空闲则通知调度器空闲监听器。
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

    // 当前Runnable的run方法
    override fun run() {
      // 当前是在子线程
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        // 开始计时
        transmitter.timeoutEnter()
        try {
          // 真正的网络请求操作,response是网络请求结果
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          // 回调 请求成功
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", INFO, e)
          } else {
            // 回调 请求失败
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          // 停止计时 即调用:transmitter.cancel() 
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            // 回调 请求失败
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          // 执行结束,操作调度器,在call列表删除当前call;如队列有等待call则唤醒执行;如调度器空闲则通知调度器空闲监听器。
          client.dispatcher.finished(this)
        }
      }
    }
  }

Asnycall的代码非常简单,只有2个重点方法:
fun executeOn(executorService: ExecutorService) 和
Runnable的抽象方法override fun run()。
executeOn(executorService: ExecutorService)是提供给dispatcher当请求排队到可执行队列中时操作请求执行的入口,整个方法除了异常处理和回调处理,只有一句executorService.execute(this),executorService是从dispatcher持有的所有call共享的线程池,okhttp通过此线程池实现线程的回收利用。
run()方法就是网络请求的真正入口,即:val response = getResponseWithInterceptorChain()获取网络请求结果,然后通过回调把结果传给调用者。至此,okhttp的线程调度完成,Dispatcher和Asnycall的任务完成。

3 责任链

接下来我们看一下Asnycall的run()里用来获取网络响应的getResponseWithInterceptorChain(),它拉开了okhttp的一大亮点的序幕:责任链模式。

  @Throws(IOException::class)
  fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors. 需要留意任务链中拦截器的顺序
    val interceptors = mutableListOf<Interceptor>()
    // 用户自定义拦截器,最先执行的拦截器
    interceptors += client.interceptors
    // 失败重试和重定向拦截器,如果请求取消则可能抛出IOException
    interceptors += RetryAndFollowUpInterceptor(client)
    // 桥接拦截器
    interceptors += BridgeInterceptor(client.cookieJar)
    // 缓存拦截器
    interceptors += CacheInterceptor(client.cache)
    // 链接拦截器,用于打开目标服务器的链接
    interceptors += ConnectInterceptor
    // 用户自定义的网络拦截器
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    // 真正对服务器进行网络调用的拦截器,任务链最后一个拦截器
    interceptors += CallServerInterceptor(forWebSocket)

    // 创建任务链,注意第四个参数index的值是0
    val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
        client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)

    var calledNoMoreExchanges = false
    try {
      // 执行任务链下一个
      val response = chain.proceed(originalRequest)
      // 以下都是异常处理
      if (transmitter.isCanceled) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw transmitter.noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null)
      }
    }
  }

任务链的执行入口是val response = chain.proceed(originalRequest),我们点进去看一下,这里确实不太好理解,所以源码中有很多注释。

override fun proceed(request: Request): Response {
    return proceed(request, transmitter, exchange)
  }

  @Throws(IOException::class)
  fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
    if (index >= interceptors.size) throw AssertionError()

    calls++

    // If we already have a stream, confirm that the incoming request will use it.
    check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
      "network interceptor ${interceptors[index - 1]} must retain the same host and port"
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    check(this.exchange == null || calls <= 1) {
      "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
    }

    // Call the next interceptor in the chain. 注意这里第四个参数index+1了
    val next = RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
    // 取出当前index对应的interceptor
    val interceptor = interceptors[index]

    // 当前拦截器执行拦截操作
    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")

    // Confirm that the next interceptor made its required call to chain.proceed().
    check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {"network interceptor $interceptor must call proceed() exactly once"}

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }

这里的重点同样只有一行val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null"),我们点开看一下intercept(next) ,这是接口Interceptor的方法,我们随便找一个实现类看,这里用的是简单好理解的LoggingInterceptor。

private static class LoggingInterceptor implements Interceptor {
    @Override
    public Response intercept(Chain chain) throws IOException {
            // 操作request
            long t1 = System.nanoTime();
            Request request = chain.request();
            logger.info(String.format("Sending request %s on %s%n%s",
                    request.url(), chain.connection(), request.headers()));

            // 执行任务链下一个
            Response response = chain.proceed(request);

            // 操作response
            long t2 = System.nanoTime();
            logger.info(String.format("Received response for %s in %.1fms%n%s",
                    request.url(), (t2 - t1) / 1e6d, response.headers()));
            return response;
        }
    }

拦截器的 intercept(Chain chain)方法基本分为3部分

  1. 处理request;
  2. 执行任务链下一个,获取response;
  3. 处理response,并返回给上级调用者。

责任链的工作流程


未命名文件-6.png
上一篇 下一篇

猜你喜欢

热点阅读