OkHttp 流程分析

2023-09-01  本文已影响0人  blossom_6694

1、创建Client
Builder 是OkHttpClient的一个内部类,使用的是构建者模式

val client = OkHttpClient.Builder().connectTimeout(5000, TimeUnit.MILLISECONDS).build()

// Builder
class Builder constructor() {
  internal var dispatcher: Dispatcher = Dispatcher() // 创建分发器对象
  ...
}

2、构建Request
也就是构建请求报文信息,请求的url,header等信息,也是用的Builder模式

val request = Request.Builder().url(url).build()

3、创建Call 对象,可以把Call 对象当成是Request 和 Response 的中间桥梁,
Call 是一个接口,真正的实现类是RealCall。

val call = client.newCall(request)

// OkHttpClient.newCall
override fun newCall(request: Request): Call {
  return RealCall.newRealCall(this, request, forWebSocket = false)
}

// RealCall.newRealCall
fun newRealCall(
    client: OkHttpClient,
    originalRequest: Request,
    forWebSocket: Boolean
  ): RealCall {
    // Safely publish the Call instance to the EventListener.
    return RealCall(client, originalRequest, forWebSocket).apply {
      transmitter = Transmitter(client, this)
    }
  }
}

同步请求:

val response = call.execute()

// RealCall.execute
override fun execute(): Response {
    // 一个RealCall 只能执行一次
  synchronized(this) {
    check(!executed) { "Already Executed" }
    executed = true
  }
  transmitter.timeoutEnter()
  transmitter.callStart()
  try {
    client.dispatcher.executed(this)
    // 通过拦截器链获取Response
    return getResponseWithInterceptorChain()
  } finally {
   
    client.dispatcher.finished(this)
  }
}

// Dispatcher.executed
// 把call 添加到runningSyncCalls 队列中
@Synchronized internal fun executed(call: RealCall) {
  runningSyncCalls.add(call)
}

// Dispatcher.finished
internal fun finished(call: AsyncCall) {
  call.callsPerHost().decrementAndGet()
  finished(runningAsyncCalls, call)
}

private fun <T> finished(calls: Deque<T>, call: T) {
  val idleCallback: Runnable?
  synchronized(this) {
      // 同步队列中移除这个call
    if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
    idleCallback = this.idleCallback
  }
  
  // 查看是否还有请求,如有再继续执行
  val isRunning = promoteAndExecute()

  if (!isRunning && idleCallback != null) {
    idleCallback.run()
  }
}

异步请求:

val response2 = call.enqueue(object : Callback {
    override fun onFailure(call: Call, e: IOException) {
        TODO("Not yet implemented")
    }

    override fun onResponse(call: Call, response: Response) {
        TODO("Not yet implemented")
    }
})


// ReaCall.enqueue
override fun enqueue(responseCallback: Callback) {
  synchronized(this) {
    check(!executed) { "Already Executed" }
    executed = true
  }
  transmitter.callStart()
  // 将new 一个AsyncCall,AsyncCall 是一个Runnable,是RealCall 的一个内部类
  client.dispatcher.enqueue(AsyncCall(responseCallback))
}

// Dispatcher.enqueue
internal fun enqueue(call: AsyncCall) {
  synchronized(this) {
    // 在等待队列中添加AsyncCall
    readyAsyncCalls.add(call)

    if (!call.get().forWebSocket) {
      val existingCall = findExistingCallWithHost(call.host())
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
    }
  }
  promoteAndExecute()
}

// Dispatcher.promoteAndExecute
private fun promoteAndExecute(): Boolean {
  assert(!Thread.holdsLock(this))

  val executableCalls = mutableListOf<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
    val i = readyAsyncCalls.iterator()
    while (i.hasNext()) {
      val asyncCall = i.next()
      
      // 判断异步运行队列是否大于等于 64 且同主机请求个数 是否大于等于5
      if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
      if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.

      i.remove()
      asyncCall.callsPerHost().incrementAndGet()
      // 条件成立,则将等到请求队列的请求 readyAsyncCalls 添加到 executableCalls 和 runningAsyncCalls 中
      executableCalls.add(asyncCall)
      runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
  }
  
  // 将刚刚添加到runningAsyncCalls 中的请求,放在线程池中执行
  for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    asyncCall.executeOn(executorService)
  }

  return isRunning
}

// Dispatcher.executorService
// 异步请求线程池
// 核心线程是为0,这样没有任务的时候,就可以线程全部关掉, 最大线程数是无限大,其实不是的,
// 因为异步运行队列的最大个数限制在了64
// 超时时间的是60秒
@get:JvmName("executorService") val executorService: ExecutorService
  get() {
    if (executorServiceOrNull == null) {
      executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
          SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
    }
    return executorServiceOrNull!!
  }
  
  // AsyncCall.enqueue
  fun executeOn(executorService: ExecutorService) {
  assert(!Thread.holdsLock(client.dispatcher))
  var success = false
  try {
    // 执行AsyncCall 里面的run 方法
    executorService.execute(this)
    success = true
  } catch (e: RejectedExecutionException) {
    val ioException = InterruptedIOException("executor rejected")
    ioException.initCause(e)
    transmitter.noMoreExchanges(ioException)
    responseCallback.onFailure(this@RealCall, ioException)
  } finally {
    if (!success) {
      client.dispatcher.finished(this) // This call is no longer running!
    }
  }
}

// AsyncCall.run
override fun run() {
  threadName("OkHttp ${redactedUrl()}") {
    var signalledCallback = false
    transmitter.timeoutEnter()
    try {
      // 最后调用 RealCall的 getResponseWithInterceptorChain 获取请求结果
      val response = getResponseWithInterceptorChain()
      signalledCallback = true
      responseCallback.onResponse(this@RealCall, response)
    } catch (e: IOException) {
      if (signalledCallback) {
        // Do not signal the callback twice!
        Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
      } else {
        responseCallback.onFailure(this@RealCall, e)
      }
    } finally {
      client.dispatcher.finished(this)
    }
  }
}

同步请求和异步请求都是从拦截器链中获取结果,采用的是责任链模式:

@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
  // Build a full stack of interceptors.
  val interceptors = mutableListOf<Interceptor>()
  interceptors += client.interceptors
  interceptors += RetryAndFollowUpInterceptor(client)
  interceptors += BridgeInterceptor(client.cookieJar)
  interceptors += CacheInterceptor(client.cache)
  interceptors += ConnectInterceptor
  if (!forWebSocket) {
    interceptors += client.networkInterceptors
  }
  interceptors += CallServerInterceptor(forWebSocket)

  val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
      client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)

  var calledNoMoreExchanges = false
  try {
    val response = chain.proceed(originalRequest)
    if (transmitter.isCanceled) {
      response.closeQuietly()
      throw IOException("Canceled")
    }
    return response
  } catch (e: IOException) {
    calledNoMoreExchanges = true
    throw transmitter.noMoreExchanges(e) as Throwable
  } finally {
    if (!calledNoMoreExchanges) {
      transmitter.noMoreExchanges(null)
    }
  }
}
上一篇 下一篇

猜你喜欢

热点阅读