geth的启动之整体及p2p服务的启动
2018-04-16 本文已影响36人
古则
1.入口
geth命令的实现在文件
go-ethereum/cmd/geth/main.go中
调用关系
main=>app.run()
其中app在main.go的全局变量中初始化
//go-ethereum/cmd/geth/main.go
app = utils.NewApp(gitCommit, "the go-ethereum command line interface")
//go-ethereum/cmd/utils/flags.go
func NewApp(gitCommit, usage string) *cli.App {
app := cli.NewApp()
app.Name = filepath.Base(os.Args[0])
app.Author = ""
//app.Authors = nil
app.Email = ""
app.Version = params.Version
if len(gitCommit) >= 8 {
app.Version += "-" + gitCommit[:8]
}
app.Usage = usage
return app
}
cli的实现采用的是
https://godoc.org/gopkg.in/urfave/cli.v1 库 该库的用法待学习
2.init()
func init() {
// Initialize the CLI app and start Geth
app.Action = geth
app.HideVersion = true // we have a command to print the version
app.Copyright = "Copyright 2013-2017 The go-ethereum Authors"
app.Commands = []cli.Command{
// See chaincmd.go:
initCommand,
importCommand,
exportCommand,
copydbCommand,
removedbCommand,
dumpCommand,
// See monitorcmd.go:
monitorCommand,
// See accountcmd.go:
accountCommand,
walletCommand,
// See consolecmd.go:
consoleCommand,
attachCommand,
javascriptCommand,
// See misccmd.go:
makecacheCommand,
makedagCommand,
versionCommand,
bugCommand,
licenseCommand,
// See config.go
dumpConfigCommand,
}
sort.Sort(cli.CommandsByName(app.Commands))
app.Flags = append(app.Flags, nodeFlags...)
app.Flags = append(app.Flags, rpcFlags...)
app.Flags = append(app.Flags, consoleFlags...)
app.Flags = append(app.Flags, debug.Flags...)
app.Flags = append(app.Flags, whisperFlags...)
app.Before = func(ctx *cli.Context) error {
runtime.GOMAXPROCS(runtime.NumCPU())
if err := debug.Setup(ctx); err != nil {
return err
}
// Start system runtime metrics collection
go metrics.CollectProcessMetrics(3 * time.Second)
utils.SetupNetwork(ctx)
return nil
}
app.After = func(ctx *cli.Context) error {
debug.Exit()
console.Stdin.Close() // Resets terminal mode.
return nil
}
}
3.geth(ctx *cli.Context)
// geth is the main entry point into the system if no special subcommand is ran.
// It creates a default node based on the command line arguments and runs it in
// blocking mode, waiting for it to be shut down.
func geth(ctx *cli.Context) error {
node := makeFullNode(ctx)
startNode(ctx, node)
node.Wait()
return nil
}
- makeFullNode 生成(node/node.go) node节点,并注册服务,见4-6
- startNode(node/node.go所有服务的启动),启动各种服务,如p2p,rpc,http,数据同步等等见7
4.makeFullNode(ctx *cli.Context) *node.Node
//cmd/geth/config.go:156
func makeFullNode(ctx *cli.Context) *node.Node {
stack, cfg := makeConfigNode(ctx)
//注册服务
utils.RegisterEthService(stack, &cfg.Eth)
if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
}
// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
shhEnabled := enableWhisper(ctx)
shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
if shhEnabled || shhAutoEnabled {
if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
}
if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
}
utils.RegisterShhService(stack, &cfg.Shh)
}
// Add the Ethereum Stats daemon if requested.
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
}
return stack
}
5.RegisterEthService(stack *node.Node, cfg *eth.Config)
./cmd/utils/flags.go:1132
// RegisterEthService adds an Ethereum client to the stack.
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
var err error
if cfg.SyncMode == downloader.LightSync {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return les.New(ctx, cfg)
})
} else {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
})
}
if err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}
}
6. eth.new
//eth/backend.go:104
// New creates a new Ethereum object (including the
// initialisation of the common Ethereum object)
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if config.SyncMode == downloader.LightSync {
return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
}
if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
}
chainDb, err := CreateDB(ctx, config, "chaindata")
if err != nil {
return nil, err
}
stopDbUpgrade := upgradeDeduplicateData(chainDb)
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
}
log.Info("Initialised chain configuration", "config", chainConfig)
eth := &Ethereum{
config: config,
chainDb: chainDb,
chainConfig: chainConfig,
eventMux: ctx.EventMux,
accountManager: ctx.AccountManager,
engine: CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),
shutdownChan: make(chan bool),
stopDbUpgrade: stopDbUpgrade,
networkId: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks),
}
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
if !config.SkipBcVersionCheck {
bcVersion := core.GetBlockChainVersion(chainDb)
if bcVersion != core.BlockChainVersion && bcVersion != 0 {
return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion)
}
core.WriteBlockChainVersion(chainDb, core.BlockChainVersion)
}
var (
vmConfig = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
if err != nil {
return nil, err
}
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
eth.blockchain.SetHead(compat.RewindTo)
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
eth.bloomIndexer.Start(eth.blockchain)
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
eth.miner.SetExtra(makeExtraData(config.ExtraData))
eth.ApiBackend = &EthApiBackend{eth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
gpoParams.Default = config.GasPrice
}
eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams)
return eth, nil
}
7.node及startnode
node中包含了rpc,ipc,http,ws,p2p等各种服务及在5中所注册的服务
//node/node.go
// Node is a container on which services can be registered.
type Node struct {
eventmux *event.TypeMux // Event multiplexer used between the services of a stack
config *Config
accman *accounts.Manager
ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop
instanceDirLock flock.Releaser // prevents concurrent use of instance directory
serverConfig p2p.Config
server *p2p.Server // Currently running P2P networking layer
//注册的服务,注册在func (n *Node) Register(constructor ServiceConstructor) error 中完成
serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
services map[reflect.Type]Service // Currently running services
rpcAPIs []rpc.API // List of APIs currently provided by the node
inprocHandler *rpc.Server // In-process RPC request handler to process the API requests
ipcEndpoint string // IPC endpoint to listen at (empty = IPC disabled)
ipcListener net.Listener // IPC RPC listener socket to serve API requests
ipcHandler *rpc.Server // IPC RPC request handler to process the API requests
httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled)
httpWhitelist []string // HTTP RPC modules to allow through this endpoint
httpListener net.Listener // HTTP RPC listener socket to server API requests
httpHandler *rpc.Server // HTTP RPC request handler to process the API requests
wsEndpoint string // Websocket endpoint (interface + port) to listen at (empty = websocket disabled)
wsListener net.Listener // Websocket RPC listener socket to server API requests
wsHandler *rpc.Server // Websocket RPC request handler to process the API requests
stop chan struct{} // Channel to wait for termination notifications
lock sync.RWMutex
log log.Logger
}
startNode如下
// Start create a live P2P node and starts running it.
func (n *Node) Start() error {
n.lock.Lock()
defer n.lock.Unlock()
// Short circuit if the node's already running
if n.server != nil {
return ErrNodeRunning
}
if err := n.openDataDir(); err != nil {
return err
}
// Initialize the p2p server. This creates the node key and
// discovery databases.
n.serverConfig = n.config.P2P
n.serverConfig.PrivateKey = n.config.NodeKey()
n.serverConfig.Name = n.config.NodeName()
n.serverConfig.Logger = n.log
if n.serverConfig.StaticNodes == nil {
n.serverConfig.StaticNodes = n.config.StaticNodes()
}
if n.serverConfig.TrustedNodes == nil {
n.serverConfig.TrustedNodes = n.config.TrustedNodes()
}
if n.serverConfig.NodeDatabase == "" {
n.serverConfig.NodeDatabase = n.config.NodeDB()
}
running := &p2p.Server{Config: n.serverConfig}
n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
// Otherwise copy and specialize the P2P configuration
services := make(map[reflect.Type]Service)
for _, constructor := range n.serviceFuncs {
// Create a new context for the particular service
ctx := &ServiceContext{
config: n.config,
services: make(map[reflect.Type]Service),
EventMux: n.eventmux,
AccountManager: n.accman,
}
for kind, s := range services { // copy needed for threaded access
ctx.services[kind] = s
}
// Construct and save the service
service, err := constructor(ctx)
if err != nil {
return err
}
kind := reflect.TypeOf(service)
if _, exists := services[kind]; exists {
return &DuplicateServiceError{Kind: kind}
}
services[kind] = service
}
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)
}
if err := running.Start(); err != nil {
return convertFileLockError(err)
}
// Start each of the services
started := []reflect.Type{}
for kind, service := range services {
// Start the next service, stopping all previous upon failure
if err := service.Start(running); err != nil {
for _, kind := range started {
services[kind].Stop()
}
running.Stop()
return err
}
// Mark the service started for potential cleanup
started = append(started, kind)
}
// Lastly start the configured RPC interfaces
if err := n.startRPC(services); err != nil {
for _, service := range services {
service.Stop()
}
running.Stop()
return err
}
// Finish initializing the startup
n.services = services
n.server = running
n.stop = make(chan struct{})
return nil
}