Go语言自习室

第十七章:Go语言goroutine和channel

2019-11-02  本文已影响0人  楚江云
golang-gopher.png

Don't communicate by sharing memory, share memory by communicating.

1. 概述

编发编程 我们从字面上理解就是在同一个时间能执行多个任务就是并发,而并发编程表现就是程序由若干个自主活动单元组成.

1.1 并发和并行的区分

**进程 : ** 进程(process)是程序在操作系统中的一执行过程,是系统进行资源分配和调度的基本单位,或者我们可以将进程视为包含应用程序在运行中需要用到和维护的各种资源的 容器

**线程 : ** 线程(thread)是进程的一个执行实例,一个线程就是一个执行空间,这边执行空间可以被操作系统调度来运行函数中写的代码,每个进程至少包含一个线程,每个进程的初始线程称为 主线程 因为执行这个线程的空间就是应用程序的本身的空间,所以当主线程结束的时候,应用程序也会终止.

**并行 : ** 同一时刻,有多条指令在多个处理器上同时执行,我们可以类比如:火车站开了10个售票窗口,十个窗口在同时售票,

**并发 : ** 同一时刻只能执行一条指令,但是多个进程的指令被快速轮转执行,因为处理的速度太快了,所以宏观感受上多个进程在同时执行的. 我们可以类比于开了一个售票窗口,排了两队(多队)在售票,因为售票窗口处理的速度太快了,感觉排的两个(多个)队伍都在向前推行.

1.2 Go语言的并发概述

Go语言从语言层面就支持并行 ,并发程序的内存管理某些情况下会很复杂,但是GO语言提供了自动垃圾回收机制.这减少了程序员编写并发程序的复杂度.

Go语言里并发是值能让某个函数独立用户其他函数运行的能力,当一个函数被创建为goroutine时,将被视为独立的工作单元 这个独立单元可以被调度到可用的逻辑处理器上执行.Go语言的并发同步模型来自一个叫做 通信顺序进程(Communicating Sequential Processe) 简称 CSP的消息传递模型,其作用是在让让多个goroutine 之间进行通信 ,这里必须明确的是多个goroutine 之间数据同步和传递的数据类型叫做通道(channel) ,channel是一种数据类型记住这一点.

1.3 goroutine是什么?

Goroutine是Go语言在语言级别支持的轻量级线程,我们也将这种微线程简单的称之为协程 ,一个Goroutine 的栈启动很小(2k或者4k)当Goroutine的栈空间不够的时候,会根据需要动态伸缩栈大小(甚至可到到1G),真是因为Goroutine的启动栈很小,所以一个主线程上可以起很多个Goroutine.

Go语言新版本默认是为每个物理处理器分配一个逻辑处理器,Go程序在运行的时候在每个逻辑处理器上调度Goroutine 来执行,这样就充分发挥了当前多核机器的优势.

1.4 goroutine 调度模型介绍

Go语言的线程模型,简单讲是由3个核心元素支撑

  • M : 内核线程(物理线程)
  • P : 执行Go代码片段所必须的资源(我们也称为上下文环境)
  • G : 待执行的Go代码片段,或者理解为一个协程

简单讲,一个G的执行需要P和M的支持,P和M的关联就形成G的有效运行环境(内核线程+上下文环境),每个P都包含一个可以运行的G的队列,队列中的G会被依次传递给与本地相关联的M,并获取运行时机.

1.4.1 MPG的运行模式

下图简单的描述了系统线程,上下文执行环境(逻辑处理器)本地运行队列之间的关系.

我们看一个Goroutine的基本执行中MPG模式是怎么运行的

  1. P和操作系统线程M绑定,形成上下文执行环境(逻辑处理器) ,这个逻辑处理器 是真正执行Goroutine的
  2. 创建一个Goroutine并准备运行,这个Goroutine会被放到Go调度器的全局队列中
  3. 调度器将队列中的Goroutine 分配给一个上下文执行环境,并将Goroutine放到这个上下文执行环境对应的本地队列中
  4. 本地队列中的Goroutine会等待,直到自己被分配到执行环境吧中被执行
mpg1.png
1.4.2 当Goroutine阻塞时

当一个Goroutine执行了一些阻塞的系统调用的时候(例如数据库读取,打开文件,某些网络请求等),此时Go的调度器会将阻塞的线程G1与P1执行环境分离,并创建一个新的线程M2与P1关联构成上下文执行环境 执行对应的本地队列中的待执行的G 4

被剥离的G1会继续阻塞,等待阻塞解除,与此同时新线程M2执行本地队列中Goroutine,一旦G1阻塞结束,对应的G1将放回本地运行队列中

mpg2.png
1.4.3 Goroutine的并发与并行

并行是让不同的代码片段同时在不同的物理处理器(CPU)上执行,并发是同时管理很多事情,想让Goroutine到达真正的并行效果,那就必须使程序运行在多核的机器上.

从理论上讲,有多个逻辑处理器(M和P关联的上下文执行环境) 时,调度器会将Goroutine平分到每个逻辑处理器上,这样Goroutine就在不同的线程上运行了,可是实际是当物理机器为单核(1CPU)的时候,还是为并发 ,可是物理机器为多核(CPU) 那么就会是真正的并行, Goroutine同时在不同的CPU上执行

mpg3

2. goroutine 的基本使用

创建一个并发执行单元,只需要在函数调用语句前天剑关键字 go 就可以创建一个新的goroutine ,一个函数可以被多次创建goroutine ,一个goroutine必须对应一个函数

我们一直都知道Go程序是从main包的main()函数开始的,Go程序在启动的时候会为main()函数创建一个默认的goroutine ,这个就是我们所知的主线程

我们需要注意的是一旦主线程结束,无论其中创建的多少个goroutine 都会立刻结束

2.1 示例1

主线程结束,程序中包含的协程都会立刻结束

package main

import "fmt"

func task1(){
    for c := 'a'; c< 'a'+26;c++{
        fmt.Printf("%c\t",c)
    }
}

func main() {
    fmt.Println("main process start ...")
    // 启动一个goroutine
    go task1()
    fmt.Println("main process finish ...")
}

go run main.go

main process start ...
main process finish ...

为啥tesk1()函数中的没有输出呢?

主要原因是主线程和开启的协程是并发运行了,程序运行得太快了,tesk1()函数中的内容还没输出,主线程就结束,所以没有看到有效的输出

package main

import (
    "fmt"
    "time"
)

func task1(){
    for c := 'a'; c< 'a'+26;c++{
        fmt.Printf("%c\t",c)
    }
}

func main() {
    fmt.Println("main process start ...")
    go task1()
    // 让主线程晚1秒结束,在主线程结束前tesk1()也执行完了
    time.Sleep(time.Second)
    fmt.Println("main process finish ...")
}

go run main.go

main process start ...
a   b   c   d   e   f   g   h   i   j   k   l   m   n   o   p   q   r   s   t   u   v   w   x   y   z   main process finish ...

2.2 示例2

如何证明是在并发执行的呢?

package main

import (
    "fmt"
    "strconv"
    "time"
)

func task1(){
    for i:= 0 ;i<=5;i++{
        fmt.Println("task1() execute ..."+strconv.Itoa(i))
        time.Sleep(time.Second)
    }
}
func task2(){
    for i:= 0 ;i<=5;i++{
        fmt.Println("task2() execute ..."+strconv.Itoa(i))
        time.Sleep(time.Second)
    }
}
func main() {
    go task1()
    go task2()
    for i:= 0 ;i<=5;i++{
        fmt.Println("main execute..."+strconv.Itoa(i))
        time.Sleep(time.Second)
    }
}

go run main.go

task1() execute ...0
task2() execute ...0
main execute...0
task2() execute ...1
task1() execute ...1
main execute...1
task1() execute ...2
main execute...2
task2() execute ...2
task2() execute ...3
main execute...3
task1() execute ...3
task2() execute ...4
main execute...4
task1() execute ...4
main execute...5
task2() execute ...5
task1() execute ...5

Process finished with exit code 0

2.3 示例3

假设我们要进行比较大的运算,这个运算是求出1到500的阶乘是多少分别是多少?

阶乘是 1的阶乘是1,2的阶乘是12 ,3的阶乘是1 * 2 * 3 .....以此类推 500的阶乘就是1 * 2 * 3 * 4 * 5... 500

那我们可以将阶乘数可结果放到一个map中,再打印出来.看程序.

package main

import (
    "fmt"
    "time"
)

var resMap = make(map[int]int,20)
func factorial(x int){
    res := 1
    for i:=1 ;i<=x ; i++{
        res *= i
    }
    resMap[x] = res
}
func main() {
    //开启500个goroutine
    for i:=1;i<=500 ;i++{
        go factorial(i)
    }
    time.Sleep(time.Second*10)

    for i,v := range resMap{
        fmt.Printf("resMap[%d] = %d\n",i,v)
    }
}

go run main.go

fatal error: concurrent map writes

goroutine 22 [running]:
runtime.throw(0x4c9c7c, 0x15)

问题1 : 程序运行直接报错,出现了资源竞争

问题2 : 每次都执行goroutine 都要在主线程中等待,防止主线程结束协程直接结束,这显然是不合理的,每次休眠多久都是预估的,等待时间长了,整个程序就显得慢,等待时间短了,goroutine任务没有执行完,程序就退出了

这该怎么办呢? 这就必须通过channel来很好的解决它

Go语言不推荐共享内存的方式传递数据,而是推荐使用channel(通道) ,channel 主要用于在多个Goroutine之间传递数据,并且保障整个过程的并发安全

3. 通道channel

通过go 关键字我们将函数设置为并发执行,但是单纯的这样做没有意义的,函数之间数据的交换才能体现出并发的意义,通常可以通过共享内存的方式来进行数据交互,但是共享内存再不同的goroutine之间容易出现竞态问题,为了保障数据交换的正确性,那就得使用互斥量对操作的内存进行加锁,这样做就造成了性能问题

**竞态 : ** 如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时
读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)。竞争状态
的存在是让并发程序变得复杂的地方,十分容易引起潜在问题。对一个共享资源的读和写操作必
须是原子化的,换句话说,同一时刻只能有一个 goroutine 对共享资源进行读和写操作

上面的那个问题我们使用加锁的方式解决一下

package main

import (
    "fmt"
    "sync"
    "time"
)

var resMap = make(map[int]int,20)
// 声明一个Mutex(互斥锁)类型的变量lock
var lock sync.Mutex
func factorial(x int){
    res := 1
    for i:=1 ;i<=x ; i++{
        res *= i
    }
    // 加锁操作 将resMap锁住,每次只有一个goroutine能对它进行写操作
    lock.Lock()
    resMap[x] = res
    // 解锁操作 解锁之后其他的协程才能继续操作
    lock.Unlock()
}
func main() {
    for i:=1;i<=10 ;i++{
        go factorial(i)
    }
    time.Sleep(time.Second*2)

    for i,v := range resMap{
        fmt.Printf("resMap[%d] = %d\n",i,v)
    }
}

go run main.go

resMap[2] = 2
resMap[7] = 5040
resMap[10] = 3628800
resMap[5] = 120
resMap[6] = 720
resMap[3] = 6
resMap[4] = 24
resMap[8] = 40320
resMap[9] = 362880
resMap[1] = 1

Process finished with exit code 0

Go语言有一句很经典的描述 通过通讯来共享内存,而不是共享内存来通信 ,channel 就是Go语言提倡的通过通讯来共享内存.

  1. channel本质上是一个数据结构
  2. 数据是先进先出(FIFO)
  3. 线程安全,多个goroutine访问时,不需要加锁,也就是说channel本身就是线程安全的
  4. channel是一种数据类型,一个channel类型只能存放指定好的数据类型
  5. channel是引用类型
  6. channel 必须初始化之后才能使用,必须make之后才能使用

3.1 声明通道类型

声明格式如下

var 通道变量 chan 通道元素类型
  • 通道元素类型 : 通道内的数据类型

  • 通道变量 : 保存通道的变量

    package main
    func main(){
      // 声明元素为int类型的通道
      var ic chan int
      // 声明元素为string类型的通道
      var sc chan string
      // 声明元素为bool类型的通道
      var bc chan  bool
      // 声明元素为map[string]string类型的通道
      var mc chan map[string]string
    }
    
    

3.2 实例化通道

channel是引用类型,必须通过make之后才能使用

基本格式如下

make(chan 数据类型)
make(chan 数据类型, capacity)
  • make 是内置函数,此处的用法和新建一个map的情况类似
  • chan 是固定标识
  • 数据类型明确是通道内流动的数据类型
  • capacity 是一个整数,标识这这个通道带的缓冲区大小
package main
func main(){
    // 声明元素为int类型的通道
    var ic chan int
    // 通道使用
    ic = make(chan int)
    // 创建一个带缓冲的的通道实例
    sc:= make(chan string ,5)

}

3.3 有缓冲和无缓冲

内置函数 len() 放回当前通道中的元素个数 cap() 返回缓冲区大小

"无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通
道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个 goroutine
没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送
和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。"

package main

import (
    "fmt"
)

func main() {
    // 无缓冲channel变量
    // 等价于 ch1 := make(chan int)
    ch1 := make(chan int, 0)
    // 显示当前channel变量的大小,和缓冲区大小
    fmt.Printf("len(ch1) = %d,cap(ch1) = %d\n", len(ch1), cap(ch1))
    go func() {
        defer fmt.Println("coroutine is over")
        for i := 0; i < 3; i++ {
            ch1 <- i
            fmt.Printf("子协程正在运行[%d],len(ch1) = %d cap(ch1) = %d\n",i,len(ch1),cap(ch1))
        }
        close(ch1)
    }()
    for data := range ch1{
        fmt.Println("ch1 = ",data)
    }
    fmt.Println("main process is over")
}

go run main.go

len(ch1) = 0,cap(ch1) = 0
子协程正在运行[0],len(ch1) = 0 cap(ch1) = 0
ch1 =  0
ch1 =  1
子协程正在运行[1],len(ch1) = 0 cap(ch1) = 0
子协程正在运行[2],len(ch1) = 0 cap(ch1) = 0
coroutine is over
ch1 =  2
main process is over

有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类
型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的
条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲
区容纳被发送的值时,发送动作才会阻塞。这导致有缓冲的通道和无缓冲的通道之间的一个很大
的不同:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的
通道没有这种保证。

package main

import (
    "fmt"
)

func main() {
    // 有缓冲channel变量
    ch1 := make(chan int, 5)
    // 显示当前channel变量的大小,和缓冲区大小
    fmt.Printf("len(ch1) = %d,cap(ch1) = %d\n", len(ch1), cap(ch1))
    go func() {
        defer fmt.Println("coroutine is over")
        for i := 0; i < 5; i++ {
            ch1 <- i
            fmt.Printf("子协程正在运行[%d],len(ch1) = %d cap(ch1) = %d\n",i,len(ch1),cap(ch1))
        }
        close(ch1)
    }()
    for data := range ch1{
        fmt.Println("ch1 = ",data)
    }
    fmt.Println("main process is over")
}

go run mian.go

len(ch1) = 0,cap(ch1) = 5
子协程正在运行[0],len(ch1) = 0 cap(ch1) = 5
子协程正在运行[1],len(ch1) = 1 cap(ch1) = 5
子协程正在运行[2],len(ch1) = 2 cap(ch1) = 5
子协程正在运行[3],len(ch1) = 3 cap(ch1) = 5
子协程正在运行[4],len(ch1) = 4 cap(ch1) = 5
coroutine is over
ch1 =  0
ch1 =  1
ch1 =  2
ch1 =  3
ch1 =  4
main process is over

Process finished with exit code 0

3.3 使用通道发送数据

通过特殊操作符 <- 向通道发送数据

格式如下

通道变量或者通道实例 <- 值
  • 值可以是变量,常量,表达式,函数返回值等,值得类型必须和通道允许的元素类型一致
package main
func main(){
    // 声明元素为int类型的通道
    var ic chan int
    // 通道使用 无缓冲通道
    ic = make(chan int)
    ic<-9
}

go run main.go

fatal error: all goroutines are asleep - deadlock!

代码中形成发送和接收对应的代码,造成死锁

package main
func main(){
    // 声明元素为int类型的通道
    var ic chan int
    // 通道使用,设置有10通道元素的缓冲区
    ic = make(chan int,10)
    ic<-9
}

3.4 使用通道接收数据

使用通道接收数的操作符号依旧是 <-

格式如下

<- 通道变量或者通道实例
  1. // 形式1 会阻塞,知道接收到数据赋值给变量data
    data := <- 通道变量

package main

import (
    "fmt"
    "time"
)

func main() {
    start := time.Now()
    ch1 := make(chan string)
    go func() {
        fmt.Println("I am start ")
        // 模拟程序休眠2S
        time.Sleep(time.Second * 2)
        ch1 <- "this is demo"
    }()
    // 未接收到数据的时候回阻塞
    data := <-ch1
    fmt.Println(data)
    fmt.Println("ok is over spend", time.Now().Sub(start))
}

go run main.go

I am start 
this is demo
ok is over 2.0009877s
  1. // 形式2 不会阻塞
    // data 表示接收到的数据,未接收到数据的时候, data的值为零值
    // ok 表示是否接收到数据,为bool类型
    data,ok := <-通道变量

package main

import (
    "fmt"
)

func send(ic chan int) {
    // 向通道中发送数据
    for i := 1; i <= 5; i++ {
        fmt.Println("send :", i)
        ic <- i
    }
    close(ic)
}
func main() {
    // 声明元素为int类型的通道
    var ic chan int
    ic = make(chan int)
    go send(ic)
    for {
        //接收通道中的数据
        data, ok := <-ic
        // 通道中没有数据的时候 (ok值为false)跳出循环
        if !ok{
            break
        }
        //打印接收的数据
        fmt.Println("receive :", data)
    }
    fmt.Println("main process is over")
}

go run main.go

send : 1
send : 2
receive : 1
receive : 2
send : 3
send : 4
receive : 3
receive : 4
send : 5
receive : 5
main process is over
  1. // 形式3 会阻塞 ,直到接收到变量
    // 实际是丢通道内的元素
    <-通道变量

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan rune,1)
    ch1<-'a'
    time.Sleep(time.Second*5)
   // 等于将元素直接丢弃
    <-ch1
    fmt.Println("main process is over")
}

go run main.go

mian process is over

3.5 channel遍历

channel 支持for....range 的方式进行遍历

  • 使用range 遍历channel的时候如果没有关闭channel就会报deallock 的错误
  • 遍历时关闭了channel ,遍历结束会将正常退出循环
package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan rune)
    go func() {
        for c := 'a'; c < 'a'+26; c++ {
            ch1 <- c
            time.Sleep(time.Second)
        }
        //关闭channel
        close(ch1)
    }()
    // 遍历通道变量ch1的时候,如果没有关闭通道,那么会阻塞
    for data := range ch1 {
        fmt.Printf("%c", data)

    }
    fmt.Println("\n main process is over")
}

go run main.go

abcdefghijklmnopqrstuvwxyz
 main process is over

3.6 单向channel

默认情况下channel是双向的,也就是既能接收数据也能发送数据。

但是,我们经常见一个通道作为参数进行传递而值希望对方是单向使用的,要么只让它发送
数据,要么只让它接收数据,这时候我们可以指定通道的方向,指定通道方向的channel就是单向channel

单向channel 的声明格式

var 通道变量 chan 通道元素类型 // 默认正常channel是双向的
var 通道变量 chan<- 通道元素类型 // 单向通道 只能写入某类型的元素(向管道中写入数据)
var 通道变量  <- chan 通道元素类型 // 单向通道 只能读取某类型的数据(从管道中读取数据)

可以将默认双向通道转成单向通道(只读或者只写),但是不能将单向通道转成双向通道

package main

import (
    "fmt"
)

func onlyW(w chan<- int) {
    defer close(w)
    for i:=0;i<10;i++ {
        // 只能向w中写入数据
        w<-i
    }
}
func onlyR(r <-chan int){
    for data := range  r{
        fmt.Printf("%d\t",data)
    }
}
func main() {
    // 定义双向channel变量ch1
    ch1 := make(chan int, 10)
    // 隐式转换成单向 只写channel
    var send chan<- int = ch1
    // 隐式转换成单向 只读channel
    var rece <-chan int = ch1
    go onlyW(send)
    onlyR(rece)
    fmt.Println("it is over")
}

go run main.go

0   1   2   3   4   5   6   7   8   9   it is over

3.7 关闭channel

我们创建的channel是可以被关闭的,在一些情况下必须关闭channel,否则会报deallock

通过内置函数close(通道变量) 就能关闭 channel

channel 被关闭之后不能向通道内写入,但是可以从通道内读取数据

当channel 被关闭之后继续写入数据程序会报 panic: send on closed channel

channel关闭之后,如何判断channel是已经关闭了?

我们通常的做法是在读取channel数据时多返回值(格式可以灵活)

data, ok := <- channel

_,ok := <- channel

package main

import "fmt"

func write(ch1 chan int){
    defer close(ch1)
    for i:= 0;i<10;i++{
        ch1<-i
    }
}
func read (ch1 chan int){
    for{
        // 判断channel是否关闭
        data,ok :=<-ch1
        if !ok {
            fmt.Println("channel close")
            break
        }
        fmt.Printf("%d\t",data)
    }
}
func main(){
    ch1 := make(chan int ,10)
    go write(ch1)
    read(ch1)

}

go run main.go

0   1   2   3   4   5   6   7   8   9   channel close

3.8 通道多路复用 select

Go语言提供了一个关键字 select, 可以同时相应多个channel的操作,select 的每个case 都会对应一个通道的收发过程,当收发完成,就会触发case 中的响应语句,多个操作在每次select中挑选一个进行响应
基础格式如下

select {
    case 操作1:
        响应操作1
    case 操作2:
        响应操作2
    case 操作3:
        响应操作3
    default:
        默认操作
    
}
  1. 操作1,操作2,操作3.... 能对应哪些基础操作呢?

    操作 示例语句
    接收任意数据 case <-ch1
    接收并复制给变量 case data:=<-ch2
    发送数据 case ch3<-99
package main

import "fmt"

func main() {
    // 定义一个整型变量i
    var i int = 0
    // 新建一个存放整型的通道ch1
    ch1 := make(chan int, 10)
    // 新建一个存放字符串的通道ch2
    ch2 := make(chan string, 10)
    // 向channel中添加元素
    for i := 0; i < 10; i++ {
        ch1 <- i
    }
    for i := 0; i < 10; i++ {
        ch2 <- "golang" + fmt.Sprintf("%d", i)
    }
    for {
        select {
        // 监听ch1
        case  <-ch1:
            fmt.Printf("%d\n", i)
            i++
        // 监听ch2
        case s := <-ch2:
            fmt.Printf("%s\n", s)
        // 默认操作
        default:
            fmt.Println("no data")
            return
        }
    }
}

go run main.go

golang0
0
1
2
3
4
golang1
golang2
5
golang3
golang4
6
7
8
golang5
golang6
golang7
9
golang8
golang9
no data

示例代码看看如何处理select中如何处理通道超时的

示例代码中涉及到Time包中的After函数

After会在另一线程经过时间段后向返回值发送当时的时间。等价于NewTimer(d).C。

package main

import (
    "errors"
    "fmt"
    "time"
)

// 模拟客户端
func RpcClient(ch chan string, request string) (string, error) {
    ch <- request
    select {
    case ack := <-ch:
        return ack, nil
    // 通道超时处理
    case <-time.After(time.Second):
        return "", errors.New("server response time out")
    }
}

// 模拟服务器端
func RpcServer(ch chan string) {
    for {
        data := <-ch
        fmt.Println("server received :", data)
        // 模拟服务端响应表超时
        time.Sleep(time.Second * 2)
        ch <- "Yes I am"
    }
}
func main() {
    ch := make(chan string)
    go RpcServer(ch)
    re, err := RpcClient(ch, "are you ok ?")
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println("client received ", re)
    }

}

go run main.go

server received : are you ok ?
server response time out

我们看两个关于time 包中计时器(timer)和打点器(ticker)的代码示例

关于time包中的具体方法和函数 请参考 time

关于time包中AfterFunc()函数的用法

package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Println("program start")
    exit := make(chan bool)

    start := time.Now()
    // AfterFunc()第二个参数是回调函数
    time.AfterFunc(time.Second*2, func() {
        for i := 0; i < 5; i++ {
            fmt.Println("after ", i)
        }
        exit <- true
    })
    <-exit
    fmt.Println("it is over ,spend :",time.Now().Sub(start))
}


go run main.go

program start
after  0
after  1
after  2
after  3
after  4
it is over ,spend : 2.0005113s

关于time包中NewTimer() 和 NewTicker()的简单示例

package main

import (
    "fmt"
    "time"
)

func main(){
    var i int
    // 创建一个新的ticker
    ticker := time.NewTicker(time.Millisecond * 100)
    // 创建一个timer
    timer := time.NewTimer(time.Second*2)
    for{
        select {
            case <-timer.C:
                goto StopHere
            case <-ticker.C:
                i++
                fmt.Printf("%d\t",i)
        }
    }
    StopHere :
        fmt.Println("over")

}

go run main.go

1   2   3   4   5   6   7   8   9   10  11  12  13  14  15  16  17  18  19  20  over

4. 示例

4.1 Telnet 回音服务器

能接收客户端的连接,客户端输入,服务端响应给客户端,并且能执行客户端发出的指令
程序中涉及的 os,net,strings,bufio包 查看 Go语言标准库文档

程序结构

|_DemoServer.go
|_main.go
|_Session.go
|_TelnetCommand.go

DemoServer.go

package main

import (
    "fmt"
    "net"
)

func DemoServer(address string,exitC chan int){

    //在指定的ip和端口上新建一个监听器
    l,err := net.Listen("tcp",address)
    // 如果错误,结束服务
    if err!=nil{
        fmt.Println(err.Error())
        exitC<-1
    }
    // 提示信息
    fmt.Println("Listen : ",address)
    // 关闭监听器
    defer l.Close()
    // 进入循环
    for  {
        // 返回一个客户端的连接,无连接的时候阻塞等待新的连接
        conn,err := l.Accept()
        // 连接出现错误,打印错误,等待新的连接
        if err != nil{
            fmt.Println(err.Error())
            continue
        }
        // 每个客户端连接执行一个会话处理操作,该操作和接收客户端连接逻辑互不干扰
        go HandleSession(conn,exitC,address)
    }
}

main.go

package main

import "os"
const(
    HOST = "127.0.0.1"
    PORt = "7001"
)
func main() {
    address := HOST+":"+PORt
    // 定义一个无缓冲区的channel
    exitC := make(chan int)
    // 开启一协程
    go DemoServer(address, exitC)
    // 阻塞直到channel中有内容写入
    code := <-exitC
    os.Exit(code)
}

Session.go

package main

import (
    "bufio"
    "fmt"
    "net"
    "strings"
)

func HandleSession(conn net.Conn, exitC chan int, address string) {
    // 客户端连接会话提示
    fmt.Println("Session start :")
    // 创建一个网络读取器
    reader := bufio.NewReader(conn)
    // 循环读取数据
    for {
        // 读取字符串,直到遇见 "\n" 回车
        str, err := reader.ReadString('\n')
        // 读取数据正常时
        if err == nil {
            // 去掉读取数据的前后两端空白
            str = strings.TrimSpace(str)
            // 处理Telnet 客户端指令
            if !TelnetCommand(str, exitC) {
                // 关闭连接
                conn.Close()
                break
            }
            // 将数据写回客户端
            conn.Write([]byte(address + " response :" + str + "\r\n"))
        } else {
            // 读取错误,关闭连接,退出连接
            fmt.Println("Session close ", err.Error())
            conn.Close()
            break
        }
    }
}

TelnetCommand.go

package main

import (
    "fmt"
    "strings"
)

func TelnetCommand(s string, exitC chan int) bool {
    // 如果客户字符串前缀有 `@close` 关闭当前连接
    // `@shutdown` 关闭Telnet服务器
    if strings.HasPrefix(s, "@close") {
        // 提示
        fmt.Println("command close the session")
        return false
    } else if strings.HasPrefix(s, "@shutdown") {
        fmt.Println("command shutdown the server")
        exitC <- 0
        return false
    }
    fmt.Println(s)
    return true
}

程序测试

执行 go build

将代码编译一下生成.exe文件

main.exe

执行 main.exe

$ ./main.exe
Listen :  127.0.0.1:7001

客户端使用网络工具 nc

nc 127.0.01 7001
asdasd
127.0.0.1:7001 response :asdasd

4.2 打印出一个数字内有多少个素数

质数是指在大于1的自然数中,除了1和它本身以外不再有其他因数的自然数。

当指定的范围数足够大的时候,我们采用多个协程一起处理

package main

import (
    "fmt"
    "time"
)
// 计算一个数字内包含了多少个素数
var num int
// 开启的协程数量
var routineNum int
// 向通道内写入数据
func DataW(ch1 chan int) {
    // 关闭通道
    defer close(ch1)
    for i := 0; i < num; i++ {
        ch1 <- i
    }
}
// 判断字数内的素数,写入通道内
func checkNnum(input chan int, store chan int, exit chan bool) {
    var check bool
    for {
        // 从通道内读取数据
        data, ok := <-input
        // 没有数据的时候结束循环
        if !ok {
            break
        }
        // 素数判断逻辑
        check = true
        for i := 2; i < data; i++ {
            // 能整除除了1和本身以外的数字,那就不是素数
            if data%i == 0 {
                check = false
                break
            }
        }
        // 将符合的素数写入通道中
        if check {
            store <- data
        }

    }
    // 向终止通道写入数据
    exit <- true
}
func closeStore(exit chan bool, store chan int){
    for i:=0;i<routineNum;i++{
        <-exit
    }
    close(store)
}
func main() {
    // 获取客户端输入
    fmt.Println("请输入需要被计算的数(测试其中的素数) :")
    fmt.Scanln(&num)
    fmt.Println("请输入开启的协程数")
    fmt.Scanln(&routineNum)
    start := time.Now()
    var Res []int
    inputCh := make(chan int, 2000)
    storeCh := make(chan int, 500)
    exitCh := make(chan bool, routineNum)
    go DataW(inputCh)
    for i := 0; i < routineNum; i++ {
        go checkNnum(inputCh,storeCh,exitCh)
    }
    go closeStore(exitCh,storeCh)
    for {
        data ,ok :=<-storeCh
        if !ok{
            break
        }
        Res = append(Res,data)
    }
    for k,v := range Res{
        fmt.Printf("%d\t",v)
        if k%20 ==0 && k != 0{
            fmt.Printf("\n")
        }
    }
    fmt.Println("\nspend : ",time.Now().Sub(start))
}

上一篇下一篇

猜你喜欢

热点阅读