Kotlin之协程(四)——Channel

2023-07-21  本文已影响0人  Deck方

Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信

简单构建一个生产者消费者例子

fun testChannel() = runBlocking<Unit> {
        val channel = Channel<Int>()

        val producer = GlobalScope.launch {
            var i = 0
            while (i <= 3) {
                delay(1000)
                println("send $i")
                channel.send(i++)
            }
        }
        val consumer = GlobalScope.launch {
            while (true) {
                val j = channel.receive()
                println("receiver+$j")
                if (j > 3) {
                    break
                }

            }
        }

        joinAll(producer,consumer)
    }

一、Channel的容量

Channel实际上是一个队列,队列一定存在缓冲区,那么一旦这个缓冲区满了,并且一直没有人调用receive并取走函数,send就需要挂起。故意让接收端的节奏方慢,发现send总是会挂起,直到receive之后才会继续往下执行,默认容量是0。

  //Channel的迭代
  fun testIterator() = runBlocking<Unit> {
        val channel = Channel<Int>(Channel.Factory.UNLIMITED)
        val producer = GlobalScope.launch {
            var i = 0
            while (i <= 5) {
                println("send $i")
                channel.send(i++)
            }
        }
        val consumer = GlobalScope.launch {
           val iterator =  channel.iterator()
            while (iterator.hasNext()) {
                delay(1000)
                val element = iterator.next()
                println("receiver $element")
                if (element > 5) {
                    return@launch
                }
            }
        }
        joinAll(producer,consumer)
    }

二、producer与actor,便捷的构建生产和消费

fun testProduce() = runBlocking<Unit> {
        val receiverChannel: ReceiveChannel<Int> = GlobalScope.produce<Int> {
            repeat(5) {
                send(it)
            }
        }
        val consumer = GlobalScope.launch {
            for (i in receiverChannel) {
                println("$i")
            }
        }
        consumer.join()
    }
fun testActor() = runBlocking<Unit> {
        val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
            while (true){
                val result=   receive()
                println(result)
            }
        }
      val producer =   GlobalScope.launch {
            for (i in 1..3) {
              sendChannel.send(i)
            }
        }
        producer.join()
    }

三、Channel的关闭

fun testChannelClose() = runBlocking<Unit> {
        val channel = Channel<Int>(3)
        val producer = GlobalScope.launch {
            List(3) {
                channel.send(it)
                println("send $it")
            }
            channel.close()
            println("send ${channel.isClosedForSend}--${channel.isClosedForReceive}")
        }
        val consumer = GlobalScope.launch {
            for (element in channel){
                println("receive $element")
                delay(100)
            }
            println("send ${channel.isClosedForSend}--${channel.isClosedForReceive}")
        }
        joinAll(producer,consumer)
    }
//print result
//send 0
//send 1
//send 2
//receive 0
//send true--false
//receive 1
//receive 2
//send true--true

四、BoradcastCahnnel

发送端和接收端存在一对多的情况,广播可以实现多个接收端且不互斥的行为。

fun testBroadcastChannel() = runBlocking<Unit> {
        val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
        val producer = GlobalScope.launch {
            List(3) {
                broadcastChannel.send(it)
                println("send $it")
                delay(100)
            }
        }
        List(3) {index->
            GlobalScope.launch {
                val receiveChannel = broadcastChannel.openSubscription()
                for (element in receiveChannel) {
                    println("#$index received $element")
                    delay(100)
                }
            }
        }.joinAll()
    }
//print result
//send 0
//send 1
//#0 received 1
//#2 received 1
//#1 received 1
//send 2
//#1 received 2
//#2 received 2
//#0 received 2

五、channel的多路复用

//test for onWait
 fun testSelectOnWait() = runBlocking<Unit> {
        val localUser = async {
            delay(100)
            User("Bob", 25)
        }
        val remoteUser = async {
            delay(200)
            User("Jack", 28)
        }
        GlobalScope.launch {
          val responseUser =   select<Response<User>> {
                localUser.onAwait { Response<User>(it, true) }
                remoteUser.onAwait { Response<User>(it, false) }
            }
            responseUser.value?.let {
                println("response $it")
            }
        }.join()
    }
//print result
//response User(name=Bob, age=25)
//test onReceive
fun testSelectChannel() = runBlocking<Unit> {
        val channels = listOf<Channel<Int>>(Channel<Int> {}, Channel<Int> { })
        GlobalScope.launch {
            delay(50)
            channels[0].send(1)
        }
        GlobalScope.launch {
            delay(100)
            channels[1].send(2)
        }
        val result = select<Int?> {
            channels.forEach { channel ->
                channel.onReceive {
                    it

                }
            }
        }
        println(result)
        delay(1000)
    }
//print result
// 1

六、SelectClause

七、Flow的多路复用

八、协程的并发安全

fun testUnSafe() = runBlocking<Unit> {
        var count = 0
        List(1000) {
            GlobalScope.launch {
                count++
            }
        }.joinAll()
        println(count)
    }
//print result
//963
//正常结果应该是1000,但是因为没有实现并发安全,造成了结果的错误

协程框架提供了一些并发安全的工具:

fun testSafeMutex() = runBlocking<Unit> {
        var count = 0
        val mutex = Mutex()
        List(1000) {
            GlobalScope.launch {
               mutex.withLock {
                   count++
               }
            }
        }.joinAll()
        println(count)
    }


 //semaphore
fun testSafeSeMaphore() = runBlocking<Unit> {
        var count = 0
        val semaphore = Semaphore(1)
        List(1000) {
            GlobalScope.launch {
                semaphore.withPermit {
                   count++
               }
            }
        }.joinAll()
        println(count)
    }

上一篇 下一篇

猜你喜欢

热点阅读