Kotlin高级进阶

Kotlin/Native 异步并发模型(1)—— Worker

2020-01-03  本文已影响0人  _Preacher_

Kotlin/Native 现状的一些讨论

Kotlin/Native 编写的程序作为一种原生二进制程序,没有强大的运行时虚拟机来提供各种运行时的保障,
因此它需要重新思考一套自己的异步并发模型。实际上 JVM 这一套机制是 C/C++
这种传统命令式编程语言的线程同步机制的延续,但 Kotlin 在编程范式上吸收了部分函数式编程的特性,因此 Kotlin/Native
的同步方案从设计思想上向函数式编程靠拢,即对象不可变,其宗旨就是如果对象本身不可变,那就不存在线程安全的问题。

Kotlin/Native 中,我们能实现的异步和并发方案有好几种,甚是混乱。第一种方式是,我们可以直接使用相关操作系统平台提供的 API
来自己开启线程,例如在 Linux 上,我们就可以像写 C 语言程序一样,自己手动调用 pthread_create
来创建线程,但是这样写出来的代码就违反了平台通用性的原则,例如如果你要将你的程序移植到 Windows 上,那异步并发方式就得全部改用
Windows 平台的机制,可移植性太差,在编写多平台程序的时候这种方式就很丑陋。

Kotlin/Native 自身提供给了我们两套异步并发的 API,首先是协程,但 Kotlin/Native 的协程与 Kotlin/JVM
的协程区别很大,Kotlin/Native 的协程是单线程的,也就是说它只能用来执行一些不占用 CPU 资源的并发任务,例如网络请求,如果要利用 CPU
多核的能力来进行并行计算,Native 版的协程就失去了作用,当然,官方说了要尽快解决这个问题,并且前几天(2019 年 12
月底)我发现官方已经发布了 Native 多线程版协程的预览版本,这个会在后文详细讨论。因为当前主分支版本的协程不能并行计算,因此官方在 Kotlin/Native
诞生之初就已经提供了另一套专门做并行任务的工具,即 WorkerWorker 与 Kotlin/Native 的异步并发模型紧密相连,做到了既能利用 CPU
多核能力,又能保障线程安全(虽然做法很粗暴)。这篇文章我们会讨论 Worker 与 Kotlin/Native 异步并发机制,而协程将在下一篇讨论。

Worker 初步使用

首先用 Intellij IDEA 创建一个基本的 Kotlin/Native 工程。我当前电脑的操作系统版本是
macOS 10.15.1,因此后面的一些示例和测试方案都基于该系统,作为类 Unix 系统,Linux 上的对应行为可能也相差无几,
但是这些示例不保证在 Windows 等系统上也全部可用,或行为完全一致。

先来看看 Worker 怎么用。然后我们在 main 函数中编写以下代码:

fun main() {
    val worker = Worker.start(true, "worker1")  // 1
    worker.execute(TransferMode.SAFE, { 2 + 1 }) {
        (it + 100).toString()
    }.consume {
        println(it)
    }
}

使用 Worker.start 函数我们就可以创建一个新的 Worker,然后调用 Workerexecute
函数就可以在别的线程执行任务了。这个函数接收三个参数,第一个我们先不看,第二个参数,即示例中的 { 2 + 1 }
将扮演一个生产者的角色(为了简便,后文我们使用源码中的命名 producer 来称呼它),它会在外面的线程执行,producer
的返回值将在第三个参数(也是个 lambda 表达式,同样,后文我们用源码中的命名 job 来称呼它)中作为参数来提供。
而 job 中的代码会在别的线程中执行。
最后 execute 函数的返回结果是一个 Future<T> 类型的对象,调用它的成员函数 consume
即可获得在 job 执行的结果。运行代码验证一下,结果如下:

103

现在还要验证一个问题,producer 与 job 还有 consume 到底在哪个线程执行,虽然官方文档肯定不会骗我们,但是我们自己要掌握验证的方法:

fun main() {
    val worker = Worker.start(true, "worker1")
    println("位置 1 的线程 id:${pthread_self()!!.rawValue.toLong()}")
    worker.execute(TransferMode.SAFE, {
        println("位置 2 的线程 id:${pthread_self()!!.rawValue.toLong()}")
        2 + 1
    }) {
        println("位置 3 的线程 id:${pthread_self()!!.rawValue.toLong()}")
        (it + 100).toString()
    }.consume {
        println("位置 4 的线程 id:${pthread_self()!!.rawValue.toLong()}")
        // println(it)
    }
}

我们在 3 个位置上都使用 pthread_self() 函数来打印当前线程 id,输出如下:

位置 1 的线程 id:4484095424
位置 2 的线程 id:4484095424
位置 3 的线程 id:123145437896704
位置 4 的线程 id:4484095424

果然,官方文档诚不欺我(手动狗头)。

有了直观的认识之后,我们会发现 Worker 用起来和协程中的 async/await 有点像。但是我们发现它比 async/await
要麻烦,同样,我们先不看 execute 函数的第一个参数,我们可能会觉得 producer 有点多此一举,为什么在其他线程执行的 job
必须使用 producer 传递过来的参数,它直接捕捉上下文的变量不行吗?为了验证这一点,于是就有了如下代码:

fun main() {
    val worker = Worker.start(true, "worker1")
    val a = "第二个参数是干啥用的?"
    worker.execute(TransferMode.SAFE, { 2 + 1 }) {
        println(a)
        (it + 100).toString()
    }.consume {
        println(it)
    }
}

重新运行程序,直接编译报错:

e: kotlin.native.concurrent.Worker.execute must take an unbound, non-capturing function or lambda

为了让信息简洁一点,上面复制过来的报错信息省略了报错的文件以及行数。我们可以看到报错信息中说,在 Worker
中执行的函数或 lambda 表达式不能有变量捕捉。于是,这就代表着,producer 是 job 与外界线程进行数据传递的唯一入口,job
无法通过变量捕捉自由访问外界线程的对象。这么看起来 Worker 的实际太粗暴了,如果我要一次传递两个对象怎么办?用
Pair 包装一下,那一次要传递三个对象呢?用 Triple!四个呢?呃……F**k。

对象的传递

现在,我们在主线程创建了一个对象,我们想把它传递到 Worker 中,由于 producer 是在外部线程中运行的,
且对外部的对象进行变量捕捉不会失败,因此我们自然而然可能会写出如下代码。

fun main() {
    val worker = Worker.start(true, "worker1")
    val testData = TestData()
    val future = worker.execute(TransferMode.SAFE, { testData }) {
        it
    }
    future.consume { println(it.index) }
}

data class TestData(var index: Int = 0)

然后理所当然的运行报错:

Uncaught Kotlin exception: kotlin.IllegalStateException: Illegal transfer state

然后我们去看看 execute 的第一个参数 TransferMode,这是一个枚举类型,共有两个枚举值,
我们去源码注释中看看这两个值的区别:

……

不复制粘贴了,有点长,大意就是:在 SAFE 模式下,如果传递到 Worker 的对象可被别的线程或 Worker 引用到,则直接抛出异常,而在
UNSAFE 模式下,不做检查,而是直接把对象传递过去,但是有可能会造成程序崩溃。接下来我们要验证两个事情:

第一,当主线程把对象传递给 Worker 后就不再持有对该对象的引用,SAFE 模式是否可以正常工作:

fun main() {
    val worker = Worker.start(true, "worker1")
    var testData: TestData? = TestData()
    val future = worker.execute(TransferMode.SAFE, {
        val data = testData!!
        testData = null
        data
    }) { data ->
        repeat(20000) { data.index++ }
        data
    }
    future.consume { println(it.index) }
}

data class TestData(var index: Int = 0)

程序正常打印输出 20000。这样来看 SAFE 模式这样设计的确是合理的,如果主线程将对象传递给 Worker
之后仍然可以继续访问对象,那就可能发生线程安全问题,因此 SAFE 模式直接拒绝了这种事情的发生而抛出异常,但是这样的写法太丑陋了,
如果要实现更优雅的写法,唯一的办法就是让 testData 的引用范围不超出 produce,也就是说把数据产生的过程都写到 produce
里面,虽然这样也没有那么优雅,但是还能接受。

官方提供了一套理论来解释上面示例程序所表现出来的行为:被 producer 传递的对象会被包装一个叫做对象子图(object
subgraph)的东西,对象子图生成之后,原线程就不能再访问对象子图,如果是在 SAFE
模式,就会使用图遍历算法检查对象子图的访问。以上都是目前官方文档的阐述,
关于 Worker 的更多资料我觉得官方在日后还会有更多补充,等到那时再详细分析。

再来看看 UNSAFE 模式:

fun main() {
    val worker = Worker.start(true, "worker1")
    val testData = TestData()
    val future = worker.execute(TransferMode.UNSAFE, { testData }) { data ->
        repeat(20000) { data.index++ }
        data
    }
    repeat(20000) { testData.index++ }
    future.consume { println(it.index) }
}

data class TestData(var index: Int = 0)

如果线程访问是安全的,应该输出 40000,但是你每次运行这段代码得到的结果都会不同,反正都小于 40000。所以,果然 UNSAFE
模式简单粗暴,直接撒手不管了,我最初的预测是,当两个线程真正发生同一时刻访问同一个变量的时候会发生崩溃,
而在其他情况下,程序照常运行,就像源码注释里说的那样。但事实并非如此,所以我建议,千万不要靠"人"来保障线程安全,在
99.99% 的情况下都应该使用 SAFE 模式,如果使用 UNSAFE 模式,风险将直接暴露出来,且 Kotlin/Native
没有线程锁来帮你兜底。

对象子图冻结、全局变量以及单例

上面已经讨论了很多情况,但是跨线程访问都是在函数内部,也就是局部变量的跨线程访问。但如果是全局变量、
单例这种在多个函数内都可以访问的变量,情况则会有所不同。

先阐述一个对象子图冻结的概念,对于某些变量,我们确切知道其一定不可变,那对于这种变量,无论在多少个线程中同时访问它都是安全的,
既然如此,那 Kotlin/Native 也没必要对这种变量在访问的时候做子图校验,对于这样的变量,我们就可以称其为被冻结的变量,
官方文档关于这个地方有些前后矛盾,
先说冻结的变量只有枚举一种,但后面又阐述了两种变量冻结的情况(后文会介绍)。还有一种情况,也有可能一个变量一开始是非冻结的,
后面又被冻结了,但是有一点是不变的,那就是已冻结的对象不可解冻。关于在多个 Worker
中访问枚举变量的情况这里也就不举例了,很简单。

下面讲讲几个重要的注解和几种重要的情况

访问全局变量

val abc = "abc"

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(abc)
    }
    future.consume { println(abc) }
}

程序正常运行,打印输出:

abc
abc

这很奇怪,官方文档说全局变量(没有特殊标记)
只能在主线程访问,但是我们明明在子线程访问了它,程序却正常运行。我们把修饰变量 abcval 改成
var 再试一试,程序果然抛出异常:IncorrectDereferenceException

那如果是非 String 的引用类型呢?

val testData = TestData()

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(testData)
    }
    future.consume { println(testData) }
}

class TestData

程序抛出异常:IncorrectDereferenceException,多测试几次后,基本可以得出一个结论:

这条结论是官方文档中没有提到的,也算是踩坑的一个收获。

在这里有个插曲,既然 val 修饰的基本类型与 String 一定是不可变的,那对于局部变量这个结论是否也成立?
我们把对象的传递小节中的第一个示例修改一下:

fun main() {
    val worker = Worker.start(true, "worker1")
    val testData = "abc"
    val future = worker.execute(TransferMode.SAFE, { testData }) {
        println(it)
        it
    }
    future.consume { println(it) }
}

最主要的变化就是把 testData 换成了一个 String,程序正常,多测试几次,对原生类型也是成立的,因此结论 1对局部变量也成立。
其实仔细思考一下,对于 val 修饰的原生类型与 String,从逻辑上确实可以证明它们一定是不可变。

@ThreadLocal 与 @SharedImmutable 以及单例

修改上面的示例:

@ThreadLocal
val testData = TestData()

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(++testData.index)
    }
    future.consume { println(testData.index) }
}

data class TestData(var index: Int = 0)

输出如下:

1
0

结果与官方的相同,如果全局变量使用 @ThreadLocal 修饰,则该变量在每个线程都有不同的副本,即使修改,也在线程之间不可见。

再修改示例,仅仅把上一个示例中的 @ThreadLocal 改成 @SharedImmutable,然后程序抛出异常;再把 println(++testData.index)
改成 println(testData.index) 程序运行正常,根据官方的说法 @SharedImmutable 的作用是将变量冻结,这样的话该变量就可以共享了,
但它毕竟只是一个注解,如果你编写了修改该变量的代码,也只能在运行时才能发现问题。

最后看看单例:

object A {
    var index = 1
}

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(A.index)
    }
    future.consume { println(A.index) }
}

如果运行程序,我们就发现 object 修饰的单例与使用 @SharedImmutable 修饰的全局变量行为是一致的,不过,
单例也可以使用 @ThreadLocal 来修饰,这也就不多说了。

总结以及其他

如果说还有什么是我没有提到的,那应该就是对象子图分离和原始共享内存,不过这两部分内容主要是用于 C
程序与 Kotlin/Native 交互的情况,例如将 Kotlin/Native 对象保存到 C 结构体中,在真实的用例中,我们使用 Kotlin/Native
调用 C 代码的情况应该占绝大多数,而使用 C 调用 Kotlin/Native 应该极少发生,因此以后有机会再探讨这部分内容。

开篇讲过 Worker 是目前 Kotlin/Native 实现并行计算的主要工具,不过 Native 版的协程最近也推出了多线程版本的预览版,
关于这部分内容将是下一篇文章要重点探讨的。

上一篇 下一篇

猜你喜欢

热点阅读