以太坊 miner worker 导读
代码版本:1.8.27,此版本的Miner模块有较大改变,取消了原来的agent模块以及work对象,但是基本逻辑还是一样的。Miner模块的主要执行部分在worker中,Miner对象及其方法主要控制着模块的开关和外部接口。
Miner模块
// Miner creates blocks and searches for proof-of-work values.
type Miner struct {
mux *event.TypeMux
worker *worker
coinbase common.Address
eth Backend
engine consensus.Engine
exitCh chan struct{}
startCh chan common.Address
stopCh chan struct{}
}
miner.update()方法
监听downloader事件,控制着canStart和shouldStart这两个开关,用于抵挡DOS攻击。
1、当监听到downloader的StartEvent事件时,canStart设置为0,表示downloader同步时不可进行挖矿,如果正在挖矿(miner.mining == ture),停止挖矿,同时将shouldStart设置为1,以便下次直接开始挖矿;
2、当监听到downloader的DoneEvent事件或者FailedEvent事件,判断shouldStart是否打开。如果是打开的,则再打开canStart,将shouldStart关闭。此时,将挖矿的控制权完全交给miner.Start()方法。
miner.start()方法很简单,打开shouldstart,设置coinbase,然后启动worker
func (miner *Miner) Start(coinbase common.Address) {
miner.startCh <- coinbase
}
func (miner *Miner) update() {
...
case addr := <-miner.startCh:
miner.SetEtherbase(addr)
if canStart {
miner.worker.start()
}
shouldStart = true
...
}
Worker模块
下图是miner主要的流程图,清晰的说明了worker的工作原理
minerWorker的数据结构如下,比较重要的已经注释:
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
config *Config
chainConfig *params.ChainConfig
engine consensus.Engine //共识引擎
eth Backend //以太坊终端
chain *core.BlockChain //区块链对象
// Feeds
pendingLogsFeed event.Feed
// Subscriptions
mux *event.TypeMux
txsCh chan core.NewTxsEvent //交易池更新事件
txsSub event.Subscription
chainHeadCh chan core.ChainHeadEvent //区块头更新事件
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent //区块链分叉事件
chainSideSub event.Subscription
// Channels
newWorkCh chan *newWorkReq
taskCh chan *task
resultCh chan *types.Block
startCh chan struct{}
exitCh chan struct{}
resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust
current *environment // An environment for current running cycle.当前挖矿生命周期的执行环境
localUncles map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks. 本地分叉区块作为潜在叔块
remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. 分叉区块链中潜在的叔块
unconfirmed *unconfirmedBlocks // A set of locally mined blocks pending canonicalness confirmations. 本地产生但尚未被确认的区块
mu sync.RWMutex // The lock used to protect the coinbase and extra fields
coinbase common.Address
extra []byte
pendingMu sync.RWMutex
pendingTasks map[common.Hash]*task //挖矿任务map
snapshotMu sync.RWMutex // The lock used to protect the snapshots below
snapshotBlock *types.Block//快照的区块
snapshotReceipts types.Receipts
snapshotState *state.StateDB//快照的状态
// atomic status counters
running int32 // The indicator whether the consensus engine is running or not. 判断引擎是否启动
newTxs int32 // New arrival transaction count since last sealing work submitting. 记录上次递交任务后新来的区块数量
// noempty is the flag used to control whether the feature of pre-seal empty
// block is enabled. The default value is false(pre-seal is enabled by default).
// But in some special scenario the consensus engine will seal blocks instantaneously,
// in this case this feature will add all empty blocks into canonical chain
// non-stop and no real transaction will be included.
noempty uint32
// External functions
isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner.
// Test hooks
newTaskHook func(*task) // Method to call upon receiving a new sealing task.
skipSealHook func(*task) bool // Method to decide whether skipping the sealing.
fullTaskHook func() // Method to call before pushing the full sealing task.
resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
}
在初始化miner的时候,会新建worker,调用newWorker( ),该方法首先配置了worker对象,然后订阅了交易池事件、规范链更新事件和分叉事件,启动4个goroutine,最后通过向startCh中传入一个struct{}{},直接进入newWorkerLoop的逻辑。
func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
worker := &worker{
config: config,
chainConfig: chainConfig,
engine: engine,
eth: eth,
mux: mux,
chain: eth.BlockChain(),
isLocalBlock: isLocalBlock,
localUncles: make(map[common.Hash]*types.Block),
remoteUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
taskCh: make(chan *task),
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
}
// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
// Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
// Sanitize recommit interval if the user-specified one is too short.
recommit := worker.config.Recommit
if recommit < minRecommitInterval {
log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
recommit = minRecommitInterval
}
go worker.mainLoop()
go worker.newWorkLoop(recommit)
go worker.resultLoop()
go worker.taskLoop()
// Submit first work to initialize pending state.
if init {
worker.startCh <- struct{}{}
}
return worker
}
NewWorkLoop
newWorkLoop主要监听两个重要的通道,一个是startCh通道,一个是chainHeadCh,这两个通道均用于清理特定父区块的pengding tasks列表,然后递交基于父区块的挖矿task。区别在于startCh通道启动是基于当前的currentBlock,而chainHeadCh是基于新传来的区块头。
// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
func (w *worker) newWorkLoop(recommit time.Duration) {
...
for {
select {
case <-w.startCh:
clearPending(w.chain.CurrentBlock().NumberU64())
timestamp = time.Now().Unix()
commit(false, commitInterruptNewHead)
case head := <-w.chainHeadCh:
clearPending(head.Block.NumberU64())
timestamp = time.Now().Unix()
commit(false, commitInterruptNewHead)
case <-timer.C: ...
case interval := <-w.resubmitIntervalCh: ...
case adjust := <-w.resubmitAdjustCh: ...
case <-w.exitCh:
return
}
}
}
清理残留挖矿任务后,构建新的挖矿任务,这时候调用commit函数,构建一个newWorkReq对象,传入newWorkCh通道,进入MainLoop协程。MainLoop()监听三个重要的通道,newWorkCh(新work请求通道)、txsCh(交易池更新事件通道)以及chainSideCh(区块链分叉事件通道)
MainLoop
// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
func (w *worker) mainLoop() {
defer w.txsSub.Unsubscribe()
defer w.chainHeadSub.Unsubscribe()
defer w.chainSideSub.Unsubscribe()
for {
select {
//task1:直接启动commitNetwork, 进一步提交挖矿task
case req := <-w.newWorkCh:
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
// task2:出现分叉后,处理叔块
case ev := <-w.chainSideCh:
// Short circuit for duplicate side blocks 检验该hash的区块是否已经被当作潜在叔块,如果是,则忽略
if _, exist := w.localUncles[ev.Block.Hash()]; exist {
continue
}
if _, exist := w.remoteUncles[ev.Block.Hash()]; exist {
continue
}
// Add side block to possible uncle block set depending on the author.
//将该区块作为潜在叔块加入叔块map,key为该区块的矿工地址
if w.isLocalBlock != nil && w.isLocalBlock(ev.Block) {
w.localUncles[ev.Block.Hash()] = ev.Block
} else {
w.remoteUncles[ev.Block.Hash()] = ev.Block
}
// If our mining block contains less than 2 uncle blocks,
// add the new uncle block if valid and regenerate a mining block.
// 如果我们正在mining的区块少于两个叔块,则添加新的叔块并从新生成mining blocks
if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
...
}
//task3: 交易池更新后
case ev := <-w.txsCh: ...
// System stopped
case <-w.exitCh:
return
case <-w.txsSub.Err():
return
case <-w.chainHeadSub.Err():
return
case <-w.chainSideSub.Err():
return
}
}
}
接着上面的的流程,newWorkCh通道传出req后,直接启动commitNewWork()函数。
commitNewWork()
方法主要的功能是递交一个新的task:
- 初始化一个新区块头给待挖矿的区块;
- 为当前挖矿周期初始化一个工作环境work;
- 获取交易池中每个账户地址的交易列表中的第一个交易后排序,然后应用这些交易;
- 获取两个叔块;
- 将区块递交给commit,用于生成task;
- 更新状态快照,供前端查询;
最后是commit方法计算挖矿奖励,更新block,将上面生成的block递交到一个挖矿task,最后将task传入taskCh通道。
// commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running.
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
// Deep copy receipts here to avoid interaction between different tasks.
receipts := copyReceipts(w.current.receipts)
s := w.current.state.Copy()
// 计算挖矿奖励(包括叔块奖励)
block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts)
if err != nil {
return err
}
if w.isRunning() {
if interval != nil {
interval()
}
select {
// 生成task,传入taskCh通道:
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
w.unconfirmed.Shift(block.NumberU64() - 1)
...
case <-w.exitCh:
log.Info("Worker has exited")
}
}
if update {
w.updateSnapshot()
}
return nil
}
TaskLoop
task进入taskLoop后,被加入pendingTasks列表:
for {
select {
case task := <-w.taskCh:
if w.newTaskHook != nil {
w.newTaskHook(task)
}
// Reject duplicate sealing work due to resubmitting.
// 计算header数据的RLP hash值,判断是否有相同的块已经在挖矿中了,如果是则放弃,否则终止之前的挖矿
sealHash := w.engine.SealHash(task.block.Header())
if sealHash == prev {
continue
}
// Interrupt previous sealing operation
interrupt()
stopCh, prev = make(chan struct{}), sealHash
if w.skipSealHook != nil && w.skipSealHook(task) {
continue
}
w.pendingMu.Lock()
w.pendingTasks[sealHash] = task
w.pendingMu.Unlock()
// 最后执行挖矿,结果会通过resuletCh传入resultLoop
if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
log.Warn("Block sealing failed", "err", err)
}
case <-w.exitCh:
interrupt()
return
}
}
resultLoop
最后是resultLoop,挖矿结果传入resultLoop,先从pengdingTasks列表中取出刚执行挖矿的task,更新收据日志中的blockHash,然后将区块存入数据库,最后将区块广播出去。
commitTransaction() 交易执行
- 设置gaspool:
- 进入交易执行循环
在for循环中,会有三种情况会被打断:a、交易还在执行,但是新的区块已经经过广播到达本地,interrupt信号为1;b、worker start 或者restart,interrupt信号为1;c、worker重新构造区块,包含了新到的交易,interrupt信号为2。
对于前两种,worker的本次执行就终止,当对于第三种情况,本次执行依然会被提交到consensus engine
- 如果区块工作环境剩余gas小于21000,则推出循环,否则从排好序的列表离取出交易;
- 执行交易并处理错误:w.commitTransaction()
// Start executing the transaction
// 首先准备当前的世界状态
w.current.state.Prepare(tx.Hash(), w.current.tcount)
// 调用交易执行的方法,core.ApplyTransaction,得到收据并放入当前的执行环境
logs, err := w.commitTransaction(tx, coinbase)
switch {
case errors.Is(err, core.ErrGasLimitReached):
// gasPool不够执行交易,则当前交易从trxs中移除
// Pop the current out-of-gas transaction without shifting in the next from the account
log.Trace("Gas limit exceeded for current block", "sender", from)
txs.Pop()
case errors.Is(err, core.ErrNonceTooLow):
// 交易nonce太低,则取下一个交易替换处理列表中的第一个交易
// New head notification data race between the transaction pool and miner, shift
log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
txs.Shift()
case errors.Is(err, core.ErrNonceTooHigh):
// 交易nonce太高,则将当前交易从trxs列表中移除
// Reorg notification data race between the transaction pool and miner, skip account =
log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
txs.Pop()
case errors.Is(err, nil):
// 一切正常,收集日志,统计执行成功的交易计数
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
w.current.tcount++
txs.Shift()
case errors.Is(err, core.ErrTxTypeNotSupported):
// Pop the unsupported transaction without shifting in the next from the account
log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type())
txs.Pop()
default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
txs.Shift()
}
Core.ApplyTransaction 执行交易的入口,将交易送入太坊虚拟机执行
imageApplyTransaction函数
该函数的调用有两种情况:
-
是在将区块插入区块链前需要验证区块合法性
bc.insertChain-->bc.processor.Process-->stateProcessor.Process -->ApplyTransaction
-
是worker挖矿过程中执行交易时
Worker.commitTransaction ——> ApplyTransaction
主要功能是:将交易转化成Message,创建EVM对象,调用ApplyMessage执行交易,生成日志对象;
- 将交易转换成Message;
- 初始化一个EVM的执行环境;
- 执行交易,改变stateDB世界状态,然后生成收据;
func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, blockNumber *big.Int, blockHash common.Hash, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, error) {
// Create a new context to be used in the EVM environment.
txContext := NewEVMTxContext(msg)
evm.Reset(txContext, statedb)
// Apply the transaction to the current state (included in the env).
result, err := ApplyMessage(evm, msg, gp)
if err != nil {
return nil, err
}
// Update the state with pending changes.
var root []byte
if config.IsByzantium(blockNumber) {
statedb.Finalise(true)
} else {
root = statedb.IntermediateRoot(config.IsEIP158(blockNumber)).Bytes()
}
*usedGas += result.UsedGas
// Create a new receipt for the transaction, storing the intermediate root and gas used
// by the tx.
receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas}
if result.Failed() {
receipt.Status = types.ReceiptStatusFailed
} else {
receipt.Status = types.ReceiptStatusSuccessful
}
receipt.TxHash = tx.Hash()
receipt.GasUsed = result.UsedGas
// If the transaction created a contract, store the creation address in the receipt.
if msg.To() == nil {
receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce())
}
// Set the receipt logs and create the bloom filter.
receipt.Logs = statedb.GetLogs(tx.Hash(), blockHash)
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
receipt.BlockHash = blockHash
receipt.BlockNumber = blockNumber
receipt.TransactionIndex = uint(statedb.TxIndex())
return receipt, err
}
ethash 挖矿
POW的本质是基于算力解决一个数学上困难的问题,解决问题的关键点除了暴力枚举,没有任何办法可以找到我们所需要的nonce值,但对于验证输出的结果是非常简单容易的。
经典的比特币POW的算法原理是对block的header加上循环更新的nonce去进行hash运算,运算的target是hash值的前n位为0,这个计算只能通过暴力枚举来进行,验证也很容易,只要使用最终的nonce打入header按照之前的算法验证即可。
以太坊采用的ethash算法与比特币不同,但基本类似,都是找到一个nonce值输入到算法中,得到的结果低于一个基于特定困难值的阈值。
RAND(h,n) <= M/d
RAND 是一系列复杂的运算
h:header 不饱和nonce
n:nonce
M:2^256
d: 难度值 ,该难度值给予父区块的时间戳和难度而得到
如上所示,我们用header和nonce经过复杂的计算,如果得到的结果小于或者等于M/d,该nonce就是可用的,意味着挖矿成功。
下图是ethash算法在以太坊源码中的实现
aa
Eth.Seal方法
主要任务是:
- 获得种子seed;
- 基于seed获得Rand对象,rand值将作为初始化nonce进行挖矿;
- 启动mine方法,执行挖矿;
// Seal implements consensus.Engine, attempting to find a nonce that satisfies
// the block's difficulty requirements.
func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
// If we're running a fake PoW, simply return a 0 nonce immediately
if ethash.config.PowMode == ModeFake || ethash.config.PowMode == ModeFullFake {
header := block.Header()
header.Nonce, header.MixDigest = types.BlockNonce{}, common.Hash{}
select {
case results <- block.WithSeal(header):
default:
ethash.config.Log.Warn("Sealing result is not read by miner", "mode", "fake", "sealhash", ethash.SealHash(block.Header()))
}
return nil
}
// If we're running a shared PoW, delegate sealing to it
if ethash.shared != nil {
return ethash.shared.Seal(chain, block, results, stop)
}
// Create a runner and the multiple search threads it directs
abort := make(chan struct{})
ethash.lock.Lock()
// 挖矿的线程
threads := ethash.threads
if ethash.rand == nil {
seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
if err != nil {
ethash.lock.Unlock()
return err
}
// 生成rand对象
ethash.rand = rand.New(rand.NewSource(seed.Int64()))
}
ethash.lock.Unlock()
if threads == 0 {
threads = runtime.NumCPU()
}
if threads < 0 {
threads = 0 // Allows disabling local mining without extra logic around local/remote
}
// Push new work to remote sealer
if ethash.remote != nil {
ethash.remote.workCh <- &sealTask{block: block, results: results}
}
var (
pend sync.WaitGroup
locals = make(chan *types.Block)
)
// 闭包多线程处理
for i := 0; i < threads; i++ {
pend.Add(1)
go func(id int, nonce uint64) {
defer pend.Done()
ethash.mine(block, id, nonce, abort, locals)
}(i, uint64(ethash.rand.Int63()))
}
// Wait until sealing is terminated or a nonce is found
go func() {
var result *types.Block
select {
case <-stop:
// Outside abort, stop all miner threads
close(abort)
case result = <-locals:
// One of the threads found a block, abort all others
select {
case results <- result:
default:
ethash.config.Log.Warn("Sealing result is not read by miner", "mode", "local", "sealhash", ethash.SealHash(block.Header()))
}
close(abort)
case <-ethash.update:
// Thread count was changed on user request, restart
close(abort)
if err := ethash.Seal(chain, block, results, stop); err != nil {
ethash.config.Log.Error("Failed to restart sealing after update", "err", err)
}
}
// Wait for all miners to terminate and return the block
pend.Wait()
}()
return nil
}
mine方法
主要任务:
- 取出block的header;
- 取出没有nonce时的区块hash;
- 设置目标target,M/td;
- 获得dataset数据集;
- 开启无限循环,计算每一轮的nonce值的POW结果,直到获得满足条件的解;
补充:DAG和epoch
- 上面的dataset就来自内存中的一组数据或者硬盘里的DAG。
- DAG是有向无环图,以太坊的DAG是基于区块高度生成的。
- 以太坊中每3万个块会生成一代DAG,这一代就成称为一个epoch。
- 挖矿的时候需要从DAG中随机选取dataset,所以挖矿工作只能在现世DAG创建以后才能开始。
参考:
1.以太坊源码解读
2.go-ethereum-code-analysis