golang知识集

从“CPU 烧开水“到优雅暂停:Go 里 sync.Cond 的

2025-11-13  本文已影响0人  Mgx_无心

目录

引言

你有没有试过让程序"暂停一下"?不是 time.Sleep(1000) 那种傻等,而是真正优雅地挂起,等我喊你再干活?

如果你曾经用 for { if paused { continue } } 把 CPU 烧到冒烟……别担心,你不是一个人。

今天,我们就用一个"发短信"的小例子,带你从"烧开水"走向"禅意暂停",彻底搞懂 Go 里的 sync.Cond

场景:短信发送系统

想象你有一个短信平台,能同时开多个 worker 发短信。但老板突然说:"先暂停!等我喝完这杯咖啡再发!"——你得让所有 worker 立刻暂停,等老板说"继续",再接着干活。

听起来简单?我们来看看三种实现方式,从"灾难"到"优雅"的进化之路。

方案1:忙轮询 — CPU 表示"我快烧开了!"

// 方案1:忙轮询(不推荐)
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

type SMSManager1 struct {
    paused int32 // 0 = running, 1 = paused
    tasks  chan string
    wg     sync.WaitGroup
    closed bool
    mu     sync.Mutex
}

func NewSMSManager1() *SMSManager1 {
    return &SMSManager1{
        tasks: make(chan string, 100),
    }
}

func (sm *SMSManager1) SetSpeed(speed int) {
    if speed == 0 {
        atomic.StoreInt32(&sm.paused, 1)
    } else {
        atomic.StoreInt32(&sm.paused, 0)
    }
}

func (sm *SMSManager1) worker(id int) {
    defer sm.wg.Done()
    for {
        sm.mu.Lock()
        if sm.closed {
            sm.mu.Unlock()
            return
        }
        sm.mu.Unlock()

        if atomic.LoadInt32(&sm.paused) == 1 {
            // 忙轮询!CPU 飙升
            continue
        }

        select {
        case task, ok := <-sm.tasks:
            if !ok {
                return
            }
            fmt.Printf("Worker %d: sending %s\n", id, task)
            time.Sleep(50 * time.Millisecond) // 模拟发送
        default:
            time.Sleep(1 * time.Millisecond) // 减轻一点,但仍是轮询
        }
    }
}

func (sm *SMSManager1) StartWorkers(n int) {
    sm.wg.Add(n)
    for i := 0; i < n; i++ {
        go sm.worker(i)
    }
    go sm.producer()
}

func (sm *SMSManager1) producer() {
    taskID := 0
    for {
        sm.mu.Lock()
        if sm.closed {
            sm.mu.Unlock()
            close(sm.tasks)
            return
        }
        paused := atomic.LoadInt32(&sm.paused) == 1
        sm.mu.Unlock()

        if !paused {
            taskID++
            select {
            case sm.tasks <- fmt.Sprintf("Task-%d", taskID):
            default:
                // 丢弃或阻塞,这里丢弃
            }
            time.Sleep(200 * time.Millisecond)
        } else {
            time.Sleep(10 * time.Millisecond)
        }
    }
}

func (sm *SMSManager1) Stop() {
    sm.mu.Lock()
    sm.closed = true
    sm.mu.Unlock()
    close(sm.tasks)
    sm.wg.Wait()
}

func main() {
    fmt.Println("=== 方案1:忙轮询(不推荐)===")
    sm := NewSMSManager1()
    sm.SetSpeed(1)
    sm.StartWorkers(3)

    time.Sleep(2 * time.Second)
    fmt.Println(">>> Pause (speed=0)")
    sm.SetSpeed(0)

    time.Sleep(3 * time.Second)
    fmt.Println(">>> Resume (speed=1)")
    sm.SetSpeed(1)

    time.Sleep(2 * time.Second)
    fmt.Println(">>> Stopping")
    sm.Stop()
}

这就是传说中的 "忙轮询"(Busy Waiting)。

worker 一旦发现暂停,就疯狂 continue,CPU 核心瞬间飙到 100%。你的笔记本风扇开始怒吼,隔壁同事以为你在挖矿。

问题所在

方案2:Sleep 轮询 — "我眯一会儿,你叫我"

// 方案2:sleep 轮询(推荐简单场景)
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

type SMSManager2 struct {
    paused int32 // 0 = running, 1 = paused
    tasks  chan string
    wg     sync.WaitGroup
    closed bool
    mu     sync.Mutex
}

func NewSMSManager3() *SMSManager2 {
    return &SMSManager2{
        tasks: make(chan string, 100),
    }
}

func (sm *SMSManager2) SetSpeed(speed int) {
    if speed == 0 {
        atomic.StoreInt32(&sm.paused, 1)
    } else {
        atomic.StoreInt32(&sm.paused, 0)
    }
}

func (sm *SMSManager2) worker(id int) {
    defer sm.wg.Done()
    for {
        // 检查关闭
        sm.mu.Lock()
        if sm.closed {
            sm.mu.Unlock()
            return
        }
        sm.mu.Unlock()

        // 检查暂停
        if atomic.LoadInt32(&sm.paused) == 1 {
            time.Sleep(50 * time.Millisecond) // 低频轮询
            continue
        }

        // 尝试读取任务,带超时
        select {
        case task, ok := <-sm.tasks:
            if !ok {
                return
            }
            fmt.Printf("Worker %d: sending %s\n", id, task)
            time.Sleep(50 * time.Millisecond)
        case <-time.After(100 * time.Millisecond):
            // 超时后重新检查 paused
            continue
        }
    }
}

func (sm *SMSManager2) StartWorkers(n int) {
    sm.wg.Add(n)
    for i := 0; i < n; i++ {
        go sm.worker(i)
    }
    go sm.producer()
}

func (sm *SMSManager2) producer() {
    taskID := 0
    for {
        sm.mu.Lock()
        if sm.closed {
            sm.mu.Unlock()
            close(sm.tasks)
            return
        }
        paused := atomic.LoadInt32(&sm.paused) == 1
        sm.mu.Unlock()

        if paused {
            time.Sleep(50 * time.Millisecond)
            continue
        }

        taskID++
        sm.tasks <- fmt.Sprintf("Task-%d", taskID)
        time.Sleep(200 * time.Millisecond)
    }
}

func (sm *SMSManager2) Stop() {
    sm.mu.Lock()
    sm.closed = true
    sm.mu.Unlock()
    close(sm.tasks)
    sm.wg.Wait()
}

func main() {
    fmt.Println("=== 方案2:sleep 轮询(推荐简单场景)===")
    sm := NewSMSManager3()
    sm.SetSpeed(1)
    sm.StartWorkers(3)

    time.Sleep(2 * time.Second)
    fmt.Println(">>> Pause (speed=0)")
    sm.SetSpeed(0)

    time.Sleep(3 * time.Second)
    fmt.Println(">>> Resume (speed=1)")
    sm.SetSpeed(1)

    time.Sleep(2 * time.Second)
    fmt.Println(">>> Stopping")
    sm.Stop()
}

好一点了!至少不烧 CPU 了。但问题来了:50ms 是拍脑袋定的。

适用场景

简单脚本、临时 demo 可以凑合用,但不是真正的"即时响应"。

方案3:sync.Cond — "你喊我,我才醒"

终于,主角登场:sync.Cond!以下是完整的实现代码:

// 方案3:sync.Cond(推荐高并发场景)
package main

import (
    "fmt"
    "sync"
    "time"
)

type SMSManager struct {
    mu     sync.Mutex
    cond   *sync.Cond
    speed  int          // 控制速度:0 = 暂停,>0 = 运行
    tasks  chan string  // 任务 channel
    wg     sync.WaitGroup
    closed bool
}

func NewSMSManager() *SMSManager {
    sm := &SMSManager{
        tasks: make(chan string, 100),
    }
    sm.cond = sync.NewCond(&sm.mu) // 关联互斥锁
    return sm
}

// 设置速度(线程安全)
func (sm *SMSManager) SetSpeed(speed int) {
    sm.mu.Lock()
    wasPaused := (sm.speed == 0)
    sm.speed = speed
    needWake := (speed > 0 && wasPaused) // 从暂停变为运行
    sm.mu.Unlock()

    if needWake {
        sm.cond.Broadcast() // 唤醒所有等待的 goroutine
    }
}

// 启动工作池
func (sm *SMSManager) StartWorkers(workerCount int) {
    sm.wg.Add(workerCount)
    for i := 0; i < workerCount; i++ {
        go sm.worker(i)
    }

    // 启动生产者(模拟)
    go sm.producer()
}

// 工作者:从 channel 读任务,但受 speed 控制
func (sm *SMSManager) worker(id int) {
    defer sm.wg.Done()

    for {
        // 1. 先检查是否要暂停
        sm.mu.Lock()
        for sm.speed == 0 && !sm.closed {
            sm.cond.Wait() // 挂起,直到被 Broadcast 唤醒
        }
        if sm.closed {
            sm.mu.Unlock()
            return
        }
        sm.mu.Unlock()

        // 2. 尝试读取任务(带超时防永久阻塞)
        select {
        case task, ok := <-sm.tasks:
            if !ok {
                return
            }
            fmt.Printf("Worker %d sending SMS: %s\n", id, task)
            time.Sleep(100 * time.Millisecond) // 模拟发送耗时

        case <-time.After(1 * time.Second):
            // 防止在 tasks 阻塞时无法响应 speed=0
        }
    }
}

// 模拟生产者
func (sm *SMSManager) producer() {
    taskID := 0
    for {
        sm.mu.Lock()
        if sm.closed {
            sm.mu.Unlock()
            close(sm.tasks)
            return
        }
        // 如果暂停,生产者也应暂停
        for sm.speed == 0 {
            sm.cond.Wait()
        }
        sm.mu.Unlock()

        taskID++
        select {
        case sm.tasks <- fmt.Sprintf("Task-%d", taskID):
        case <-time.After(5 * time.Second):
            // 超时退出(仅 demo)
            return
        }
        time.Sleep(time.Duration(200/sm.speed) * time.Millisecond) // 简单限速
    }
}

// 停止整个系统
func (sm *SMSManager) Stop() {
    sm.mu.Lock()
    sm.closed = true
    sm.mu.Unlock()
    sm.cond.Broadcast() // 唤醒所有等待者,让它们退出
    sm.wg.Wait()
}

// ===== 使用示例 =====
func main() {
    sm := NewSMSManager()
    sm.SetSpeed(5) // 初始速度 >0,开始工作
    sm.StartWorkers(3)

    time.Sleep(3 * time.Second)
    fmt.Println(">>> Pausing (speed=0)")
    sm.SetSpeed(0)

    time.Sleep(5 * time.Second)
    fmt.Println(">>> Resuming (speed=10)")
    sm.SetSpeed(10)

    time.Sleep(3 * time.Second)
    fmt.Println(">>> Stopping")
    sm.Stop()
}

当 worker 发现 speed == 0(暂停),它会:

而老板(主线程)只需:

sm.SetSpeed(10) // 内部调用 sm.cond.Broadcast()

所有暂停的 worker 瞬间醒来,继续干活!零延迟,零浪费,优雅得像瑜伽大师。

sync.Cond 的核心概念

sync.Cond 是 Go 提供的条件变量(Condition Variable),用于 "等待某个条件成立" 的场景。

核心三要素

  1. 一把锁(通常是 sync.Mutex 或 sync.RWMutex)
    → cond 必须和这把锁绑定

  2. Wait() 方法
    → 释放锁 + 挂起 goroutine,直到被唤醒

  3. Signal() / Broadcast() 方法

    • Signal():唤醒一个等待的 goroutine
    • Broadcast():唤醒所有等待的 goroutine(我们用这个)

使用模板

// 初始化
mu := &sync.Mutex{}
cond := sync.NewCond(mu)

// 等待方
mu.Lock()
for !condition { // 必须用 for,防止"虚假唤醒"
    cond.Wait()
}
// 条件满足,干活
mu.Unlock()

// 通知方
mu.Lock()
condition = true
cond.Broadcast() // 或 Signal()
mu.Unlock()

重要提醒

方案对比分析

方案 CPU 消耗 响应速度 代码复杂度 适用场景
忙轮询 🔥 极高 ❌ 别用
Sleep 轮询 🟢 低 慢(有延迟) ✅ 简单场景
sync.Cond 🟢 几乎为零 ⚡ 即时 ✅ 高并发、需精确控制

在我们的短信系统中:

总结:何时该用 sync.Cond?

当你遇到以下场景,sync.Cond 就是你的救星:

  1. 需要 "暂停/恢复" 控制(如流控、调试、维护模式)
  2. 多个 goroutine 等待同一条件成立
  3. 不想用 channel(比如条件不是"有数据",而是"状态改变")
  4. 拒绝轮询,追求 零 CPU 浪费 + 即时响应

记住

sync.Cond 不是万能的,但在"状态等待"场景下,它比 channel 更直接,比轮询更优雅。

最后:

优雅的程序,从学会"等待"开始。
别再让 goroutine 在梦里狂奔了,给它一个 sync.Cond,让它安心睡觉,等你一声令下,再奋起直追 💪

往期部分文章列表

上一篇 下一篇

猜你喜欢

热点阅读