
MPT(Merkle Patricia Trie)

MPT这种数据结构实际上是一种Trie树变种,是以太坊中一种非常重要的数据结构,用来存储用户账户的状态及其变更(状态树)、交易信息(交易树)、交易的收据信息(收据树)。MPT实际上是两种数据结构的组合,分别是Merkle树 和 Patricia 树。由于Merkle树前一篇文档已有较详细的描述,本节将简单介绍Patricia 树的前身Trie树和它本身,接下来引出本文重点Merkle Patricia Trie, 分析一下MPT之于在以太坊区块的作用,以及相关实现细节。



position in the tree defines the key with which it is associated





[i0, i1 ... in, value]





If you want to store just one (path,value) binding where the path is (in the case of the ethereum state trie), 64 characters long (number of nibbles in bytes32), you will need over a kilobyte of extra space to store one level per character, and each lookup or delete will take the full 64 steps.



针对于Trie树存储不平衡的缺陷,PatriciaTrie,又被称为 RadixTree 或紧凑前缀树 (compact prefix tree),提出了一种优化 Trie空间使用率的idea:如果存在一个父节点只有一个子节点,那么这个父节点将与其子节点合并。这样可以缩短 Trie 中不必要的深度,大大加快搜索节点速度。


Merkle Patricia Trie

MPT(Merkle Patricia Trie)则是在Patricia树的基础上,添加了加密安全功能:引用为下一个节点的hash值,Merkle 树的特性带来的;其次引入了很多节点类型来提高效率:Null Node(空节点),Extension Node(拓展节点),Leaf Node(叶子节点),Branch Node(分支节点)。


以太坊区块结构 ———MPT实际应用场景



每个矿工在把交易打包成块的时候,会组织三棵树Merkle Patricia Trie:



One particular limitation(of Bitcoin) is that, while they can prove the inclusion of transactions, they cannot prove anything about the current state (eg. digital asset holdings, name registrations, the status of financial contracts, etc).


三棵树求取根哈希,可以得到 区块头中的StateRoot,TransactionsRoot,ReceiptsRoot三个字段。这样就建立了交易和区块头字段的映射。当其他用户收到块,根据块里的交易可以计算出收据和状态,计算三个根哈希后和区块头的三个字段进行验证,判断这是否为合法的块。




node 接口族担当整个 MPT 中的各种节点,node 接口分四种实现: fullNode,shortNode,valueNode,hashNode。


图取自《以太坊技术详解与实战》 以太坊状态树


type node interface {
    fstring(string) string
    cache() (hashNode, bool)
    canUnload(cachegen, cachelimit uint16) bool

type (
    fullNode struct {
        Children [17]node // Actual trie node data to encode/decode (needs custom encoder)
        flags    nodeFlag
    shortNode struct {
        Key   []byte
        Val   node
        flags nodeFlag
    hashNode  []byte
    valueNode []byte



func keybytesToHex(str []byte) []byte {
    l := len(str)*2 + 1
    var nibbles = make([]byte, l)
    for i, b := range str {
        nibbles[i*2] = b / 16
        nibbles[i*2+1] = b % 16
    nibbles[l-1] = 16
    return nibbles

*比如叶子节点key字段剩余的路径=>"bob",b的ASCII十六进制编码为0x62,o的ASCII十六进制编码为0x6f,分解成高四位和第四位,16表示终结 0x10,最终编码结果为[6 2 6 15 6 2 16],*

> all data is stored in bytes format, it is not possible to differentiate between, for instance, the nibble 1, and the nibbles 01 (both must be stored as <01>). To specify odd length, the partial path is prefixed with a flag.
> 所有信息按字节存储,所以对于半字节的信息,需要一种没有二义性的方式存:比如,1和01按字节存储结果都是<01>,HP就能解决

节点·序列化编码:RLP 编码




图中 · 表示concat连接


RLP 编码函数接受一个item(也就是以上公式中s(x))。item定义如下:








def rlp_encode(input):
    if isinstance(input,str):
        if len(input) == 1 and chr(input) < 128: return input
        else: return encode_length(len(input),128) + input
    elif isinstance(input,list):
        output = ''
        for item in input: output += rlp_encode(item)
        return encode_length(len(output),192) + output

def encode_length(L,offset):
    if L < 56:
         return chr(L + offset)
    elif L < 256**8:
         BL = to_binary(L)
         return chr(len(BL) + offset + 55) + BL
         raise Exception("input too long")

def to_binary(x):
    return '' if x == 0 else to_binary(int(x / 256)) + chr(x % 256)

Example MPT(逻辑梳理)


('do', 'verb'), ('dog', 'puppy'), ('doge', 'coin'), ('horse', 'stallion')


<64 6f> : 'verb'
<64 6f 67> : 'puppy'
<64 6f 67 65> : 'coin'
<68 6f 72 73 65> : 'stallion'




rootHash: [ <16>, hashA ]
hashA:    [ <>, <>, <>, <>, hashB, <>, <>, <>, hashC, <>, <>, <>, <>, <>, <>, <>, <> ]
hashC:    [ <20 6f 72 73 65>, 'stallion' ]
hashB:    [ <00 6f>, hashD ]
hashD:    [ <>, <>, <>, <>, <>, <>, hashE, <>, <>, <>, <>, <>, <>, <>, <>, <>, 'verb' ]
hashE:    [ <17>, hashF ]
hashF:    [ <>, <>, <>, <>, <>, <>, hashG, <>, <>, <>, <>, <>, <>, <>, <>, <>, 'puppy' ]
hashG:    [ <35>, 'coin' ]





H(rlp.encode(x)) = sha3(rlp.encode(x)) if len(rlp.encode(x)) >= 32 else rlp.encode(x),值得注意的是当MPT新出现一个len(rlp.encode(x))>32的节点时,为了反序列化,数据库需要额外存储键值对(sha3(rlp.encode(x)), rlp.encode(x)),因为hash函数的单向性,若只知道sha3(x)无法推导出x。



1. 如何根据某一个hash值在DB中找到整棵MPT?(Trie的反序列化过程)


// Trie is a Merkle Patricia Trie.
// The zero value is an empty trie with no database.
// Use New to create a trie that sits on top of a database.
// Trie is not safe for concurrent use.
type Trie struct {
    root         node
    db           Database
    originalRoot common.Hash

    // Cache generation values.
    // cachegen increases by one with each commit operation.
    // new nodes are tagged with the current generation and unloaded
    // when their generation is older than than cachegen-cachelimit.
    cachegen, cachelimit uint16

包含了当前的root节点, db是levelDB(键值对都是二进制的高效数据库,以太坊使用它主要考虑到它的写优化的特点),trie的结构最终都是需要通过键值对的形式(key为hash值,value为[]bytes)存储到数据库里面去,然后启动的时候是需要从数据库里面加载的。 originalRoot 启动加载的时候的hash值,通过这个hash值可以在数据库里面恢复出整颗的trie树。


func (t *Trie) newFlag() nodeFlag {
    return nodeFlag{dirty: true, gen: t.cachegen}




func (t *Trie) resolveHash(n hashNode, prefix []byte) (node, error) {
    cacheMissCounter.Inc(1) //每执行一次cache计数器+1
    hash := common.BytesToHash(n)
    // 通过hash从DB中取出node的RLP编码内容(如果node存在,否则返回空)
    enc, err := t.db.Node(hash)
    if err != nil || enc == nil {
        return nil, &MissingNodeError{NodeHash: hash, Path: prefix}
    return mustDecodeNode(n, enc, t.cachegen), nil


func decodeNode(hash, buf []byte, cachegen uint16) (node, error) {
    if len(buf) == 0 {
        return nil, io.ErrUnexpectedEOF
    elems, _, err := rlp.SplitList(buf) //将buf拆分为列表的内容以及列表后的任何剩余字节。
    if err != nil {
        return nil, fmt.Errorf("decode error: %v", err)
    switch c, _ := rlp.CountValues(elems); c {
    case 2:
        n, err := decodeShort(hash, elems, cachegen) //decode shortNode
        return n, wrapError(err, "short")
    case 17:
        n, err := decodeFull(hash, elems, cachegen) //decode fullNode
        return n, wrapError(err, "full")
        return nil, fmt.Errorf("invalid number of list elements: %v", c)
func decodeShort(hash, elems []byte, cachegen uint16) (node, error) {
    kbuf, rest, err := rlp.SplitString(elems) 
    if err != nil {
        return nil, err
    flag := nodeFlag{hash: hash, gen: cachegen}
    key := compactToHex(kbuf)
    if hasTerm(key) {//是否有结束标志符(也就是判断是否为叶子节点)
        // 叶子节点处理
        val, _, err := rlp.SplitString(rest)
        if err != nil {
            return nil, fmt.Errorf("invalid value node: %v", err)
        return &shortNode{key, append(valueNode{}, val...), flag}, nil
    r, _, err := decodeRef(rest, cachegen)
    if err != nil {
        return nil, wrapError(err, "val")
    return &shortNode{key, r, flag}, nil


func decodeRef(buf []byte, cachegen uint16) (node, []byte, error) {
    kind, val, rest, err := rlp.Split(buf)
    if err != nil {
        return nil, buf, err
    switch {
    case kind == rlp.List:
        // 'embedded' node reference. The encoding must be smaller
        // than a hash in order to be valid.
        if size := len(buf) - len(rest); size > hashLen {
            err := fmt.Errorf("oversized embedded node (size is %d bytes, want size < %d)", size, hashLen)
            return nil, buf, err
        n, err := decodeNode(nil, buf, cachegen)
        return n, rest, err
    case kind == rlp.String && len(val) == 0:
        // empty node
        return nil, rest, nil
    case kind == rlp.String && len(val) == 32:
        return append(hashNode{}, val...), rest, nil
        return nil, nil, fmt.Errorf("invalid RLP string size %d (want 0 or 32)", len(val))


func decodeFull(hash, elems []byte, cachegen uint16) (*fullNode, error) {
    n := &fullNode{flags: nodeFlag{hash: hash, gen: cachegen}}
    for i := 0; i < 16; i++ {
        cld, rest, err := decodeRef(elems, cachegen)
        if err != nil {
            return n, wrapError(err, fmt.Sprintf("[%d]", i))
        n.Children[i], elems = cld, rest
    val, _, err := rlp.SplitString(elems)
    if err != nil {
        return n, err
    if len(val) > 0 {
        n.Children[16] = append(valueNode{}, val...)
    return n, nil


2. MPT存在性证明&不存在性证明

if the root hash of a given trie is publicly known, then anyone can provide a proof that the trie has a given value at a specific path by providing the nodes going up each step of the way. It is impossible for an attacker to provide a proof of a (path, value) pair that does not exist since the root hash is ultimately based on all hashes below it, so any modification would change the root hash.

如果已知RootHash,在已知整棵MPT的情况下,任何人都可以提供某个value存在性证明,只需要从Root节点沿着key到该节点所经过的节点即可,如果该树不存在这个value,攻击者是无法提供一个 (path, value)通过存在性证明生成与已知的RootHash相同的hash值,从而保证了MPT的安全性


Prove constructs a merkle proof for key. The result contains all encoded nodes on the path to the value at key. The value itself is also included in the last node and can be retrieved by verifying the proof.


If the trie does not contain a value for key, the returned proof contains all nodes of the longest existing prefix of the key (at least the root node), ending with the node that proves the absence of the key.

总结一下,以root节点为开端,沿着SHA3 (ethereumAddress)(状态树)or rlp(transactionIndex)(交易树或者收据树)作为key的路径导向所经历的所有节点,到目标节点(存在性证明)或者拥有与目标节点最多共同前缀的节点(不存在性证明)。

proof.go Prove用于生成(不)存在性证明。

func (t *Trie) Prove(key []byte, fromLevel uint, proofDb ethdb.Putter) error {
    // Collect all nodes on the path to key.
    key = keybytesToHex(key)
    nodes := []node{}
    tn := t.root
    for len(key) > 0 && tn != nil {
        switch n := tn.(type) {
        case *shortNode:
            if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) {
                // The trie doesn't contain the key.
                tn = nil
            } else {
                tn = n.Val
                key = key[len(n.Key):]
            nodes = append(nodes, n)
        case *fullNode:
            tn = n.Children[key[0]]
            key = key[1:]
            nodes = append(nodes, n)
        case hashNode:
            var err error
            tn, err = t.resolveHash(n, nil)
            if err != nil {
                log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
                return err
            panic(fmt.Sprintf("%T: invalid node: %v", tn, tn))
    hasher := newHasher(0, 0, nil)
    defer returnHasherToPool(hasher)

    for i, n := range nodes {
        // Don't bother checking for errors here since hasher panics
        // if encoding doesn't work and we're not writing to any database.
        n, _, _ = hasher.hashChildren(n, nil)
        hn, _ := hasher.store(n, nil, false)
        if hash, ok := hn.(hashNode); ok || i == 0 {
            // If the node's database encoding is a hash (or is the
            // root node), it becomes a proof element.
            //当ok为false的时候,意味着node<32 bytes,并没有经过任何sha3,
            if fromLevel > 0 {
            } else {
                enc, _ := rlp.EncodeToBytes(n)
                if !ok {
                    hash = crypto.Keccak256(enc)
                proofDb.Put(hash, enc)
    return nil


func VerifyProof(rootHash common.Hash, key []byte, proofDb DatabaseReader) (value []byte, nodes int, err error) {
    key = keybytesToHex(key)
    wantHash := rootHash
    for i := 0; ; i++ {
        //从DB(sha3 hash,node)中根据hash取出node的值(这些node>32byte
        buf, _ := proofDb.Get(wantHash[:])
        if buf == nil {
            return nil, i, fmt.Errorf("proof node %d (hash %064x) missing", i, wantHash)
        //解析 RLP 编码
        n, err := decodeNode(wantHash[:], buf, 0)
        if err != nil {
            return nil, i, fmt.Errorf("bad proof node %d: %v", i, err)
        //1.对node根据类型做prove路径选择,遇到sha3 hash值就返回剩余路径和对应的hash值
        //2.对于node<32 byte的节点,直接对其本身数据进行解析比对
        keyrest, cld := get(n, key)
        switch cld := cld.(type) {
        case nil:
            // The trie doesn't contain the key.
            return nil, i, nil
        case hashNode:
            key = keyrest
            copy(wantHash[:], cld)
        case valueNode:
            return cld, i + 1, nil

func get(tn node, key []byte) ([]byte, node) {
    for {
        switch n := tn.(type) {
        case *shortNode:
            if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) {
                return nil, nil
            tn = n.Val
            key = key[len(n.Key):]
        case *fullNode:
            tn = n.Children[key[0]]
            key = key[1:]
        case hashNode:
            return key, n
        case nil:
            return key, nil
        case valueNode:
            return nil, n
            panic(fmt.Sprintf("%T: invalid node: %v", tn, tn))

3. MPT Get & Insert & Update & Delete & Commit操作?

值得注意的是:Trie树的插入,查找和删除 Trie树的初始化调用New函数,函数接受一个hash值和一个Database参数,如果hash值不是空值的化,就说明是从数据库加载一个已经存在的Trie树, 就调用trei.resolveHash方法来加载整颗Trie树,上文已介绍。

func (t *Trie) TryGet(key []byte) ([]byte, error) {
    key = keybytesToHex(key)
    value, newroot, didResolve, err := t.tryGet(t.root, key, 0)
    if err == nil && didResolve {
        t.root = newroot
    return value, err

func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode node, didResolve bool, err error) {
    switch n := (origNode).(type) {
    case nil:
        return nil, nil, false, nil
    case valueNode:
        // 就是要查找的叶子节点数据
        return n, n, false, nil
    case *shortNode:
        if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) {
            // key not found in trie
            return nil, n, false, nil
        value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key))
        if err == nil && didResolve {
            n = n.copy()
            n.Val = newnode
            n.flags.gen = t.cachegen
        return value, n, didResolve, err
    case *fullNode:
        value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1)
        if err == nil && didResolve {
            n = n.copy()
            n.flags.gen = t.cachegen
            n.Children[key[pos]] = newnode
        return value, n, didResolve, err
    case hashNode:
        // hashNodes时候需要去db中获取
        child, err := t.resolveHash(n, key[:pos])
        if err != nil {
            return nil, n, true, err
        value, newnode, _, err := t.tryGet(child, key, pos)
        return value, newnode, true, err
        panic(fmt.Sprintf("%T: invalid node: %v", origNode, origNode))

    func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
    if len(key) == 0 {
        if v, ok := n.(valueNode); ok {
            return !bytes.Equal(v, value.(valueNode)), value, nil
        return true, value, nil
    switch n := n.(type) {
    case *shortNode:
        matchlen := prefixLen(key, n.Key)
        // If the whole key matches, keep this short node as is
        // and only update the value.
        if matchlen == len(n.Key) {
            dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value)
            if !dirty || err != nil {
                return false, n, err
            return true, &shortNode{n.Key, nn, t.newFlag()}, nil
        // Otherwise branch out at the index where they differ.
        branch := &fullNode{flags: t.newFlag()}
        var err error
        _, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val)
        if err != nil {
            return false, nil, err
        _, branch.Children[key[matchlen]], err = t.insert(nil, append(prefix, key[:matchlen+1]...), key[matchlen+1:], value)
        if err != nil {
            return false, nil, err
        // Replace this shortNode with the branch if it occurs at index 0.
        if matchlen == 0 {
            return true, branch, nil
        // Otherwise, replace it with a short node leading up to the branch.
        return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil

    case *fullNode:
        dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value)
        if !dirty || err != nil {
            return false, n, err
        n = n.copy()
        n.flags = t.newFlag()
        n.Children[key[0]] = nn
        return true, n, nil

    case nil:
        return true, &shortNode{key, value, t.newFlag()}, nil

    case hashNode:
        // We've hit a part of the trie that isn't loaded yet. Load
        // the node and insert into it. This leaves all child nodes on
        // the path to the value in the trie.
        rn, err := t.resolveHash(n, prefix)
        if err != nil {
            return false, nil, err
        dirty, nn, err := t.insert(rn, prefix, key, value)
        if !dirty || err != nil {
            return false, rn, err
        return true, nn, nil

        panic(fmt.Sprintf("%T: invalid node: %v", n, n))



在 trie.Trie 结构体的 insert(),delete()等函数实现中,可以看到除了新创建的 fullNode、shortNode,那些子结构有所改变的 fullNode、shortNode 的 nodeFlag 成员也会被重设,hashNode 会被清空。在下次 trie.Commit()调用时,整个 MPT 自底向上的遍历过程中,所有清空的 hashNode 会被重新赋值。这样 trie.Hash()结束后,我们可以得到一个根节点 root 的 hashNode,它就是此时此刻这个 MPT 结构的哈希值。


func (t *Trie) Commit() (root common.Hash, err error) {
    if t.db == nil {
        panic("Commit called on trie with nil database")
    return t.CommitTo(t.db)
// CommitTo writes all nodes to the given database.
// Nodes are stored with their sha3 hash as the key.
// Committing flushes nodes from memory. Subsequent Get calls will
// load nodes from the trie's database. Calling code must ensure that
// the changes made to db are written back to the trie's attached
// database before using the trie.
func (t *Trie) CommitTo(db DatabaseWriter) (root common.Hash, err error) {
    hash, cached, err := t.hashRoot(db)
    if err != nil {
        return (common.Hash{}), err
    t.root = cached
    return common.BytesToHash(hash.(hashNode)), nil

func (t *Trie) hashRoot(db DatabaseWriter) (node, node, error) {
    if t.root == nil {
        return hashNode(emptyRoot.Bytes()), nil, nil
    h := newHasher(t.cachegen, t.cachelimit)
    defer returnHasherToPool(h)
    return h.hash(t.root, db, true)


// hash collapses a node down into a hash node, also returning a copy of the
// original node initialized with the computed hash to replace the original one.
func (h *hasher) hash(n node, db *Database, force bool) (node, node, error) {
    // If we're not storing the node, just hashing, use available cached data
    if hash, dirty := n.cache(); hash != nil {
        if db == nil {
            return hash, n, nil
        if n.canUnload(h.cachegen, h.cachelimit) {
            // Unload the node from cache. All of its subnodes will have a lower or equal
            // cache generation number.
            return hash, hash, nil
        if !dirty {
            return hash, n, nil
    // Trie not processed yet or needs storage, walk the children
    collapsed, cached, err := h.hashChildren(n, db)
    if err != nil {
        return hashNode{}, n, err
    hashed, err := h.store(collapsed, db, force)
    if err != nil {
        return hashNode{}, n, err
    // Cache the hash of the node for later reuse and remove
    // the dirty flag in commit mode. It's fine to assign these values directly
    // without copying the node first because hashChildren copies it.
    cachedHash, _ := hashed.(hashNode)
    switch cn := cached.(type) {
    case *shortNode:
        cn.flags.hash = cachedHash
        if db != nil {
            cn.flags.dirty = false
    case *fullNode:
        cn.flags.hash = cachedHash
        if db != nil {
            cn.flags.dirty = false
    return hashed, cached, nil

hasher的hash方法主要做了两个操作。 一个是保留原有的树形结构,并用cache变量中, 另一个是计算原有树形结构的hash并把hash值存放到cache变量中保存下来。

首先要理解的是:MPT的hash计算也是从根节点开始,依次向上递归调用,直至树根。计算原有hash值的主要流程是首先调用h.hashChildren(n,db)把所有的子节点的hash值求出来,把原有的子节点替换成子节点的hash值。 这是一个递归调用的过程,会从树叶依次往上计算直到树根。然后调用store方法计算当前节点的hash值,并把当前节点的hash值放入cache节点,设置dirty参数为false(新创建的节点的dirty值是为true的),然后返回。

// hashChildren replaces the children of a node with their hashes if the encoded
// size of the child is larger than a hash, returning the collapsed node as well
// as a replacement for the original node with the child hashes cached in.
func (h *hasher) hashChildren(original node, db *Database) (node, node, error) {
    var err error

    switch n := original.(type) {
    case *shortNode:
        // Hash the short node's child, caching the newly hashed subtree
        collapsed, cached := n.copy(), n.copy()
        collapsed.Key = hexToCompact(n.Key)
        cached.Key = common.CopyBytes(n.Key)
        //如果这个节点的value是hashNode,节点为extension node,则“深入”子节点去递归计算其hash值,并返回放到collapsed.Val中。其二值得注意的是:force设为了“false”
        if _, ok := n.Val.(valueNode); !ok {
            collapsed.Val, cached.Val, err = h.hash(n.Val, db, false)
            if err != nil {
                return original, original, err
        return collapsed, cached, nil

    case *fullNode:
        // Hash the full node's children, caching the newly hashed subtrees
        collapsed, cached := n.copy(), n.copy()
        for i := 0; i < 16; i++ {
            if n.Children[i] != nil {
                collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false)
                if err != nil {
                    return original, original, err
        cached.Children[16] = n.Children[16]
        return collapsed, cached, nil
        // Value and hash nodes don't have children so they're left as were
        return n, original, nil


如果当前节点是shortNode, 首先把collapsed.Key从Hex Encoding 替换成 Compact Encoding, 然后递归调用hash方法计算子节点的hash和cache,这样就把子节点替换成了子节点的hash值,
如果当前节点是fullNode, 那么遍历每个子节点,把子节点替换成子节点的Hash值,

执行完hashChildren方法之后,会执行store方法。如果一个node的所有子节点都替换成了子节点的hash值,那么直接调用rlp.Encode方法对这个节点进行编码,如果编码后的值小于32,并且这个节点不是根节点,那么就把他们直接存储在他们的父节点里面,否者调用h.sha.Write方法进行hash计算, 然后把hash值和编码后的数据存储到数据库里面,然后返回hash值。

func (h *hasher) store(n node, db DatabaseWriter, force bool) (node, error) {
    // Don't store hashes or empty nodes.
    if _, isHash := n.(hashNode); n == nil || isHash {
        return n, nil
    // Generate the RLP encoding of the node
    if err := rlp.Encode(h.tmp, n); err != nil {
        panic("encode error: " + err.Error())

    if h.tmp.Len() < 32 && !force {
        return n, nil // Nodes smaller than 32 bytes are stored inside their parent
    // Larger nodes are replaced by their hash and stored in the database.
    hash, _ := n.cache()
    if hash == nil {
        hash = hashNode(h.sha.Sum(nil))
    if db != nil {
        // We are pooling the trie nodes into an intermediate memory cache
        hash := common.BytesToHash(hash)

        db.insert(hash, h.tmp, n)
        // Track external references from account->storage trie
        if h.onleaf != nil {
            switch n := n.(type) {
            case *shortNode:
                if child, ok := n.Val.(valueNode); ok {
                    h.onleaf(child, hash)
            case *fullNode:
                for i := 0; i < 16; i++ {
                    if child, ok := n.Children[i].(valueNode); ok {
                        h.onleaf(child, hash)

    return hash, nil


了解以上流程之后针对Example MPT的PoC代码如下:

func TestElementProofsy(t *testing.T) {
    trie := newEmpty()
    updateString(trie, "do", "verb")
    root, err :=trie.Commit(nil)

    if err != nil {
        t.Fatal("commit error")



func (h *hasher) store(n node, db *Database, force bool) (node, error) {
    // Don't store hashes or empty nodes.
    if _, isHash := n.(hashNode); n == nil || isHash {
        return n, nil
    // Generate the RLP encoding of the node
    if err := rlp.Encode(&h.tmp, n); err != nil {
        panic("encode error: " + err.Error())
    if len(h.tmp) < 32 && !force {
        println("... self hash: "+hex.EncodeToString(h.tmp))
        return n, nil // Nodes smaller than 32 bytes are stored inside their parent
    // Larger nodes are replaced by their hash and stored in the database.
    hash, _ := n.cache()
    if hash == nil {
        hash = h.makeHashNode(h.tmp)

    if db != nil {

        println("... sha3 hash: "+hex.EncodeToString(hash))

        // We are pooling the trie nodes into an intermediate memory cache
        hash := common.BytesToHash(hash)

        db.insert(hash, h.tmp, n)
    return hash, nil




func TestElementProofsy(t *testing.T) {
    trie := newEmpty()
    updateString(trie, "do", "verb")
    root, err :=trie.Commit(nil)

    if err != nil {
        t.Fatal("commit error")
    println("root : ",hex.EncodeToString(root.Bytes()))

    for i, prover := range makeProvers(trie) {
        proof := prover([]byte("doge"))
        if proof == nil {
            t.Fatalf("prover %d: nil proof", i)
        println("\n***Verify Proof now!")

        val, _, err := VerifyProof(trie.Hash(), []byte("doge"), proof)
        if err != nil {
            t.Fatalf("prover %d: failed to verify proof: %v\nraw proof: %x", i, err, proof)
        if !bytes.Equal(val, []byte("coin")) {
            t.Fatalf("prover %d: verified value mismatch: have %x, want 'coin'", i, val)

        println("\n***prover verified successful")

func makeProvers(trie *Trie) []func(key []byte) *ethdb.MemDatabase {
    var provers []func(key []byte) *ethdb.MemDatabase

    // Create a direct trie based Merkle prover
    provers = append(provers, func(key []byte) *ethdb.MemDatabase {
        proof := ethdb.NewMemDatabase()
        println("\n***Create Merkle trie provers")
        trie.Prove(key, 0, proof)
        return proof
    return provers

LevelDB 写优化==》mLSM 跟老船长讨论TODO

security_trie 的idea


// SecureTrie is not safe for concurrent use.



