Go 并发编程(六)
2018-07-24 本文已影响39人
LL大L
goroutine
在go语言中,我们只需要在需要异步的函数前面加一个go 关键字即可完成异步
func main() {
for i := 0; i < 1000; i++ {
go func(i int) {
for {
fmt.Printf("Hello from" + " goroutine %d\n", i)
}
}(i)
}
time.Sleep(time.Millisecond)
}
定义
- 任何函数只需加上go关键字就能送给调度器运行
- 不需要再定义是区分是否是异步函数
- 调度器在合适的点进行切换
- 使用-race来检测数据访问冲突
goroutine 可能切换的点
- I/O, select
- channel
- 等待锁
- 函数调用(有时)
- runtime.Gosched()
- 只是参考,不能保证切换,不能保证在其他地方不切换
协程 Coroutine
在go中的并发,是使用协程来处理的,这里的协程具有以下几个特点
- 轻量级“线程”
- 非抢占式多任务处理,由协程主动交出控制权
- 编译器/解释器/虚拟机层面的多任务
- 多个协程可能在一个或多个线程上运行
channel
15294004898585.jpg下图为channel和调度器之间的关系
channel 是一等公民
下面我们用一段代码来演示channel在go中是一等公民
func worker(id int, c chan int) {
for n := range c{
fmt.Printf("worker %d received %c \n", id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}
func chanDemo() {
var channels [10]chan<- int
for i := 0; i < 10; i++ {
channels[i] = createWorker(i)
}
for i := 0; i < 10; i++ {
channels[i] <- 'a' + I
}
time.Sleep(time.Millisecond)
}
buffered channel
我们可以在make 一个channel时,后面跟一个数字来表示这个channel的缓存是多少,这里的d就不会被输出出来
func bufferedChannel() {
c := make(chan int, 3)
go worker(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
time.Sleep(time.Millisecond)
}
close channel
在worker函数中,使用range来判断channel是否关闭,如果没有关闭则会在此函数的生命周期内一直循环
func channelClose() {
c := make(chan int)
go worker(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
close(c)
time.Sleep(time.Millisecond)
}
这里的关闭一定是发送方来进行close,如果不使用range来判断,我们还可以用下面的方式来判断range
n, ok := <- c
if ok {
...
}
WaitGroup
这里我们使用WaitGroup 来创建两个并发请求
type worker struct {
in chan int
done func()
}
func doWorker(id int, w worker) {
for n := range w.in{
fmt.Printf("worker %d received %c \n", id, n)
w.done()
}
}
func createWorker(id int, wg *sync.WaitGroup) worker {
w := worker{
in : make(chan int),
done : func() {
wg.Done()
},
}
go doWorker(id, w)
return w
}
func chanDemo() {
var wg sync.WaitGroup
var workers [10] worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i, &wg)
}
wg.Add(20)
for i, worker := range workers {
worker.in <- 'a' + I
}
for i, worker := range workers {
worker.in <- 'A' + I
}
wg.Wait()
}
func main() {
chanDemo()
}
使用Select 来进行调度
下面我们来实现一个非租塞式,10秒钟结束,中间800ms没有操作则输出timeout,如果有操作则输出的一个例子
func worker(id int, c chan int) {
for n := range c {
fmt.Printf("worker %d received %d \n", id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- I
I++
}
}()
return out
}
func main() {
var c1, c2 = generator(), generator()
var worker = createWorker(0)
var values []int
// 计时器
tm := time.After(10 * time.Second)
for {
var activeWorker chan<- int
var activeValue int
if len(values) > 0 {
activeWorker = worker
activeValue = values[0]
}
select {
case n := <-c1:
values = append(values, n)
case n := <-c2:
values = append(values, n)
case activeWorker <- activeValue:
values = values[1:]
case <-time.After(800 * time.Millisecond):
fmt.Println("timeout")
// 10s 后调用
case <-tm:
fmt.Println("bye")
return
}
}
}
atomic 原子操作
这段代码如果不加锁,我们在race时,会告知当前值在读取时,有可能会被写
type atomicInt struct {
value int
lock sync.Mutex
}
func (a *atomicInt) increment() {
// 这样保证defer只在这个匿名函数中执行
func() {
a.lock.Lock()
defer a.lock.Unlock()
a.value++
}()
}
func (a *atomicInt) get() int {
a.lock.Lock()
defer a.lock.Unlock()
return a.value
}
func main() {
var a atomicInt
a.increment()
go func() {
a.increment()
}()
time.Sleep(time.Millisecond)
fmt.Println(a.get())
}