区块链技术探索

以太坊挖矿源码分析 miner.go worker.go

2018-10-10  本文已影响106人  hasika

miner.go是挖矿的入口,真正实现类是worker.go。在miner的New方法中初始化了worker.

worker中主要执行了四个循环函数, 出块的主要功能都是在这四个循环中完成的。

涉及到的代码比较多,我先一步一步讲解,读者可以对照后面的源码查看

func newWorker(...) {
    ...
    go worker.mainLoop()
    go worker.newWorkLoop(recommit)
    go worker.resultLoop()
    go worker.taskLoop()
    //开始出块
    worker.startCh <- struct{}{}
}

func (w *worker) newWorkLoop(recommit time.Duration) {
    //提交一个新的任务
    commit := func(noempty bool, s int32) {
        ...
        w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp:                                   timestamp}
        ...
    }
    //删除老的任务,以任务中的块高以标准,staleThreshold = 7
    clearPending := func(number uint64) {
        w.pendingMu.Lock()
        for h, t := range w.pendingTasks {
            if t.block.NumberU64()+staleThreshold <= number {
                delete(w.pendingTasks, h)
            }
        }
        w.pendingMu.Unlock()
    }
    
    for {
        select {
        case <-w.startCh:
            //清除老的块
            clearPending(w.chain.CurrentBlock().NumberU64())
            timestamp = time.Now().Unix()
            //提交一个新的任务
            commit(false, commitInterruptNewHead)
            ...
    }
}
func (w *worker) mainLoop() {
    ...

   for {
      select {
      case req := <-w.newWorkCh:
         w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
        ...
      }
}
// commitNewWork generates several new sealing tasks based on the parent block.
func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
    ...
   //创建区块头
   num := parent.Number()
   header := &types.Header{
      ParentHash: parent.Hash(),
      Number:     num.Add(num, common.Big1),
      GasLimit:   core.CalcGasLimit(parent, w.gasFloor, w.gasCeil),
      Extra:      w.extra,
      Time:       big.NewInt(timestamp),
   }
   // Only set the coinbase if our consensus engine is running (avoid spurious block rewards)
   if w.isRunning() {
      if w.coinbase == (common.Address{}) {
         log.Error("Refusing to mine without etherbase")
         return
      }
       // 设置区块头的Coinbase,为出块奖励矿工做准备
      header.Coinbase = w.coinbase
   }
   //设置出块难度
   if err := w.engine.Prepare(w.chain, header); err != nil {
      log.Error("Failed to prepare header for mining", "err", err)
      return
   }
   // If we are care about TheDAO hard-fork check whether to override the extra-data or not
    //是否支持DAO事件硬分叉
   if daoBlock := w.config.DAOForkBlock; daoBlock != nil {
      // Check whether the block is among the fork extra-override range
      limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
      if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
         // Depending whether we support or oppose the fork, override differently
         if w.config.DAOForkSupport {
            header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
         } else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
            header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
         }
      }
   }
   // Could potentially happen if starting to mine in an odd state.
   err := w.makeCurrent(parent, header)
   if err != nil {
      log.Error("Failed to create mining context", "err", err)
      return
   }
   // Create the current work task and check any fork transitions needed
   env := w.current
   if w.config.DAOForkSupport && w.config.DAOForkBlock != nil && w.config.DAOForkBlock.Cmp(header.Number) == 0 {
      misc.ApplyDAOHardFork(env.state)
   }
   // Accumulate the uncles for the current block
   //删除老的块
   for hash, uncle := range w.possibleUncles {
      if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() {
         delete(w.possibleUncles, hash)
      }
   }
   uncles := make([]*types.Header, 0, 2)
   for hash, uncle := range w.possibleUncles {
      if len(uncles) == 2 {
         break
      }
      //校验一些参数,提交叔块
      if err := w.commitUncle(env, uncle.Header()); err != nil {
         log.Trace("Possible uncle rejected", "hash", hash, "reason", err)
      } else {
         log.Debug("Committing new uncle to block", "hash", hash)
         uncles = append(uncles, uncle.Header())
      }
   }
    //可以执行 noempty=false
   if !noempty {
      // Create an empty block based on temporary copied state for sealing in advance without waiting block
      // execution finished.
      //出块
      w.commit(uncles, nil, false, tstart)
   }

   // Fill the block with all available pending transactions.
   pending, err := w.eth.TxPool().Pending()
   if err != nil {
      log.Error("Failed to fetch pending transactions", "err", err)
      return
   }
   // Short circuit if there is no available pending transactions
   if len(pending) == 0 {
      w.updateSnapshot()
      return
   }
   // Split the pending transactions into locals and remotes
   localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
   for _, account := range w.eth.TxPool().Locals() {
      if txs := remoteTxs[account]; len(txs) > 0 {
         delete(remoteTxs, account)
         localTxs[account] = txs
      }
   }
   if len(localTxs) > 0 {
      txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs)
      if w.commitTransactions(txs, w.coinbase, interrupt) {
         return
      }
   }
   if len(remoteTxs) > 0 {
      txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs)
      if w.commitTransactions(txs, w.coinbase, interrupt) {
         return
      }
   }
   w.commit(uncles, w.fullTaskHook, true, tstart)
}
// makeCurrent creates a new environment for the current cycle.
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
   state, err := w.chain.StateAt(parent.Root())
   if err != nil {
      return err
   }
   env := &environment{
      signer:    types.NewEIP155Signer(w.config.ChainID),
      state:     state,
      ancestors: mapset.NewSet(),
      family:    mapset.NewSet(),
      uncles:    mapset.NewSet(),
      header:    header,
   }

   // when 08 is processed ancestors contain 07 (quick block)
   //数7个祖先
   for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) {
      for _, uncle := range ancestor.Uncles() {
         //将叔块添加到family中
         env.family.Add(uncle.Hash())
      }
      //将祖先添加到family中
      env.family.Add(ancestor.Hash())
      //将祖先添加到祖先中
      env.ancestors.Add(ancestor.Hash())
   }
    ...
}
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
   // Deep copy receipts here to avoid interaction between different tasks.
   // 深度复制收据
   receipts := make([]*types.Receipt, len(w.current.receipts))
   for i, l := range w.current.receipts {
      receipts[i] = new(types.Receipt)
      *receipts[i] = *l
   }
   s := w.current.state.Copy()
   //出块
   block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts)
   if err != nil {
      return err
   }
   if w.isRunning() {
      if interval != nil {
         interval()
      }
      select {
      case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
         w.unconfirmed.Shift(block.NumberU64() - 1)

         feesWei := new(big.Int)
         for i, tx := range block.Transactions() {
            feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice()))
         }
         feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))

         log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
            "uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))

      case <-w.exitCh:
         log.Info("Worker has exited")
      }
   }
   if update {
      w.updateSnapshot()
   }
   return nil
}
func (ethash *Ethash) Finalize(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) {
   // Accumulate any block and uncle rewards and commit the final state root
    //计算出块奖励,包括当前块和叔块,并写入到世界状态中
   accumulateRewards(chain.Config(), state, header, uncles)
    
   header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))

   // Header seems complete, assemble into a block and return
   return types.NewBlock(header, txs, uncles, receipts), nil
}
func accumulateRewards(config *params.ChainConfig, state *state.StateDB, header *types.Header, uncles []*types.Header) {
   // Select the correct block reward based on chain progression
   blockReward := FrontierBlockReward
   if config.IsByzantium(header.Number) {
      blockReward = ByzantiumBlockReward
   }
   // Accumulate the rewards for the miner and any included uncles
   reward := new(big.Int).Set(blockReward)
   r := new(big.Int)
   //计算每个叔块的奖励并将奖励添加到叔块的账户中
   for _, uncle := range uncles {
      r.Add(uncle.Number, big8)
      r.Sub(r, header.Number)
      r.Mul(r, blockReward)
      r.Div(r, big8)
      state.AddBalance(uncle.Coinbase, r)
    
      //将叔块的一部分奖励添加到矿工账号中
      r.Div(blockReward, big32)
      reward.Add(reward, r)
   }
   state.AddBalance(header.Coinbase, reward)
}
func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []*Receipt) *Block {
   b := &Block{header: CopyHeader(header), td: new(big.Int)}

   // TODO: panic if len(txs) != len(receipts)
   if len(txs) == 0 {
      b.header.TxHash = EmptyRootHash
   } else {
      b.header.TxHash = DeriveSha(Transactions(txs))
      b.transactions = make(Transactions, len(txs))
      copy(b.transactions, txs)
   }

   if len(receipts) == 0 {
      b.header.ReceiptHash = EmptyRootHash
   } else {
      b.header.ReceiptHash = DeriveSha(Receipts(receipts))
      b.header.Bloom = CreateBloom(receipts)
   }

   if len(uncles) == 0 {
      b.header.UncleHash = EmptyUncleHash
   } else {
      b.header.UncleHash = CalcUncleHash(uncles)
      b.uncles = make([]*Header, len(uncles))
      for i := range uncles {
         b.uncles[i] = CopyHeader(uncles[i])
      }
   }

   return b
}
func (w *worker) taskLoop() {
    ...
   for {
      select {
      case task := <-w.taskCh:
         if w.newTaskHook != nil {
            w.newTaskHook(task)
         }
         // Reject duplicate sealing work due to resubmitting.
         // 计算header的RLP编码的hash,防止重复提交
         sealHash := w.engine.SealHash(task.block.Header())
         if sealHash == prev {
            continue
         }
         // 出块
         if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
            log.Warn("Block sealing failed", "err", err)
         }
      case <-w.exitCh:
         interrupt()
         return
      }
   }
}
// SealHash returns the hash of a block prior to it being sealed.
func (ethash *Ethash) SealHash(header *types.Header) (hash common.Hash) {
    hasher := sha3.NewKeccak256()
    rlp.Encode(hasher, []interface{}{
    header.ParentHash,
    header.UncleHash,
    header.Coinbase,
    header.Root,
    header.TxHash,
    header.ReceiptHash,
    header.Bloom,
    header.Difficulty,
    header.Number,
    header.GasLimit,
    header.GasUsed,
    header.Time,
    header.Extra,
})
hasher.Sum(hash[:0])
return hash
}
func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
  ...
    var (
        pend   sync.WaitGroup
        locals = make(chan *types.Block)
    )
   //开多个线程进行挖矿
   for i := 0; i < threads; i++ {
      pend.Add(1)
      go func(id int, nonce uint64) {
         defer pend.Done()
         //挖矿代码,传入了一个管道locals
         ethash.mine(block, id, nonce, abort, locals)
      }(i, uint64(ethash.rand.Int63()))
   }
   // Wait until sealing is terminated or a nonce is found
   go func() {
      var result *types.Block
      select {
      case <-stop:
         // Outside abort, stop all miner threads
         close(abort)
         //找到nonce值,挖矿成功
      case result = <-locals:
         // One of the threads found a block, abort all others
         select {
         //将区块插入到results管道中
         case results <- result:
         default:
            log.Warn("Sealing result is not read by miner", "mode", "local", "sealhash", ethash.SealHash(block.Header()))
         }
         close(abort)
      case <-ethash.update:
         // Thread count was changed on user request, restart
         close(abort)
         if err := ethash.Seal(chain, block, results, stop); err != nil {
            log.Error("Failed to restart sealing after update", "err", err)
         }
      }
      // Wait for all miners to terminate and return the block
      pend.Wait()
   }()
   return nil
}
func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan struct{}, found chan *types.Block) {
   // Extract some data from the header
   var (
      header  = block.Header()
      hash    = ethash.SealHash(header).Bytes()
      target  = new(big.Int).Div(two256, header.Difficulty)
      number  = header.Number.Uint64()
      dataset = ethash.dataset(number, false)
   )
   // Start generating random nonces until we abort or find a good one
   var (
      attempts = int64(0)
      nonce    = seed
   )
   logger := log.New("miner", id)
   logger.Trace("Started ethash search for new nonces", "seed", seed)
search:
   for {
      ...
      default:
        ...
         
         digest, result := hashimotoFull(dataset.dataset, hash, nonce)
         //取到合法的hash值时
         if new(big.Int).SetBytes(result).Cmp(target) <= 0 {
            // Correct nonce found, create a new header with it
            header = types.CopyHeader(header)
            header.Nonce = types.EncodeNonce(nonce)
            header.MixDigest = common.BytesToHash(digest)

            // Seal and return a block (if still needed)
            select {
            //根据nonce值重新组装区块
            case found <- block.WithSeal(header):
               logger.Trace("Ethash nonce found and reported", "attempts", nonce-seed, "nonce", nonce)
            case <-abort:
               logger.Trace("Ethash nonce found but discarded", "attempts", nonce-seed, "nonce", nonce)
            }
            break search
         }
         //每次循环nonce+1
         nonce++
      }
   }
   // Datasets are unmapped in a finalizer. Ensure that the dataset stays live
   // during sealing so it's not unmapped while being read.
   runtime.KeepAlive(dataset)
}
func (w *worker) resultLoop() {
   for {
      select {
      case block := <-w.resultCh:
         ...

         //将区块写入到区块链中
         stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)

         //向其他节点广播区块
         w.mux.Post(core.NewMinedBlockEvent{Block: block})
        // 为了节约资源,向其他节点广播区块头等简单信息
         w.chain.PostChainEvents(events, logs)
   }
}

https://t.zsxq.com/iiMvfea

我的星球.jpg
上一篇下一篇

猜你喜欢

热点阅读