设计模式

手撸golang GO与微服务 ChatServer之3 压测与

2021-03-09  本文已影响0人  老罗话编程

缘起

最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

案例需求(聊天服务器)

目标(Day 3)

过程

单元测试

package chat_server

import (
    "fmt"
    cs "learning/gooop/chat_server"
    "math/rand"
    "os"
    "runtime"
    "runtime/pprof"
    "strings"
    "sync"
    "testing"
    "time"
)

func Test_ChatServer(t *testing.T) {
    fnAssertTrue := func(b bool, msg string) {
        if !b {
            t.Fatal(msg)
        }
    }

    // create dump file
    memProfile, err := os.Create("/home/ioly/chat_server_mem.profile")
    if err != nil {
        t.Fatal(err)
    }
    cpuProfile, err := os.Create("/home/ioly/chat_server_cpu.profile")
    if err != nil {
        t.Fatal(err)
    }
    rtnProfile, err := os.Create("/home/ioly/chat_server_routine.profile")
    if err != nil {
        t.Fatal(err)
    }
    err = pprof.StartCPUProfile(cpuProfile)
    if err != nil {
        t.Fatal(err)
    }
    defer pprof.StopCPUProfile()

    port := 3333
    server := cs.NewChatServer()
    err = server.Open(port)
    if err != nil {
        t.Fatal(err)
    }

    clientCount := 500
    wg := &sync.WaitGroup{}

    rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
    address := fmt.Sprintf("localhost:%v", port)
    for i := 0;i < clientCount;i++ {
        err, client := cs.DialChatClient(address)
        if err != nil {
            t.Fatal(err)
        }

        id := fmt.Sprintf("c%02d", i)
        //client.RecvHandler(func(client cs.IChatClient, msg cs.IMsg) {
        //  t.Logf("%v recv: %v\n", id, msg)
        //})

        wg.Add(1)
        go func() {
            client.SetName(id)
            client.Send(&cs.NameMsg{id })

            n := 0
            duration := rnd.Intn(3) + 1
            for range time.Tick(time.Duration(duration) * time.Second) {
                client.Send(&cs.ChatMsg{id, fmt.Sprintf("msg %02d from %v", n, id) })

                n++
                if n > 10 {
                    break
                }
            }

            client.Close()
            wg.Done()
        }()
    }

    // wait and set status
    done := []bool{false}
    go func() {
        wg.Wait()
        done[0] = true
    }()

    // print passed seconds
    passedSeconds := 0
    for range time.Tick(time.Second) {
        passedSeconds++
        t.Logf("%v seconds passed", passedSeconds)

        if done[0] {
            break
        }
    }

    time.Sleep(5*time.Second)
    server.Close()

    // dump heap file
    runtime.GC()
    err = pprof.Lookup("heap").WriteTo(memProfile, 0)
    if err != nil {
        t.Fatal(err)
    }
    err = pprof.Lookup("goroutine").WriteTo(rtnProfile, 0)
    if err != nil {
        t.Fatal(err)
    }

    // check server logs
    logs := cs.Logging.AllLogs()
    fnHasLog := func(log string) bool {
        for _,it := range logs {
            if strings.Contains(it, log) {
                return true
            }
        }
        return false
    }
    for i := 0;i < clientCount;i++ {
        msg := fmt.Sprintf("tChatClient.postConnClosed, c%02d, serverFlag=false", i)
        fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)

        msg = fmt.Sprintf("tChatServer.handleIncomingConn, clientCount=%v", i + 1)
        fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)

        msg = fmt.Sprintf("tChatClient.postConnClosed, c%02d, serverFlag=true", i)
        fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)

        msg = fmt.Sprintf("tChatServer.handleClientClosed, c%02d", i)
        fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)
    }
}

测试输出

并发500, 仅末尾部分

tChatClient.beginWrite, write error, c496, serverFlag=true
tChatClient.beginWrite, <- closeChan, c496, serverFlag=true
tChatClient.postConnClosed, c496, serverFlag=true
tChatServer.handleClientClosed, c496
tChatServer.handleClientClosed, c496, clientCount=1
tChatClient.closeConn, c498, serverFlag=false
tChatClient.beginWrite, <- closeChan, c498, serverFlag=false
tChatClient.beginRead, read error, c498, serverFlag=false
tChatClient.postConnClosed, c498, serverFlag=false
tChatClient.beginRead, read error, c498, serverFlag=true
tChatClient.closeConn, c498, serverFlag=true
tChatClient.beginWrite, <- closeChan, c498, serverFlag=true
tChatClient.postConnClosed, c498, serverFlag=true
tChatServer.handleClientClosed, c498
tChatServer.handleClientClosed, c498, clientCount=0
    ChatServer_test.go:97: 34 seconds passed
--- PASS: Test_ChatServer (39.50s)
PASS
ok      command-line-arguments  39.535s

routine泄漏诊断(修复前)

有泄漏时的诊断日志(并发50)

$ go tool pprof ~/chat_server_routine.profile 
File: chat_server.test
Type: goroutine
Time: Mar 9, 2021 at 9:12am (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 4, 100% of 4 total
Showing top 10 nodes out of 21
      flat  flat%   sum%        cum   cum%
         3 75.00% 75.00%          3 75.00%  runtime.gopark
         1 25.00%   100%          1 25.00%  runtime/pprof.runtime_goroutineProfileWithLabels
         0     0%   100%          1 25.00%  command-line-arguments.Test_ChatServer
         0     0%   100%          1 25.00%  learning/gooop/chat_server.(*tChatClient).beginWrite
         0     0%   100%          1 25.00%  learning/gooop/chat_server.(*tChatClient).closeConn
         0     0%   100%          1 25.00%  main.main
         0     0%   100%          1 25.00%  runtime.chanrecv
         0     0%   100%          1 25.00%  runtime.chanrecv1
         0     0%   100%          1 25.00%  runtime.chansend
         0     0%   100%          1 25.00%  runtime.chansend1
(pprof) 

可以看到closeConn函数阻塞

func (me *tChatClient) closeConn() {
    if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {
        return
    }

    Logging.Logf("tChatClient.closeConn, %v, serverFlag=%v", me.name, me.serverFlag)
    me.closeChan <- true
}

从代码看, closeConn函数阻塞只有一种可能, 就是closeChan阻塞.
这个原因有点诡异, 因为已经加了atomic.CompareAndSwapInt32保护,
仍然导致阻塞, 唯一原因是并发环境下, closeChan尚未开始读取, 是无法写入的.

修复routine泄漏

就是给导致写阻塞的closeChan加上缓冲, 大小等于closeConn函数的可能调用入口数量

func openChatClient(conn net.Conn, serverFlag bool) IChatClient {
    it := &tChatClient{
        conn: conn,
        openFlag: 0,
        closeFlag: 0,
        serverFlag: serverFlag,

        closeChan: make(chan bool, 3),  // 给closeChan添加缓冲区!!
        sendChan: make(chan IMsg, gMaxPendingSend),

        name: "anonymous",
        sendLogs: []IMsg{},
        dropLogs: []IMsg{},
        recvLogs: []IMsg{},
    }
    it.open()
    return it
}

routine泄漏诊断(修复后)

修复后, 加大并发到500, 未发现明显routine泄漏(不存在业务代码导致挂起的routine)

$ go tool pprof ~/chat_server_routine.profile 
File: chat_server.test
Type: goroutine
Time: Mar 9, 2021 at 9:28am (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 3, 100% of 3 total
Showing top 10 nodes out of 17
      flat  flat%   sum%        cum   cum%
         2 66.67% 66.67%          2 66.67%  runtime.gopark
         1 33.33%   100%          1 33.33%  runtime/pprof.runtime_goroutineProfileWithLabels
         0     0%   100%          1 33.33%  command-line-arguments.Test_ChatServer
         0     0%   100%          1 33.33%  main.main
         0     0%   100%          1 33.33%  runtime.chanrecv
         0     0%   100%          1 33.33%  runtime.chanrecv1
         0     0%   100%          1 33.33%  runtime.main
         0     0%   100%          1 33.33%  runtime/pprof.(*Profile).WriteTo
         0     0%   100%          1 33.33%  runtime/pprof.profileWriter
         0     0%   100%          1 33.33%  runtime/pprof.writeGoroutine
(pprof) 

(未完待续)

上一篇下一篇

猜你喜欢

热点阅读