Go并发编程
Go天生为并发而生,使用go 关键字快捷启动线程(goroutine)轻量级线程也叫协程,一个goroutine大概占2KB内存。
简单例子:
func run(msg string){
for i:=0; i < 5; i++ {
fmt.Println(msg,i)
}
}
func main() {
run("Direct Run:")
go run("Thread Run:") //go 关键字,启动goroutine
run("After Run:")
time.Sleep(30 *time.Second)
fmt.Println("done")
}
MPG 模型
MPG是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。
MPG.png
- M代表一个操作系统内核线程,也可以称为一个工作线程,默认最大10000
- G协程(goroutine),最终是要放到M上执行的
- P代表着处理器(processor)维护一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界等等),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了,会从从全局队列拿一些到自己队列上,或者从其它P队列偷一些到自己队列上,或者。P的数量通过runtime.GOMAXPROCS()设置,默认值是cpu核数,go 1.5之前默认值是1,P的队列的G数量不超过256个。
执行步骤
- go 关键字创建协程,优先加入某个P的本地队列,满了加入全局队列
- P 需要持有或者绑定一个M,而M会启动一个线程,不断从P的本地队列获取G执行
- 当P的本地队列执行完成以后,会从全局队列取G,如果全局队列没有G,再从其它P队列窃取G到本地队列
- 重复执行以上步骤,直至所有G执行完毕
调度策略
复用线程:避免频繁的创建、销毁线程,而是对线程的复用
- Work-Stealing (任务窃取)
当M无可运行的G时,尝试从全局队列获取,没有从网络轮询器获取,再没有从其它处理器的本地队列窃取 - Hand Off 机制,当M因为G进行系统调用阻塞时,M会与P解绑,同时带走G,把P转移给其它空闲的线程执行
Go语言相比起其他语言的优势在于goroutine则是由Go运行时(runtime)调度器自己调度的,而其它语言是OS调度。特点是goroutine的调度是在用户态下完成的,不涉及内核态与用户态之间的频繁切换,调度成本比调度OS线程低很多, 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上(调度m个goroutine到n个OS线程),再加上本身goroutine的超轻量(占内存2KB),有效保证了Go的并发性能。
并发模型
Go 支持两种并发模型:CSP模型和共享内存模型
- CSP模型:communicating sequential processes 通信顺序进程
核心概率:奉行通过通道来共享内存,而不是共享内存来通信 - 共享内存模型:通过共享内存来+锁机制实现
通道Chan
通道(channel)是用来传递数据的一种特殊类型,常用于两个协程通讯和同步运行(自带锁机制)。通道就像一个传送带或者队列,遵循先入先出,保证收发数据的顺序,声明的时候要指定数据类型。
通道可以分为无缓冲通道和缓冲通道
通道根据读写分为:"chan" 可读可写, "chan<-" 仅可写, "<-chan" 仅可读
简单来说,通道是一个线程安全共享池
无缓冲通道
采用 make(chan 元素类型) 创建无缓冲通道,使用无缓冲通道进行通信将导致发送和接收的goroutine同步化,所以也叫同步通道
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // 把 sum 发送到通道 c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int) //int 类型 通道
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x := <-c
y := <-c // 从通道 c 中接收
fmt.Println(x, y, x+y)
}
无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。
缓冲通道
采用make(chan 元素类型, 缓冲大小) 创建缓冲通道
利用缓冲通道实现生产者消费者例子
type apple struct{
name string
}
func produce(bufChan chan apple){
index := 0
for {
time.Sleep(100*time.Millisecond)
index ++
bufChan <- apple{"apple" + strconv.Itoa(index)}
fmt.Println("produce apple", index)
}
}
func consume(bufchan chan apple){
for {
time.Sleep(120*time.Millisecond)
result := <- bufchan
fmt.Println("consume ", result)
}
}
func main() {
bufList := make(chan apple, 10) //容量为10,cap()
go produce(bufList)
go consume(bufList)
select {
}
}
1.缓冲通道读写不需要在两个不同协程里面
2.可以只有写入操作;但是不能只有读操作,运行时会报deadlock错误
3.当通道满了以后,写入会阻塞,当通道为空,读取堵塞
4.当通道不使用时,要记得关闭通道
关闭通道
func main() {
ch1 := make(chan int, 5)
ch2 := make(chan int, 5)
go func() {
for i := 0; i < 20; i++ {
ch1 <- i
}
close(ch1) //使用完关闭通道,通知读取协程
}()
go func() {
for {
if result,ok :=<-ch1; ok{
ch2 <- result* result
}else{ //ch1通道关闭,取完值 ok=false
break
}
}
close(ch2)
}()
for result := range ch2{ //ch2 通道关闭,取完值range循环自动退出
fmt.Println(result)
}
}
1.对一个关闭的通道进行接收会一直获取值直到通道为空
2.对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值
3.对一个关闭的通道再发送值就会导致panic
4.关闭一个已经关闭的通道会导致panic
判断通道关闭,通常采用for range 通道的方式
select
select 是go的一个控制结构,类似switch语句,只是每个case都必须是一个通讯操作(chan读写)
超时控制例子
func main() {
chan1 := make(chan int)
quit := make(chan bool)
go func() {
for {
select {
case <- time.After(5*time.Second):
fmt.Println("timeout...")
quit <- true
case result,_:= <-chan1:
fmt.Println(result)
}
}
}()
for i:= 0; i < 5; i++ {
chan1 <- i
time.Sleep(time.Second)
}
<- quit
fmt.Println("done...")
}
定时任务例子:
func main() {
tickTimer := time.NewTicker(1 * time.Second)
tickTimer1 := time.NewTicker(20 * time.Second)
startTime:= time.Now().Add(60*time.Second)
for {
select {
case <-tickTimer.C:
fmt.Println("tickTime:",time.Now())
case <-tickTimer1.C:
fmt.Println("tickTime1:", time.Now())
}
if time.Now().After(startTime) {
fmt.Println("exit:",time.Now())
break
}
}
}
1.select里面case操作必须是通讯IO操作(channel的收发)
2.select中如果没有default语句,会阻塞等待任意一个case,满足则执行
3.select中如果没有default语句,多个case同时满足条件,随机选择一个case执行(case执行顺序是完全随机)
4.select中default语句,没有满足的case条件时执行
5.当select没有任何语句的时候,会一直阻塞
并发同步
sync 是go语言并发同步的包
sync.WaitGroup
可以使用sync.WaitGroup来实现并发任务的同步,而不是使用time.Sleep
func DoSomething(wg * sync.WaitGroup) {
defer wg.Done()
fmt.Println("do something...")
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go DoSomething(&wg)
go DoSomething(&wg)
wg.Wait()
fmt.Println("done...")
}
sync.WaitGroup 是结构体,参数传递要传递结构体指针
sync.Once
确保某些操作高并发的场景下执行一次
func Init(){
fmt.Println("init")
}
func main() {
var once sync.Once
go once.Do(Init)
go once.Do(Init)
time.Sleep(5*time.Second)
}
sync.Once是结构体,参数传递要传递结构体指针
互斥锁
var num = 0
var lock sync.Mutex
var wg sync.WaitGroup
func increment(){
lock.Lock()
num = num + 1
lock.Unlock()
wg.Done()
}
func main() {
wg.Add(300)
for i:=0; i < 300; i++ {
go increment()
}
wg.Wait()
fmt.Println(num)
}
读写锁
var (
num = 0
rw sync.RWMutex
wg sync.WaitGroup
)
func Write(){
rw.Lock()
num = num+1
time.Sleep(10*time.Millisecond)
rw.Unlock()
wg.Done()
}
func Read(){
rw.RLock()
time.Sleep(time.Millisecond)
rw.RUnlock()
wg.Done()
}
func main() {
for i:=0;i < 100; i++ {
wg.Add(1)
go Write()
}
for i:=0;i < 300; i++ {
wg.Add(1)
go Read()
}
wg.Wait()
fmt.Println("result:",num)
}
原子操作
sync/atomic包提供,主要如下操作:
方法 | 说明 |
---|---|
func LoadInt32(addr *int32) (val int32) | 读取操作 |
func StoreInt32(addr *int32, val int32) | 写入操作 |
func AddInt32(addr *int32, delta int32) (new int32) | 修改操作 |
func SwapInt32(addr *int32, new int32) (old int32) | 交换操作 |
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) | 比较并交换操作 |
sync.Once 源码如下:
type Once struct {
done uint32
m Mutex
}
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}