pd调度源码分析

2020-04-25  本文已影响0人  白馨_1114

Placement Driver (后续以 PD 简称) 是 TiDB 里面全局中心总控节点,它负责整个集群的调度,负责全局 ID 的生成,以及全局时间戳 TSO 的生成等。PD 还保存着整个集群 TiKV 的元信息,负责给 client 提供路由功能。

作为中心总控节点,PD 通过集成 etcd ,自动的支持 auto failover,无需担心单点故障问题。同时,PD 也通过 etcd 的 raft,保证了数据的强一致性,不用担心数据丢失的问题。

在架构上面,PD 所有的数据都是通过 TiKV 主动上报获知的。同时,PD 对整个 TiKV 集群的调度等操作,也只会在 TiKV 发送 heartbeat 命令的结果里面返回相关的命令,让 TiKV 自行去处理,而不是主动去给 TiKV 发命令。这样设计上面就非常简单,我们完全可以认为 PD 是一个无状态的服务(当然,PD 仍然会将一些信息持久化到 etcd),所有的操作都是被动触发,即使 PD 挂掉,新选出的 PD leader 也能立刻对外服务,无需考虑任何之前的中间状态。

Pd选举:
pd集成etcd,一个集群至少需要启动3个pd实例服务。
启动pd集群:

集群有两种启动方式:

1.静态方式:
在配置中设置:
initial-cluster = "pd=http://127.0.0.1:2380"
或者启动pd的时候加参数:
./pd-server --name="pd"
--initial-cluster="pd1=http://10.87.109.72:2380,pd2=http://10.87.109.73:2380,pd3=http://10.87.109.74:2380
静态方式,使用的是ectd peer内部通信端口2380.

2.动态方式:
使用 join 参数,将一个新的 PD 服务加入到现有的 PD 集群里面。 如果我们需要添加 pd4,只需要在 --join 参数里面填入当前 PD 集群任意一个 PD 服务的 client url,比如:
./bin/pd-server --name=pd4
--client-urls="http://host4:2379"
--peer-urls="http://host4:2380"
--join="http://host1:2379"

动态方式使用的是etcd 自身提供的 member 相关 API,所以使用 2379 端口。
以上这两种方式是互斥的,只能使用一种方式初始化集群(不知道原因,但混合操作可能会报错)。

启动单个pd代码:

func main() {
   cfg := server.NewConfig()//加载配置
   ......      
   err = server.PrepareJoinCluster(cfg)//初始化上文集群
   ......
   if err := svr.Run(ctx); err != nil { //主程序具体实现见下文
      log.Fatalf("run server failed: %v", fmt.Sprintf("%+v", err))
   }
......
}

func (s *Server) Run(ctx context.Context) error {
   timeMonitorOnce.Do(func() {
      go StartMonitor(time.Now, func() {
         log.Errorf("system time jumps backward")
         timeJumpBackCounter.Inc()
      })
   })

   if err := s.startEtcd(ctx); err != nil {
      return err
   }

   if err := s.startServer(); err != nil {
      return err
   }

   s.startServerLoop()

   return nil
}

一、StartMonitor
启动一个协程监控系统时间,有没有jumps backward
UnixNano()在高并发环境下,在高并发情况下并不安全,所以将jumps backward当作错误日志记录下来。
(代码中,只是记录下来,没有做多余操作,推测应该跟 raft时间有关)
https://blog.csdn.net/fwhezfwhez/article/details/81069249

func StartMonitor(now func() time.Time, systimeErrHandler func()) {
   log.Info("start system time monitor")
   tick := time.NewTicker(100 * time.Millisecond)
   defer tick.Stop()
   for {
      last := now().UnixNano()
      <-tick.C //阻塞100ms,在比较
      if now().UnixNano() < last {
         log.Errorf("syst em time jump backward, last:%v", last)
         systimeErrHandler()
      }
   }
}

二、startEtcd
根据加载的配置,启动etcd服务(etcdCfg详细配置可以参见Server.embed.Config说明)

func (s *Server) startEtcd(ctx context.Context) error {
   ctx, cancel := context.WithTimeout(ctx, etcdStartTimeout) //记录启动初始时间
   etcd, err := embed.StartEtcd(s.etcdCfg) //StartEtcd launches the etcd server and HTTP handlers for client/server communication.
   // Check cluster ID
   urlmap, err := types.NewURLsMap(s.cfg.InitialCluster) //parse --initial-cluster
   tlsConfig, err := s.cfg.Security.ToTLSConfig() //tls初始化

   if err = etcdutil.CheckClusterID(etcd.Server.Cluster().ID(), urlmap, tlsConfig); err != nil {
      return err
   } //checks Etcd's cluster ID

   client, err := clientv3.New(clientv3.Config{ //启动客户端,提供对etcd操作,比如member命令
      Endpoints:   endpoints,
      DialTimeout: etcdTimeout,
      TLS:         tlsConfig,
   })
   // update advertise peer urls.
   etcdMembers, err := etcdutil.ListEtcdMembers(client)
   …..
}

三、startServerLoop

func (s *Server) startServerLoop() {
    s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(context.Background())
    s.serverLoopWg.Add(3)
    go s.leaderLoop() //选举pd leader
    go s.etcdLeaderLoop()
    go s.serverMetricsLoop()
}

pd Leader 的选举在leaderloop中实现,不同于raft选举:
保存更新tso

pd与tikv交互:
handleStoreHeartbeat
handleAskSplit

pd与tidb交互:

pd调度策略:
pd调度策略主要是针对tikv的。
如何通过调度策略影响tikv?

1.pd 选举。如果是leader掉线,会重新选举。这个选举时间大概是3s,而且会断开客户端服务?
2.tikv于pd的交互:
每个tikv会定期向pd汇报节点整体信息(根据这个汇报的信息,比如每个region的大小不能超过64m,一旦超过64m。辣么就要分裂成两个group raft)
每个raft group的leader会定期想pd汇报信息
然后pd 不断的通过这两种信息,之后做决策。主动下线,是会马上通知pd 不可用。但是某个store心跳包不可用的时候,其实并不能判断到底是什么情况。
这个默认是等待30分整,确实是相当的长。
Pd通过某个 leader的心跳包发现replica不够,可以add或者remove
保证多个replica不在同一个位置上,而不是同一个节点上。这个位置是一个逻辑的概念,通过location-lables来标志哪些lable是位置标志。???这个是怎么标示的?
Pd不断通过store或者leader的心跳包收集信息。获取整个集群的纤细数据,
然后根据这些详细数据生成操作调度序列。
tikv收到这些信息,如何执行的?在哪些情况下,可能不执行。
3.pd中增加replica ,删除replica ,重现选举replica如何实现的。

上一篇下一篇

猜你喜欢

热点阅读