Golang

nsq源码(10) nsqlookupd 注册nsqd

2019-02-13  本文已影响1人  Linrundong

nsqlookupd 总共支持4种指令执行,分别是PING、IDENTIFY、REGISTER、UNREGISTER

[nsqlookupd] 2019/02/12 10:28:34.012898 INFO: TCP: listening on [::]:4160
[nsqlookupd] 2019/02/12 10:28:34.012898 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2019/02/12 10:28:36.930898 INFO: TCP: new client(127.0.0.1:62523)
[nsqlookupd] 2019/02/12 10:28:36.930898 INFO: CLIENT(127.0.0.1:62523): desired protocol magic '  V1'
[nsqlookupd] 2019/02/12 10:28:36.930898 INFO: CLIENT(127.0.0.1:62523): IDENTIFY Address:sz-linrundong TCP:4150 HTTP:4151 Version:1.1.0
[nsqlookupd] 2019/02/12 10:28:36.930898 INFO: DB: client(127.0.0.1:62523) REGISTER category:client key: subkey:
[nsqlookupd] 2019/02/12 10:28:36.931898 INFO: DB: client(127.0.0.1:62523) REGISTER category:channel key:test subkey:testchan
[nsqlookupd] 2019/02/12 10:28:36.931898 INFO: DB: client(127.0.0.1:62523) REGISTER category:topic key:test subkey:

nsqlookupd 注册指令

func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    switch params[0] {
    case "PING":
        return p.PING(client, params)
    case "IDENTIFY":
        return p.IDENTIFY(client, reader, params[1:])
    case "REGISTER":
        return p.REGISTER(client, reader, params[1:])
    case "UNREGISTER":
        return p.UNREGISTER(client, reader, params[1:])
    }
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}
func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    if client.peerInfo == nil {
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
    }

    // 处理指令字符串提取topic, channel名字
    topic, channel, err := getTopicChan("REGISTER", params)
    if err != nil {
        return nil, err
    }

    // 记录类别,topic,channel等名字
    if channel != "" {
        key := Registration{"channel", topic, channel}
        if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
            p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s",
                client, "channel", topic, channel)
        }
    }
    key := Registration{"topic", topic, ""}
    if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
        p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s",
            client, "topic", topic, "")
    }

    return []byte("OK"), nil
}
上一篇 下一篇

猜你喜欢

热点阅读