移动 前端 Python Android JavaKotlin专题禅与计算机程序设计艺术

Kotlin(十九)协程初探究<2>

2021-03-03  本文已影响0人  zcwfeng

共享资源的控制

共享资源可以是共享变量,也可是数据库的共享数据,如何保证共享资源的正确性至关重要。

1. 锁模式

类似前面的例子一个商品库存在一个抢购商品,活动中高并发可能出现超卖的情况,所以要对商品库商品共享资源进行加锁,保证同一个时刻只有一个线程对其进行读写。
java中有syncronized关键字,但是kotlin没有,取而代之使用了@syncronized注解和syncronized()来实现相同效果。

除了支持syncronized这种并发原语,还支持java.util.concurrent.* 下面的并发工具,比如volatile关键字在Kotlin变成了注解:

@Volatile private var running = false

看下syncronized的例子

class Shop {
    val goods = hashMapOf<Long,Int>()

    init {
        goods.put(1,10)
        goods.put(2,15)
    }

    @Synchronized fun buyGoods1(id: Long) {
        val stock = goods.getValue(id)
        goods.put(id, stock - 1)
    }

    fun buyGoods2(id: Long) {
        synchronized(this) {
            val stock = goods.getValue(id)
            goods.put(id, stock - 1)
        }
    }
}

对比Java 可以syncronized的方式进行同步加锁,还可以用Lock的方式来对代码进行加锁。改造上面的方法:

    val lock = ReentrantLock()

    fun buyGoods3(id: Long) {
        lock.lock()
        try {
            val stock = goods.getValue(id)
            goods.put(id, stock - 1)
        } catch (ex: Exception) {
            println("[Exception] is $ex")
        } finally {
            lock.unlock()
        }
    }

这种写法的缺点:

对这个方法进行改进,提高这个方法的抽象程度:

fun <T> withLock(lock: Lock, action: () -> T): T {
    lock.lock()
    try {
        return action()
    } finally {
        lock.unlock()
    }
}

withLock 传入一个lock对象的lambda表达式,我们现在可以不用关心对buyGoods进行加锁,只需要在调用时候传入一个lock对象就可以。

fun <T> withLock(lock: Lock, action: () -> T): T {
    lock.lock()
    try {
        return action()
    } finally {
        lock.unlock()
    }
}

val lock = ReentrantLock()

fun buyGoods(id: Long) {
        val stock = goods.getValue(id)
        goods.put(id, stock - 1)
}

fun buyGoods4(id: Long) {
        withLock(lock, {buyGoods(4)})
}

不同的商家库存之间商品库存并不会发生并发冲突,A商家购买衣服和B商家购买鞋子是可以同时进行的,但是我们现在给商品的buyGoods加锁,不满足这个实际场景。因为同一个时刻只能被一个线程调用,从而导致锁竞争激烈,引发崩溃。

解决这个问题核心:如何对并发时最会发生冲突的部分进行加锁。那么我们对上架buyGoods进行加锁。

class Shop (private var goods: HashMap<Long, Int>) {
    private val lock: Lock = ReentrantLock()

    fun buyGoods(id: Long) {
        lock.withLock {
            val stock = goods.getValue(id)
            goods.put(id, stock - 1)
        }
    }
}

class ShopApi {
    private val A_goods = hashMapOf<Long,Int>()
    private val B_goods = hashMapOf<Long,Int>()

    private var shopA: Shop
    private var shopB: Shop

    init {
        A_goods.put(1,10)
        A_goods.put(2,15)
        B_goods.put(1,20)
        B_goods.put(2,10)
        shopA = Shop(A_goods)
        shopB = Shop(B_goods)
    }

    fun buyGoods(shopName: String, id: Long) {
        when (shopName) {
            "A" -> shopA.buyGoods(id)
            "B" -> shopB.buyGoods(id)
            else -> {}
        }
    }
}

fun main(args: Array<String>) {
    val shopApi = ShopApi()
    shopApi.buyGoods("A",1)
    shopApi.buyGoods("A",2)
    shopApi.buyGoods("B",2)
}

这样需要初始化多个Shop.(或者运行时初始化,不过就需要考虑线程安全问题)。需要成千上万商家,使用when来匹配是一个灾难。这种方式无法支持异步操作。

改善需要下面几点:

2. 有状态的并行计算单元Actor

最先应用在Erlang中应用,而且现在Actor模型已经被应用在生产环境中,比如Akka(一个基于Actor模型的并发框架)。很多语言也支持Actor模型,比如Scala,Java,包括Kotlin页内置Actor模型。Actor模型要做的事:

举个例子,两个人只知道对方的地址,他们交流,给对方传递消息,不能用网络电话类通信,只通过信件传递消息。很像现实中邮政系统,只需要把信投递到相应的信箱中,不需要知道如何具体处理送达过程,也可能收到回复,相当消息反馈。把这个例子的信件相当与Actor中的消息,Actor与Actor之间只能通过消息通信。

截屏2021-03-01 11.53.16.png

Actor模式不同的人之间信息是不可变的,你不能发出这封信件后又去修改它的内容,同时接收信件的人是从他的信箱里有序的处理消息,Actor内部状态将不再有线程安全问题。

实现一个简化版方案例子,将购物消息发给商家Actor,商家Actor进行减库存操作,并返回一个唯一的订单号,支持查询当前库存。

import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.UntypedAbstractActor
import akka.pattern.Patterns
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import java.util.*

class ShopActor(val stocks: HashMap<Long, Int>) : UntypedAbstractActor() {
    var orderNumber = 1L
    override fun onReceive(message: Any?) {
        when (message) {
            is Action.Buy -> {
                val stock = stocks.getValue(message.id)
                if (stock > message.amount) {
                    stocks.plus(Pair(message.id, stock - message.amount))
                    sender.tell(orderNumber, self)
                    orderNumber++
                } else {
                    sender.tell("low stocks", self)
                }
            }
            is Action.GetStock -> {
                sender.tell(stocks.get(message.id), self)
            }
        }
    }
}

sealed class Action {
    data class BuyOrInit(val id: Long, val userId: Long, val amount: Long, val shopName: String, val stocks: Map<Long, Int>) : Action()
    data class Buy(val id: Long, val userId: Long, val amount: Long) : Action()
    data class GetStock(val id: Long) : Action()
    data class GetStockOrInit(val id: Long, val shopName: String, val stocks: Map<Long, Int>) : Action()
}

class ManageActor : UntypedAbstractActor() {
    override fun onReceive(message: Any?) {
        when (message) {
            is Action.BuyOrInit -> getOrInit(message.shopName,message.stocks).forward(Action.Buy(message.id, message.userId, message.amount), context)
            is Action.GetStockOrInit -> getOrInit(message.shopName,message.stocks).forward(Action.GetStock(message.id), context)
        }
    }

    fun getOrInit(shopName: String, stocks: Map<Long, Int>): ActorRef {
        return context.findChild("shop-actor-${shopName}").orElseGet { context.actorOf(Props.create(ShopActor::class.java, stocks), "shop-actor-${shopName}") }
    }

}

fun main(args: Array<String>) {
    val stocksA = hashMapOf(Pair(1L, 10), Pair(2L, 5), Pair(3L, 20))
    val stocksB = hashMapOf(Pair(1L, 15), Pair(2L, 8), Pair(3L, 30))
    val actorSystem = ActorSystem.apply("shop-system") //
    val manageActor = actorSystem.actorOf(Props.create(ManageActor::class.java), "manage-actor")
    val timeout = Timeout(Duration.create(3, "seconds"))

    val resA = Patterns.ask(manageActor, Action.GetStockOrInit(1L, "A", stocksA), timeout)
    val stock = Await.result(resA, timeout.duration())
    println("the stock is ${stock}")

    val resB = Patterns.ask(manageActor, Action.BuyOrInit(2L, 1L, 1,"B", stocksB), timeout)
    val orderNumber = Await.result(resB, timeout.duration())
    println("the orderNumber is ${orderNumber}")


}

shopActor 内部有两个状态,stocks 和 orderNumber 分别代表库存和订单号.
定义sealed class 表示用户请求行为,ShopActor内部有onReceive方法,根据用户不同请求做不同处理。
ManagerActor负责管理和初始化ShopActor。

相关CQRS架构和一些理解参考

下面是这个例子的理解图例

截屏2021-03-01 12.45.54.png
上一篇 下一篇

猜你喜欢

热点阅读