go客户端并发http请求
2021-01-25 本文已影响0人
Best博客
http客户端并发访问一个服务端
当你要通过http请求群发100万邮件(发送邮件的服务器不需要你考虑资源消耗),越快发送完越好,很自然你会通过调节http.client的pool资源池的大小,从而单位时间内发送更多的请求出去,这里便是值得注意的地方,你要向同一个目标服务器发送100万请求,哪怕你把pool调大到500,想的是顺时hi并发100个请求发送出去,但是其实只有2个,因为http.client中MaxIdleConnsPerHost默认值为2,它的作用是像同一个目标机器发送请求的最大并发量。
![](https://img.haomeiwen.com/i5746597/274bfff39657f966.png)
以下实战demo
package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/360EntSecGroup-Skylar/excelize/v2"
"github.com/go-resty/resty/v2"
"net/http"
"os"
"strings"
"sync"
"time"
)
var client = resty.New().SetDebug(false).SetHeader("Content-Type", "application/json").SetTimeout(10 * time.Second)
func init() {
t2 := http.DefaultTransport.(*http.Transport).Clone()
t2.MaxIdleConns = 100
t2.MaxConnsPerHost = 100
t2.MaxIdleConnsPerHost = 100 //无论你连接池多大,但是如果你是并发一个服务器,请调大次参数,它是允许你客户端向同一个服务端发送并发请求个数的控制参数,连接池100,是针对100个不同服务器的并发个数控制
client.SetTransport(t2)
}
type Task struct {
textgoApi string
filePath string
sheets map[string][]int
result chan string
result2 chan string
resultPath string
resultPath2 string
}
func main() {
task := Task{
filePath: "",
sheets: map[string][]int{},
result: make(chan string, 100),
result2: make(chan string, 100),
textgoApi: "",
resultPath: "",
resultPath2: "",
}
fmt.Println("start run***")
task.run()
fmt.Println("start end***")
}
func (t *Task) run() {
f, err := excelize.OpenFile(t.filePath)
if err != nil {
fmt.Println(err, "run openfile发生了错误")
return
}
go func() {
defer func() {
close(t.result)
close(t.result2)
}()
for key, val := range t.sheets {
fmt.Println(key, val)
t.handle(f, key, val)
}
}()
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
n := 0
for res := range t.result2 {
n++
t.appendWrite(t.resultPath2, res)
}
t.appendWrite(t.resultPath2, fmt.Sprintf("总共没违规合计:%d", n))
}()
go func() {
defer wg.Done()
num := 0
for res := range t.result {
num++
t.appendWrite(t.resultPath, res)
}
t.appendWrite(t.resultPath, fmt.Sprintf("总共违规合计:%d", num))
}()
wg.Wait()
}
func (t *Task) handle(file *excelize.File, sheet string, columns []int) {
rows, err := file.Rows(sheet)
if err != nil {
fmt.Println(err, "file.Rows(sheet)")
return
}
for rows.Next() {
data, err := rows.Columns()
if err != nil {
fmt.Println(err, "rows.Columns()")
continue
}
t.call(data, columns)
}
}
func (t *Task) call(data []string, columns []int) {
defer func() {
if err := recover(); err != nil {
fmt.Println(err, "recover---")
}
}()
for k, v := range data {
if t.exitsElem(columns, k+1) {
strs := strings.Split(v, ";")
isVio, s := t.isViolation(strs)
if isVio {
//违规
t.result <- fmt.Sprintf("违规类容:%s |--|--| 原始类容: %s", s, strings.Join(data, ""))
return
}
}
}
t.result2 <- fmt.Sprintf("没有违规类容:%s |--|--| ", strings.Join(data, ""))
}
func (t *Task) isViolation(text []string) (b bool, s string) {
for _, v := range text {
if v == "" {
continue
}
resp := t.isVoi(v)
if resp {
s = v
b = true
return
}
}
return
}
type RequestParam struct {
Text string `json:"text"`
}
type Resp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data struct {
OutputInfo []RespData `json:"output_info"`
} `json:"data"`
}
type RespData struct {
IsIllegal int `json:"is_illegal"`
IllegalType string `json:"illegal_type"`
Label int `json:"label"`
Score float64 `json:"score"`
}
//{
// "code": 0,
// "msg": "success",
// "data": {
// "output_info": [
// {
// "is_illegal": 1,
// "illegal_type": "涉政",
// "label": 3,
// "score": 0.99994314
// }
// ]
// }
//}
func (t *Task) isVoi(text string) (b bool) {
texts := []string{}
byteText := []rune(text)
by := bytes.Buffer{}
strText := ""
for _, v := range byteText {
by.WriteRune(v)
strText = by.String()
if len(strText) > 90 {
texts = append(texts, strText)
by = bytes.Buffer{}
}
}
texts = append(texts, strText)
for _, ttt := range texts {
b = t.post(ttt)
if b {
return
}
}
return
}
func (t *Task) post(ttt string) (b bool) {
data := Resp{}
for i := 0; i < 3; i++ {
reqParam := RequestParam{Text: ttt}
req, _ := json.Marshal(reqParam)
resp, err := client.R().SetBody(string(req)).Post(t.textgoApi)
if err != nil {
fmt.Println(err, "= client.R().SetBody(string(req)).Post(t.textgoApi)")
return
}
err = json.Unmarshal(resp.Body(), &data)
if err != nil {
return
}
if data.Code != 0 {
time.Sleep(time.Millisecond * 100)
fmt.Println("我是3次都失败了的数据", data)
continue
}
if len(data.Data.OutputInfo) <= 0 {
return
}
if data.Data.OutputInfo[0].IsIllegal == 1 {
b = true
return
}
return
}
fmt.Println("我是3次都失败了的数据", data, ttt)
return
}
func (t *Task) exitsElem(data []int, i int) (b bool) {
for _, v := range data {
if v == i {
return true
}
}
return
}
//追加写
func (t *Task) appendWrite(path string, content string) (err error) {
f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return
}
defer f.Close()
_, err = fmt.Fprintln(f, content)
return
}
// Pool Goroutine Pool
type Pool struct {
queue chan int
wg *sync.WaitGroup
}
// newPoll 新建一个协程池
func newPoll(size int) *Pool {
if size <= 0 {
size = 1
}
return &Pool{
queue: make(chan int, size),
wg: &sync.WaitGroup{},
}
}
// Add 新增一个执行
func (p *Pool) Add(delta int) {
// delta为正数就添加
for i := 0; i < delta; i++ {
p.queue <- 1
}
// delta为负数就减少
for i := 0; i > delta; i-- {
<-p.queue
}
p.wg.Add(delta)
}
// Done 执行完成减一
func (p *Pool) Done() {
<-p.queue
p.wg.Done()
}
// Wait 等待Goroutine执行完毕
func (p *Pool) Wait() {
p.wg.Wait()
}