golang学习篇章

go客户端并发http请求

2021-01-25  本文已影响0人  Best博客

http客户端并发访问一个服务端

当你要通过http请求群发100万邮件(发送邮件的服务器不需要你考虑资源消耗),越快发送完越好,很自然你会通过调节http.client的pool资源池的大小,从而单位时间内发送更多的请求出去,这里便是值得注意的地方,你要向同一个目标服务器发送100万请求,哪怕你把pool调大到500,想的是顺时hi并发100个请求发送出去,但是其实只有2个,因为http.client中MaxIdleConnsPerHost默认值为2,它的作用是像同一个目标机器发送请求的最大并发量。

image.png

以下实战demo

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "github.com/360EntSecGroup-Skylar/excelize/v2"
    "github.com/go-resty/resty/v2"
    "net/http"
    "os"
    "strings"
    "sync"
    "time"
)

var client = resty.New().SetDebug(false).SetHeader("Content-Type", "application/json").SetTimeout(10 * time.Second)

func init() {
    t2 := http.DefaultTransport.(*http.Transport).Clone()
    t2.MaxIdleConns = 100
    t2.MaxConnsPerHost = 100
    t2.MaxIdleConnsPerHost = 100  //无论你连接池多大,但是如果你是并发一个服务器,请调大次参数,它是允许你客户端向同一个服务端发送并发请求个数的控制参数,连接池100,是针对100个不同服务器的并发个数控制
    client.SetTransport(t2)
}

type Task struct {
    textgoApi   string
    filePath    string
    sheets      map[string][]int
    result      chan string
    result2     chan string
    resultPath  string
    resultPath2 string
}

func main() {
    task := Task{
        filePath:    "",
        sheets:      map[string][]int{},
        result:      make(chan string, 100),
        result2:     make(chan string, 100),
        textgoApi:   "",
        resultPath:  "",
        resultPath2: "",
    }
    fmt.Println("start run***")
    task.run()
    fmt.Println("start end***")
}

func (t *Task) run() {

    f, err := excelize.OpenFile(t.filePath)
    if err != nil {
        fmt.Println(err, "run openfile发生了错误")
        return
    }
    go func() {
        defer func() {
            close(t.result)
            close(t.result2)
        }()

        for key, val := range t.sheets {
            fmt.Println(key, val)
            t.handle(f, key, val)
        }
    }()

    wg := &sync.WaitGroup{}

    wg.Add(2)
    go func() {
        defer wg.Done()

        n := 0
        for res := range t.result2 {
            n++
            t.appendWrite(t.resultPath2, res)
        }
        t.appendWrite(t.resultPath2, fmt.Sprintf("总共没违规合计:%d", n))
    }()

    go func() {
        defer wg.Done()

        num := 0
        for res := range t.result {
            num++
            t.appendWrite(t.resultPath, res)
        }
        t.appendWrite(t.resultPath, fmt.Sprintf("总共违规合计:%d", num))
    }()

    wg.Wait()

}

func (t *Task) handle(file *excelize.File, sheet string, columns []int) {
    rows, err := file.Rows(sheet)
    if err != nil {
        fmt.Println(err, "file.Rows(sheet)")
        return
    }
    for rows.Next() {
        data, err := rows.Columns()
        if err != nil {
            fmt.Println(err, "rows.Columns()")
            continue
        }
        t.call(data, columns)
    }
}

func (t *Task) call(data []string, columns []int) {
    defer func() {
        if err := recover(); err != nil {
            fmt.Println(err, "recover---")
        }
    }()
    for k, v := range data {
        if t.exitsElem(columns, k+1) {

            strs := strings.Split(v, ";")

            isVio, s := t.isViolation(strs)

            if isVio {
                //违规
                t.result <- fmt.Sprintf("违规类容:%s           |--|--|          原始类容:  %s", s, strings.Join(data, ""))
                return
            }

        }
    }
    t.result2 <- fmt.Sprintf("没有违规类容:%s           |--|--|  ", strings.Join(data, ""))

}

func (t *Task) isViolation(text []string) (b bool, s string) {
    for _, v := range text {
        if v == "" {
            continue
        }
        resp := t.isVoi(v)
        if resp {
            s = v
            b = true
            return
        }
    }
    return
}

type RequestParam struct {
    Text string `json:"text"`
}

type Resp struct {
    Code int    `json:"code"`
    Msg  string `json:"msg"`
    Data struct {
        OutputInfo []RespData `json:"output_info"`
    } `json:"data"`
}

type RespData struct {
    IsIllegal   int     `json:"is_illegal"`
    IllegalType string  `json:"illegal_type"`
    Label       int     `json:"label"`
    Score       float64 `json:"score"`
}

//{
//    "code": 0,
//    "msg": "success",
//    "data": {
//        "output_info": [
//            {
//                "is_illegal": 1,
//                "illegal_type": "涉政",
//                "label": 3,
//                "score": 0.99994314
//            }
//        ]
//    }
//}
func (t *Task) isVoi(text string) (b bool) {
    texts := []string{}
    byteText := []rune(text)
    by := bytes.Buffer{}
    strText := ""
    for _, v := range byteText {
        by.WriteRune(v)
        strText = by.String()
        if len(strText) > 90 {
            texts = append(texts, strText)
            by = bytes.Buffer{}
        }
    }
    texts = append(texts, strText)

    for _, ttt := range texts {
        b = t.post(ttt)
        if b {
            return
        }
    }
    return
}

func (t *Task) post(ttt string) (b bool) {
    data := Resp{}
    for i := 0; i < 3; i++ {
        reqParam := RequestParam{Text: ttt}
        req, _ := json.Marshal(reqParam)

        resp, err := client.R().SetBody(string(req)).Post(t.textgoApi)
        if err != nil {
            fmt.Println(err, "= client.R().SetBody(string(req)).Post(t.textgoApi)")
            return
        }
        err = json.Unmarshal(resp.Body(), &data)
        if err != nil {
            return
        }
        if data.Code != 0 {
            time.Sleep(time.Millisecond * 100)
            fmt.Println("我是3次都失败了的数据", data)
            continue
        }

        if len(data.Data.OutputInfo) <= 0 {
            return
        }

        if data.Data.OutputInfo[0].IsIllegal == 1 {
            b = true
            return
        }
        return
    }
    fmt.Println("我是3次都失败了的数据", data, ttt)
    return
}

func (t *Task) exitsElem(data []int, i int) (b bool) {
    for _, v := range data {
        if v == i {
            return true
        }
    }
    return
}

//追加写
func (t *Task) appendWrite(path string, content string) (err error) {
    f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0644)
    if err != nil {
        return
    }
    defer f.Close()

    _, err = fmt.Fprintln(f, content)
    return
}

// Pool Goroutine Pool
type Pool struct {
    queue chan int
    wg    *sync.WaitGroup
}

// newPoll 新建一个协程池
func newPoll(size int) *Pool {
    if size <= 0 {
        size = 1
    }
    return &Pool{
        queue: make(chan int, size),
        wg:    &sync.WaitGroup{},
    }
}

// Add 新增一个执行
func (p *Pool) Add(delta int) {
    // delta为正数就添加
    for i := 0; i < delta; i++ {
        p.queue <- 1
    }
    // delta为负数就减少
    for i := 0; i > delta; i-- {
        <-p.queue
    }
    p.wg.Add(delta)
}

// Done 执行完成减一
func (p *Pool) Done() {
    <-p.queue
    p.wg.Done()
}

// Wait 等待Goroutine执行完毕
func (p *Pool) Wait() {
    p.wg.Wait()
}


参考文献
原来这样使用 Go HTTP 客户端才能获取更高性能

上一篇 下一篇

猜你喜欢

热点阅读