解析Golang sync.Cond 条件变量源码

2023-02-22  本文已影响0人  robertzhai

使用场景

一个goroutine G1拿到锁,操作,其它多个goroutine等待
G1 完事,需要通知其它多个goroutine 可以竞争获取锁,获取到锁后继续操作
类似java里面的 wait notify notifyAll

源码位置

src/sync/cond.go


image.png

Wait 等待
Signal 唤醒一个等待的goroutine
Broadcast 唤醒所有等待的goroutine


// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
    noCopy noCopy

    // L is held while observing or changing the condition
    L Locker

    notify  notifyList
    checker copyChecker
}

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}

// Wait atomically unlocks c.L and suspends execution
//Wait 先解锁,然后挂起
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//Wait 被唤醒后,c.L 没有加锁,唤醒后才加锁返回;Wait 返回后调用者不能保证条件是真,调用方应该在一个loop 里调用Wait
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait() // loop 里面不断检查条件是否满足
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

// Signal wakes one goroutine waiting on c, if there is any.
// 唤醒一个goroutine
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

// Broadcast wakes all goroutines waiting on c.
//唤醒所有goroutine
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

wait使用

        c.L.Lock()
    for !done {
        c.Wait()
    }
    log.Println(name, "starts reading")
    c.L.Unlock()
image.png

1写3读

package main

import (
    "log"
    "sync"
    "time"
)

var done = false

func read(name string, c *sync.Cond) {
    c.L.Lock()
    for !done {
        c.Wait()
    }
    log.Println(name, "starts reading")
    c.L.Unlock()
}

func write(name string, c *sync.Cond) {
    log.Println(name, "starts writing")
    time.Sleep(time.Second)
    c.L.Lock()
    done = true
    c.L.Unlock()
    log.Println(name, "wakes all")
    c.Broadcast()
}

func main() {
    cond := sync.NewCond(&sync.Mutex{})

    go read("reader1", cond)
    go read("reader2", cond)
    go read("reader3", cond)
    write("writer", cond)

    time.Sleep(time.Second * 3)
}

output

writer starts writing
writer wakes all
reader3 starts reading
reader2 starts reading
reader1 starts reading

一个生产者写消息到队列,2个消防者消费队列数据

package main

import (
    "fmt"
    "sync"
    "time"
)

var cond = sync.NewCond(&sync.Mutex{})

var queue []int

func producer() {
    i := 0
    for {
        cond.L.Lock()
        queue = append(queue, i)
        i++
        cond.L.Unlock()

        cond.Signal()
        time.Sleep(1 * time.Second)
    }
}

func consumer(consumerName string) {
    for {
        cond.L.Lock()
        for len(queue) == 0 {
            cond.Wait()
        }

        fmt.Println(consumerName, queue[0])
        queue = queue[1:]
        cond.L.Unlock()
    }
}

func main() {
    // 开启一个 producer
    go producer()

    // 开启两个 consumer
    go consumer("consumer-1")
    go consumer("consumer-2")

    for {
        time.Sleep(1 * time.Minute)
    }
}

output

consumer-2 0
consumer-2 1
consumer-1 2
consumer-2 3
consumer-1 4

ref

上一篇下一篇

猜你喜欢

热点阅读