go并发编程案例解析2[添加监控]

2019-01-05  本文已影响0人  百炼

date[2019-01-5]

package main

import (
    "bufio"
    "encoding/json"
    "flag"
    "fmt"
    "github.com/influxdata/influxdb/client/v2"
    "io"
    "log"
    "math/rand"
    "net/http"
    "net/url"
    "os"
    "regexp"
    "strconv"
    "strings"
    "time"
)

type LogProcess struct {
    rc chan []byte
    wc chan *Message

    read  Reader
    write Writer
}

type Reader interface {
    Read(rc chan []byte)
}

type Writer interface {
    Write(wc chan *Message)
}

type ReadDataFromFile struct {
    path string
}
type WriteDateToInfluxDb struct {
    influxDBua string
}

type Message struct {
    TimeLocal                    time.Time
    BytesSent                    int
    Path, Method, Scheme, Status string
    UpstreamTime, RequestTime    float64
}

const (
    TYPE_HANDLE_LINE = 0
    TYPE_ERR_NUM     = 1
)

var TypeMonitorChan = make(chan int, 200)
//系统状态监控
type SystemInfo struct {
    HandleLine   int     `json:"handleLine"`   //总处理日志行数
    Tps          float64 `json:"tps"`          //系统吞吐量
    ReadChanLen  int     `json:"readChanLen"`  //read channel长度
    WriteChanLen int     `json:"writeChanLen"` //write channel长度
    RunTime      string  `json:"runTime"`      //运行总时间
    ErrNum       int     `json:"errNum"`       //错误数
}

type Monitor struct {
    startTime time.Time
    data      SystemInfo
    tpsSli    []int
}

func (m *Monitor) start(lp *LogProcess) {
    go func() {
        for n := range TypeMonitorChan {
            switch n {
            case TYPE_ERR_NUM:
                m.data.ErrNum += 1
            case TYPE_HANDLE_LINE:
                m.data.HandleLine += 1
            }
        }
    }()

    ticker := time.NewTicker(time.Second * 5)
    go func() {
        for {
            <-ticker.C
            m.tpsSli = append(m.tpsSli, m.data.HandleLine)
            if len(m.tpsSli) > 2 {
                m.tpsSli = m.tpsSli[1:]
            }
        }
    }()
    http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
        m.data.RunTime = time.Now().Sub(m.startTime).String()
        m.data.ReadChanLen = len(lp.rc)
        m.data.WriteChanLen = len(lp.wc)

        if len(m.tpsSli) >= 2 {
            m.data.Tps = float64(m.tpsSli[1]-m.tpsSli[0]) / 5
        }

        ret, _ := json.MarshalIndent(m.data, "", "\t")
        io.WriteString(writer, string(ret))
    })

    http.ListenAndServe(":9193", nil)
}

func (w *WriteDateToInfluxDb) Write(wc chan *Message) {
    //初始化influxdb client
    //从Write Channel读取数
    //Tags:Path,Method,Scheme,Status
    //Fiedls:
    //Time:
    //写入模块
    infSlic := strings.Split(w.influxDBua, "@")

    c, err := client.NewHTTPClient(client.HTTPConfig{
        Addr:     infSlic[0],
        Username: infSlic[1],
        Password: infSlic[2],
    })
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    for v := range wc {
        // Create a new point batch
        bp, err := client.NewBatchPoints(client.BatchPointsConfig{
            Database:  infSlic[3],
            Precision: infSlic[4],
        })
        if err != nil {
            log.Fatal(err)
        }

        // Create a point and add to batch
        //Tags:Path Method Scheme Status

        tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status,}
        fields := map[string]interface{}{
            "BytesSent":    v.BytesSent,
            "UpstreamTime": v.UpstreamTime,
            "RequestTime":  v.RequestTime,
        }

        pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
        if err != nil {
            log.Fatal(err)
        }
        bp.AddPoint(pt)

        // Write the batch
        if err := c.Write(bp); err != nil {
            log.Fatal(err)
        }

        // Close client resources
        if err := c.Close(); err != nil {
            log.Fatal(err)
        }
        log.Println("Write Success!")
    }
}

func (r *ReadDataFromFile) Read(rc chan []byte) {
    //读取模块
    //打开文件
    f, err := os.Open(r.path)
    if err != nil {
        panic(fmt.Sprintf("open file eror :%s", err.Error()))
    }

    //从文件末尾读取
    f.Seek(0, 2)
    rd := bufio.NewReader(f)
    for {
        line, err := rd.ReadBytes('\n')
        if err == io.EOF {
            TypeMonitorChan <- TYPE_ERR_NUM
            time.Sleep(500 * time.Millisecond)
            continue
        } else if err != nil {
            TypeMonitorChan <- TYPE_ERR_NUM
            panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
        }
        TypeMonitorChan <- TYPE_HANDLE_LINE
        rc <- line[:len(line)-1]
    }
}

func (lp *LogProcess) ProcessData() {
    //处理模块
    r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"`)
    rd := rand.New(rand.NewSource(time.Now().UnixNano()))
    loc, _ := time.LoadLocation("Asia/Shanghai")
    for v := range lp.rc {
        fmt.Println(string(v))
        ret := r.FindStringSubmatch(string(v))
        if len(ret) != 10 {
            TypeMonitorChan <- TYPE_ERR_NUM
            log.Println("FindStringSubmatch fail:", string(v))
            continue
        }
        message := &Message{}
        location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0800", ret[4], loc)
        if err != nil {
            TypeMonitorChan <- TYPE_ERR_NUM
            log.Println("ParseInLocation fail:", err.Error(), string(ret[4]))

        }
        message.TimeLocal = location
        byteSent, _ := strconv.Atoi(ret[8])
        message.BytesSent = byteSent

        //GET /foo?query=t HTTP/1.0
        reqSli := strings.Split(ret[5], " ")
        if len(reqSli) != 3 {
            TypeMonitorChan <- TYPE_ERR_NUM
            log.Println("strings.Split Fail", ret[5])
            continue
        }

        message.Method = reqSli[1]
        message.Scheme = reqSli[1]
        u, err := url.Parse(reqSli[2])
        if err != nil {
            TypeMonitorChan <- TYPE_ERR_NUM
            log.Println("url parse fail:", err)
        }
        message.Path = u.Path
        message.Status = ret[6]

        message.UpstreamTime = rd.Float64() * 4
        message.RequestTime = rd.Float64() * 4
        //message.UpstreamTime, _ = strconv.ParseFloat(ret[12], 64)
        //message.RequestTime, _ = strconv.ParseFloat(ret[13], 64)
        lp.wc <- message
    }
}

func main() {
    var path, influxDsn string
    flag.StringVar(&path, "path", "C:/soft/nginx-1.15.8/logs/access.log", "read file path")
    flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@imooc@imoocpass@imooc@s", "read influxdb datasource")

    r := &ReadDataFromFile{
        path: path,
    }
    w := &WriteDateToInfluxDb{
        influxDBua: influxDsn,
    }
    lp := &LogProcess{
        rc:    make(chan []byte, 200),
        wc:    make(chan *Message, 200),
        read:  r,
        write: w,
    }

    go lp.read.Read(lp.rc)
    for i := 0; i < 2; i++ {
        go lp.ProcessData()
    }

    for i := 0; i < 4; i++ {
        go lp.write.Write(lp.wc)
    }

    m := Monitor{
        startTime: time.Now(),
        data:      SystemInfo{},
    }
    m.start(lp)
    time.Sleep(time.Duration(30000000) * time.Second)
}

结果

运行起来,在浏览器查看
http://127.0.0.1:9193/monitor

{
    "handleLine": 0,
    "tps": 0,
    "readChanLen": 0,
    "writeChanLen": 0,
    "runTime": "15.4403878s",
    "errNum": 31
}
上一篇 下一篇

猜你喜欢

热点阅读