以太坊挖矿源码分析 miner.go worker.go
2018-10-10 本文已影响106人
hasika
miner.go是挖矿的入口,真正实现类是worker.go。在miner的New方法中初始化了worker.
worker中主要执行了四个循环函数, 出块的主要功能都是在这四个循环中完成的。
涉及到的代码比较多,我先一步一步讲解,读者可以对照后面的源码查看
- newWorker中向startCh管道插入数据
- newWorkLoop方法中的for循环中的startCh管道取出数据
- startCh中执行两块逻辑,1.调用clearPending删除老的任务,以任务中的块高为基准;2.调用commit方法提交一个新的任务。第一个参数为false,后面会用到。
- commit方法向w.newWorkCh管道中插入newWorkReq
- mainLoop方法中的w.newWorkCh拿到数据,调用w.commitNewWork方法
- w.commitNewWork方法中做了以下事情
- 创建区块头
- 新区块头的number等于父区块number+1
- 调用core.CalcGasLimit方法,计算gas限额,代码比较简单,总的思路是如果父区块使用的gas大于父区块gasLimit的2/3,那么当前区块的gasLimit就会增加。
- 设置区块头的Coinbase,为出块奖励矿工做准备
- 设置出块难度
- calcDifficultyHomestead 出块难度会以10万为单位,呈指数级增加
- 设置是否支持DAO事件,header.Extra
- 设置叔块makeCurrent
- 创建env
- 设置祖先和家族,这里还没有设置该区块的叔块
- 出块 w.commit
- w.engine.Finalize
- accumulateRewards 计算出块奖励,包括当前块和叔块,并写入到世界状态中
- state.IntermediateRoot 设置最终状态header.Root
- 创建Block,设置Block的交易信息, 叔块信息,header中的txHash,ReceiptHash,UncelHash
- w.taskCh <-&task{receipts: receipts, state: s, block: block, createdAt: time.Now()}: 向taskCh中添加任务
- w.engine.Finalize
- 创建区块头
- taskLoop中的task := <-w.taskCh拿到数据,处理逻辑
- w.engine.SealHash(task.block.Header()) 计算header数据的RLP hash值,判断是否有相同的块已经在挖矿中了
- w.engine.Seal(w.chain, task.block, w.resultCh, stopCh) 传入w.resultCh管道,如果校验没问题,会向w.resultCh中插入数据
- ethash.mine(block, id, nonce, abort, locals) 挖矿,挖矿成功后,会向参数中的locals管道中写入数据
- hashimotoFull调用这个方法计算hash值,每次循环nonce++
- 当取到合法的hash值时,重新组装区块,并插入到found也就是locals管道中
- result = <-locals 读取管道中的数据,管道中的数据在插入到Seal中参数resultCh管道中
- ethash.mine(block, id, nonce, abort, locals) 挖矿,挖矿成功后,会向参数中的locals管道中写入数据
- resultLoop中的block := <-w.resultCh:拿到数据
- stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)将区块写到区块链中
- w.mux.Post(core.NewMinedBlockEvent{Block: block}) 向其他节点广播区块
- w.chain.PostChainEvents(events, logs) 为了节约资源,向其他节点广播区块头等简单信息
func newWorker(...) {
...
go worker.mainLoop()
go worker.newWorkLoop(recommit)
go worker.resultLoop()
go worker.taskLoop()
//开始出块
worker.startCh <- struct{}{}
}
func (w *worker) newWorkLoop(recommit time.Duration) {
//提交一个新的任务
commit := func(noempty bool, s int32) {
...
w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}
...
}
//删除老的任务,以任务中的块高以标准,staleThreshold = 7
clearPending := func(number uint64) {
w.pendingMu.Lock()
for h, t := range w.pendingTasks {
if t.block.NumberU64()+staleThreshold <= number {
delete(w.pendingTasks, h)
}
}
w.pendingMu.Unlock()
}
for {
select {
case <-w.startCh:
//清除老的块
clearPending(w.chain.CurrentBlock().NumberU64())
timestamp = time.Now().Unix()
//提交一个新的任务
commit(false, commitInterruptNewHead)
...
}
}
func (w *worker) mainLoop() {
...
for {
select {
case req := <-w.newWorkCh:
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
...
}
}
// commitNewWork generates several new sealing tasks based on the parent block.
func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
...
//创建区块头
num := parent.Number()
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
GasLimit: core.CalcGasLimit(parent, w.gasFloor, w.gasCeil),
Extra: w.extra,
Time: big.NewInt(timestamp),
}
// Only set the coinbase if our consensus engine is running (avoid spurious block rewards)
if w.isRunning() {
if w.coinbase == (common.Address{}) {
log.Error("Refusing to mine without etherbase")
return
}
// 设置区块头的Coinbase,为出块奖励矿工做准备
header.Coinbase = w.coinbase
}
//设置出块难度
if err := w.engine.Prepare(w.chain, header); err != nil {
log.Error("Failed to prepare header for mining", "err", err)
return
}
// If we are care about TheDAO hard-fork check whether to override the extra-data or not
//是否支持DAO事件硬分叉
if daoBlock := w.config.DAOForkBlock; daoBlock != nil {
// Check whether the block is among the fork extra-override range
limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
// Depending whether we support or oppose the fork, override differently
if w.config.DAOForkSupport {
header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
} else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
}
}
}
// Could potentially happen if starting to mine in an odd state.
err := w.makeCurrent(parent, header)
if err != nil {
log.Error("Failed to create mining context", "err", err)
return
}
// Create the current work task and check any fork transitions needed
env := w.current
if w.config.DAOForkSupport && w.config.DAOForkBlock != nil && w.config.DAOForkBlock.Cmp(header.Number) == 0 {
misc.ApplyDAOHardFork(env.state)
}
// Accumulate the uncles for the current block
//删除老的块
for hash, uncle := range w.possibleUncles {
if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() {
delete(w.possibleUncles, hash)
}
}
uncles := make([]*types.Header, 0, 2)
for hash, uncle := range w.possibleUncles {
if len(uncles) == 2 {
break
}
//校验一些参数,提交叔块
if err := w.commitUncle(env, uncle.Header()); err != nil {
log.Trace("Possible uncle rejected", "hash", hash, "reason", err)
} else {
log.Debug("Committing new uncle to block", "hash", hash)
uncles = append(uncles, uncle.Header())
}
}
//可以执行 noempty=false
if !noempty {
// Create an empty block based on temporary copied state for sealing in advance without waiting block
// execution finished.
//出块
w.commit(uncles, nil, false, tstart)
}
// Fill the block with all available pending transactions.
pending, err := w.eth.TxPool().Pending()
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
// Short circuit if there is no available pending transactions
if len(pending) == 0 {
w.updateSnapshot()
return
}
// Split the pending transactions into locals and remotes
localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
for _, account := range w.eth.TxPool().Locals() {
if txs := remoteTxs[account]; len(txs) > 0 {
delete(remoteTxs, account)
localTxs[account] = txs
}
}
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs)
if w.commitTransactions(txs, w.coinbase, interrupt) {
return
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs)
if w.commitTransactions(txs, w.coinbase, interrupt) {
return
}
}
w.commit(uncles, w.fullTaskHook, true, tstart)
}
// makeCurrent creates a new environment for the current cycle.
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
state, err := w.chain.StateAt(parent.Root())
if err != nil {
return err
}
env := &environment{
signer: types.NewEIP155Signer(w.config.ChainID),
state: state,
ancestors: mapset.NewSet(),
family: mapset.NewSet(),
uncles: mapset.NewSet(),
header: header,
}
// when 08 is processed ancestors contain 07 (quick block)
//数7个祖先
for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) {
for _, uncle := range ancestor.Uncles() {
//将叔块添加到family中
env.family.Add(uncle.Hash())
}
//将祖先添加到family中
env.family.Add(ancestor.Hash())
//将祖先添加到祖先中
env.ancestors.Add(ancestor.Hash())
}
...
}
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
// Deep copy receipts here to avoid interaction between different tasks.
// 深度复制收据
receipts := make([]*types.Receipt, len(w.current.receipts))
for i, l := range w.current.receipts {
receipts[i] = new(types.Receipt)
*receipts[i] = *l
}
s := w.current.state.Copy()
//出块
block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts)
if err != nil {
return err
}
if w.isRunning() {
if interval != nil {
interval()
}
select {
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
w.unconfirmed.Shift(block.NumberU64() - 1)
feesWei := new(big.Int)
for i, tx := range block.Transactions() {
feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice()))
}
feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
"uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))
case <-w.exitCh:
log.Info("Worker has exited")
}
}
if update {
w.updateSnapshot()
}
return nil
}
func (ethash *Ethash) Finalize(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) {
// Accumulate any block and uncle rewards and commit the final state root
//计算出块奖励,包括当前块和叔块,并写入到世界状态中
accumulateRewards(chain.Config(), state, header, uncles)
header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))
// Header seems complete, assemble into a block and return
return types.NewBlock(header, txs, uncles, receipts), nil
}
func accumulateRewards(config *params.ChainConfig, state *state.StateDB, header *types.Header, uncles []*types.Header) {
// Select the correct block reward based on chain progression
blockReward := FrontierBlockReward
if config.IsByzantium(header.Number) {
blockReward = ByzantiumBlockReward
}
// Accumulate the rewards for the miner and any included uncles
reward := new(big.Int).Set(blockReward)
r := new(big.Int)
//计算每个叔块的奖励并将奖励添加到叔块的账户中
for _, uncle := range uncles {
r.Add(uncle.Number, big8)
r.Sub(r, header.Number)
r.Mul(r, blockReward)
r.Div(r, big8)
state.AddBalance(uncle.Coinbase, r)
//将叔块的一部分奖励添加到矿工账号中
r.Div(blockReward, big32)
reward.Add(reward, r)
}
state.AddBalance(header.Coinbase, reward)
}
func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []*Receipt) *Block {
b := &Block{header: CopyHeader(header), td: new(big.Int)}
// TODO: panic if len(txs) != len(receipts)
if len(txs) == 0 {
b.header.TxHash = EmptyRootHash
} else {
b.header.TxHash = DeriveSha(Transactions(txs))
b.transactions = make(Transactions, len(txs))
copy(b.transactions, txs)
}
if len(receipts) == 0 {
b.header.ReceiptHash = EmptyRootHash
} else {
b.header.ReceiptHash = DeriveSha(Receipts(receipts))
b.header.Bloom = CreateBloom(receipts)
}
if len(uncles) == 0 {
b.header.UncleHash = EmptyUncleHash
} else {
b.header.UncleHash = CalcUncleHash(uncles)
b.uncles = make([]*Header, len(uncles))
for i := range uncles {
b.uncles[i] = CopyHeader(uncles[i])
}
}
return b
}
func (w *worker) taskLoop() {
...
for {
select {
case task := <-w.taskCh:
if w.newTaskHook != nil {
w.newTaskHook(task)
}
// Reject duplicate sealing work due to resubmitting.
// 计算header的RLP编码的hash,防止重复提交
sealHash := w.engine.SealHash(task.block.Header())
if sealHash == prev {
continue
}
// 出块
if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
log.Warn("Block sealing failed", "err", err)
}
case <-w.exitCh:
interrupt()
return
}
}
}
// SealHash returns the hash of a block prior to it being sealed.
func (ethash *Ethash) SealHash(header *types.Header) (hash common.Hash) {
hasher := sha3.NewKeccak256()
rlp.Encode(hasher, []interface{}{
header.ParentHash,
header.UncleHash,
header.Coinbase,
header.Root,
header.TxHash,
header.ReceiptHash,
header.Bloom,
header.Difficulty,
header.Number,
header.GasLimit,
header.GasUsed,
header.Time,
header.Extra,
})
hasher.Sum(hash[:0])
return hash
}
func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
...
var (
pend sync.WaitGroup
locals = make(chan *types.Block)
)
//开多个线程进行挖矿
for i := 0; i < threads; i++ {
pend.Add(1)
go func(id int, nonce uint64) {
defer pend.Done()
//挖矿代码,传入了一个管道locals
ethash.mine(block, id, nonce, abort, locals)
}(i, uint64(ethash.rand.Int63()))
}
// Wait until sealing is terminated or a nonce is found
go func() {
var result *types.Block
select {
case <-stop:
// Outside abort, stop all miner threads
close(abort)
//找到nonce值,挖矿成功
case result = <-locals:
// One of the threads found a block, abort all others
select {
//将区块插入到results管道中
case results <- result:
default:
log.Warn("Sealing result is not read by miner", "mode", "local", "sealhash", ethash.SealHash(block.Header()))
}
close(abort)
case <-ethash.update:
// Thread count was changed on user request, restart
close(abort)
if err := ethash.Seal(chain, block, results, stop); err != nil {
log.Error("Failed to restart sealing after update", "err", err)
}
}
// Wait for all miners to terminate and return the block
pend.Wait()
}()
return nil
}
func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan struct{}, found chan *types.Block) {
// Extract some data from the header
var (
header = block.Header()
hash = ethash.SealHash(header).Bytes()
target = new(big.Int).Div(two256, header.Difficulty)
number = header.Number.Uint64()
dataset = ethash.dataset(number, false)
)
// Start generating random nonces until we abort or find a good one
var (
attempts = int64(0)
nonce = seed
)
logger := log.New("miner", id)
logger.Trace("Started ethash search for new nonces", "seed", seed)
search:
for {
...
default:
...
digest, result := hashimotoFull(dataset.dataset, hash, nonce)
//取到合法的hash值时
if new(big.Int).SetBytes(result).Cmp(target) <= 0 {
// Correct nonce found, create a new header with it
header = types.CopyHeader(header)
header.Nonce = types.EncodeNonce(nonce)
header.MixDigest = common.BytesToHash(digest)
// Seal and return a block (if still needed)
select {
//根据nonce值重新组装区块
case found <- block.WithSeal(header):
logger.Trace("Ethash nonce found and reported", "attempts", nonce-seed, "nonce", nonce)
case <-abort:
logger.Trace("Ethash nonce found but discarded", "attempts", nonce-seed, "nonce", nonce)
}
break search
}
//每次循环nonce+1
nonce++
}
}
// Datasets are unmapped in a finalizer. Ensure that the dataset stays live
// during sealing so it's not unmapped while being read.
runtime.KeepAlive(dataset)
}
func (w *worker) resultLoop() {
for {
select {
case block := <-w.resultCh:
...
//将区块写入到区块链中
stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
//向其他节点广播区块
w.mux.Post(core.NewMinedBlockEvent{Block: block})
// 为了节约资源,向其他节点广播区块头等简单信息
w.chain.PostChainEvents(events, logs)
}
}
我的星球.jpg