解析Golang sync.Cond 条件变量源码
2023-02-22 本文已影响0人
robertzhai
使用场景
一个goroutine G1拿到锁,操作,其它多个goroutine等待
G1 完事,需要通知其它多个goroutine 可以竞争获取锁,获取到锁后继续操作
类似java里面的 wait notify notifyAll
源码位置
src/sync/cond.go
image.png
Wait 等待
Signal 唤醒一个等待的goroutine
Broadcast 唤醒所有等待的goroutine
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
// Wait atomically unlocks c.L and suspends execution
//Wait 先解锁,然后挂起
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//Wait 被唤醒后,c.L 没有加锁,唤醒后才加锁返回;Wait 返回后调用者不能保证条件是真,调用方应该在一个loop 里调用Wait
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
// c.Wait() // loop 里面不断检查条件是否满足
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
// Signal wakes one goroutine waiting on c, if there is any.
// 唤醒一个goroutine
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// Broadcast wakes all goroutines waiting on c.
//唤醒所有goroutine
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
wait使用
c.L.Lock()
for !done {
c.Wait()
}
log.Println(name, "starts reading")
c.L.Unlock()
image.png
1写3读
package main
import (
"log"
"sync"
"time"
)
var done = false
func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
c.Wait()
}
log.Println(name, "starts reading")
c.L.Unlock()
}
func write(name string, c *sync.Cond) {
log.Println(name, "starts writing")
time.Sleep(time.Second)
c.L.Lock()
done = true
c.L.Unlock()
log.Println(name, "wakes all")
c.Broadcast()
}
func main() {
cond := sync.NewCond(&sync.Mutex{})
go read("reader1", cond)
go read("reader2", cond)
go read("reader3", cond)
write("writer", cond)
time.Sleep(time.Second * 3)
}
output
writer starts writing
writer wakes all
reader3 starts reading
reader2 starts reading
reader1 starts reading
一个生产者写消息到队列,2个消防者消费队列数据
package main
import (
"fmt"
"sync"
"time"
)
var cond = sync.NewCond(&sync.Mutex{})
var queue []int
func producer() {
i := 0
for {
cond.L.Lock()
queue = append(queue, i)
i++
cond.L.Unlock()
cond.Signal()
time.Sleep(1 * time.Second)
}
}
func consumer(consumerName string) {
for {
cond.L.Lock()
for len(queue) == 0 {
cond.Wait()
}
fmt.Println(consumerName, queue[0])
queue = queue[1:]
cond.L.Unlock()
}
}
func main() {
// 开启一个 producer
go producer()
// 开启两个 consumer
go consumer("consumer-1")
go consumer("consumer-2")
for {
time.Sleep(1 * time.Minute)
}
}
output
consumer-2 0
consumer-2 1
consumer-1 2
consumer-2 3
consumer-1 4