nsq源码剖析-nsqd(1)
1. 开始
开篇罗嗦了一大堆,终于开始进入正题了。golang的优秀源码很多,比如杀手级应用docker,google的Kubernetes等,但都太大了,这种能不能读?能。但不适合小白读,等你成长成老鸟再读吧。现在开始nsq源码的阅读,nsq是一个分布式消息队列,源码短小精悍,其实我阅读它还有个人的一个原因就是想从中提取出他处理tcp连接的框架。看看最后能不能实现这个愿望。
源码获取:
nsq官网:http://nsq.io/,github地址:https://github.com/nsqio/nsq
2. nsq功能概述
在阅读源码前大概了解一下它的功能对理解源码很有帮助,在官网的文档中有快速开始,可以快速了解一下他的功能。nsq有3个守护程序组成:nsqd,nsqlookup,nsqadmin。
nsqd负责从客户端接收,排队和传送消息。
nsqlookupd负责监控信息。客户端通过nsqlookupd发现nsqd的消费者的一个topic,nsqd节点把信息广播到nsqlookupd。
nsqadmin是一个web ui管理平台各种任务和实时展示聚合信息。
3. nsqd
废话不多说直接找到nsqd的main函数,源代码路径 /nsqio/nsq/apps/nsqd/nsqgd.go,它的外部依赖是比较少的,编译前请自行下载。先看一下他的main函数:
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
}
nsqd为了优雅的关闭退出,使用了svc包来管理程序的运行。先不用管svc,svc.run之后会执行Init和Start函数。
func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}
func (p *program) Start() error {
opts := nsqd.NewOptions()
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
rand.Seed(time.Now().UTC().UnixNano())
//not get it
if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqd"))
os.Exit(0)
}
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
}
}
cfg.Validate()
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
//not get it
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
p.nsqd = nsqd
return nil
}
Init函数没什么好说的,Start中有几个作用:初始化nsqd的参数,nsqd := nsqd.New(opts)根据参数创建nsqd实例,nsqd.LoadMetadata()从本地加载上次的运行数据,err = nsqd.PersistMetadata()不知道做什么先不用管,nsqd.Main()主程序运行。这个Start函数主要是调用了nsqd.Main()函数,其他都可先忽略。接着看最重要的nsqd.Main()。
4.nsqd.Main()
func (n *NSQD) Main() {
var httpListener net.Listener
var httpsListener net.Listener
//like relative with NSQD
ctx := &context{n}
tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)
os.Exit(1)
}
n.Lock()
n.tcpListener = tcpListener
n.Unlock()
//tcpServer not get it
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
})
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
os.Exit(1)
}
n.Lock()
n.httpsListener = httpsListener
n.Unlock()
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
})
}
httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
os.Exit(1)
}
n.Lock()
n.httpListener = httpListener
n.Unlock()
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
})
n.waitGroup.Wrap(func() { n.queueScanLoop() })
n.waitGroup.Wrap(func() { n.lookupLoop() })
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(func() { n.statsdLoop() })
}
}
main中主要有5个函数如下:
微信图片_20180406183619.jpg
5.protocol.TCPServer
路径:nsqio\nsq\internal\protocol\tcp_server.go
看路径就知道这是内部公共函数,代码如下:
package protocol
import (
"net"
"runtime"
"strings"
"github.com/nsqio/nsq/internal/lg"
)
type TCPHandler interface {
Handle(net.Conn)
}
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
logf(lg.WARN, "temporary Accept() failure - %s", err)
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
logf(lg.ERROR, "listener.Accept() - %s", err)
}
break
}
go handler.Handle(clientConn)
}
logf(lg.INFO, "TCP: closing %s", listener.Addr())
}
TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc)看到这个函数,我们就要知道main函数给它传得是那三个参数是什么,看main中的代码:
ctx := &context{n}
tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)
os.Exit(1)
}
n.Lock()
n.tcpListener = tcpListener
n.Unlock()
//tcpServer not get it
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
})
由代码可知n.tcpListener是连接监听器,tcpServer是tcpServer类型的变量,n.logf是日志相关参数可先不用管。这里有个参数要注意一下就是第二个参数 TCPHandler是个接口,传入tcpServer等于实现了这个接口,对go接口不熟悉的同学可能在这里就晕了。这个函数中有几个结构需要注意一下,ctx := &context{n} 和 tcpServer := &tcpServer{ctx: ctx},这里为什么要这样的结构封装,为什么用接口TCPHandler作为参数?这些疑问暂且记下。
TCPServer函数主要流程如下:
每接收到一个连接,就创建一个这个连接的handle协程,那么现在的关键是看这个协程是怎么样的,我们知道handler是一个接口类型,我们要找到Handle函数就要找参数tcpServer对应的类型,type tcpServer struct是在tcp.go中定义的,那么在tcp.go中就可以找到Handle函数的原型。下面就分析Handle函数。
6.(p *tcpServer) Handle(clientConn net.Conn)
路径:nsqio\nsq\nsqd\tcp.go
type tcpServer struct {
ctx *context
}
func (p *tcpServer) Handle(clientConn net.Conn) {
p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())
// The client should initialize itself by sending a 4 byte sequence indicating
// the version of the protocol that it intends to communicate, this will allow us
// to gracefully upgrade the protocol away from text/line oriented to whatever...
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
return
}
protocolMagic := string(buf)
p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{ctx: p.ctx}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
return
}
}
微信图片_20180406183619.jpg
客户端一旦连接上,就要发送4个字节" V2"否则服务端就断开连接。收到协议protocolMagic后执行prot.IOLoop(clientConn),从名字就能看出这是处理协议的地方。 prot 是个接口,通过prot = &protocolV2{ctx: p.ctx}可以实例化接口,再次自问为什么用接口?
7.(p *protocolV2) IOLoop(conn net.Conn)
路径:nsqio\nsq\nsqd\protocol_v2.go
type protocolV2 struct {
ctx *context
}
func (p *protocolV2) IOLoop(conn net.Conn) error {
var err error
var line []byte
var zeroTime time.Time
clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
// synchronize the startup of messagePump in order
// to guarantee that it gets a chance to initialize
// goroutine local state derived from client attributes
// and avoid a potential race with IDENTIFY (where a client
// could have changed or disabled said attributes)
messagePumpStartedChan := make(chan bool)
//not get it
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan
for {
//not get it
if client.HeartbeatInterval > 0 {
client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
} else {
client.SetReadDeadline(zeroTime)
}
// ReadSlice does not allocate new space for the data each request
// ie. the returned slice is only valid until the next call to it
line, err = client.Reader.ReadSlice('\n')
if err != nil {
if err == io.EOF {
err = nil
} else {
err = fmt.Errorf("failed to read command - %s", err)
}
break
}
// trim the '\n'
line = line[:len(line)-1]
// optionally trim the '\r'
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
params := bytes.Split(line, separatorBytes)
p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
var response []byte
response, err = p.Exec(client, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
break
}
// errors of type FatalClientErr should forceably close the connection
if _, ok := err.(*protocol.FatalClientErr); ok {
break
}
continue
}
if response != nil {
err = p.Send(client, frameTypeResponse, response)
if err != nil {
err = fmt.Errorf("failed to send response - %s", err)
break
}
}
}
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
conn.Close()
close(client.ExitChan)
if client.Channel != nil {
client.Channel.RemoveClient(client.ID)
}
return err
}
微信图片_20180406183619.jpg
8.(p *protocolV2) Exec(client *clientV2, params [][]byte)
路径:nsqio\nsq\nsqd\protocol_v2.go
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
if bytes.Equal(params[0], []byte("IDENTIFY")) {
return p.IDENTIFY(client, params)
}
err := enforceTLSPolicy(client, p, params[0])
if err != nil {
return nil, err
}
switch {
case bytes.Equal(params[0], []byte("FIN")):
return p.FIN(client, params)
case bytes.Equal(params[0], []byte("RDY")):
return p.RDY(client, params)
case bytes.Equal(params[0], []byte("REQ")):
return p.REQ(client, params)
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params)
case bytes.Equal(params[0], []byte("MPUB")):
return p.MPUB(client, params)
case bytes.Equal(params[0], []byte("DPUB")):
return p.DPUB(client, params)
case bytes.Equal(params[0], []byte("NOP")):
return p.NOP(client, params)
case bytes.Equal(params[0], []byte("TOUCH")):
return p.TOUCH(client, params)
case bytes.Equal(params[0], []byte("SUB")):
return p.SUB(client, params)
case bytes.Equal(params[0], []byte("CLS")):
return p.CLS(client, params)
case bytes.Equal(params[0], []byte("AUTH")):
return p.AUTH(client, params)
}
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}
以上就是协议里定义的各种功能函数,到此我们粗略的过了一遍tcpserver服务。
9.总结
把上面的流程串一下,主要是理清了主要流程,一些分支还需要再次阅读源码。一些结构体字段的意义和作用也没有介绍,还有为什么handle和ioloop流程用接口实现,这里先不做解释,我们先了解一下nsqd中tcpserver服务的主要脉络,而且我们要不断自问什么要这样设计,如果是我,我会如何做呢?
微信图片_20180406183619.jpg