以太坊 ethereum

以太坊 miner worker 导读

2021-08-29  本文已影响0人  walker_1992

代码版本:1.8.27,此版本的Miner模块有较大改变,取消了原来的agent模块以及work对象,但是基本逻辑还是一样的。Miner模块的主要执行部分在worker中,Miner对象及其方法主要控制着模块的开关和外部接口。

Miner模块

// Miner creates blocks and searches for proof-of-work values.
type Miner struct {
    mux      *event.TypeMux
    worker   *worker
    coinbase common.Address
    eth      Backend
    engine   consensus.Engine
    exitCh   chan struct{}
    startCh  chan common.Address
    stopCh   chan struct{}
}

miner.update()方法

监听downloader事件,控制着canStart和shouldStart这两个开关,用于抵挡DOS攻击。

1、当监听到downloader的StartEvent事件时,canStart设置为0,表示downloader同步时不可进行挖矿,如果正在挖矿(miner.mining == ture),停止挖矿,同时将shouldStart设置为1,以便下次直接开始挖矿;

2、当监听到downloader的DoneEvent事件或者FailedEvent事件,判断shouldStart是否打开。如果是打开的,则再打开canStart,将shouldStart关闭。此时,将挖矿的控制权完全交给miner.Start()方法。

miner.start()方法很简单,打开shouldstart,设置coinbase,然后启动worker

func (miner *Miner) Start(coinbase common.Address) {
    miner.startCh <- coinbase
}

func (miner *Miner) update() {
...
        case addr := <-miner.startCh:
            miner.SetEtherbase(addr)
            if canStart {
                miner.worker.start()
            }
            shouldStart = true
...
}

Worker模块

下图是miner主要的流程图,清晰的说明了worker的工作原理

miner

Worker的数据结构如下,比较重要的已经注释:

// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
    config      *Config
    chainConfig *params.ChainConfig
    engine      consensus.Engine //共识引擎
    eth         Backend                    //以太坊终端
    chain       *core.BlockChain   //区块链对象

    // Feeds
    pendingLogsFeed event.Feed

    // Subscriptions
    mux          *event.TypeMux
    txsCh        chan core.NewTxsEvent //交易池更新事件
    txsSub       event.Subscription
    chainHeadCh  chan core.ChainHeadEvent //区块头更新事件
    chainHeadSub event.Subscription
    chainSideCh  chan core.ChainSideEvent //区块链分叉事件
    chainSideSub event.Subscription

    // Channels
    newWorkCh          chan *newWorkReq
    taskCh             chan *task
    resultCh           chan *types.Block
    startCh            chan struct{}
    exitCh             chan struct{}
    resubmitIntervalCh chan time.Duration
    resubmitAdjustCh   chan *intervalAdjust

    current      *environment                 // An environment for current running cycle.当前挖矿生命周期的执行环境
    localUncles  map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks. 本地分叉区块作为潜在叔块
    remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. 分叉区块链中潜在的叔块
    unconfirmed  *unconfirmedBlocks           // A set of locally mined blocks pending canonicalness confirmations. 本地产生但尚未被确认的区块

    mu       sync.RWMutex // The lock used to protect the coinbase and extra fields
    coinbase common.Address
    extra    []byte

    pendingMu    sync.RWMutex
    pendingTasks map[common.Hash]*task //挖矿任务map

    snapshotMu       sync.RWMutex // The lock used to protect the snapshots below
    snapshotBlock    *types.Block//快照的区块
    snapshotReceipts types.Receipts
    snapshotState    *state.StateDB//快照的状态

    // atomic status counters
    running int32 // The indicator whether the consensus engine is running or not. 判断引擎是否启动
    newTxs  int32 // New arrival transaction count since last sealing work submitting. 记录上次递交任务后新来的区块数量

    // noempty is the flag used to control whether the feature of pre-seal empty
    // block is enabled. The default value is false(pre-seal is enabled by default).
    // But in some special scenario the consensus engine will seal blocks instantaneously,
    // in this case this feature will add all empty blocks into canonical chain
    // non-stop and no real transaction will be included.
    noempty uint32

    // External functions
    isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner.

    // Test hooks
    newTaskHook  func(*task)                        // Method to call upon receiving a new sealing task.
    skipSealHook func(*task) bool                   // Method to decide whether skipping the sealing.
    fullTaskHook func()                             // Method to call before pushing the full sealing task.
    resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
}

在初始化miner的时候,会新建worker,调用newWorker( ),该方法首先配置了worker对象,然后订阅了交易池事件、规范链更新事件和分叉事件,启动4个goroutine,最后通过向startCh中传入一个struct{}{},直接进入newWorkerLoop的逻辑。

func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
    worker := &worker{
        config:             config,
        chainConfig:        chainConfig,
        engine:             engine,
        eth:                eth,
        mux:                mux,
        chain:              eth.BlockChain(),
        isLocalBlock:       isLocalBlock,
        localUncles:        make(map[common.Hash]*types.Block),
        remoteUncles:       make(map[common.Hash]*types.Block),
        unconfirmed:        newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
        pendingTasks:       make(map[common.Hash]*task),
        txsCh:              make(chan core.NewTxsEvent, txChanSize),
        chainHeadCh:        make(chan core.ChainHeadEvent, chainHeadChanSize),
        chainSideCh:        make(chan core.ChainSideEvent, chainSideChanSize),
        newWorkCh:          make(chan *newWorkReq),
        taskCh:             make(chan *task),
        resultCh:           make(chan *types.Block, resultQueueSize),
        exitCh:             make(chan struct{}),
        startCh:            make(chan struct{}, 1),
        resubmitIntervalCh: make(chan time.Duration),
        resubmitAdjustCh:   make(chan *intervalAdjust, resubmitAdjustChanSize),
    }
    // Subscribe NewTxsEvent for tx pool
    worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
    // Subscribe events for blockchain
    worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
    worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)

    // Sanitize recommit interval if the user-specified one is too short.
    recommit := worker.config.Recommit
    if recommit < minRecommitInterval {
        log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
        recommit = minRecommitInterval
    }

    go worker.mainLoop()
    go worker.newWorkLoop(recommit)
    go worker.resultLoop()
    go worker.taskLoop()

    // Submit first work to initialize pending state.
    if init {
        worker.startCh <- struct{}{}
    }
    return worker
}

NewWorkLoop

newWorkLoop主要监听两个重要的通道,一个是startCh通道,一个是chainHeadCh,这两个通道均用于清理特定父区块的pengding tasks列表,然后递交基于父区块的挖矿task。区别在于startCh通道启动是基于当前的currentBlock,而chainHeadCh是基于新传来的区块头。

// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
func (w *worker) newWorkLoop(recommit time.Duration) {
...
    for {
        select {
        case <-w.startCh:
            clearPending(w.chain.CurrentBlock().NumberU64())
            timestamp = time.Now().Unix()
            commit(false, commitInterruptNewHead)

        case head := <-w.chainHeadCh:
            clearPending(head.Block.NumberU64())
            timestamp = time.Now().Unix()
            commit(false, commitInterruptNewHead)

        case <-timer.C: ...

        case interval := <-w.resubmitIntervalCh: ...

        case adjust := <-w.resubmitAdjustCh: ...

        case <-w.exitCh:
            return
        }
    }
}

清理残留挖矿任务后,构建新的挖矿任务,这时候调用commit函数,构建一个newWorkReq对象,传入newWorkCh通道,进入MainLoop协程。MainLoop()监听三个重要的通道,newWorkCh(新work请求通道)、txsCh(交易池更新事件通道)以及chainSideCh(区块链分叉事件通道)

MainLoop

// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
func (w *worker) mainLoop() {
    defer w.txsSub.Unsubscribe()
    defer w.chainHeadSub.Unsubscribe()
    defer w.chainSideSub.Unsubscribe()

    for {
        select {
                //task1:直接启动commitNetwork,  进一步提交挖矿task
        case req := <-w.newWorkCh:
            w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
                // task2:出现分叉后,处理叔块
        case ev := <-w.chainSideCh:
            // Short circuit for duplicate side blocks 检验该hash的区块是否已经被当作潜在叔块,如果是,则忽略
            if _, exist := w.localUncles[ev.Block.Hash()]; exist {
                continue
            }
            if _, exist := w.remoteUncles[ev.Block.Hash()]; exist {
                continue
            }
            // Add side block to possible uncle block set depending on the author.
                        //将该区块作为潜在叔块加入叔块map,key为该区块的矿工地址
            if w.isLocalBlock != nil && w.isLocalBlock(ev.Block) {
                w.localUncles[ev.Block.Hash()] = ev.Block
            } else {
                w.remoteUncles[ev.Block.Hash()] = ev.Block
            }
            // If our mining block contains less than 2 uncle blocks,
            // add the new uncle block if valid and regenerate a mining block.
                        // 如果我们正在mining的区块少于两个叔块,则添加新的叔块并从新生成mining blocks
            if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
...
            }
                //task3: 交易池更新后
        case ev := <-w.txsCh: ...

        // System stopped
        case <-w.exitCh:
            return
        case <-w.txsSub.Err():
            return
        case <-w.chainHeadSub.Err():
            return
        case <-w.chainSideSub.Err():
            return
        }
    }
}

接着上面的的流程,newWorkCh通道传出req后,直接启动commitNewWork()函数。

commitNewWork()

方法主要的功能是递交一个新的task:

  1. 初始化一个新区块头给待挖矿的区块;
  2. 为当前挖矿周期初始化一个工作环境work;
  3. 获取交易池中每个账户地址的交易列表中的第一个交易后排序,然后应用这些交易;
  4. 获取两个叔块;
  5. 将区块递交给commit,用于生成task;
  6. 更新状态快照,供前端查询;

最后是commit方法计算挖矿奖励,更新block,将上面生成的block递交到一个挖矿task,最后将task传入taskCh通道。

// commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running.
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
    // Deep copy receipts here to avoid interaction between different tasks.
    receipts := copyReceipts(w.current.receipts)
    s := w.current.state.Copy()
        // 计算挖矿奖励(包括叔块奖励)
    block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts)
    if err != nil {
        return err
    }
    if w.isRunning() {
        if interval != nil {
            interval()
        }
        select {
                // 生成task,传入taskCh通道:
        case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
            w.unconfirmed.Shift(block.NumberU64() - 1)
...
        case <-w.exitCh:
            log.Info("Worker has exited")
        }
    }
    if update {
        w.updateSnapshot()
    }
    return nil
}

TaskLoop

task进入taskLoop后,被加入pendingTasks列表:

    for {
        select {
        case task := <-w.taskCh:
            if w.newTaskHook != nil {
                w.newTaskHook(task)
            }
            // Reject duplicate sealing work due to resubmitting.
                        // 计算header数据的RLP hash值,判断是否有相同的块已经在挖矿中了,如果是则放弃,否则终止之前的挖矿
            sealHash := w.engine.SealHash(task.block.Header())
            if sealHash == prev {
                continue
            }
            // Interrupt previous sealing operation
            interrupt()
            stopCh, prev = make(chan struct{}), sealHash

            if w.skipSealHook != nil && w.skipSealHook(task) {
                continue
            }
            w.pendingMu.Lock()
            w.pendingTasks[sealHash] = task
            w.pendingMu.Unlock()
                        // 最后执行挖矿,结果会通过resuletCh传入resultLoop
            if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
                log.Warn("Block sealing failed", "err", err)
            }
        case <-w.exitCh:
            interrupt()
            return
        }
    }

resultLoop

最后是resultLoop,挖矿结果传入resultLoop,先从pengdingTasks列表中取出刚执行挖矿的task,更新收据日志中的blockHash,然后将区块存入数据库,最后将区块广播出去。

commitTransaction() 交易执行

  1. 设置gaspool:
  2. 进入交易执行循环

在for循环中,会有三种情况会被打断:a、交易还在执行,但是新的区块已经经过广播到达本地,interrupt信号为1;b、worker start 或者restart,interrupt信号为1;c、worker重新构造区块,包含了新到的交易,interrupt信号为2。

对于前两种,worker的本次执行就终止,当对于第三种情况,本次执行依然会被提交到consensus engine

  1. 如果区块工作环境剩余gas小于21000,则推出循环,否则从排好序的列表离取出交易;
  2. 执行交易并处理错误:w.commitTransaction()
        // Start executing the transaction
                // 首先准备当前的世界状态
        w.current.state.Prepare(tx.Hash(), w.current.tcount)
                // 调用交易执行的方法,core.ApplyTransaction,得到收据并放入当前的执行环境
        logs, err := w.commitTransaction(tx, coinbase)
        switch {
        case errors.Is(err, core.ErrGasLimitReached):
                        // gasPool不够执行交易,则当前交易从trxs中移除
            // Pop the current out-of-gas transaction without shifting in the next from the account
            log.Trace("Gas limit exceeded for current block", "sender", from)
            txs.Pop()

        case errors.Is(err, core.ErrNonceTooLow):
                        // 交易nonce太低,则取下一个交易替换处理列表中的第一个交易
            // New head notification data race between the transaction pool and miner, shift
            log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
            txs.Shift()

        case errors.Is(err, core.ErrNonceTooHigh):
                        // 交易nonce太高,则将当前交易从trxs列表中移除
            // Reorg notification data race between the transaction pool and miner, skip account =
            log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
            txs.Pop()

        case errors.Is(err, nil):
                        // 一切正常,收集日志,统计执行成功的交易计数
            // Everything ok, collect the logs and shift in the next transaction from the same account
            coalescedLogs = append(coalescedLogs, logs...)
            w.current.tcount++
            txs.Shift()

        case errors.Is(err, core.ErrTxTypeNotSupported):
            // Pop the unsupported transaction without shifting in the next from the account
            log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type())
            txs.Pop()

        default:
            // Strange error, discard the transaction and get the next in line (note, the
            // nonce-too-high clause will prevent us from executing in vain).
            log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
            txs.Shift()
        }

Core.ApplyTransaction 执行交易的入口,将交易送入太坊虚拟机执行

image

ApplyTransaction函数

该函数的调用有两种情况:

  1. 是在将区块插入区块链前需要验证区块合法性
    bc.insertChain-->bc.processor.Process-->stateProcessor.Process -->ApplyTransaction

  2. 是worker挖矿过程中执行交易时
    Worker.commitTransaction ——> ApplyTransaction

主要功能是:将交易转化成Message,创建EVM对象,调用ApplyMessage执行交易,生成日志对象;

  1. 将交易转换成Message;
  2. 初始化一个EVM的执行环境;
  3. 执行交易,改变stateDB世界状态,然后生成收据;
func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, blockNumber *big.Int, blockHash common.Hash, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, error) {
    // Create a new context to be used in the EVM environment.
    txContext := NewEVMTxContext(msg)
    evm.Reset(txContext, statedb)

    // Apply the transaction to the current state (included in the env).
    result, err := ApplyMessage(evm, msg, gp)
    if err != nil {
        return nil, err
    }

    // Update the state with pending changes.
    var root []byte
    if config.IsByzantium(blockNumber) {
        statedb.Finalise(true)
    } else {
        root = statedb.IntermediateRoot(config.IsEIP158(blockNumber)).Bytes()
    }
    *usedGas += result.UsedGas

    // Create a new receipt for the transaction, storing the intermediate root and gas used
    // by the tx.
    receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas}
    if result.Failed() {
        receipt.Status = types.ReceiptStatusFailed
    } else {
        receipt.Status = types.ReceiptStatusSuccessful
    }
    receipt.TxHash = tx.Hash()
    receipt.GasUsed = result.UsedGas

    // If the transaction created a contract, store the creation address in the receipt.
    if msg.To() == nil {
        receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce())
    }

    // Set the receipt logs and create the bloom filter.
    receipt.Logs = statedb.GetLogs(tx.Hash(), blockHash)
    receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
    receipt.BlockHash = blockHash
    receipt.BlockNumber = blockNumber
    receipt.TransactionIndex = uint(statedb.TxIndex())
    return receipt, err
}

ethash 挖矿

POW的本质是基于算力解决一个数学上困难的问题,解决问题的关键点除了暴力枚举,没有任何办法可以找到我们所需要的nonce值,但对于验证输出的结果是非常简单容易的。

经典的比特币POW的算法原理是对block的header加上循环更新的nonce去进行hash运算,运算的target是hash值的前n位为0,这个计算只能通过暴力枚举来进行,验证也很容易,只要使用最终的nonce打入header按照之前的算法验证即可。

以太坊采用的ethash算法与比特币不同,但基本类似,都是找到一个nonce值输入到算法中,得到的结果低于一个基于特定困难值的阈值。

RAND(h,n) <= M/d

RAND 是一系列复杂的运算
h:header 不饱和nonce
n:nonce

M:2^256
d: 难度值 ,该难度值给予父区块的时间戳和难度而得到

如上所示,我们用header和nonce经过复杂的计算,如果得到的结果小于或者等于M/d,该nonce就是可用的,意味着挖矿成功。

下图是ethash算法在以太坊源码中的实现


aa

Eth.Seal方法

主要任务是:

  1. 获得种子seed;
  2. 基于seed获得Rand对象,rand值将作为初始化nonce进行挖矿;
  3. 启动mine方法,执行挖矿;
// Seal implements consensus.Engine, attempting to find a nonce that satisfies
// the block's difficulty requirements.
func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
    // If we're running a fake PoW, simply return a 0 nonce immediately
    if ethash.config.PowMode == ModeFake || ethash.config.PowMode == ModeFullFake {
        header := block.Header()
        header.Nonce, header.MixDigest = types.BlockNonce{}, common.Hash{}
        select {
        case results <- block.WithSeal(header):
        default:
            ethash.config.Log.Warn("Sealing result is not read by miner", "mode", "fake", "sealhash", ethash.SealHash(block.Header()))
        }
        return nil
    }
    // If we're running a shared PoW, delegate sealing to it
    if ethash.shared != nil {
        return ethash.shared.Seal(chain, block, results, stop)
    }
    // Create a runner and the multiple search threads it directs
    abort := make(chan struct{})

    ethash.lock.Lock()
        // 挖矿的线程
    threads := ethash.threads
    if ethash.rand == nil {
        seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
        if err != nil {
            ethash.lock.Unlock()
            return err
        }
                // 生成rand对象
        ethash.rand = rand.New(rand.NewSource(seed.Int64()))
    }
    ethash.lock.Unlock()
    if threads == 0 {
        threads = runtime.NumCPU()
    }
    if threads < 0 {
        threads = 0 // Allows disabling local mining without extra logic around local/remote
    }
    // Push new work to remote sealer
    if ethash.remote != nil {
        ethash.remote.workCh <- &sealTask{block: block, results: results}
    }
    var (
        pend   sync.WaitGroup
        locals = make(chan *types.Block)
    )
        // 闭包多线程处理
    for i := 0; i < threads; i++ {
        pend.Add(1)
        go func(id int, nonce uint64) {
            defer pend.Done()
            ethash.mine(block, id, nonce, abort, locals)
        }(i, uint64(ethash.rand.Int63()))
    }
    // Wait until sealing is terminated or a nonce is found
    go func() {
        var result *types.Block
        select {
        case <-stop:
            // Outside abort, stop all miner threads
            close(abort)
        case result = <-locals:
            // One of the threads found a block, abort all others
            select {
            case results <- result:
            default:
                ethash.config.Log.Warn("Sealing result is not read by miner", "mode", "local", "sealhash", ethash.SealHash(block.Header()))
            }
            close(abort)
        case <-ethash.update:
            // Thread count was changed on user request, restart
            close(abort)
            if err := ethash.Seal(chain, block, results, stop); err != nil {
                ethash.config.Log.Error("Failed to restart sealing after update", "err", err)
            }
        }
        // Wait for all miners to terminate and return the block
        pend.Wait()
    }()
    return nil
}

mine方法

主要任务:

  1. 取出block的header;
  2. 取出没有nonce时的区块hash;
  3. 设置目标target,M/td;
  4. 获得dataset数据集;
  5. 开启无限循环,计算每一轮的nonce值的POW结果,直到获得满足条件的解;

补充:DAG和epoch

  1. 上面的dataset就来自内存中的一组数据或者硬盘里的DAG。
  2. DAG是有向无环图,以太坊的DAG是基于区块高度生成的。
  3. 以太坊中每3万个块会生成一代DAG,这一代就成称为一个epoch。
  4. 挖矿的时候需要从DAG中随机选取dataset,所以挖矿工作只能在现世DAG创建以后才能开始。

参考:
1.以太坊源码解读
2.go-ethereum-code-analysis

上一篇下一篇

猜你喜欢

热点阅读