区块链系统教程我爱编程

Ethereum源码阅读笔记-whisper

2018-06-14  本文已影响14人  豆瓣奶茶

go-ethereum/whisper

先从whipserv6看起。

Whisper定义

Whisper代表了在以太坊网络中的一个隐秘(dark)通信接口,使用的是以太坊自有的P2P通信层。所谓dark,意思是没有可靠的方法可以来追踪数据包(见于Specs

// whisper/whisperv6/whisper.go
type Whisper struct {
    // 协议描述和参数
    protocol p2p.Protocol
    // 订阅功能安装的消息过滤器
    filters  *Filters
    // 私钥存储
    privateKeys map[string]*ecdsa.PrivateKey
    // 对称密钥存储
    symKeys     map[string][]byte
    // 和密钥存储相关的互斥锁           
    keyMu       sync.RWMutex
    // 互斥地同步消息和过期池
    poolMu      sync.RWMutex
    // 当前由此节点跟踪的信封(envelopes)池
    envelopes   map[common.Hash]*Envelope
    // 消息过期池
    expirations map[uint32]*set.SetNonTS
    
    // 互斥地同步活跃节点集合
    peerMu sync.RWMutex
    // 当前活跃节点的集合
    peers  map[*Peer]struct{}
    // 普通whisper消息的消息队列
    messageQueue chan *Envelope
    // 点对点消息的消息队列(不会再被做任何转发的消息)
    p2pMsgQueue  chan *Envelope
    // 用于温和的退出的channel
    quit         chan struct{}
    // 存储了配置项的设置信息,以便可以动态修改。
    settings syncmap.Map
    // 允许处理whisper相关消息的最大时长(以秒为单位)
    syncAllowance int
    // 指示这个节点是否是纯粹的轻客户端(不转发任何消息)
    lightClient bool
    // 用于保护stats
    statsMu sync.Mutex
    // whisper节点的统计信息
    stats   Statistics
    // MailServer接口
    mailServer MailServer
}

Whisper节点的配置Config

由于新建一个Whisper实例,需要接收Config参数,所以先来看看Config的定义,见于config.go文件中。

// whisper/whisperv6/config.go
// Config描述了一个whisper节点的配置状态
type Config struct {
    // 最大消息长度
    MaxMessageSize     uint32  `toml:",omitempty"`
    // 接受工作量证明的最小值
    MinimumAcceptedPOW float64 `toml:",omitempty"`
}

这里面提到了一个MinimumAcceptedPOW,是关于工作量证明的,我在Proof of Work找到了这个设计的初衷。

之所以用到POW,主要是为了防止垃圾消息,也同样是为了缓解网络的压力。计算POW的成本可以理解为“你想让网络将你的消息存储一段时间(即TTL时间段),因此需要分配资源,那么你就需要为此支付价格”。所需的POW应该与消息大小和TTL成正比。

代码中还提供了一个默认配置样例DefaultConfig

// whisper/whisperv6/config.go
var DefaultConfig = Config{
    MaxMessageSize:     DefaultMaxMessageSize, // uint32(1024 * 1024)
    MinimumAcceptedPOW: DefaultMinimumPoW, // 0.2
}

新建一个Whisper客户端

搞明白了Config,开始新建一个Whisper客户端,whisper.go中的New()方法会新建一个Whisper客户端,该客户端可以在以太坊P2P网络中进行通信。

// whisper/whisperv6/whisper.go
func New(cfg *Config) *Whisper {
    // 如果传入的cfg是nil,则使用上面提到的DefaultConfig
    if cfg == nil {
        cfg = &DefaultConfig
    }
    // 初始化Whisper结构体
    whisper := &Whisper{
        privateKeys:   make(map[string]*ecdsa.PrivateKey),
        symKeys:       make(map[string][]byte),
        envelopes:     make(map[common.Hash]*Envelope),
        expirations:   make(map[uint32]*set.SetNonTS),
        peers:         make(map[*Peer]struct{}),
        messageQueue:  make(chan *Envelope, messageQueueLimit),
        p2pMsgQueue:   make(chan *Envelope, messageQueueLimit),
        quit:          make(chan struct{}),
        // 最长处理时间默认为10s
        syncAllowance: DefaultSyncAllowance,
    }
    // 使用NewFilters为这个whisper新建过滤器
    whisper.filters = NewFilters(whisper)
    
    // 将<minPowIdx, cfg.MinimumAcceptedPOW>放入settings内存中
    whisper.settings.Store(minPowIdx, cfg.MinimumAcceptedPOW)
    // 将<maxMsgSizeIdx, cfg.MaxMessageSize>放入settings内存中
    whisper.settings.Store(maxMsgSizeIdx, cfg.MaxMessageSize)
    // 指示消息队列是否溢出
    whisper.settings.Store(overflowIdx, false)
    // p2p whisper子协议规则
    whisper.protocol = p2p.Protocol{
        Name:    ProtocolName, // "shh"
        Version: uint(ProtocolVersion), // uint(uint64(6))
        Length:  NumberOfMessageCodes, // 128
        // 启动运行
        Run:     whisper.HandlePeer,
        // 节点信息
        NodeInfo: func() interface{} {
            return map[string]interface{}{
                "version":        ProtocolVersionStr, // "6.0"
                // 读取whisper.settings[maxMsgSizeIdx]
                "maxMessageSize": whisper.MaxMessageSize(),
                // 读取whisper.settings[minPowIdx]
                // 如果为空或出错,则返回DefaultMinimumPoW,即0.2
                "minimumPoW":     whisper.MinPow(),
            }
        },
    }
    return whisper
}

总体来说,New()主要是创建了一个空的whisper{}结构体,并尽力初始化,比较重要的地方,就是初始化whisper.protocol子协议字段,这里面涉及到很多魔数。

关于版本号6wiki上有这么一段话:

Tor系统有一个协议,可以在两个节点之间建立连接,但它们互相并不知道对方在哪里,这是用于隐藏服务的Rendezvous协议。隐藏的服务(相当于TCP/IP中一个监听端口的服务)会选择随机数量的”介绍人”节点(我想,这个随机数量一般都是6)。为了做到这一点,它会和每一个介绍人都建立标准的3跳链(3-hop chain),当一个用户想和一个隐藏服务建立连接时,它们就会传播一个请求去连接与特定公钥相关的隐藏服务。

关于消息代号数量128p2p.Protocol结构体中的Lenght字段表示某一子协议中消息代号的总数量,而在whisper/whisperv6/doc.go的常量表中可以看到根据EIP-627提案,whisper协议的消息代号共有128,故为128

HandlePeer:发现新节点后做什么

在新建whisper客户端时,whisper.protocol结构体用HandlerPeer()填充了Run字段,当和一个节点协商该协议时,会在一个新的goroutine上调用Run代表的方法。在whisper这里,当whisper子协议连接被协商的时候,下面的P2P层会调用HandlePeer

// whisper/whisperv6/whisper.go
func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
    // 新建一个whisper peer实例,接下来开始追踪它
    whisperPeer := newPeer(whisper, peer, rw)
    
    // 同步的将这个peer实例添加到追踪列表中
    whisper.peerMu.Lock()
    whisper.peers[whisperPeer] = struct{}{}
    whisper.peerMu.Unlock()
    // 当退出时,将该peer从追踪列表中删除
    defer func() {
        whisper.peerMu.Lock()
        delete(whisper.peers, whisperPeer)
        whisper.peerMu.Unlock()
    }()
    // 握手,状态更新
    if err := whisperPeer.handshake(); err != nil {
        return err
    }
    whisperPeer.start() // 启动通信
    defer whisperPeer.stop() // 退出时停止通信
    return whisper.runMessageLoop(whisperPeer, rw) // 开始消息循环处理
}

这里主要是在发现一个新的节点时,whisper协议所做的操作,大部分操作都是和Peer有关。下面就来看看whisperPeer

whisper中的Peer

peer.go中有关于Peer的定义,Peer代表了在whisper协议中的一个节点连接。

// whisper/whisperv6/peer.go
type Peer struct {
    // 本地whisper节点
    host *Whisper 
    // 远程whisper节点
    peer *p2p.Peer
    // 消息读写句柄
    ws   p2p.MsgReadWriter
    // 远程节点是否可信
    trusted        bool
    // 远程节点要求的POW值
    powRequirement float64
    // 布隆过滤器锁
    bloomMu        sync.Mutex
    // 布隆过滤器
    bloomFilter    []byte
    // 是否是全节点过滤器
    fullNode       bool
    // 存储该节点已知的消息,以避免浪费带宽
    known *set.Set // Messages already known by the peer to avoid wasting bandwidth
    // 优雅的退出连接
    quit chan struct{}
}

新建一个远程节点的本地代理Peer

// whisper/whisperv6/peer.go
// 新建一个whisper协议下的peer对象,但这个方法并不会去握手。
func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
    return &Peer{
        host:           host,
        peer:           remote,
        ws:             rw,
        trusted:        false, // 默认不可信
        powRequirement: 0.0, // 初始化为0
        known:          set.New(),
        quit:           make(chan struct{}),
        bloomFilter:    MakeFullNodeBloom(), //创建一个长度为64的布隆过滤器,并且所有位初始化为0xFF
        fullNode:       true, // 已经初始化为全节点过滤器
    }
}

和远程节点握手:handshake()

handshake()向远程节点发送协议初始化状态信息,同时也会验证远程节点的状态。状态消息有以下几点要素:

消息代号为statusCode,即0
消息的payload是一个列表:[whisper协议版本号(即6), 本地节点需要的最小POW值, 本地节点感兴趣的消息过滤器]
只有版本号是强制验证的,后续2个参数都是可选的,即本地节点要求的最小POW和本地节点感兴趣的消息过滤器

// whisper/whisperv6/peer.go
func (peer *Peer) handshake() error {
    
    // 异步地发送握手状态消息
    errc := make(chan error, 1) // error channel
    go func() {
        pow := peer.host.MinPow() // 获取本地节点(自己)的POW最小需求值
        powConverted := math.Float64bits(pow) // 将float64转化为uint64
        // BloomFilter()为本地节点所有感兴趣的话题返回一个集成的布隆过滤器
        // 要求远程节点只能发送被通告的布隆过滤器中匹配的消息
        // 如果不匹配,则被认为是垃圾消息,并将与该远程节点断开连接。
        bloom := peer.host.BloomFilter()
        // 通过读写句柄向远程节点发送状态消息(状态码为0,表示这是一个状态消息)
        // 这个消息包含了[whisper的协议版本号,POW值,消息相关的布隆过滤器]
        errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom)
    }()
    
    // 获取远程节点状态数据,并且验证协议是否匹配
    packet, err := peer.ws.ReadMsg() // 从读写句柄中读取远程消息
    if err != nil {
        return err
    }
    // 正常情况下,第一个数据包的代号应该是状态码,否则就报错
    if packet.Code != statusCode {
        return fmt.Errorf("peer [%x] sent packet %x before status packet", peer.ID(), packet.Code)
    }
    // 数据包是rlp序列化格式,需要进行解码
    s := rlp.NewStream(packet.Payload, uint64(packet.Size))
    _, err = s.List()
    if err != nil {
        return fmt.Errorf("peer [%x] sent bad status message: %v", peer.ID(), err)
    }
    // 读取前8个字节,作为协议版本号
    //(上面也看到了,状态消息的格式为一个列表:[whisper的协议版本号,POW值,消息相关的布隆过滤器])
    peerVersion, err := s.Uint()
    if err != nil {
        return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", peer.ID(), err)
    }
    // 如果获取到的协议版本号不是uint64(6),那么说明协议不匹配。
    if peerVersion != ProtocolVersion {
        return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", peer.ID(), peerVersion, ProtocolVersion)
    }
    // 只有版本号是强制要求的,后续的参数都是可选的
    powRaw, err := s.Uint() // 继续读取8个字节,作为POW值
    // 如果没出错,则验证pow和bloomfilter;如果出错,也没什么,继续往下走,即后续参数是可选的
    if err == nil {
        pow := math.Float64frombits(powRaw) // uint64 -> float64
        // pow无穷大,不是数字,小于0,都会报错
        if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 {
            return fmt.Errorf("peer [%x] sent bad status message: invalid pow", peer.ID())
        }
        // 用pow更新peer.powRequirement
        peer.powRequirement = pow
        var bloom []byte
        err = s.Decode(&bloom) //将剩余的部分全部解码,并存至bloom[]中
        if err == nil {
            sz := len(bloom)
            // 初始化时,布隆过滤器的长度都是BloomFilterSize,即64,否则就验证报错。
            if sz != BloomFilterSize && sz != 0 {
                return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz)
            }
            // 用解码后的bloom更新peer.bloomFilter,以及peer.fullNode
            peer.setBloomFilter(bloom)
        }
    }
    
    // 阻塞等待通道是否返回错误,如果出错,则报错。
    if err := <-errc; err != nil {
        return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
    }
    return nil
}

开始通信:start()

start()初始化

原文链接:http://www.huamo.online/2018/04/11/Ethereum%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB%E7%AC%94%E8%AE%B0-whisper/

上一篇下一篇

猜你喜欢

热点阅读