nsq源码解读之nsqd
2018-04-07 本文已影响0人
高稚商de菌
nsqd是一个守护进程,用来接收和转发消息。和前文提到的nsqdlookup类似,它同样使用go-svc来管理进程。而在启动服务的时候,不仅支持tcp和http,还支持https。本文主要分析nsqd源码中值得借鉴的点。
1. 加载数据文件
在nsqd的start和stop函数中,程序除了读取配置以后,还涉及数据文件的加载和写入。数据文件中主要包含当前nsqd的topic和channel信息。
- 数据是以json格式存放的,格式定义为meta。包含topic和channel的信息,以及是否paused
- 数据存在两个文件中,一个代表当前的,另一个带ID的是用来作回滚的(从注释上看是这样的,但是这个文件可以是符号链接,具体应用场景未知)
有以下几点是可以学习的:
- 使用atomic原子操作来处理一些线程间共享的数据,避免使用锁,可以简化代码,降低开销
- 写文件的时候,可以先写一个tmp文件,写成功了再rename为正式的问题。
- 写文件时,将文件fsync落盘用sync()
- windows建立symlink需要管理员权限,所以需要重新写一份,而linux可以通过os.Symlink直接建立符号连接,不用写两份文件。
// nsqd/nsqd.go
type meta struct {
Topics []struct {
Name string `json:"name"`
Paused bool `json:"paused"`
Channels []struct {
Name string `json:"name"`
Paused bool `json:"paused"`
} `json:"channels"`
} `json:"topics"`
}
func writeSyncFile(fn string, data []byte) error {
f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
_, err = f.Write(data)
if err == nil {
err = f.Sync()
}
f.Close()
return err
}
func (n *NSQD) LoadMetadata() error {
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)
fn := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fnID := oldMetadataFile(n.getOpts())
// ......
// 此处省略代码为读取文件的过程
// ......
var m meta
err = json.Unmarshal(data, &m)
// ......
// 此处省略代码为恢复数据过程
// ......
return nil
}
func (n *NSQD) PersistMetadata() error {
// ......
// 此处省略代码为获取数据过程
// ......
tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
err = writeSyncFile(tmpFileName, data)
if err != nil {
return err
}
err = os.Rename(tmpFileName, fileName)
if err != nil {
return err
}
// technically should fsync DataPath here
stat, err := os.Lstat(fileNameID)
if err == nil && stat.Mode()&os.ModeSymlink != 0 {
return nil
}
// if no symlink (yet), race condition:
// crash right here may cause next startup to see metadata conflict and abort
tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())
if runtime.GOOS != "windows" {
err = os.Symlink(fileName, tmpFileNameID)
} else {
// on Windows need Administrator privs to Symlink
// instead write copy every time
err = writeSyncFile(tmpFileNameID, data)
}
if err != nil {
return err
}
err = os.Rename(tmpFileNameID, fileNameID)
if err != nil {
return err
}
// technically should fsync DataPath here
return nil
}
2. 在用flag模块读取命令行配置时,可以使用flag.Var,读取指定类型的配置。
type Value interface {
String() string
Set(string) error
}
func (f *FlagSet) Var(value Value, name string, usage string)
Var方法使用指定的名字、使用信息注册一个flag。该flag的类型和值由第一个参数表示,该参数应实现了Value接口。例如:
// nsqd/nsqd.go
type tlsMinVersionOption uint16
func (t *tlsMinVersionOption) Set(s string) error {
s = strings.ToLower(s)
switch s {
case "":
return nil
case "ssl3.0":
*t = tls.VersionSSL30
case "tls1.0":
*t = tls.VersionTLS10
case "tls1.1":
*t = tls.VersionTLS11
case "tls1.2":
*t = tls.VersionTLS12
default:
return fmt.Errorf("unknown tlsVersionOption %q", s)
}
return nil
}
func (t *tlsMinVersionOption) Get() interface{} { return uint16(*t) }
func (t *tlsMinVersionOption) String() string {
return strconv.FormatInt(int64(*t), 10)
}
......
......
tlsRequired := tlsRequiredOption(opts.TLSRequired)
tlsMinVersion := tlsMinVersionOption(opts.TLSMinVersion)
3. 可以使用一个chan来结束一个线程。原理是close chan之后,chan会输出该类型的零值。
// 结束的时候
// nsqd/nsqd.go
func (n *NSQD) Exit() {
......
close(n.exitChan)
......
}
func (n *NSQD) queueScanLoop() {
......
for {
select {
......
case <-n.exitChan:
goto exit
......
}
}
......
exit:
n.logf("QUEUESCAN: closing")
......
}
// nsqd/lookup.go
func (n *NSQD) lookupLoop() {
......
for {
select {
......
case <-n.exitChan:
goto exit
......
}
}
......
}