概述 🔗
k8s 简化了我们在集群上部署、运维应用的流程。在 k8s 上,我们可以很方便地部署一个分布式应用。以 Deployment 为例,应用就由多个 Pod 组成,基于 Pod 的伸缩能力,应用天然就具备应用的高可用性和可扩展性。但在分布式系统中,通常我们需要指定其中一个 Pod 为 leader,负责协调所有 Pod 或执行特定任务。
一般提到 Leader 选举,我们都会想到利用 ZooKeeper 或 Redis 等软件的分布式锁来实现,所有 Pod 不断争抢锁,抢到锁的 Pod 就成为 Leader,并且 Leader 不断进行 “心跳” 对锁进行续期。其他 Pod 依旧不断强锁,保证 之前 Leader 出现异常后能快速选举出新的 Leader。
但使用 ZooKeeper 或 Redis 等,需要我们额外部署一个软件。那在 k8s 中能不能不使用外部软件,实现 Leader 选举呢?
k8s 本身为了保证资源(如 Endpoint、ConfigMap 等)的一致性,在资源的 metadata 中定义了资源版本,并版本信息维护在了分布式存储 etcd 中,通过乐观锁的方式更新资源,保证并发更新资源时,资源只能被更新一次。因此我们也可以利用这个特点,通过在 Pod 内争抢更新某个资源来实现 Leader 选举。
更方便的是,k8s 的 Client 已经封装了 Leader 选举的逻辑,我们直接使用即可。接下来就以 k8s 的 Java Client ( kubernetes-client/java ) 为例,介绍如何实现 Leader 选举。
代码实现 🔗
首先需要引入 Java Client ,以 maven 为例,在 pom.xml
中添加相关依赖:
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>13.0.0</version>
</dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java-extended</artifactId>
<version>13.0.0</version>
</dependency>
client-java
封装了 k8s 的基础 API,要使用 Leader 选举类,还需要引入 client-java-extended
。
下面是一段 Leader 选举的代码示例:
import io.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
import io.kubernetes.client.extended.leaderelection.LeaderElector;
import io.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.util.Config;
import java.net.InetAddress;
import java.time.Duration;
public class LeaderElectionExample {
public static void main(String[] args) throws Exception {
ApiClient client = Config.defaultClient();
Configuration.setDefaultApiClient(client);
String appNamespace = "default";
String appName = "leader-election-ip";
// 使用 IP 作为 Identity
String lockHolderIdentityName = InetAddress.getLocalHost().getHostAddress();
// 创建 ConfigMap 锁
ConfigMapLock lock = new ConfigMapLock(appNamespace, appName, lockHolderIdentityName);
// Leader 选举的配置
LeaderElectionConfig leaderElectionConfig =
new LeaderElectionConfig(lock,
Duration.ofMillis(10000),
Duration.ofMillis(8000),
Duration.ofMillis(2000));
// 初始化 LeaderElector
LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
// 选举 Leader
leaderElector.run(
() -> {
System.out.println("Do something when getting leadership.");
},
() -> {
System.out.println("Do something when losing leadership.");
});
}
}
首先初始化了 ApiClient。如果是本地开发,则默认会使用本地 ${HOME}/.kube/config
中的 kubeconfig 配置来访问集群。
然后创建了一个 ConfigMapLock,该锁就是 Leader 选举时争抢的资源锁,本质上是一个 ConfigMap。在上面代码中,锁的 Identity 是 Pod 的 IP,这样我们就可以根据锁内容判断当前哪个 Pod 是 Leader。除了 ConfigMap,还可以使用 Endpoints 和 Lease 等资源,在 Java Client 中也分别实现了对应的 EndpointsLock 和 LeaseLock。
接下来就是创建一个 LeaderElectionConfig ,该配置主要包含以下属性:
- lock:资源锁,如 ConfigMapLock、EndpointsLock 等;
- leaseDuration:Leader 持有锁的时长;
- renewDeadline:续约时间间隔,每隔一段时间 Leader 就需要对锁进行续约;
- retryPeriod:重试时间间隔,其他 Pod 不断争抢锁的时间间隔。
然后就根据 LeaderElectionConfig 创建一个 LeaderElector 实例,并调用其 run
方法进行抢锁。调用 run
方法后,Pod 会尝试更新锁资源。示例代码中是 ConfigMapLock,所以 Pod 会尝试创建或更新 ConfigMap,一旦更新成功,就会将当前 Pod 的 IP 写入到 ConfigMap 的 metadata.annotations
,写入成功后该 Pod 就成为了 Leader。
下面就是 Leader 选举后的 ConfigMap 示例:
$ kubectl get configmap leader-election-ip -o=json
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"annotations": {
"control-plane.alpha.kubernetes.io/leader": "{\"holderIdentity\":\"127.0.0.1\",\"leaseDurationSeconds\":10,\"acquireTime\":\"2021-09-16T07:28:46.958Z\",\"renewTime\":\"2021-09-16T07:28:53.029Z\",\"leaderTransitions\":0}"
},
"creationTimestamp": "2021-09-16T07:28:47Z",
"name": "leader-election-ip",
"namespace": "default",
"resourceVersion": "2197494621",
"selfLink": "/api/v1/namespaces/default/configmaps/leader-election-ip",
"uid": "d2966aac-5962-403c-9d07-99e0e1c712ed"
}
}
LeaderElector 的 run
方法提供了几个抢锁相关的 hook:
- startLeadingHook:LeaderElector 客户端成为 Leader 后调用;
- stopLeadingHook:LeaderElector 客户端不再是 Leader 后调用;
- onNewLeaderHook:有新 Leader 后调用,其参数是新 Leader 的 Identity;
🔗
在分布式系统中,我们可能只需要 Leader 处理特定任务,这时也可以很方便地判断某个 Pod 是否是 Leader:
// 当前的 Pod IP
String identity = InetAddress.getLocalHost().getHostAddress();
// 创建 ConfigMap 锁
ConfigMapLock lock = new ConfigMapLock(namespace, configMapName, identity);
// 当前的 LeaderElectionRecord
LeaderElectionRecord record = lock.get();
// 当前 Leader 的 Identity
String leaderIdentity = record.getHolderIdentity();
// 如果当前 Leader 的 Identity 等于当前 Pod 的 IP,则表示当前 Pod 是 leader;否则不是
boolean isLeader = leaderIdentity.equals(ip);
整体来看,由于 k8s Client 封装了 Leader 选举的相关逻辑,所以我们使用起来非常简单。关于 LeaderElection 的具体实现,其实也比较简单易懂,就是不断抢锁、续约,感兴趣的可以直接阅读 kubernetes-client/java 的相关源码。
总结 🔗
由此可见,基于 k8s 的 Leader 选举是非常简单方便的,并且运用也非常广泛。比如将 Flink 部署到 k8s 上时,Flink 为了实现高可用就使用了基于 k8s 的 Leader 选举机制,保证同时只有一个 jobmanager 负责任务的调度(即作为 Leader),并且 Leader 出现问题时,其他能候补 Pod 够快速晋升为 Leader。