[k8s源码分析][client-go] k8s选举leader
1. 前言
转载请说明原文出处, 尊重他人劳动成果!
本文将分析用于
kube-scheduler
和kube-controller-manager
高可用的leaderelection
. 相当于实现一个分布式资源锁.
源码位置: https://github.com/nicktming/client-go
分支: tming-v1.13 (基于v1.13版本)
2. 例子
architecture.png
例子中会启动三个进程来竞争
leadership
.
2.1 创建一个namespace
[root@master kubectl]# ./kubectl get ns
NAME STATUS AGE
default Active 21h
kube-public Active 21h
kube-system Active 21h
[root@master kubectl]# ./kubectl create ns nicktming
namespace/nicktming created
[root@master kubectl]#
[root@master kubectl]# ./kubectl get ns
NAME STATUS AGE
default Active 21h
kube-public Active 21h
kube-system Active 21h
nicktming Active 3s
[root@master kubectl]# ./kubectl get endpoints -n nicktming
No resources found.
[root@master kubectl]#
2.2 代码
package main
import (
"context"
"flag"
"k8s.io/client-go/rest"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog"
"fmt"
)
func main() {
klog.InitFlags(nil)
var endpointLockName string
var endpointLockNamespace string
var id string
flag.StringVar(&id, "id", "", "the holder identity name")
flag.StringVar(&endpointLockName, "lease-lock-name", "example", "the lease lock resource name")
flag.StringVar(&endpointLockNamespace, "lease-lock-namespace", "nicktming", "the lease lock resource namespace")
flag.Parse()
if id == "" {
klog.Fatal("unable to get id (missing id flag).")
}
config := &rest.Config{
Host: "http://172.21.0.16:8080",
}
client := clientset.NewForConfigOrDie(config)
lock := &resourcelock.EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Name: endpointLockName,
Namespace: endpointLockNamespace,
},
Client: client.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// use a client that will stop allowing new requests once the context ends
// config.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down")))
// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
log.Printf("Received termination, signaling shutdown")
cancel()
}()
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
// ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// we're notified when we start - this is where you would
// usually put your code
klog.Infof("%s: leading", id)
},
OnStoppedLeading: func() {
// we can do cleanup here, or after the RunOrDie method
// returns
klog.Infof("%s: lost", id)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == id {
// I just got the lock
return
}
klog.Infof("new leader elected: %v", identity)
},
},
})
// because the context is closed, the client should report errors
_, err := client.CoreV1().Endpoints(endpointLockNamespace).Get(endpointLockName, metav1.GetOptions{})
if err == nil || !strings.Contains(err.Error(), "the leader is shutting down") {
log.Fatalf("%s: expected to get an error when trying to make a client call: %v", id, err)
}
// we no longer hold the lease, so perform any cleanup and then
// exit
log.Printf("%s: done", id)
}
然后启动三个进程来竞争一个资源.
go run test.go --id=1
go run test.go --id=2
go run test.go --id=3
运行查看
nicktming
这个namespace
.
[root@master kubectl]# ./kubectl get endpoints -n nicktming
NAME ENDPOINTS AGE
example <none> 5s
[root@master kubectl]# ./kubectl get endpoints -o yaml -n nicktming
apiVersion: v1
items:
- apiVersion: v1
kind: Endpoints
metadata:
annotations:
control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"1","leaseDurationSeconds":60,"acquireTime":"2019-10-16T13:15:07Z","renewTime":"2019-10-16T13:23:22Z","leaderTransitions":0}'
creationTimestamp: "2019-10-16T13:15:07Z"
name: example
namespace: nicktming
resourceVersion: "45526"
selfLink: /api/v1/namespaces/nicktming/endpoints/example
uid: f988a420-f016-11e9-a4ad-525400d54f7e
kind: List
metadata:
resourceVersion: ""
selfLink: ""
[root@master kubectl]#
可以看到该
endpoint
中的annotations
中保存了所有进程要竞争的资源以及谁是当前的leader
.
可以看到
id=1
的进程获得了该资源, 并且已经启动了自己的程序, 而id=2
和id=3
的进程一直在等待. 通过查看它们的日志就可以看到.
// 进程1
[root@worker leaderelection]# go run test.go --id=1
1: leading
// 进程2
[root@worker leaderelection]# go run test.go --id=2
new leader elected: 1
// 进程3
[root@worker leaderelection]# go run test.go --id=3
new leader elected: 1
此时杀死进程1, 然后分别查看进程2和进程3的日志
// 进程1
[root@worker leaderelection]# go run test.go --id=1
1: leading
^CReceived termination, signaling shutdown
1: lost
1: expected to get an error when trying to make a client call: <nil>
[root@worker leaderelection]#
// 进程2
[root@worker leaderelection]# go run test.go --id=2
new leader elected: 1
new leader elected: 3
// 进程3
[root@worker leaderelection]# go run test.go --id=3
new leader elected: 1
3: leading
然后查看资源
[root@master kubectl]# ./kubectl get endpoints -n nicktming
NAME ENDPOINTS AGE
example <none> 14m
[root@master kubectl]# ./kubectl get endpoints -o yaml -n nicktming
apiVersion: v1
items:
- apiVersion: v1
kind: Endpoints
metadata:
annotations:
control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"3","leaseDurationSeconds":60,"acquireTime":"2019-10-16T13:27:16Z","renewTime":"2019-10-16T13:29:56Z","leaderTransitions":1}'
creationTimestamp: "2019-10-16T13:15:07Z"
name: example
namespace: nicktming
resourceVersion: "46112"
selfLink: /api/v1/namespaces/nicktming/endpoints/example
uid: f988a420-f016-11e9-a4ad-525400d54f7e
kind: List
metadata:
resourceVersion: ""
selfLink: ""
锁已经被进程
3
获得, 此时如果进程1再启动的话, 也只能一直等待.
3. 源码分析
代码路径在
client-go/tools/leaderelection
下.
leaderElection.png
LeaderElectionRecord
: 保存leader
的信息.
Interface
: 客户端.
LeaderCallbacks
: 回调函数.
LeaderElectionConfig
: 定义了一些竞争资源的参数.
LeaderElector
: 定义了一些竞争资源的方法.
3.1 Interface接口
LeaderElectionRecord
: 是资源占有者的描述信息, 如上面例子中看到control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"3","leaseDurationSeconds":60,"acquireTime":"2019-10-16T13:27:16Z","renewTime":"2019-10-16T13:29:56Z","leaderTransitions":1}'
中control-plane.alpha.kubernetes.io/leader
为资源, 而资源的占有者信息就是其对应的value
, 也就是一个LeaderElectionRecord
.
Interface
: 中定义了一系列方法, 包括增加修改获取一个LeaderElectionRecord
, 说白了就是一个客户端, 而且每个客户端实例都要有自己分布式唯一的id
.
// tools/leaderelection/resourcelock/interface.go
type LeaderElectionRecord struct {
// 持有者的id 也就是leader的id
HolderIdentity string `json:"holderIdentity"`
// 一个租约多长时间
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
// 获得leader的时间
AcquireTime metav1.Time `json:"acquireTime"`
// 续约的时间
RenewTime metav1.Time `json:"renewTime"`
// leader变更的次数
LeaderTransitions int `json:"leaderTransitions"`
}
type Interface interface {
// 返回当前资源LeaderElectionRecord
Get() (*LeaderElectionRecord, error)
// 创建一个资源LeaderElectionRecord
Create(ler LeaderElectionRecord) error
// 更新资源
Update(ler LeaderElectionRecord) error
// 记录事件
RecordEvent(string)
// 返回当前该应用的id
Identity() string
// 描述信息
Describe() string
}
它有三个实现类, 分别为
EndpointLock
,ConfigMapLock
和LeaseLock
分别可以操作k8s
中的endpoint
,configmap
和lease
. 也就是提供了这三种资源类型. 这里以EndpointLock
为例子说明.
// tools/leaderelection/resourcelock/endpointslock.go
type EndpointsLock struct {
// 必须包括namespace和name
EndpointsMeta metav1.ObjectMeta
// 访问api-server的客户端
Client corev1client.EndpointsGetter
// 该EndpointsLock的分布式唯一身份id
LockConfig ResourceLockConfig
// 当前操作的endpoint
e *v1.Endpoints
}
// tools/leaderelection/resourcelock/interface.go
type ResourceLockConfig struct {
// 分布式唯一id
Identity string
EventRecorder EventRecorder
}
Create
,Update
,Get
方法都是利用client
去访问k8s
的api-server
. 通过这里可以看得更明白, 就是操作EndpointsLock.e.Annotations
中的control-plane.alpha.kubernetes.io/leader
就是在上面的例子中看到的.
// tools/leaderelection/resourcelock/endpointslock.go
func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) {
var record LeaderElectionRecord
var err error
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if el.e.Annotations == nil {
el.e.Annotations = make(map[string]string)
}
if recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found {
if err := json.Unmarshal([]byte(recordBytes), &record); err != nil {
return nil, err
}
}
return &record, nil
}
// Create attempts to create a LeaderElectionRecord annotation
func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: el.EndpointsMeta.Name,
Namespace: el.EndpointsMeta.Namespace,
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: string(recordBytes),
},
},
})
return err
}
另外
interface
还提供了生成各个子类的方法
func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
switch lockType {
case EndpointsResourceLock:
return &EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coreClient,
LockConfig: rlc,
}, nil
...
default:
return nil, fmt.Errorf("Invalid lock-type %s", lockType)
}
}
3.2 LeaderElector
LeaderElector
是一个竞争资源的实体.
type LeaderElectionConfig struct {
// 客户端
Lock rl.Interface
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration
// 需要用户配置的回调函数
Callbacks LeaderCallbacks
WatchDog *HealthzAdaptor
// 判断在cancel的时候如果当前是leader是否需要释放
ReleaseOnCancel bool
Name string
}
LeaderElectionConfig
拥有一个Interface
对象, 以及用户需要配置的回调函数LeaderCallbacks
对象.
关于LeaseDuration
,RenewDeadline
和RetryPeriod
会在方法中介绍.
type LeaderElector struct {
// 用于保存当前应用的一些配置 包括该应用的id等等
config LeaderElectionConfig
// 远程获取的资源 (不一定自己是leader) 所有想竞争此资源的应用获取的是同一份
observedRecord rl.LeaderElectionRecord
// 获取的时间
observedTime time.Time
reportedLeader string
clock clock.Clock
metrics leaderMetricsAdapter
name string
}
这里着重要关注以下几个属性:
config: 该LeaderElectionConfig
对象配置了当前应用的客户端, 以及此客户端的唯一id
等等.
observedRecord: 该LeaderElectionRecord
就是保存着从api-server
中获得的leader
的信息.
observedTime: 获得的时间.
很明显判断当前进程是不是
leader
只需要判断config
中的id
和observedRecord
中的id
是不是一致即可.
func (le *LeaderElector) GetLeader() string {
return le.observedRecord.HolderIdentity
}
// IsLeader returns true if the last observed leader was this client else returns false.
func (le *LeaderElector) IsLeader() bool {
return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
}
3.3 Run
func (le *LeaderElector) Run(ctx context.Context) {
defer func() {
runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading()
}()
// 如果获取失败 那就是ctx signalled done
// 不然即使失败, 该client也会一直去尝试获得leader位置
if !le.acquire(ctx) {
return // ctx signalled done
}
// 如果获得leadership 以goroutine和回调的形式启动用户自己的逻辑方法OnStartedLeading
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx)
// 一直去续约 这里也是一个循环操作
// 如果失去了leadership 该方法才会返回
// 该方法返回 整个Run方法就返回了
le.renew(ctx)
}
1. 该
client
(也就是le
这个实例)首先会调用acquire
方法一直尝试去竞争leadership
. (如果竞争失败, 继续竞争, 不会进入2. 竞争成功, 进入2)
2. 异步启动用户自己的逻辑程序(OnStartedLeading
). 进入3
3. 通过调用renew
方法续约自己的leadership
. 续约成功, 继续续约. 续约失败, 整个Run
就结束了.
3.3.1 acquire
func (le *LeaderElector) maybeReportTransition() {
// 如果没有变化 则不需要更新
if le.observedRecord.HolderIdentity == le.reportedLeader {
return
}
// 更新reportedLeader为最新的leader的id
le.reportedLeader = le.observedRecord.HolderIdentity
if le.config.Callbacks.OnNewLeader != nil {
// 调用当前应用的回调函数OnNewLeader报告新的leader产生
go le.config.Callbacks.OnNewLeader(le.reportedLeader)
}
}
// 一旦获得leadership 立马返回true
// 返回false的唯一情况是ctx signals done
func (le *LeaderElector) acquire(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
// 尝试获得或者更新资源
succeeded = le.tryAcquireOrRenew()
// 有可能会产生新的leader
// 所以调用maybeReportTransition检查是否需要广播新产生的leader
le.maybeReportTransition()
if !succeeded {
// 如果获得leadership失败 则返回后继续竞争
klog.V(4).Infof("failed to acquire lease %v", desc)
return
}
// 自己成为leader
// 可以调用cancel方法退出JitterUntil进而从acquire中返回
le.config.Lock.RecordEvent("became leader")
le.metrics.leaderOn(le.config.Name)
klog.Infof("successfully acquired lease %v", desc)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}
acquire的作用如下:
1. 一旦获得leadership
, 立马返回true
. 否则会隔RetryPeriod
时间尝试一次
2. 一旦有ctx signals done
, 会返回false
这里的逻辑比较简单, 主要的逻辑是在
tryAcquireOrRenew
方法中.
3.3.2 renew
// RenewDeadline=10s RetryPeriod=2s
func (le *LeaderElector) renew(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 每隔RetryPeriod会调用 除非cancel()方法被调用才会退出
wait.Until(func() {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
// 每隔2ms调用该方法直到该方法返回true为止
// 如果超时了也会退出该方法 并且err中有错误信息
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
done := make(chan bool, 1)
go func() {
defer close(done)
done <- le.tryAcquireOrRenew()
}()
select {
case <-timeoutCtx.Done():
return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
case result := <-done:
return result, nil
}
}, timeoutCtx.Done())
// 有可能会产生新的leader 如果有会广播新产生的leader
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
// 如果err == nil, 表明上面PollImmediateUntil中返回true了 续约成功 依然处于leader位置
// 返回后 继续运行wait.Until的逻辑
klog.V(5).Infof("successfully renewed lease %v", desc)
return
}
// err != nil 表明超时了 试的总时间超过了RenewDeadline 失去了leader位置 续约失败
// 调用cancel方法退出wait.Until
le.config.Lock.RecordEvent("stopped leading")
le.metrics.leaderOff(le.config.Name)
klog.Infof("failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())
// if we hold the lease, give it up
if le.config.ReleaseOnCancel {
le.release()
}
}
可以看到该
client
的base
条件是它自己是当前的leader
, 然后来续约操作.
这里来说一下
RenewDeadline
和RetryPeriod
的作用.
每隔RetryPeriod
时间会通过tryAcquireOrRenew
续约, 如果续约失败, 还会进行再次尝试. 一直到尝试的总时间超过RenewDeadline
后该client
就会失去leadership
.
3.3.3 tryAcquireOrRenew
// 竞争或者更新leadership
// 成功返回true 失败返回false
func (le *LeaderElector) tryAcquireOrRenew() bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1. obtain or create the ElectionRecord
// 从client端中获得ElectionRecord
oldLeaderElectionRecord, err := le.config.Lock.Get()
if err != nil {
if !errors.IsNotFound(err) {
// 失败直接退出
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
// 因为没有获取到, 因此创建一个新的进去
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
// 然后设置observedRecord为刚刚加入进去的leaderElectionRecord
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
// 2. Record obtained, check the Identity & Time
// 从远端获取到record(资源)成功存到oldLeaderElectionRecord
// 如果oldLeaderElectionRecord与observedRecord不相同 更新observedRecord
// 因为observedRecord代表是从远端存在Record
// 需要注意的是每个client都在竞争leadership, 而leader一直在续约, leader会更新它的RenewTime字段
// 所以一旦leader续约成功 每个non-leader候选者都需要更新其observedTime和observedRecord
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
le.observedRecord = *oldLeaderElectionRecord
le.observedTime = le.clock.Now()
}
// 如果leader已经被占有并且不是当前自己这个应用, 而且时间还没有到期
// 那就直接返回false, 因为已经无法抢占 时间没有过期
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
// 如果当前服务就是以前的占有者
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
// 如果当前服务不是以前的占有者 LeaderTransitions加1
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// update the lock itself
// 当前client占有该资源 成为leader
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
这里需要注意的是当前
client
不是leader
的时候, 如何去判断一个leader
是否已经expired
了?
le.observedTime.Add(le.config.LeaseDuration).After(now.Time)
le.observedTime:
代表的是获得leader
(截止当前时间为止的最后一次renew)对象的时间.
le.config.LeaseDuration
: 自己(当前client
)获得leadership
需要的等待时间.
le.observedTime.Add(le.config.LeaseDuration)
: 就是自己(当前client
)被允许获得leadership
的时间.
如果
le.observedTime.Add(le.config.LeaseDuration).before(now.Time)
为true
的话, 就表明leader
过期了. 白话文的意思就是从leader
上次续约完, 已经超过le.config.LeaseDuration
的时间没有续约了, 所以被认为该leader
过期了. 把before
换成after
就是表明没有过期.