Kotlin 的锁和多线程同步
Synchronized.kt 的源码:
/**
* Executes the given function [block] while holding the monitor of the given object [lock].
*/
@kotlin.internal.InlineOnly
public inline fun <R> synchronized(lock: Any, block: () -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
monitorEnter(lock)
try {
return block()
}
finally {
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
monitorExit(lock)
}
}
JvmFlagAnnotations.kt 的源码:
/**
* Marks the JVM backing field of the annotated property as `volatile`, meaning that writes to this field
* are immediately made visible to other threads.
*/
@Target(FIELD)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Volatile
/**
* Marks the JVM backing field of the annotated property as `transient`, meaning that it is not
* part of the default serialized form of the object.
*/
@Target(FIELD)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Transient
/**
* Marks the JVM method generated from the annotated function as `strictfp`, meaning that the precision
* of floating point operations performed inside the method needs to be restricted in order to
* achieve better portability.
*/
@Target(FUNCTION, CONSTRUCTOR, PROPERTY_GETTER, PROPERTY_SETTER, CLASS)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Strictfp
/**
* Marks the JVM method generated from the annotated function as `synchronized`, meaning that the method
* will be protected from concurrent execution by multiple threads by the monitor of the instance (or,
* for static methods, the class) on which the method is defined.
*/
@Target(FUNCTION, PROPERTY_GETTER, PROPERTY_SETTER)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Synchronized
如何使用 Synchronized 同步锁:
在Java中,给一个方法加锁 ,需要给方法加 synchronized 关键字
public synchronized void doSomething() {
}
kotlin 中没有 synchronized 关键之,取而代之的是 @Synchronized 注解
class MyUtil {
@Synchronized
fun doSomething() {
}
}
Kotlin 在方法内,可以使用 block 块
class Util {
val lock = Any()
fun main() {
synchronized(lock) {
}
}
}
Volatile 关键字
有时仅仅为了读写一个或者两个实例域就使用同步的话,显得开销过大;而Volatile关键字为实例域的同步访问提供了免锁的机制。
当一个共享变量被vlatile修饰时,其就具备两个含义:
1、一个线程修改了变量的值,变量的新值对其他线程是立即可见的。
2、禁止使用指令重排序。
什么是指令重排序呢?
重排序通常是编译器或者运行环境为了优化程序性能而采取的对指令重新排序执行的一种手段。重排序分为两类:编译期重排序和运行期重排序,分别对应着编译时和运行时环境。
在 kotlin 中没有 volatile
关键字,但是有 @Volatile
注解
class Util {
@Volatile
var lock = Any()
}
在 kotlin 中的 Any 和 java 中的 Object 相似,每一个类都是从 Any 继承过来的,但是 Any 并没有声明 wait() , notify() 和 notifyAll() 方法,这就意味着,你不能在kotlin类中调用这些方法。但是你仍然能够使用java.lang.Object的实例作为lock,并且调用相关的方法。下面将会展示一个使用 Object 做为 lock 解决生产者和消费者的问题。
private val lock = Object()
fun produce() = synchronized(lock) {
while(items>=maxItems) {
lock.wait()
}
Thread.sleep(Random.nextInt(100).toLong())
items++
println("Produced, count is$items:${Thread.currentThread()}")
lock.notifyAll()
}
fun consume() = synchronized(lock) {
while(items<=0) {
lock.wait()
}
Thread.sleep(Random.nextInt(100).toLong())
items--
println("Consumed, count is$items:${Thread.currentThread()}")
lock.notifyAll()
}
- volatile变量具有可见性
- volatile不保证原子性
- volatile保证有序性
volatile能禁止指令的重排序,因此volatile保证有序性。
volatile禁止指令重排序的含义:当程序执行到volatile变量的时候,在其前面的语句已经全部执行完成,并且结果对后面可见,而且该变量后面的语句都没有执行。
正确使用volatile
synchronized可以防止多个线程同时执行一段代码,这会阻塞一部分线程的执行,这样就会影响程序的执行效率。
而volatile在某些情况下的性能是优于synchronized的。
但是volatile无法替代synchronized,因为volatile无法保证操作的原子性。
通常情况下,我们使用volatile关键字要避开两种场景:
1、对变量的写操作依赖当前值。
2、该变量包含在具有其他变量的不变式中。
使用volatile的场景有很多,这里介绍两种常见的场景:
场景1:状态标志
当多线程执行该类的时候,我们需要对状态标志stop保持可见性,这样我们的运行才能实时保持正确的执行。这种情况如果使用sychronized的话显然要复杂的多。
class TestVolatile1 {
@Volatile
var stop = false;
fun onInit(){
if(stop){
}
}
fun onStop(){
stop = true;
}
}
场景2: 双重检查模式(DCL)
在 Java 中,我们的单例模式经常会这样写,第一次判空是为了不必要的同步操作,第二次判断是只有在MyLock实例==null的时候才会去new一个实例出来,当多线程调用时,当进行这两次判空时,我们需要保证instance的可见性。
public class MyLock {
private static volatile MyLock instance = null;
public static MyLock getInstance() {
if(instance == null){
synchronized (MyLock.class){
if(instance == null){
instance = new MyLock();
}
}
}
return instance;
}
}
在 Kotlin 中实现多线程同步的方式
“ 现有 Task1、Task2 等多个并行任务,如何等待全部执行完成后,执行 Task3。”
在 Kotlin 中我们有多种实现方式:
- Thread.join
- Synchronized
- ReentrantLock
- BlockingQueue
- CountDownLatch
- CyclicBarrier
- CAS
- Future
- CompletableFuture
- Rxjava
- Coroutine
- Flow
我们先定义三个Task,模拟上述场景, Task3 基于 Task1、Task2 返回的结果拼接字符串,每个 Task 通过 sleep 模拟耗时:
val task1: () -> String = {
sleep(2000)
"Hello".also { println("task1 finished: $it") }
}
val task2: () -> String = {
sleep(2000)
"World".also { println("task2 finished: $it") }
}
val task3: (String, String) -> String = { p1, p2 ->
sleep(2000)
"$p1 $p2".also { println("task3 finished: $it") }
}
1. Thread.join()
Kotlin 兼容 Java,Java 的所有线程工具默认都可以使用。其中最简单的线程同步方式就是使用 Thread 的 join() :
@Test
fun test_join() {
lateinit var s1: String
lateinit var s2: String
val t1 = Thread { s1 = task1() }
val t2 = Thread { s2 = task2() }
t1.start()
t2.start()
t1.join()
t2.join()
task3(s1, s2)
}
2. Synchronized
使用 synchronized 锁进行同步
@Test
fun test_synchrnoized() {
lateinit var s1: String
Thread {
synchronized(Unit) {
s1 = task1()
}
}.start()
val s2: String = task2()
synchronized(Unit) {
task3(s1, s2)
}
}
但是如果超过三个任务,使用 synchrnoized 这种写法就比较别扭了,为了同步多个并行任务的结果需要声明n个锁,并嵌套n个 synchronized。
3. ReentrantLock
ReentrantLock 是 JUC 提供的线程锁,可以替换 synchronized 的使用
@Test
fun test_ReentrantLock() {
lateinit var s1: String
val lock = ReentrantLock()
Thread {
lock.lock()
s1 = task1()
lock.unlock()
}.start()
val s2: String = task2()
lock.lock()
task3(s1, s2)
lock.unlock()
}
ReentrantLock 的好处是,当有多个并行任务时是不会出现嵌套 synchrnoized 的问题,但仍然需要创建多个 lock 管理不同的任务。
4. BlockingQueue
阻塞队列内部也是通过 Lock 实现的,所以也可以达到同步锁的效果
@Test
fun test_blockingQueue() {
lateinit var s1: String
val queue = SynchronousQueue<Unit>()
Thread {
s1 = task1()
queue.put(Unit)
}.start()
val s2: String = task2()
queue.take()
task3(s1, s2)
}
当然,阻塞队列更多是使用在生产/消费场景中的同步。
5. CountDownLatch
UC 中的锁大都基于 AQS 实现的,可以分为独享锁和共享锁。ReentrantLock 就是一种独享锁。相比之下,共享锁更适合本场景。 例如 CountDownLatch,它可以让一个线程一直处于阻塞状态,直到其他线程的执行全部完成:
@Test
fun test_countdownlatch() {
lateinit var s1: String
lateinit var s2: String
val cd = CountDownLatch(2)
Thread() {
s1 = task1()
cd.countDown()
}.start()
Thread() {
s2 = task2()
cd.countDown()
}.start()
cd.await()
task3(s1, s2)
}
共享锁的好处是不必为了每个任务都创建单独的锁,即使再多并行任务写起来也很轻松。
6. CyclicBarrier
CyclicBarrier 是 JUC 提供的另一种共享锁机制,它可以让一组线程到达一个同步点后再一起继续运行,其中任意一个线程未达到同步点,其他已到达的线程均会被阻塞。
与 CountDownLatch 的区别在于 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置后重复使用,这也正是 Cyclic 的命名由来,可以循环使用。
@Test
fun test_CyclicBarrier() {
lateinit var s1: String
lateinit var s2: String
val cb = CyclicBarrier(3)
Thread {
s1 = task1()
cb.await()
}.start()
Thread() {
s2 = task1()
cb.await()
}.start()
cb.await()
task3(s1, s2)
}
7. CAS
AQS 内部通过自旋锁实现同步,自旋锁的本质是利用 CompareAndSwap 避免线程阻塞的开销。 因此,我们可以使用基于 CAS 的原子类计数,达到实现无锁操作的目的。
@Test
fun test_cas() {
lateinit var s1: String
lateinit var s2: String
val cas = AtomicInteger(2)
Thread {
s1 = task1()
cas.getAndDecrement()
}.start()
Thread {
s2 = task2()
cas.getAndDecrement()
}.start()
while (cas.get() != 0) {}
task3(s1, s2)
}
While 循环空转看起来有些浪费资源,但是自旋锁的本质就是这样,所以 CAS 仅仅适用于一些cpu密集型的短任务同步。
8. Future
上面无论有锁操作还是无锁操作,都需要定义两个变量s1、s2记录结果非常不方便。 Java 1.5 开始,提供了 Callable 和 Future ,可以在任务执行结束时返回结果。
@Test
fun test_future() {
val future1 = FutureTask(Callable(task1))
val future2 = FutureTask(Callable(task2))
Executors.newCachedThreadPool().execute(future1)
Executors.newCachedThreadPool().execute(future2)
task3(future1.get(), future2.get())
}
通过 future.get(),可以同步等待结果返回,写起来非常方便。
9. CompletableFuture
future.get() 虽然方便,但是会阻塞线程。 Java 8 中引入了 CompletableFuture ,他实现了 Future 接口的同时实现了 CompletionStage 接口。 CompletableFuture 可以针对多个 CompletionStage 进行逻辑组合、实现复杂的异步编程。 这些逻辑组合的方法以回调的形式避免了线程阻塞:
@Test
fun test_CompletableFuture() {
CompletableFuture.supplyAsync(task1)
.thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
task3(p1, p2)
}.join()
}
10. RxJava
RxJava 提供的各种操作符以及线程切换能力同样可以帮助我们实现需求: zip 操作符可以组合两个 Observable 的结果;subscribeOn 用来启动异步任务
@Test
fun test_Rxjava() {
Observable.zip(
Observable.fromCallable(Callable(task1))
.subscribeOn(Schedulers.newThread()),
Observable.fromCallable(Callable(task2))
.subscribeOn(Schedulers.newThread()),
BiFunction(task3)
).test().await()
}
11. Coroutine
前面那么多方式,其实都是 Java 的工具。 Coroutine 终于算得上是 Kotlin 特有的工具了:
@Test
fun test_coroutine() {
runBlocking {
val c1 = async(Dispatchers.IO) {
task1()
}
val c2 = async(Dispatchers.IO) {
task2()
}
task3(c1.await(), c2.await())
}
}
12. Flow
Flow 就是 Coroutine 版的 RxJava,具备很多 RxJava 的操作符,例如 zip:
@Test
fun test_flow() {
val flow1 = flow<String> { emit(task1()) }
val flow2 = flow<String> { emit(task2()) }
runBlocking {
flow1.zip(flow2) { t1, t2 ->
task3(t1, t2)
}.flowOn(Dispatchers.IO).collect()
}
}
FlowOn 使得 Task 在异步计算并发射结果。
总结
作为结论,在 Kotlin 上最好用的线程同步方案首推协程。