Polygon zkEVM Node 梳理

2023-03-26  本文已影响0人  雪落无留痕

jsonrpc 模块

用户通过jsonrpc接口广播交易,对于Non-Sequencer 节点,会将交易转发给Sequencer; 对于Sequencer节点,会将交易添加到交易池中。

// SendRawTransaction has two different ways to handle new transactions:
// - for Sequencer nodes it tries to add the tx to the pool
// - for Non-Sequencer nodes it relays the Tx to the Sequencer node
func (e *EthEndpoints) SendRawTransaction(input string) (interface{}, rpcError) {
    if e.cfg.SequencerNodeURI != "" {
        return e.relayTxToSequencerNode(input)
    } else {
        return e.tryToAddTxToPool(input)
    }
}

pool 模块

pool 模块会对交易进行验验,交易状态置为pending 状态,

// AddTx adds a transaction to the pool with the pending state
func (p *Pool) AddTx(ctx context.Context, tx types.Transaction) error {
    if err := p.validateTx(ctx, tx); err != nil {
        return err
    }

    poolTx := Transaction{
        Transaction: tx,
        Status:      TxStatusPending,
        IsClaims:    false,
        ReceivedAt:  time.Now(),
    }
  
  // 检查是否bridge claim 交易
    poolTx.IsClaims = poolTx.IsClaimTx(p.l2BridgeAddr, p.cfg.FreeClaimGasLimit)

    // Execute transaction to calculate its zkCounters
    zkCounters, err := p.PreExecuteTx(ctx, tx)
    if err == nil {
        poolTx.ZKCounters = zkCounters
    }

    if executor.IsExecutorOutOfCountersError(executor.ExecutorErrorCode(err)) {
        return err
    }

    return p.storage.AddTx(ctx, poolTx)
}

pool 将交易通过executor 预执行处理后,主要计算ZKCounters, 保存到pool.transaction中,交易结构为:

type Transaction struct {
    types.Transaction
    Status   TxStatus
    IsClaims bool
    state.ZKCounters
    FailedCounter         uint64
    ReceivedAt            time.Time
    PreprocessedStateRoot common.Hash
}

Sequencer 横块

dbManager 模块

主要启动两个任务:

// Start stars the dbManager routines
func (d *dbManager) Start() {
    go d.loadFromPool()
    go d.storeProcessedTxAndDeleteFromPool()
}

loadFromPool 从pool.transaction 读取pending 交易,添加到worker 中。

func (d *dbManager) addTxToWorker(tx pool.Transaction, isClaim bool) error {
   txTracker, err := d.worker.NewTxTracker(tx.Transaction, isClaim, tx.ZKCounters)
   if err != nil {
       return err
   }
   d.worker.AddTx(d.ctx, txTracker)
   return d.txPool.UpdateTxStatus(d.ctx, tx.Hash(), pool.TxStatusWIP)  // 将交易置为WIP状态
}

首先构建TxTracker, 结构如下:

// TxTracker is a struct that contains all the tx data needed to be managed by the worker
type TxTracker struct {
    Hash           common.Hash
    HashStr        string
    From           common.Address
    FromStr        string
    Nonce          uint64
    Gas            uint64 // To check if it fits into a batch
    GasPrice       *big.Int
    Cost           *big.Int       // Cost = Amount + Benefit
    Benefit        *big.Int       // GasLimit * GasPrice
    IsClaim        bool           // Needed to calculate efficiency
    BatchResources batchResources // To check if it fits into a batch
    Efficiency     float64   // tx efficiency
    RawTx          []byte
}

交易最终添加到effciencyList, 根据efficiency 排序:

// add adds a tx to the efficiencyList
func (e *efficiencyList) add(tx *TxTracker) bool {
    e.mutex.Lock()
    defer e.mutex.Unlock()

    if _, found := e.list[tx.HashStr]; !found {
        e.list[tx.HashStr] = tx
        e.addSort(tx)
        return true
    }
    return false
}

efficiencyList 结构为:

// efficiencyList represents a list of tx sorted by efficiency
type efficiencyList struct {
    list   map[string]*TxTracker
    sorted []*TxTracker
    mutex  sync.Mutex
}

storeProcessedTxAndDeleteFromPool 通过txStore 通道获取交易,并添加到batch当中,并将交易在pool的状态置为 Selected.

finalizer

finalizer 主要获取交易,构建Batch.

// Start starts the finalizer.
func (f *finalizer) Start(ctx context.Context, batch *WipBatch, processingReq *state.ProcessRequest) {
   var err error
   if batch != nil {
      f.batch = batch
   } else {
      f.batch, err = f.dbManager.GetWIPBatch(ctx)
      if err != nil {
         log.Fatalf("failed to get work-in-progress batch from DB, Err: %s", err)
      }
   }

   if processingReq != nil {
      f.processRequest = *processingReq
   } else {
      f.processRequest, err = f.prepareProcessRequestFromState(ctx, false)
      if err != nil {
         log.Errorf("failed to prepare process request from state, Err: %s", err)
      }
   }

   // Closing signals receiver
   go f.listenForClosingSignals(ctx)

   // Processing transactions and finalizing batches
   f.finalizeBatches(ctx)     // 固化 batch
}

tryToSendSequences

聚合多个生成的batch,组成sequences,

    go func() {
        for {
            s.tryToSendSequence(ctx, tickerSendSequence)
        }
    }()

构建monitored tx, 保存在 state.monitored_txs 表中,状态为: status: MonitoredTxStatusCreated。

参考

https://github.com/0xPolygonHermez/zkevm-node

上一篇 下一篇

猜你喜欢

热点阅读