etcd源码解析(一)-------启动梳理

2021-08-31  本文已影响0人  万万没想到367

项目源码地址

https://github.com/etcd-io/etcd

源码解析

入口梳理

build.sh

etcd_build() {
  out="bin"
  if [[ -n "${BINDIR}" ]]; then out="${BINDIR}"; fi
  toggle_failpoints_default

  run rm -f "${out}/etcd"
  (
    cd ./server
    # Static compilation is useful when etcd is run in a container. $GO_BUILD_FLAGS is OK
    # shellcheck disable=SC2086
    run env "${GO_BUILD_ENV[@]}" go build $GO_BUILD_FLAGS \
      -installsuffix=cgo \
      "-ldflags=${GO_LDFLAGS[*]}" \
      -o="../${out}/etcd" . || return 2
  ) || return 2
.......

从build.sh可以看出 项目的启动文件在./server目录下

启动梳理

这里会忽略无意义的跳转 如果感兴趣可以自己下载源码跟踪一下

func Main(args []string) {
    checkSupportArch()

    if len(args) > 1 {
        cmd := args[1]
        switch cmd {
        case "gateway", "grpc-proxy":
            if err := rootCmd.Execute(); err != nil {
                fmt.Fprint(os.Stderr, err)
                os.Exit(1)
            }
            return
        }
    }

    startEtcdOrProxyV2(args)
}

如果传递了参数 可以以proxy启动 这里先忽略

func startEtcdOrProxyV2(args []string) {
    grpc.EnableTracing = false

    cfg := newConfig()
    //根据配置初始化日志 集群信息等内容
    .....

    var stopped <-chan struct{}
    var errc <-chan error
  //这里对数据存储目录做了校验 禁止proxy和data同时配置
    which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
    if which != dirEmpty {
        lg.Info(
            "server has been already initialized",
            zap.String("data-dir", cfg.ec.Dir),
            zap.String("dir-type", string(which)),
        )
        switch which {
        case dirMember:
                      // 开始启动etcd
            stopped, errc, err = startEtcd(&cfg.ec)
        case dirProxy:
            err = startProxy(cfg)
        default:
            lg.Panic(
                "unknown directory type",
                zap.String("dir-type", string(which)),
            )
        }
    } else {
        shouldProxy := cfg.isProxy()
               //如果是作为proxy 才会执行这个
        ......
        if shouldProxy {
            err = startProxy(cfg)
        }
    }
      //对异常和中断通知做处理 忽略
    ......

    osutil.Exit(0)
}

初始化配置 下面开始正式启动流程

// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
    e, err := embed.StartEtcd(cfg)
    if err != nil {
        return nil, nil, err
    }
    osutil.RegisterInterruptHandler(e.Close)
    select {
    case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
    case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
    }
    return e.Server.StopNotify(), e.Err(), nil
}

开始启动etcd:

// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
    if err = inCfg.Validate(); err != nil {
        return nil, err
    }
    serving := false
    e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
    cfg := &e.cfg
    defer func() {
        if e == nil || err == nil {
            return
        }
        if !serving {
            // errored before starting gRPC server for serveCtx.serversC
            for _, sctx := range e.sctxs {
                close(sctx.serversC)
            }
        }
        e.Close()
        e = nil
    }()

    if !cfg.SocketOpts.Empty() {
        cfg.logger.Info(
            "configuring socket options",
            zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
            zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
        )
    }
    e.cfg.logger.Info(
        "configuring peer listeners",
        zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
    )
    // 开始其他member发送信息的监听   即默认的2380
    if e.Peers, err = configurePeerListeners(cfg); err != nil {
        return e, err
    }

    e.cfg.logger.Info(
        "configuring client listeners",
        zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
    )
    // 开启client服务监听  即默认的2379
    if e.sctxs, err = configureClientListeners(cfg); err != nil {
        return e, err
    }

    for _, sctx := range e.sctxs {
        e.Clients = append(e.Clients, sctx.l)
    }

    var (
        urlsmap types.URLsMap
        token   string
    )
    memberInitialized := true
    // 通过wal文件是否存在 来判断是否进行过初始化
    if !isMemberInitialized(cfg) {
        memberInitialized = false
        //获取集群的地址和集群令牌
        urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
        if err != nil {
            return e, fmt.Errorf("error setting up initial cluster: %v", err)
        }
    }

    // AutoCompactionRetention defaults to "0" if not set.
    if len(cfg.AutoCompactionRetention) == 0 {
        cfg.AutoCompactionRetention = "0"
    }
    // 获取自动压缩版本的周期 mode: Revision根据配置获得 Periodic 配置时间*1 hour
    autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
    if err != nil {
        return e, err
    }
    // boltdb 后端存储方式 数组/map  这个应该是对应阿里之前的boltdb优化分享
    backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)

    srvcfg := config.ServerConfig{
        .....
    }


    if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
        return e, err
    }

    // buffer channel so goroutines on closed connections won't wait forever
    e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))

    // newly started member ("memberInitialized==false")
    // does not need corruption check
    if memberInitialized {
        if err = e.Server.CheckInitialHashKV(); err != nil {
            // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
            // (nothing to close since rafthttp transports have not been started)

            e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
            e.Server.Cleanup()
            e.Server = nil
            return e, err
        }
    }
    e.Server.Start()

    if err = e.servePeers(); err != nil {
        return e, err
    }
    if err = e.serveClients(); err != nil {
        return e, err
    }
    if err = e.serveMetrics(); err != nil {
        return e, err
    }

    e.cfg.logger.Info(
        "now serving peer/client/metrics",
        zap.String("local-member-id", e.Server.ID().String()),
        zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()),
        zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
        zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
        zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
        zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),
    )
    serving = true
    return e, nil

对配置参数转换 并开启监听 这里先进入 etcdserver.NewServer 分析新建Server的过程

NewServer

// NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
    b, err := bootstrap(cfg)
    if err != nil {
        return nil, err
    }

    defer func() {
        if err != nil {
            b.be.Close()
        }
    }()

    sstats := stats.NewServerStats(cfg.Name, b.raft.wal.id.String())
    lstats := stats.NewLeaderStats(cfg.Logger, b.raft.wal.id.String())

    heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
    srv = &EtcdServer{
        readych:            make(chan struct{}),
        Cfg:                cfg,
        lgMu:               new(sync.RWMutex),
        lg:                 cfg.Logger,
        errorc:             make(chan error, 1),
        v2store:            b.st,
        snapshotter:        b.ss,
        r:                  *b.raft.newRaftNode(b.ss),
        id:                 b.raft.wal.id,
        attributes:         membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
        cluster:            b.raft.cl,
        stats:              sstats,
        lstats:             lstats,
        SyncTicker:         time.NewTicker(500 * time.Millisecond),
        peerRt:             b.prt,
        reqIDGen:           idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()),
        AccessController:   &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
        consistIndex:       b.ci,
        firstCommitInTermC: make(chan struct{}),
    }
    serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1)

    srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)

    srv.be = b.be
    srv.beHooks = b.beHooks
    minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat

    // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
    // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
    srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{
        MinLeaseTTL:                int64(math.Ceil(minTTL.Seconds())),
        CheckpointInterval:         cfg.LeaseCheckpointInterval,
        ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
    })

    tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
        func(index uint64) <-chan struct{} {
            return srv.applyWait.Wait(index)
        },
        time.Duration(cfg.TokenTTL)*time.Second,
    )
    if err != nil {
        cfg.Logger.Warn("failed to create token provider", zap.Error(err))
        return nil, err
    }

    mvccStoreConfig := mvcc.StoreConfig{
        CompactionBatchLimit:    cfg.CompactionBatchLimit,
        CompactionSleepInterval: cfg.CompactionSleepInterval,
    }
    srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)

    srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))

    newSrv := srv // since srv == nil in defer if srv is returned as nil
    defer func() {
        // closing backend without first closing kv can cause
        // resumed compactions to fail with closed tx errors
        if err != nil {
            newSrv.kv.Close()
        }
    }()
    if num := cfg.AutoCompactionRetention; num != 0 {
        srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
        if err != nil {
            return nil, err
        }
        srv.compactor.Run()
    }

    srv.applyV3Base = srv.newApplierV3Backend()
    srv.applyV3Internal = srv.newApplierV3Internal()
    if err = srv.restoreAlarms(); err != nil {
        return nil, err
    }

    if srv.Cfg.EnableLeaseCheckpoint {
        // setting checkpointer enables lease checkpoint feature.
        srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
            srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
        })
    }

    // TODO: move transport initialization near the definition of remote
    tr := &rafthttp.Transport{
        Logger:      cfg.Logger,
        TLSInfo:     cfg.PeerTLSInfo,
        DialTimeout: cfg.PeerDialTimeout(),
        ID:          b.raft.wal.id,
        URLs:        cfg.PeerURLs,
        ClusterID:   b.raft.cl.ID(),
        Raft:        srv,
        Snapshotter: b.ss,
        ServerStats: sstats,
        LeaderStats: lstats,
        ErrorC:      srv.errorc,
    }
    if err = tr.Start(); err != nil {
        return nil, err
    }
    // add all remotes into transport
    for _, m := range b.remotes {
        if m.ID != b.raft.wal.id {
            tr.AddRemote(m.ID, m.PeerURLs)
        }
    }
    for _, m := range b.raft.cl.Members() {
        if m.ID != b.raft.wal.id {
            tr.AddPeer(m.ID, m.PeerURLs)
        }
    }
    srv.r.transport = tr

    return srv, nil
}

创建server的大体流程如下:

bootstrap
// 新建存储目录,快照目录 后端存储
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
    st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)

    if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
        cfg.Logger.Warn(
            "exceeded recommended request limit",
            zap.Uint("max-request-bytes", cfg.MaxRequestBytes),
            zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),
            zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),
            zap.String("recommended-request-size", recommendedMaxRequestBytesString),
        )
    }

    if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
        return nil, fmt.Errorf("cannot access data directory: %v", terr)
    }

    haveWAL := wal.Exist(cfg.WALDir())
    ss := bootstrapSnapshot(cfg)

    be, ci, beExist, beHooks, err := bootstrapBackend(cfg)
    if err != nil {
        return nil, err
    }
    defer func() {
        if err != nil {
            be.Close()
        }
    }()
    // 获取用于发送请求的客户端
    prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
    if err != nil {
        return nil, err
    }

    switch {
    case !haveWAL && !cfg.NewCluster:
        /*
        1. 检查配置分集群地址是否正确
        2. 根据配置 创建集群服务 并且根据配置添加member 并生成自己的id
        3. 根据配置 请求其他已存在的etcd服务 获取集群和member信息
        4. 将步骤2,3的集群信息比对 核实集群信息和配置的是一致的
        5. 验证集群中的其他etcd的版本是否与本地etcd版本兼容
        6. 创建符合raft协议的存储服务
         */
        b, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be)
    case !haveWAL && cfg.NewCluster:
        /*
        1. 检查配置分集群地址是否正确
        2. 根据配置 创建集群服务 并且根据配置添加member 并生成自己的id
        3. 根据配置 验证集群中是否有相同的member已经启动
        4. 创建符合raft协议的存储服务
        */
        b, err = bootstrapNewClusterNoWAL(cfg, prt, st, be)
    case haveWAL:
        /*
        1. 验证快照是否可用
        2. 读取快照 尝试从快照回复etcd
        3. 创建符合raft协议的存储服务
        */
        b, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci)
    default:
        be.Close()
        return nil, fmt.Errorf("unsupported bootstrap config")
    }
    if err != nil {
        return nil, err
    }

    if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
        return nil, fmt.Errorf("cannot access member directory: %v", terr)
    }
    b.prt = prt
    b.ci = ci
    b.st = st
    b.be = be
    b.ss = ss
    b.beHooks = beHooks
    return b, nil
}
上一篇 下一篇

猜你喜欢

热点阅读