庖丁解牛,一文搞懂Kotlin协程的线程池

2023-09-02  本文已影响0人  Android程序员老鸦

上两篇文章梳理了协程的运行原理,因为线程池相对于协程实现来说是可以单独拿出来讲的,所以分析到线程池的时候没有继续深入,现在就单独来看看协程线程池的实现。
协程线程池是由分发器Dispatchers来维护的,主要是Dispatchers.DEFAULT和Dispatchers.IO两个分发器。

Dispatchers.DEFAULT

Dispatchers.DEFAULT它持有的线程池是CoroutineScheduler:

  【SchedulerCoroutineDispatcher】
    internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
   private var coroutineScheduler = createScheduler()

    private fun createScheduler() =
        CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) // 线程池分发

    ...略
}

  【CoroutineScheduler】
   internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int, // 核心线程数
    @JvmField val maxPoolSize: Int, // 最大线程数
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, // 空闲线程存活时间
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME // 线程池名称"CoroutineScheduler"
) : Executor, Closeable {
    // 用于存储全局的纯CPU(不阻塞)任务
    @JvmField
    val globalCpuQueue = GlobalQueue()

    // 用于存储全局的执行非纯CPU(可能阻塞)任务
    @JvmField
    val globalBlockingQueue = GlobalQueue()

    // 用于记录当前处于Parked状态(一段时间后自动终止)的线程的数量
    private val parkedWorkersStack = atomic(0L)

    // 用于保存当前线程池中的线程
    // workers[0]永远为null,作为哨兵位
    // index从1到maxPoolSize为有效线程
    @JvmField
    val workers = AtomicReferenceArray<Worker?>(maxPoolSize + 1)

    // 控制状态
    private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
    // 表示已经创建的线程的数量
    private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
    // 表示可以获取的CPU令牌数量,初始值为线程池核心线程数量
    private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)

    // 获取指定的状态的已经创建的线程的数量
    private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
    // 获取指定的状态的执行阻塞任务的数量
    private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
    // 获取指定的状态的CPU令牌数量
    public inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()

    // 当前已经创建的线程数量加1
    private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
    // 当前已经创建的线程数量减1
    private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
    // 当前执行阻塞任务的线程数量加1
    private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
    // 当前执行阻塞任务的线程数量减1
    private inline fun decrementBlockingTasks() {
        controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
    }

    // 尝试获取CPU令牌
    private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
        val available = availableCpuPermits(state)
        if (available == 0) return false
        val update = state - (1L shl CPU_PERMITS_SHIFT)
        if (controlState.compareAndSet(state, update)) return true
    }
    // 释放CPU令牌
    private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)

    // 表示当前线程池是否关闭
    private val _isTerminated = atomic(false)
    val isTerminated: Boolean get() = _isTerminated.value

companion object {
    // 用于标记一个线程是否在parkedWorkersStack中(处于Parked状态)
    @JvmField
    val NOT_IN_STACK = Symbol("NOT_IN_STACK")

    // 线程的三个状态
    // CLAIMED表示线程可以执行任务
    // PARKED表示线程暂停执行任务,一段时间后会自动进入终止状态
    // TERMINATED表示线程处于终止状态
    private const val PARKED = -1
    private const val CLAIMED = 0
    private const val TERMINATED = 1

    // 以下五个常量为掩码
    private const val BLOCKING_SHIFT = 21 // 2x1024x1024
    // 1-21位
    private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
    // 22-42位
    private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
    // 42
    private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
    // 43-63位
    private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT

    // 以下两个常量用于require中参数判断
    internal const val MIN_SUPPORTED_POOL_SIZE = 1
    // 2x1024x1024-2
    internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2

    // parkedWorkersStack的掩码
    private const val PARKED_INDEX_MASK = CREATED_MASK
    // inv表示01反转
    private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
    private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
}

       
    

    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        trackTask() // this is needed for virtual time support
        // 传入ruanable和taskContext构建一个Task实例,taskContext决定线程是cpu密集型还是阻塞型,通过上面Dispatchers.DEFAULT的代码可以知
        // 道,它分发任务的时候是创建的NonBlockingContext,也就是非阻塞型的
        val task = createTask(block, taskContext) 
        // 判断当前线程是否运行在当前线程池
        val currentWorker = currentWorker()
        // 尝试加入本地队列,注意这个方法是Woker的扩展方法,这个本地队列是Woker的变量,
        // Woker是对Thread的一层封装,专门用于协程线程池里用的
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        // notAdded不为null,代表加入本地任务队列失败,也代表此时的线程不是运行在线程池
        if (notAdded != null) {
            // 尝试加入全局任务队列,这里的全局指的是线程池里维护的两个变量
            if (!addToGlobalQueue(notAdded)) {
                // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        val skipUnpark = tailDispatch && currentWorker != null
        // Checking 'task' instead of 'notAdded' is completely okay
        // 如果任务是非阻塞任务,则唤醒cpu线程
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            // 否则就唤醒阻塞线程
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
    // currentWorker方法
    private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }

    // Worker的扩展函数submitToLocalQueue
    private fun CoroutineScheduler.Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
      // Worker 为空,直接返回任务本身
      if (this == null) return task
      // 非阻塞的任务且此时Worker处于阻塞状态,则直接返回
      if (task.mode == TASK_NON_BLOCKING && state === CoroutineScheduler.WorkerState.BLOCKING) {
        return task
      }
      //表示本地队列里存有任务了
      mayHaveLocalTasks = true
      //加入到本地队列里
      //localQueue 为Worker的成员变量
      return localQueue.add(task, fair = tailDispatch)
    }
    // addToGlobalQueue方法
    private fun addToGlobalQueue(task: Task): Boolean {
      return if (task.isBlocking) {
        //加入到全局阻塞队列
        globalBlockingQueue.addLast(task)
    } else {
        //加入到全局cpu队列
        globalCpuQueue.addLast(task)
      }
    }
    fun signalCpuWork() {
     //尝试去唤醒正在挂起的线程,若是有线程可以被唤醒,则无需创建新线程
      if (tryUnpark()) return
      //若唤醒不成功,则需要尝试创建线程
      if (tryCreateWorker()) return
      //再试一次,边界条件
      tryUnpark()
    }
    // 尝试创建线程的方法
    private fun tryCreateWorker(state: Long = controlState.value): Boolean {
     //获取当前已经创建的线程数
      val created = createdWorkers(state)
      //获取当前阻塞的任务数
      val blocking = blockingTasks(state)
      //已创建的线程数-阻塞的任务数=非阻塞的线程数
      //coerceAtLeast(0) 表示结果至少是0
      val cpuWorkers = (created - blocking).coerceAtLeast(0)
      //如果非阻塞数小于核心线程数
    // 现在若是已经创建了5个线程,而这几个线程都在执行IO任务,此时就需要再创建新的线程来执行任务,因为此时CPU是空闲的。
     //只要非阻塞任务的个数小于核心线程数,那么就需要创建新的线程,目的是为了充分利用CPU。
      if (cpuWorkers < corePoolSize) {
          //创建线程
          val newCpuWorkers = createNewWorker()
          //如果当前只有一个非阻塞线程并且核心线程数>1,那么再创建一个线程
          //目的是为了方便"偷"任务...
          if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
          //创建成功
          if (newCpuWorkers > 0) return true
      }
      return false
    }
    // 创建线程的方法
    //workers 为Worker 数组,因为需要对数组进行add 操作,因此需要同步访问
    private fun createNewWorker(): Int {
      synchronized(workers) {
        if (isTerminated) return -1
        val state = controlState.value
        //获取已创建的线程数
        val created = createdWorkers(state)
        //阻塞的任务数
        val blocking = blockingTasks(state)
        //非阻塞的线程数
        val cpuWorkers = (created - blocking).coerceAtLeast(0)
        //非阻塞的线程数不能超过核心线程数
        if (cpuWorkers >= corePoolSize) return 0
        //已创建的线程数不能大于最大线程数
        if (created >= maxPoolSize) return 0
        val newIndex = createdWorkers + 1
        require(newIndex > 0 && workers[newIndex] == null)
        //构造线程
        val worker = Worker(newIndex)
        //记录到数组里
        workers[newIndex] = worker
        //记录创建的线程数
        require(newIndex == incrementCreatedWorkers())
        //开启线程
        worker.start()
        //当前非阻塞线程数
        return cpuWorkers + 1
    }
}

    ...略
}

通过以上分析,知道有三个任务队列,这里要理清一下这些任务队列的关系:


线程池dispatch的操作可以大概做个总结:
1.先把分发下来的runable任务封装成Task,并标记它是阻塞型的还是非阻塞型的
2.判断当前线程是否是运行在当前线程池里的线程,是的话就把传进来的Task直接加入当前线程的任务队列中
3.否则根据task类型加入到线程池的相应任务队列中
4.尝试唤醒相应类型的线程,没有的话就创建线程来执行工作
线程池里的Worker

看看真正执行任务的地方,重点看Worker里的runWorker方法

// Worker内部类
    internal inner class Worker private constructor() : Thread() {
        init {
            isDaemon = true
        }

        // guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
        @Volatile // volatile for push/pop operation into parkedWorkersStack
        var indexInArray = 0
            set(index) {
                name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
                field = index
            }

        constructor(index: Int) : this() {
            indexInArray = index
        }

        inline val scheduler get() = this@CoroutineScheduler

        @JvmField
        val localQueue: WorkQueue = WorkQueue()
        // 线程状态
        var state = WorkerState.DORMANT
        

        override fun run() = runWorker()

        private fun runWorker() {
            var rescanned = false
          //一直查找任务去执行,除非worker终止了
          while (!isTerminated && state != CoroutineScheduler.WorkerState.TERMINATED) {
            //从队列里寻找任务
            //mayHaveLocalTasks:本地队列里是否有任务
            val task = findTask(mayHaveLocalTasks)
            if (task != null) {
                rescanned = false
                minDelayUntilStealableTaskNs = 0L
                //任务获取到后,执行任务
                executeTask(task)
                //任务执行完毕,继续循环查找任务
                continue
            } else {
                mayHaveLocalTasks = false
            }
            if (minDelayUntilStealableTaskNs != 0L) {
                // 这个rescanned控制下面的分支代码的执行
                if (!rescanned) {
                    rescanned = true
                } else {
                    //挂起一段时间再去偷
                        rescanned = false
                        tryReleaseCpu(WorkerState.PARKING)
                        interrupted()
                        LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                        minDelayUntilStealableTaskNs = 0L   // 只执行一次,下次循环不会再命中              
                }  
                continue
            }
          //尝试挂起
          tryPark()
        }
        //释放token
        tryReleaseCpu(CoroutineScheduler.WorkerState.TERMINATED)
      }

      fun findTask(scanLocalQueue: Boolean): Task? {
          //尝试获取cpu 许可
          //若是拿到cpu 许可,则可以执行任何任务
           // 它和核心线程数相关,假设我们是8核CPU,那么同一时间最多只能有8个线程在CPU上执行。因此,若是其它线程想
          // 要执行非阻塞任务(占用CPU),需要申请许可(token),申请成功说明有CPU空闲,此时该线程可以执行非阻塞任务。否则,只能执行阻塞任务。
          if (tryAcquireCpuPermit())
               return findAnyTask(scanLocalQueue)
           //拿不到,若是本地队列有任务,则从本地取,否则从全局阻塞队列取
           val task = if (scanLocalQueue) {
               localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
           } else {
               globalBlockingQueue.removeFirstOrNull()
           }
           //都拿不到,则偷别人的
            // 当从本地队列、全局队列里都没找出任务时,当前的Worker打起了别个Woker的主意。我们知道全局队列是所有Worker共
          // 享,而本地队列是每个Worker私有的。因此,当前Worker发现自己没任务可以执行的时候会去看看其它Worker的本地队列里
          // 是否有可以执行的任务,若是有就可以偷过来用。
           return task ?: trySteal(blockingOnly = true)
       }

       private fun findAnyTask(scanLocalQueue: Boolean): Task? {
           if (scanLocalQueue) {
               //可以从本地队列找
               val globalFirst = nextInt(2 * corePoolSize) == 0
               if (globalFirst) pollGlobalQueues()?.let { return it }
               localQueue.poll()?.let { return it }
               if (!globalFirst) pollGlobalQueues()?.let { return it }
           } else {
               //从全局队列找
               pollGlobalQueues()?.let { return it }
           }
           //偷别人的
           return trySteal(blockingOnly = false)
       }
// 从全局队列获取任务
private fun pollGlobalQueues(): Task? {
    // 随机获取CPU任务或者非CPU任务
    if (nextInt(2) == 0) {
        // 优先获取CPU任务
        globalCpuQueue.removeFirstOrNull()?.let { return it }
        return globalBlockingQueue.removeFirstOrNull()
    } else {
        // 优先获取非CPU任务
        globalBlockingQueue.removeFirstOrNull()?.let { return it }
        return globalCpuQueue.removeFirstOrNull()
    }
}
————————————————
版权声明:本文为CSDN博主「LeeDuo.」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126492774


      // 挂起函数,这是针对woker的状态
      private fun tryPark() {
          //没有在挂起栈里
        if (!inStack()) {
            //将worker放入挂起栈里
            parkedWorkersStackPush(this)
            return
        }
        while (inStack() && workerCtl.value == CoroutineScheduler.PARKED) { // Prevent spurious wakeups
            if (isTerminated || state == CoroutineScheduler.WorkerState.TERMINATED) break
            //真正挂起(不是实时,会暂时挂起一段时间idleWorkerKeepAliveNs,线程空闲时间),并标记worker state 状态,会修改state = WorkerState.TERMINATED,runWorker循环里会判断该标记,若是终止了,则循环停止,整个线程执行结束。
            park()
        }

        ...略
    }

做个小总结:
1.线程执行的时候从全局队列、本地队列里查找任务。
2.若是没找到,则尝试从别的Worker 本地队列里偷取任务。
3.能够找到任务则最终会执行协程体里的代码。
4.若是没有任务,则根据策略挂起一段时间或是最终退出线程的执行。

Dispatchers.IO:
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {

    private val default = UnlimitedIoScheduler.limitedParallelism(
        systemProp(
            IO_PARALLELISM_PROPERTY_NAME,
            64.coerceAtLeast(AVAILABLE_PROCESSORS)
        )
    )
    override fun dispatch(context: CoroutineContext, block: Runnable) {
            default.dispatch(context, block)
    }
    ...略
}

// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
private object UnlimitedIoScheduler : CoroutineDispatcher() {

    @InternalCoroutinesApi
    override fun dispatchYield(context: CoroutineContext, block: Runnable) {
        DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
    }
     // 最终调用了DefaultScheduler的分分发方法
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
    }
}
// UnlimitedIoScheduler父类CoroutineDispatcher的方法
    public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
        parallelism.checkParallelism()
        // 返回一个UnlimitedIoScheduler的代理Dispatcher
        return LimitedDispatcher(this, parallelism)
    }

通过以上代码可以看到Dispatchers.IO最终调用到的线程池分发方法是DEFAULT里的,而DEFAULT是个单例,所以两者其实共享了线程池CoroutineScheduler.
但是随着对代理类LimitedDispatcher的深入研究发现Dispatchers.IO策略上有所不同。

// 因为Dispatchers.IO是单例的,所以内部的这个LimitedDispatcher也是单例的,先看名字,这是一个受限制的分发器,
// 限制啥?限制的是最大并行数量,由系统属性设定的值或 CPU 核心数的最大值决定,系统属性值一般设置的是 64,也就是说,一般来说,该调度器可能会创建 64 个线程来执行任务
internal class LimitedDispatcher(
    private val dispatcher: CoroutineDispatcher,
    private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {

    @Volatile
    private var runningWorkers = 0

    private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)

    // A separate object that we can synchronize on for K/N
    private val workerAllocationLock = SynchronizedObject()

    @ExperimentalCoroutinesApi
    override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
        parallelism.checkParallelism()
        if (parallelism >= this.parallelism) return this
        return super.limitedParallelism(parallelism)
    }

    // 核心方法,分发任务
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // 先执行dispatchInternal
        dispatchInternal(block) {
            // 再把自己作为runnable参数给到代理的dispatcher
            dispatcher.dispatch(this, this)
        }
    }
  
    private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
        // 添加任务到队列,如果此时最大并行任务超过了限制就直接return,暂时不执行dispatch方法
        if (addAndTryDispatching(block)) return
        // 统计当前的并行任务,超过了就直接return,暂时不执行dispatch方法
        if (!tryAllocateWorker()) return
        // 做完了添加操作执行dispatch(),此时才会真正分发到协程线程池CoroutineSchduler
        dispatch()
    }
    // 添加任务到队列并尝试分发
     private fun addAndTryDispatching(block: Runnable): Boolean {
         // 添加本地任务队列,添加不需要条件 来了就添加
        queue.addLast(block)
        // 判断正在执行的任务的数量是否大于最大并行线程数量,正是在这里做到了限制最大并行IO线程的作用
        return runningWorkers >= parallelism 
    }
    // 尝试分配线程,其实不会新建线程,只是统计一下当前的并行线程数量,在这里仍然会先判断是否大于了最大并行限制
    private fun tryAllocateWorker(): Boolean {
        synchronized(workerAllocationLock) {
            if (runningWorkers >= parallelism) return false
            ++runningWorkers
            return true
        }
    }

       // 注意他自己实现了Runnable,最终他是把自己送到CoroutineSchduler去执行的,协程传进来的任务在这里被包装了执行
    override fun run() {
        var fairnessCounter = 0
        // 别被这个循环体迷惑,就以为线程都是串行执行的,实际每次新协程创建任务都会执行run方法,然后最终包装到协程线程池去并发执行
        // 这个循环是为了执行超过最大并发数的时候,那些只添加到了队列但是没有立马执行的任务
        while (true) {
            val task = queue.removeFirstOrNull()
            if (task != null) {
                try {
                    task.run()
                } catch (e: Throwable) {
                    handleCoroutineException(EmptyCoroutineContext, e)
                }
                //  这里比较有意思,当有大量的并发任务(比如短时间添加了200个任务),那么这个方法有可能会长时间的执行下去,所以
              // 这里为了公平起见不长期霸占资源,当超过执行了16个任务后就重新分发一次,这样就能短暂的让出cpu让别的线程执行(不知道理解的对不对)
                if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
                    // Do "yield" to let other views to execute their runnable as well
                    // Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
                    dispatcher.dispatch(this, this)
                    return
                }
                continue
            }

            synchronized(workerAllocationLock) {
                --runningWorkers
                if (queue.size == 0) return
                ++runningWorkers
                fairnessCounter = 0
            }
        }
    }
}

由上可以看出Dispatchers.IO 任务分发是借助于DefaultScheduler,也就是Dispatchers.Default的能力,因此两者是共用一个线程池。
只是Dispatchers.IO 比较特殊,它有个队列,该队列作用:
当IO 任务分派个数超过设定的并行数时,不会直接进行分发,而是先存放在队列里。

到此协程线程池基本原理分析完毕,下篇打算探讨一些协程的常用案例,看看哪些适合应用在实际开发中以及注意事项。

上一篇下一篇

猜你喜欢

热点阅读