Kotlin(十九)协程初探究<2>
共享资源的控制
共享资源可以是共享变量,也可是数据库的共享数据,如何保证共享资源的正确性至关重要。
1. 锁模式
类似前面的例子一个商品库存在一个抢购商品,活动中高并发可能出现超卖的情况,所以要对商品库商品共享资源进行加锁,保证同一个时刻只有一个线程对其进行读写。
java中有syncronized关键字,但是kotlin没有,取而代之使用了@syncronized注解和syncronized()来实现相同效果。
除了支持syncronized这种并发原语,还支持java.util.concurrent.* 下面的并发工具,比如volatile关键字在Kotlin变成了注解:
@Volatile private var running = false
看下syncronized的例子
class Shop {
val goods = hashMapOf<Long,Int>()
init {
goods.put(1,10)
goods.put(2,15)
}
@Synchronized fun buyGoods1(id: Long) {
val stock = goods.getValue(id)
goods.put(id, stock - 1)
}
fun buyGoods2(id: Long) {
synchronized(this) {
val stock = goods.getValue(id)
goods.put(id, stock - 1)
}
}
}
对比Java 可以syncronized的方式进行同步加锁,还可以用Lock的方式来对代码进行加锁。改造上面的方法:
val lock = ReentrantLock()
fun buyGoods3(id: Long) {
lock.lock()
try {
val stock = goods.getValue(id)
goods.put(id, stock - 1)
} catch (ex: Exception) {
println("[Exception] is $ex")
} finally {
lock.unlock()
}
}
这种写法的缺点:
- 若是在同一个类内有多个同步方法,将会竞争同一把锁
- 在加锁之后,编码人员容易忘记解锁操作
- 重复的模板代码
对这个方法进行改进,提高这个方法的抽象程度:
fun <T> withLock(lock: Lock, action: () -> T): T {
lock.lock()
try {
return action()
} finally {
lock.unlock()
}
}
withLock 传入一个lock对象的lambda表达式,我们现在可以不用关心对buyGoods进行加锁,只需要在调用时候传入一个lock对象就可以。
fun <T> withLock(lock: Lock, action: () -> T): T {
lock.lock()
try {
return action()
} finally {
lock.unlock()
}
}
val lock = ReentrantLock()
fun buyGoods(id: Long) {
val stock = goods.getValue(id)
goods.put(id, stock - 1)
}
fun buyGoods4(id: Long) {
withLock(lock, {buyGoods(4)})
}
不同的商家库存之间商品库存并不会发生并发冲突,A商家购买衣服和B商家购买鞋子是可以同时进行的,但是我们现在给商品的buyGoods加锁,不满足这个实际场景。因为同一个时刻只能被一个线程调用,从而导致锁竞争激烈,引发崩溃。
解决这个问题核心:如何对并发时最会发生冲突的部分进行加锁。那么我们对上架buyGoods进行加锁。
class Shop (private var goods: HashMap<Long, Int>) {
private val lock: Lock = ReentrantLock()
fun buyGoods(id: Long) {
lock.withLock {
val stock = goods.getValue(id)
goods.put(id, stock - 1)
}
}
}
class ShopApi {
private val A_goods = hashMapOf<Long,Int>()
private val B_goods = hashMapOf<Long,Int>()
private var shopA: Shop
private var shopB: Shop
init {
A_goods.put(1,10)
A_goods.put(2,15)
B_goods.put(1,20)
B_goods.put(2,10)
shopA = Shop(A_goods)
shopB = Shop(B_goods)
}
fun buyGoods(shopName: String, id: Long) {
when (shopName) {
"A" -> shopA.buyGoods(id)
"B" -> shopB.buyGoods(id)
else -> {}
}
}
}
fun main(args: Array<String>) {
val shopApi = ShopApi()
shopApi.buyGoods("A",1)
shopApi.buyGoods("A",2)
shopApi.buyGoods("B",2)
}
这样需要初始化多个Shop.(或者运行时初始化,不过就需要考虑线程安全问题)。需要成千上万商家,使用when来匹配是一个灾难。这种方式无法支持异步操作。
改善需要下面几点:
- 独立的一个单元,可以有状态,可以处理逻辑(比如Shop类)
- 每个单元有独特的标识,且系统最多只有一个实例
- 每个单元可以顺序处理逻辑,不会有并发问题,方法同步是一种方案,线程安全的消息队列也是一种方案。
- 最好能支持异步操作,处理成功后有返回值。
2. 有状态的并行计算单元Actor
最先应用在Erlang中应用,而且现在Actor模型已经被应用在生产环境中,比如Akka(一个基于Actor模型的并发框架)。很多语言也支持Actor模型,比如Scala,Java,包括Kotlin页内置Actor模型。Actor模型要做的事:
- 另一种思维来解决并发问题,而不是只有
共享内存
这一种方式; - 提高锁抽象的程度,尽量不再业务中出现锁,减少因为使用锁出现的问题,比如死锁;
- 为解决分布式并发问题提供一个更好的思路
举个例子,两个人只知道对方的地址,他们交流,给对方传递消息,不能用网络电话类通信,只通过信件传递消息。很像现实中邮政系统,只需要把信投递到相应的信箱中,不需要知道如何具体处理送达过程,也可能收到回复,相当消息反馈。把这个例子的信件相当与Actor中的消息,Actor与Actor之间只能通过消息通信。

Actor模式不同的人之间信息是不可变的,你不能发出这封信件后又去修改它的内容,同时接收信件的人是从他的信箱里有序的处理消息,Actor内部状态将不再有线程安全问题。
实现一个简化版方案例子,将购物消息发给商家Actor,商家Actor进行减库存操作,并返回一个唯一的订单号,支持查询当前库存。
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.UntypedAbstractActor
import akka.pattern.Patterns
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import java.util.*
class ShopActor(val stocks: HashMap<Long, Int>) : UntypedAbstractActor() {
var orderNumber = 1L
override fun onReceive(message: Any?) {
when (message) {
is Action.Buy -> {
val stock = stocks.getValue(message.id)
if (stock > message.amount) {
stocks.plus(Pair(message.id, stock - message.amount))
sender.tell(orderNumber, self)
orderNumber++
} else {
sender.tell("low stocks", self)
}
}
is Action.GetStock -> {
sender.tell(stocks.get(message.id), self)
}
}
}
}
sealed class Action {
data class BuyOrInit(val id: Long, val userId: Long, val amount: Long, val shopName: String, val stocks: Map<Long, Int>) : Action()
data class Buy(val id: Long, val userId: Long, val amount: Long) : Action()
data class GetStock(val id: Long) : Action()
data class GetStockOrInit(val id: Long, val shopName: String, val stocks: Map<Long, Int>) : Action()
}
class ManageActor : UntypedAbstractActor() {
override fun onReceive(message: Any?) {
when (message) {
is Action.BuyOrInit -> getOrInit(message.shopName,message.stocks).forward(Action.Buy(message.id, message.userId, message.amount), context)
is Action.GetStockOrInit -> getOrInit(message.shopName,message.stocks).forward(Action.GetStock(message.id), context)
}
}
fun getOrInit(shopName: String, stocks: Map<Long, Int>): ActorRef {
return context.findChild("shop-actor-${shopName}").orElseGet { context.actorOf(Props.create(ShopActor::class.java, stocks), "shop-actor-${shopName}") }
}
}
fun main(args: Array<String>) {
val stocksA = hashMapOf(Pair(1L, 10), Pair(2L, 5), Pair(3L, 20))
val stocksB = hashMapOf(Pair(1L, 15), Pair(2L, 8), Pair(3L, 30))
val actorSystem = ActorSystem.apply("shop-system") //
val manageActor = actorSystem.actorOf(Props.create(ManageActor::class.java), "manage-actor")
val timeout = Timeout(Duration.create(3, "seconds"))
val resA = Patterns.ask(manageActor, Action.GetStockOrInit(1L, "A", stocksA), timeout)
val stock = Await.result(resA, timeout.duration())
println("the stock is ${stock}")
val resB = Patterns.ask(manageActor, Action.BuyOrInit(2L, 1L, 1,"B", stocksB), timeout)
val orderNumber = Await.result(resB, timeout.duration())
println("the orderNumber is ${orderNumber}")
}
shopActor 内部有两个状态,stocks 和 orderNumber 分别代表库存和订单号.
定义sealed class 表示用户请求行为,ShopActor内部有onReceive方法,根据用户不同请求做不同处理。
ManagerActor负责管理和初始化ShopActor。
下面是这个例子的理解图例
