Polygon zkEVM Node 梳理
2023-03-26 本文已影响0人
雪落无留痕
jsonrpc 模块
用户通过jsonrpc接口广播交易,对于Non-Sequencer 节点,会将交易转发给Sequencer; 对于Sequencer节点,会将交易添加到交易池中。
// SendRawTransaction has two different ways to handle new transactions:
// - for Sequencer nodes it tries to add the tx to the pool
// - for Non-Sequencer nodes it relays the Tx to the Sequencer node
func (e *EthEndpoints) SendRawTransaction(input string) (interface{}, rpcError) {
if e.cfg.SequencerNodeURI != "" {
return e.relayTxToSequencerNode(input)
} else {
return e.tryToAddTxToPool(input)
}
}
pool 模块
pool 模块会对交易进行验验,交易状态置为pending 状态,
// AddTx adds a transaction to the pool with the pending state
func (p *Pool) AddTx(ctx context.Context, tx types.Transaction) error {
if err := p.validateTx(ctx, tx); err != nil {
return err
}
poolTx := Transaction{
Transaction: tx,
Status: TxStatusPending,
IsClaims: false,
ReceivedAt: time.Now(),
}
// 检查是否bridge claim 交易
poolTx.IsClaims = poolTx.IsClaimTx(p.l2BridgeAddr, p.cfg.FreeClaimGasLimit)
// Execute transaction to calculate its zkCounters
zkCounters, err := p.PreExecuteTx(ctx, tx)
if err == nil {
poolTx.ZKCounters = zkCounters
}
if executor.IsExecutorOutOfCountersError(executor.ExecutorErrorCode(err)) {
return err
}
return p.storage.AddTx(ctx, poolTx)
}
pool 将交易通过executor 预执行处理后,主要计算ZKCounters, 保存到pool.transaction中,交易结构为:
type Transaction struct {
types.Transaction
Status TxStatus
IsClaims bool
state.ZKCounters
FailedCounter uint64
ReceivedAt time.Time
PreprocessedStateRoot common.Hash
}
Sequencer 横块
dbManager 模块
主要启动两个任务:
// Start stars the dbManager routines
func (d *dbManager) Start() {
go d.loadFromPool()
go d.storeProcessedTxAndDeleteFromPool()
}
loadFromPool
从pool.transaction 读取pending 交易,添加到worker 中。
func (d *dbManager) addTxToWorker(tx pool.Transaction, isClaim bool) error {
txTracker, err := d.worker.NewTxTracker(tx.Transaction, isClaim, tx.ZKCounters)
if err != nil {
return err
}
d.worker.AddTx(d.ctx, txTracker)
return d.txPool.UpdateTxStatus(d.ctx, tx.Hash(), pool.TxStatusWIP) // 将交易置为WIP状态
}
首先构建TxTracker, 结构如下:
// TxTracker is a struct that contains all the tx data needed to be managed by the worker
type TxTracker struct {
Hash common.Hash
HashStr string
From common.Address
FromStr string
Nonce uint64
Gas uint64 // To check if it fits into a batch
GasPrice *big.Int
Cost *big.Int // Cost = Amount + Benefit
Benefit *big.Int // GasLimit * GasPrice
IsClaim bool // Needed to calculate efficiency
BatchResources batchResources // To check if it fits into a batch
Efficiency float64 // tx efficiency
RawTx []byte
}
交易最终添加到effciencyList, 根据efficiency 排序:
// add adds a tx to the efficiencyList
func (e *efficiencyList) add(tx *TxTracker) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
if _, found := e.list[tx.HashStr]; !found {
e.list[tx.HashStr] = tx
e.addSort(tx)
return true
}
return false
}
efficiencyList 结构为:
// efficiencyList represents a list of tx sorted by efficiency
type efficiencyList struct {
list map[string]*TxTracker
sorted []*TxTracker
mutex sync.Mutex
}
storeProcessedTxAndDeleteFromPool
通过txStore
通道获取交易,并添加到batch当中,并将交易在pool的状态置为 Selected.
finalizer
finalizer 主要获取交易,构建Batch.
// Start starts the finalizer.
func (f *finalizer) Start(ctx context.Context, batch *WipBatch, processingReq *state.ProcessRequest) {
var err error
if batch != nil {
f.batch = batch
} else {
f.batch, err = f.dbManager.GetWIPBatch(ctx)
if err != nil {
log.Fatalf("failed to get work-in-progress batch from DB, Err: %s", err)
}
}
if processingReq != nil {
f.processRequest = *processingReq
} else {
f.processRequest, err = f.prepareProcessRequestFromState(ctx, false)
if err != nil {
log.Errorf("failed to prepare process request from state, Err: %s", err)
}
}
// Closing signals receiver
go f.listenForClosingSignals(ctx)
// Processing transactions and finalizing batches
f.finalizeBatches(ctx) // 固化 batch
}
tryToSendSequences
聚合多个生成的batch,组成sequences,
go func() {
for {
s.tryToSendSequence(ctx, tickerSendSequence)
}
}()
构建monitored tx, 保存在 state.monitored_txs 表中,状态为: status: MonitoredTxStatusCreated。