go等待多线程全部返回结果几种方式

2023-08-21  本文已影响0人  yichen_china

方法一


package main
 
import (
    "fmt"
    "time"
)
 
func request(index int,ch chan<- string)  {
    time.Sleep(time.Duration(index)*time.Second)
    s := fmt.Sprintf("编号%d完成",index)
    ch <- s
}
 
func main() {
    ch := make(chan string, 10)
 
    for i := 0; i < 4; i++ {
        go request(i, ch)
    }
 
    for {
        select {
        case i := <-ch: // select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句
            println(i)
        default:
            time.Sleep(time.Second)
            fmt.Println("无数据")
        }
    }
}

解决方式: 即我们在生成完4个goroutine后对data channel进行关闭,这样通过for range从通道循环取出全部值,通道关闭就会退出for range循环。

具体实现:可以利用sync.WaitGroup解决,在所有的 data channel 的输入处理之前,wg.Wait()这个goroutine会处于等待状态(wg.Wait()源码就是for循环)。当执行方法处理完后(wg.Done),wg.Wait()就会放开执行,执行后面的close(ch)。

方法二

package main
 
import (
    "fmt"
    "sync"
    "time"
)
 
var wg sync.WaitGroup
 
func request(index int,ch chan<- string)  {
    time.Sleep(time.Duration(index)*time.Second)
    s := fmt.Sprintf("编号%d完成",index)
    ch <- s
    defer wg.Done()
}
 
func main() {
    ch := make(chan string, 10)
 
    go func() {
        wg.Wait()
        close(ch)
    }()
 
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go request(i, ch)
    }
    
    for ret := range ch{
        fmt.Println(len(ch))
        fmt.Println(ret)
    }
}
package main
 
import (
    "fmt"
    "time"
)
 
func request(index int,ch chan<- string)  {
    time.Sleep(time.Duration(index)*time.Second)
    s := fmt.Sprintf("编号%d完成",index)
    ch <- s
}
 
func main() {
    ch := make(chan string, 10)
 
    for i := 0; i < 4; i++ {
        go request(i, ch)
    }
 
    for {
        select {
        case i := <-ch: // select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句
            println(i)
        default:
            time.Sleep(time.Second)
            fmt.Println("无数据")
        }
    }
}   

上面这种方式获取,通过select case + default的方式也可以完美避免阻塞死锁报错!但是适用于通道不关闭,需要时刻循环执行数据并且处理的情境下。

由此,引入了select多路复用的使用

在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以接收将会发生阻塞。select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。具体格式如下:

select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        默认操作
}

一定留意,default的作用很大! 是避免阻塞的核心。

使用select语句能提高代码的可读性。

可处理一个或多个channel的发送/接收操作。
如果多个case同时满足,select会随机选择一个。
对于没有case的select{}会一直等待,可用于阻塞main函数。
5、实际项目中goroutine+channel+select的使用

如下,使用于 项目监听终端中断信号操作:

srv := http.Server{
        Addr:    setting.AppConf.Http.Addr,
        Handler: routers.SetupRouter(setting.AppConf),
    }
 
    go func() {
        // 开启一个goroutine启动服务
        if err := srv.ListenAndServe(); err != nil {
            zap.S().Errorf("listen finish err: %s addr: %s", err, setting.AppConf.Http.Addr)
        }
    }()
 
    // 等待中断信号来优雅地关闭服务器,为关闭服务器操作设置一个5秒的超时
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
 
    for {
        select {
        case s := <-sig:
            zap.S().Infof("recv exit signal: %s", s.String())
            ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
            defer cancel()
            // 5秒内优雅关闭服务(将未处理完的请求处理完再关闭服务),超过5秒就超时退出
            if err := srv.Shutdown(ctx); err != nil {
                zap.S().Fatal("Server Shutdown err: ", err)
            }
            zap.S().Info("Server Shutdown Success")
            return
        }
    }

如下,使用于 项目通过通道来进行数据处理、数据发送接收等操作:

日志文件,收集日志

package taillog
 
// 专门从日志文件,收集日志
import (
    "context"
    "fmt"
    "github.com/hpcloud/tail"
    "logagent/kafka"
)
//var (
//  tailObj *tail.Tail
//)
 
//TailTask 一个日志收集的任务
type TailTask struct {
    path string
    topic string
    instance *tail.Tail
    //为了能实现退出t.run
    ctx context.Context
    cancelFunc context.CancelFunc
}
 
func NewTailTask(path,topic string) (tailObj *TailTask)  {
    ctx,cancel := context.WithCancel(context.Background())
    tailObj = &TailTask{
        path:path,
        topic:topic,
        ctx:ctx,
        cancelFunc:cancel,
    }
    tailObj.init() //根据路径去打开对应的日志
    return
}
 
func (t *TailTask)init()  {
    config := tail.Config{
        ReOpen:    true, //重新打开
        Follow:    true, //是否跟随
        Location:  &tail.SeekInfo{Offset:0,Whence:2}, //从文件哪个地方开始读
        MustExist: false, //文件不存在不报错
        Poll:      true,
    }
    var err error
    t.instance, err = tail.TailFile(t.path, config)
    if err != nil {
        fmt.Println("tail file failed,err:",err)
    }
    // 当goroutine执行的函数退出的时候,goroutine结束
    go t.run() //直接去采集日志,发送到kafka
}
 
func (t *TailTask)run()  {
    for{
        select {
        case <- t.ctx.Done():
            fmt.Printf("tail task:%s_%s 结束了\n",t.path,t.topic)
            return
        case line := <- t.instance.Lines: //从tailObj一行行读取数据
            //发往kafka
            //kafka.SendToKafka(t.topic,line.Text) //函数调用函数
 
            // 优化,先把日志数据发送到一个通道中
            // kafka包中有单独的goroutine去取日志发送到kafka
            kafka.SendToChan(t.topic,line.Text)
        }
    }
}

从kafka写日志

package kafka
 
//专门从kafka写日志
import (
    "fmt"
    "github.com/Shopify/sarama"
    "time"
)
 
type logData struct {
    topic string
    data string
}
 
var (
    client sarama.SyncProducer //声明一个全局连接kafka的生产者client
    logDataChan chan *logData
)
 
// 初始化client
func Init(address []string, maxSize int)(err error) {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll //发送完数据需要leader和follow都确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
    config.Producer.Return.Successes = true //成功交付的消息将在success channel 返回
 
    //连接kafka
    client,err = sarama.NewSyncProducer(address,config)
    if err != nil {
        fmt.Println("producer closed,err:",err)
        return
    }
    // 初始化logDataChan
    logDataChan = make(chan *logData,maxSize)
    // 开启后台的goroutine从通道取数据,发送kafka
    go sendToKafka()
    return
}
 
// 给外部暴漏一个函数,该函数只把日志数据发送到一个内部chan中
func SendToChan(topic,data string)  {
    msg := &logData{
        topic: topic,
        data:  data,
    }
    logDataChan <- msg
}
 
 
//真正往kafka发送日志的函数
func sendToKafka()  {
    for{
        select {
        case ld := <- logDataChan:
            // 构造一个消息
            msg := &sarama.ProducerMessage{}
            msg.Topic = ld.topic
            msg.Value = sarama.StringEncoder(ld.data)
            // 发送到kafka
            pid,offset,err := client.SendMessage(msg)
            if err != nil {
                fmt.Println("send msg failed,err:",err)
                return
            }
            fmt.Printf("pid:%v,offset:%v\n",pid,offset)
        default:
            time.Sleep(time.Microsecond*50)
        }
    }
}

整理比较随性,有点混乱,后续如果再碰到坑继续整理,继续踩坑优化~

上一篇 下一篇

猜你喜欢

热点阅读