nsq源码解读之nsqd

2018-04-07  本文已影响0人  高稚商de菌

nsqd是一个守护进程,用来接收和转发消息。和前文提到的nsqdlookup类似,它同样使用go-svc来管理进程。而在启动服务的时候,不仅支持tcp和http,还支持https。本文主要分析nsqd源码中值得借鉴的点。

1. 加载数据文件

在nsqd的start和stop函数中,程序除了读取配置以后,还涉及数据文件的加载和写入。数据文件中主要包含当前nsqd的topic和channel信息。

有以下几点是可以学习的:

// nsqd/nsqd.go
type meta struct {
    Topics []struct {
        Name     string `json:"name"`
        Paused   bool   `json:"paused"`
        Channels []struct {
            Name   string `json:"name"`
            Paused bool   `json:"paused"`
        } `json:"channels"`
    } `json:"topics"`
}

func writeSyncFile(fn string, data []byte) error {
    f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
    if err != nil {
        return err
    }

    _, err = f.Write(data)
    if err == nil {
        err = f.Sync()
    }
    f.Close()
    return err
}

func (n *NSQD) LoadMetadata() error {
    atomic.StoreInt32(&n.isLoading, 1)
    defer atomic.StoreInt32(&n.isLoading, 0)

    fn := newMetadataFile(n.getOpts())
    // old metadata filename with ID, maintained in parallel to enable roll-back
    fnID := oldMetadataFile(n.getOpts())

    // ......
    // 此处省略代码为读取文件的过程
    // ......

    var m meta
    err = json.Unmarshal(data, &m)


    // ......
    // 此处省略代码为恢复数据过程
    // ......

    return nil
}

func (n *NSQD) PersistMetadata() error {

    // ......
    // 此处省略代码为获取数据过程
    // ......
    
    tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())

    err = writeSyncFile(tmpFileName, data)
    if err != nil {
        return err
    }
    err = os.Rename(tmpFileName, fileName)
    if err != nil {
        return err
    }
    // technically should fsync DataPath here

    stat, err := os.Lstat(fileNameID)
    if err == nil && stat.Mode()&os.ModeSymlink != 0 {
        return nil
    }

    // if no symlink (yet), race condition:
    // crash right here may cause next startup to see metadata conflict and abort

    tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())

    if runtime.GOOS != "windows" {
        err = os.Symlink(fileName, tmpFileNameID)
    } else {
        // on Windows need Administrator privs to Symlink
        // instead write copy every time
        err = writeSyncFile(tmpFileNameID, data)
    }
    if err != nil {
        return err
    }

    err = os.Rename(tmpFileNameID, fileNameID)
    if err != nil {
        return err
    }
    // technically should fsync DataPath here

    return nil
}
2. 在用flag模块读取命令行配置时,可以使用flag.Var,读取指定类型的配置。
type Value interface {
    String() string
    Set(string) error
}

func (f *FlagSet) Var(value Value, name string, usage string)

Var方法使用指定的名字、使用信息注册一个flag。该flag的类型和值由第一个参数表示,该参数应实现了Value接口。例如:

// nsqd/nsqd.go
type tlsMinVersionOption uint16

func (t *tlsMinVersionOption) Set(s string) error {
    s = strings.ToLower(s)
    switch s {
    case "":
        return nil
    case "ssl3.0":
        *t = tls.VersionSSL30
    case "tls1.0":
        *t = tls.VersionTLS10
    case "tls1.1":
        *t = tls.VersionTLS11
    case "tls1.2":
        *t = tls.VersionTLS12
    default:
        return fmt.Errorf("unknown tlsVersionOption %q", s)
    }
    return nil
}

func (t *tlsMinVersionOption) Get() interface{} { return uint16(*t) }

func (t *tlsMinVersionOption) String() string {
    return strconv.FormatInt(int64(*t), 10)
}

......
......

    tlsRequired := tlsRequiredOption(opts.TLSRequired)
    tlsMinVersion := tlsMinVersionOption(opts.TLSMinVersion)
3. 可以使用一个chan来结束一个线程。原理是close chan之后,chan会输出该类型的零值。
// 结束的时候
// nsqd/nsqd.go
func (n *NSQD) Exit() {
    ......
    close(n.exitChan)
    ......
}

func (n *NSQD) queueScanLoop() {
    ......
    for {
        select {
        ......
        case <-n.exitChan:
            goto exit
        ......
        }
    }
    ......
exit:
    n.logf("QUEUESCAN: closing")
    ......
}

// nsqd/lookup.go
func (n *NSQD) lookupLoop() {
    ......
    for {
        select {
        ......
        case <-n.exitChan:
            goto exit
        ......
        }
    }
    ......
}
4. 最关键的topic的创建和消息的发布,将在接下来的文章中详细分析。
上一篇 下一篇

猜你喜欢

热点阅读