kubernetes leader election 源码分析

最近在写一个 kubernetes 的 addon,原理与 controller-manager 差不多。考虑到 addon 的高可用性需要将 kubernetes 源码包中的 leaderelection 包(k8s.io/kubernetes/pkg/client/leaderelection)移植到 client-go 中供调用,于是顺带分析了下这部分源码。

源码版本为 kubernetes release-1.5

controller-manager 与 scheduler 在启动的时候均可以添加--leader-elect参数以实现高可用性。原理大致与 HDFS 选主类似:抢占锁节点成为 leader,定时心跳保持 alive,leader 故障触发锁节点失效条件,non-leader 发起新一轮 leader election。

对于 HDFS 选主来说,锁节点指的是 Zookeeper 特定位置创建的一个临时节点ActiveStandbyElectorLock。任一 NameNode 率先创建该节点并写入自身信息即成为 Active NameNode(Zookeeper 的 API 与一致性保证了同一时间只会有一个 NameNode 创建该节点成功),剩余的 NameNode 成为 Standby NameNode,并在该临时节点上注册 Watcher。当 Active NameNode 因为某些原因跪了之后,考虑到临时 session 节点的特性,该节点会被删除,Standby NameNode 通过 Watcher 接收到节点删除事件,发起新一轮选举。

对于 k8s 的选主来说,锁节点指的是 kube-system 命名空间下的同名 endpoint。任一 goroutine 如果能成功在该 ep 的 annotation 中留下自身记号即成为 leader。成为 leader 后会定时续约,更新 annotation 中的相关过期时间戳。non-leader 们会定时去获取该 ep 的 annotation,若发现过期等情况则进行抢占。

leader 凭证主要有锁和租约两种。以我的理解,k8s 的 leader 凭证算是锁这一类的,强调主动性,主动去抢占写 ep 的 annotation。而 lease 租约更多是一种由某个机构主动颁发 lease,candidate 被动获得租约并成为 leader 的过程。

client 启动参数

controller-manager 及 scheduler 中与 leader election 相关的启动参数有以下四个:

  • leader-elect: 是否开启选举功能
  • leader-elect-lease-duration: 锁的失效时间,类似于 session-timeout
  • leader-elect-renew-deadline: leader 的心跳间隔,必须小于等于 lease-duration
  • leader-elect-retry-period: non-leader 每隔 retry-period 尝试获取锁

ResourceLock 锁结构

1
2
3
4
5
6
7
8
9
10
11
12
13
// k8s.io/kubernetes/pkg/client/leaderelection/resourcelock/interface.go
const LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"
type LeaderElectionRecord struct {
// leader 标识,通常为 hostname
HolderIdentity string `json:"holderIdentity"`
// 同启动参数 --leader-elect-lease-duration
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
// Leader 第一次成功获得租约时的时间戳
AcquireTime unversioned.Time `json:"acquireTime"`
// leader 定时 renew 的时间戳
RenewTime unversioned.Time `json:"renewTime"`
LeaderTransitions int `json:"leaderTransitions"`
}

leader 会在 kube-system namespace 下的同名 endpoint 的 annotations 中写入 key 值为LeaderElectionRecordAnnotationKey,value 为LeaderElectionRecord类型的记录,表明自己身份的同时会根据 –leader-elect-renew-deadline 参数定期去更新记录中的 RenewTime 字段(续约,合同年限为 LeaseDurationSeconds)。

lease 租约在分布式系统中的很多地方都有应用,如分布式锁、一致性问题等。租约机制可以一定程度上避免双主问题,确保同一时刻最多只有一个 leader。

1
2
3
4
5
6
7
8
9
10
11
// k8s.io/kubernetes/pkg/client/leaderelection/resourcelock/interface.go
type Interface interface {
// 获取、创建、更新 annotations 中的选举记录
Get() (*LeaderElectionRecord, error)
Create(ler LeaderElectionRecord) error
Update(ler LeaderElectionRecord) error
RecordEvent(string)
Identity() string
// endpoint namespace/name
Describe() string
}

k8s 中的选举锁需实现 resourcelock.Interface 接口,1.5 版本中实现的只有 endspointslock,不过貌似最近有 configmaplock 的 PR。endpointslock 实现该接口以实现 annotations 中 LeaderElectionRecord 的获取、创建、更新等功能,代码实现较为简单,不作详述。

LeaderElection 流程

LeaderElector

1
2
3
4
5
6
7
8
9
10
// k8s.io/kubernetes/pkg/client/leaderelection/leaderelection.go
type LeaderElector struct {
// 配置,基本同启动参数
config LeaderElectionConfig
// 租约缓存
observedRecord rl.LeaderElectionRecord
// 观察到租约缓存时的时间戳,用以判断租约是否到期
observedTime time.Time
...
}

Leader Election Run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// k8s.io/kubernetes/pkg/client/leaderelection/leaderelection.go
func (le *LeaderElector) Run() {
defer func() {
runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading()
}()
// 获取租约,若获取不到则陷入定时循环获取租约,不执行下一条语句
le.acquire()
stop := make(chan struct{})
// 成功获得租约,调用回调函数执行 leader goroutine 要干的活儿
go le.config.Callbacks.OnStartedLeading(stop)
// 定时循环刷新租约,若刷新失败则说明被抢占了 leader,退出循环
le.renew()
// 已不再是 leader,关闭 stop chan,停止干活儿
close(stop)
}

acruire && renew

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// k8s.io/kubernetes/pkg/client/leaderelection/leaderelection.go
// 定时尝试获取租约,成功获得租约则立即退出函数
func (le *LeaderElector) acquire() {
// 创建 stop chan,若成功获得租约,则关闭 stop chan,退出循环
stop := make(chan struct{})
wait.JitterUntil(func() {
succeeded := le.tryAcquireOrRenew()
le.maybeReportTransition()
desc := le.config.Lock.Describe()
// 未能成功获得租约,进入下一定时循环
if !succeeded {
glog.V(4).Infof("failed to renew lease %v", desc)
return
}
le.config.Lock.RecordEvent("became leader")
glog.Infof("sucessfully acquired lease %v", desc)
close(stop)
}, le.config.RetryPeriod, JitterFactor, true, stop)
}

// 定时尝试续约,续约失败则立即退出函数
func (le *LeaderElector) renew() {
stop := make(chan struct{})
wait.Until(func() {
err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) {
return le.tryAcquireOrRenew(), nil
})
le.maybeReportTransition()
desc := le.config.Lock.Describe()
// 续约成功,进入下一定时循环
if err == nil {
glog.V(4).Infof("succesfully renewed lease %v", desc)
return
}
le.config.Lock.RecordEvent("stopped leading")
// 续约失败,关闭 stop chan,退出循环
glog.Infof("failed to renew lease %v", desc)
close(stop)
}, 0, stop)
}

tryAcquireOrRenew

tryAcquireOrRenew 函数尝试获取租约,如果获取不到或者得到的租约已过期则尝试抢占,否则 leader 不变。函数返回 True 说明本 goroutine 已成功抢占到锁,获得租约合同,成为 leader。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// k8s.io/kubernetes/pkg/client/leaderelection/leaderelection.go
func (le *LeaderElector) tryAcquireOrRenew() bool {
// 创建 leader election 租约
now := unversioned.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}

// 1. 从 endpointslock 上获取 leader election 租约(kube-system 下同名 endpoint 的 annotation)
oldLeaderElectionRecord, err := le.config.Lock.Get()
if err != nil {
// 发生 error,且 error 是除租约不存在以外的其它错误
if !errors.IsNotFound(err) {
glog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
// 租约存在
// 于是将函数一开始创建的 leader election 租约放入同名 endpoint 的 annotation 中
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
// 创建失败,函数返回 false
glog.Errorf("error initially creating leader election record: %v", err)
return false
}
// 创建成功,成为 leader,函数返回 true
le.observedRecord = leaderElectionRecord
le.observedTime = time.Now()
return true
}

// 2. 更新本地缓存的租约,并更新观察时间戳,用来判断租约是否到期
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
le.observedRecord = *oldLeaderElectionRecord
le.observedTime = time.Now()
}
// leader 的租约尚未到期,自己暂时不能抢占它,函数返回 false
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
oldLeaderElectionRecord.HolderIdentity != le.config.Lock.Identity() {
glog.Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}

// 3. 租约到期,而 leader 身份不变,因此当年获得租约的时间戳 AcquireTime 保持不变
if oldLeaderElectionRecord.HolderIdentity == le.config.Lock.Identity() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
} else {
// 租约到期,leader 易主,transtions+1 说明 leader 更替了
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}

// 尝试去更新租约记录
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
// 更新失败,函数返回 false
glog.Errorf("Failed to update lock: %v", err)
return false
}
// 更新成功,函数返回 true
le.observedRecord = leaderElectionRecord
le.observedTime = time.Now()
return true
}

kube-controller-manager 选举相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
// 启动时在 kube-system namespace 下创建一个同名的 endpoint 作为 endpointslock
rl := resourcelock.EndpointsLock{
EndpointsMeta: api.ObjectMeta{
Namespace: "kube-system",
Name: "kube-controller-manager",
},
Client: leaderElectionClient,
LockConfig: resourcelock.ResourceLockConfig{
// id = os.Hostname()
Identity: id,
EventRecorder: recorder,
},
}
// 开启选举流程
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: &rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
// 成为 leader 后调用 run 函数启动各 controller goroutine
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
panic("unreachable")

kube-scheduler 的相关源码基本类似,仅仅是 endpointslock 的名字改成 kube-scheduler,Callbacks 中的 run 函数改成了 scheduler 的启动函数 scheduler.Run(),启动 goroutine 执行调度流程。

从 leaderelection.go 源代码文件最上面的注释可以看到,这种选举的实现并不能保证不会出现脑裂的情况,即可能在某一时间点会有多个 client 同时扮演 leader 角色。client 是利用本地的时间戳去推测选举情况的,可以通过适当调整 RenewDeadline 与 LeaseDuration 或者架设 ntp 来解决。