Flink 源码之leader选举(Zookeeper方式)
Flink 系列博客
Flink QuickStart
Flink 双流操作
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任务提交操作
Flink 配置Prometheus监控
Flink in docker 部署
Flink HA 部署
Flink 常见调优参数总结
Flink 源码之任务提交流程分析
Flink 源码之基本算子
Flink 源码之Trigger
Flink 源码之Evictor
Flink 源码之Window
Flink 源码之WindowOperator
Flink 源码之StreamGraph生成
Flink 源码之JobGraph生成
Flink 源码之两阶段提交
Flink 源码之分布式快照
Flink 源码之时间处理
Flink 源码之节点间通信
Flink 源码之Credit Based反压
Flink 源码之快照
Flink 源码之FlinkKafkaConsumer
Flink 源码之内存管理
Flink 源码之 1.11新特性Unaligned checkpoint
Flink 源码之TaskManager启动流程
ZooKeeperHaServices
ZooKeeperHaServices
包装了使用Zookeeper方式实现的Flink一系列高可用服务,比如JobManager,ResourceManager和Dispatcher等leader的选举操作。
ZooKeeperHaServices
通过HighAvailabilityServicesUtils
创建,例如在TaskManagerRunner
创建的相关调用如下:
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
createHighAvailabilityServices
是通过工厂的方式,根据configuration
中的配置,创建不同的HighAvailabilityServices
。
下面我们回到ZooKeeperHaServices
。它提供了一系列创建LeaderElectionService
和LeaderRetrievalService
的方法:
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
public LeaderRetrievalService getDispatcherLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
return getJobManagerLeaderRetriever(jobID);
}
@Override
public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH);
}
@Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);
}
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
}
@Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH);
}
下一章我们重点关注Job Manager选举服务。我们找到对应服务的创建方法,如下所示:
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
}
这里使用了ZooKeeperUtils
工具类,它的createLeaderElectionService
方法代码如下:
public static ZooKeeperLeaderElectionService createLeaderElectionService(
final CuratorFramework client,
final Configuration configuration,
final String pathSuffix) {
final String latchPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;
final String leaderPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
}
这段代码读取了配置中LATCH_PATH
和LEADER_PATH
的配置信息,创建出了一个ZooKeeperLeaderElectionService
。这些配置的具体含义放在下一章分析。
LeaderElectionService
Flink有多种角色需要参与leader竞选,例如JobManager和ResourceManager,与此同时还支持多种leader竞选的方式比如Zookeeper和Standalone等。如果这些逻辑强耦合在一起,需要编写大量实现,维护起来十分困难。
为了解决这个问题,Flink将leader选举模块拆分,提供了两个接口:LeaderContender
和LeaderElectionService
。所有参与leader竞选的角色都需要实现LeaderContender
接口,例如JobManagerRunnerImpl
和ResourceManager
等。所有leader竞选的方式需要实现LeaderElectionService
,例如ZookeeperLeaderElectionService
,StandaloneLeaderElectionService
和EmbeddedLeaderElectionService
。
LeaderContender
接口和各个方法的用途说明如下所示:
public interface LeaderContender {
/**
* Callback method which is called by the {@link LeaderElectionService} upon selecting this
* instance as the new leader. The method is called with the new leader session ID.
*
* @param leaderSessionID New leader session ID
*/
// 回调函数,如果被选举为leader,LeaderElectionService会调用此方法
void grantLeadership(UUID leaderSessionID);
/**
* Callback method which is called by the {@link LeaderElectionService} upon revoking the
* leadership of a former leader. This might happen in case that multiple contenders have
* been granted leadership.
*/
// 回调函数,leader角色被收回的时候调用
void revokeLeadership();
/**
* Callback method which is called by {@link LeaderElectionService} in case of an error in the
* service thread.
*
* @param exception Caught exception
*/
// leader选举服务发生异常的时候调用
void handleError(Exception exception);
/**
* Returns the description of the {@link LeaderContender} for logging purposes.
*
* @return Description of this contender.
*/
default String getDescription() {
return "LeaderContender: " + getClass().getSimpleName();
}
}
LeaderElectionService
接口和各个方法的用途说明如下所示:
public interface LeaderElectionService {
/**
* Starts the leader election service. This method can only be called once.
*
* @param contender LeaderContender which applies for the leadership
* @throws Exception
*/
// 启动leader选举服务,需要把参与竞选的角色作为参数传入
// 只能被调用一次
void start(LeaderContender contender) throws Exception;
/**
* Stops the leader election service.
* @throws Exception
*/
// 停止leader选举服务
void stop() throws Exception;
/**
* Confirms that the {@link LeaderContender} has accepted the leadership identified by the
* given leader session id. It also publishes the leader address under which the leader is
* reachable.
*
* <p>The rational behind this method is to establish an order between setting the new leader
* session ID in the {@link LeaderContender} and publishing the new leader session ID as well
* as the leader address to the leader retrieval services.
*
* @param leaderSessionID The new leader session ID
* @param leaderAddress The address of the new leader
*/
// 确认LeaderContender已经接收了leader身份
// 同时也会公布leader的访问地址
// 这个方法的实现需要包含:
// 1. 设置新leader的id
// 2. 向leader获取服务公布新leader的session id和访问地址
void confirmLeadership(UUID leaderSessionID, String leaderAddress);
/**
* Returns true if the {@link LeaderContender} with which the service has been started owns
* currently the leadership under the given leader session id.
*
* @param leaderSessionId identifying the current leader
*
* @return true if the associated {@link LeaderContender} is the leader, otherwise false
*/
// 如果具有leader身份,返回true
boolean hasLeadership(@Nonnull UUID leaderSessionId);
}
ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService
是LeaderElectionService
的一个实现,使用Zookeeper协助选举leader的过程。
ZooKeeperLeaderElectionService
借助Curator框架的LeaderLatch
,实现leader选举操作和通知回调操作。更为详细的内容参见Curator leader选举。
ZooKeeperLeaderElectionService
使用如下三个成员变量记录leader选举的状态:
private volatile UUID issuedLeaderSessionID;
private volatile UUID confirmedLeaderSessionID;
private volatile String confirmedLeaderAddress;
其中:
- issuedLeaderSessionID(提议的leader session id)是leader状态发生变更的时候,会产生一个新的session id,将这个id写入issuedLeaderSessionID,到这一步需要LeaderContender的授予leader身份操作,授权成功之后才会更新confirmedLeaderSessionID
- LeaderContender在leader选举获得leader身份的时候,通常需要进行一些额外的操作。这些操作完成之后才能确认leader的选举操作完成,才能向集群公布新leader的地址和session id信息
在使用ZooKeeperLeaderElectionService
之前需要启动。启动方法(start)的逻辑如下:
@Override
// start方法必须要传入LeaderContender
public void start(LeaderContender contender) throws Exception {
Preconditions.checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");
LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);
synchronized (lock) {
// 设置错误监听器
client.getUnhandledErrorListenable().addListener(this);
// 设置leaderContender
// 需要参与leader竞选的角色需要实现LeaderContender接口
leaderContender = contender;
// 设置leaderLatch的监听器,路径为latchPath
leaderLatch.addListener(this);
// 启动leaderLatch
leaderLatch.start();
// 设置NodeCache的监听器,路径为leaderPath
cache.getListenable().addListener(this);
// 启动NodeCache
cache.start();
// 设置Curator连接状态的监听器
client.getConnectionStateListenable().addListener(listener);
// 设置运行状态为正在运行
running = true;
}
}
接收LeaderLatch
回调通知的class需要继承LeaderLatchListener
接口。ZooKeeperLeaderElectionService
中这两个方法的实现如下:
@Override
// 如果获取到leader状态,回调此方法
public void isLeader() {
synchronized (lock) {
if (running) {
// 创建一个leader 选举提议session id,采用UUID
issuedLeaderSessionID = UUID.randomUUID();
// 清除确认的leader信息
clearConfirmedLeaderInformation();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Grant leadership to contender {} with session ID {}.",
leaderContender.getDescription(),
issuedLeaderSessionID);
}
// 告诉leaderContender,授予leader角色
// leaderContender在确认了授予leader角色和进行完相关操作之后,需要调用confirmLeadership方法,确认新的leader已选举完毕
leaderContender.grantLeadership(issuedLeaderSessionID);
} else {
LOG.debug("Ignoring the grant leadership notification since the service has " +
"already been stopped.");
}
}
}
@Override
// 如果失去了leader状态,调用此方法
public void notLeader() {
synchronized (lock) {
// 如果ZooKeeperLeaderElectionService正在运行
if (running) {
LOG.debug(
"Revoke leadership of {} ({}@{}).",
leaderContender.getDescription(),
confirmedLeaderSessionID,
confirmedLeaderAddress);
// 清空issuedLeaderSessionID
issuedLeaderSessionID = null;
// 清除确认的leader信息
clearConfirmedLeaderInformation();
// 告诉leaderContender,收回leader角色
leaderContender.revokeLeadership();
} else {
LOG.debug("Ignoring the revoke leadership notification since the service " +
"has already been stopped.");
}
}
}
监听NodeCache的变化需要实现NodeCacheListener
接口。ZooKeeperLeaderElectionService
的实现方法如下所示。它的作用为当获取到leader身份的时候,将leader信息写入Zookeeper
// 如果Node内容有变化,调用此方法
@Override
public void nodeChanged() throws Exception {
try {
// leaderSessionID is null if the leader contender has not yet confirmed the session ID
// 如果竞选leader成功
if (leaderLatch.hasLeadership()) {
synchronized (lock) {
if (running) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Leader node changed while {} is the leader with session ID {}.",
leaderContender.getDescription(),
confirmedLeaderSessionID);
}
// 如果保存的有已确认的leader sessionID
if (confirmedLeaderSessionID != null) {
// 读取NodeCache的内容
ChildData childData = cache.getCurrentData();
// 如果NodeCache没有内容
if (childData == null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Writing leader information into empty node by {}.",
leaderContender.getDescription());
}
// 写入leader的信息
writeLeaderInformation();
} else {
// 如果NodeCache有内容,把内容读取出来
byte[] data = childData.getData();
// 如果内容为空或者是长度为0
if (data == null || data.length == 0) {
// the data field seems to be empty, rewrite information
if (LOG.isDebugEnabled()) {
LOG.debug(
"Writing leader information into node with empty data field by {}.",
leaderContender.getDescription());
}
// 写入leader信息
writeLeaderInformation();
} else {
// 如果有数据的话
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais);
// 读取leader的地址和leader sessionID
String leaderAddress = ois.readUTF();
UUID leaderSessionID = (UUID) ois.readObject();
// 如果leader地址和已确认的leader地址不同,需要写入leader信息
if (!leaderAddress.equals(confirmedLeaderAddress) ||
(leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {
// the data field does not correspond to the expected leader information
if (LOG.isDebugEnabled()) {
LOG.debug(
"Correcting leader information by {}.",
leaderContender.getDescription());
}
writeLeaderInformation();
}
}
}
}
} else {
LOG.debug("Ignoring node change notification since the service has already been stopped.");
}
}
}
} catch (Exception e) {
leaderContender.handleError(new Exception("Could not handle node changed event.", e));
throw e;
}
}
上面的逻辑中多次调用了writeLeaderInformation
方法。此方法负责将leader的信息(地址和session id)写入到Zookeeper。
protected void writeLeaderInformation() {
// this method does not have to be synchronized because the curator framework client
// is thread-safe
// curator是线程安全的,因此这里不需要设置为synchronized
try {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Write leader information: Leader={}, session ID={}.",
confirmedLeaderAddress,
confirmedLeaderSessionID);
}
// 将leader的地址和session id写入baos
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeUTF(confirmedLeaderAddress);
oos.writeObject(confirmedLeaderSessionID);
oos.close();
// 标记leader信息是否已写入
boolean dataWritten = false;
// 如果leader信息没有写入,并且竞选获得了leader权限
while (!dataWritten && leaderLatch.hasLeadership()) {
// 获取leaderPath这个Node的状态
Stat stat = client.checkExists().forPath(leaderPath);
if (stat != null) {
// 获取临时节点的创建者
long owner = stat.getEphemeralOwner();
获取当前会话的ID
long sessionID = client.getZookeeperClient().getZooKeeper().getSessionId();
// 如果这个临时节点是当前会话创建的
if (owner == sessionID) {
try {
// 将baos的数据(leader地址和session id)写入节点
client.setData().forPath(leaderPath, baos.toByteArray());
// 标记data已经写入
dataWritten = true;
} catch (KeeperException.NoNodeException noNode) {
// node was deleted in the meantime
}
} else {
try {
// 如果这个节点不是当前会话创建的,删除此节点
client.delete().forPath(leaderPath);
} catch (KeeperException.NoNodeException noNode) {
// node was deleted in the meantime --> try again
}
}
} else {
try {
// 如果该节点不存在
// 创建一个临时节点,数据为baos中的数据
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
leaderPath,
baos.toByteArray());
// 标记data已经写入
dataWritten = true;
} catch (KeeperException.NodeExistsException nodeExists) {
// node has been created in the meantime --> try again
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Successfully wrote leader information: Leader={}, session ID={}.",
confirmedLeaderAddress,
confirmedLeaderSessionID);
}
} catch (Exception e) {
leaderContender.handleError(
new Exception("Could not write leader address and leader session ID to " +
"ZooKeeper.", e));
}
}
除了以上的接口方法外,ZooKeeperLeaderElectionService
还是curator client连接状态的监听器(实现了UnhandledErrorListener)。如果client连接出现问题,unhandledError
方法会被调用:
@Override
public void unhandledError(String message, Throwable e) {
// 如果遇到异常,告知leaderContender
leaderContender.handleError(new FlinkException("Unhandled error in ZooKeeperLeaderElectionService: " + message, e));
}
ZooKeeperLeaderRetrievalService
经过上面的分析我们已经清楚了Leader选举的过程。但是有一个问题,Flink的各个角色是怎么知道leader信息的呢?
为了解决这个问题,Flink引入了LeaderRetrievalService
。这个Service负责获取当前leader的相关信息。LeaderRetrievalService
持有一个LeaderRetrievalListener
对象,用于等leader发生变更的时候发出通知。
ZooKeeperLeaderRetrievalService
为LeaderRetrievalService
的一个实现方式,从Zookeeper中读取leader的信息并通知listener。
ZooKeeperLeaderRetrievalService
在使用之前,需要先启动它。start
方法的内容如下所示。主要进行一些初始化工作。
@Override
// 这里需要传入一个LeaderRetrievalListener对象
public void start(LeaderRetrievalListener listener) throws Exception {
Preconditions.checkNotNull(listener, "Listener must not be null.");
Preconditions.checkState(leaderListener == null, "ZooKeeperLeaderRetrievalService can " +
"only be started once.");
LOG.info("Starting ZooKeeperLeaderRetrievalService {}.", retrievalPath);
synchronized (lock) {
// 记录下listener
leaderListener = listener;
// 设置client错误监听器
client.getUnhandledErrorListenable().addListener(this);
// 设置NodeCache监听器,node路径为retrievalPath,和leader选举服务中保存leader信息的node路径一致
cache.getListenable().addListener(this);
// 启动NodeCache
cache.start();
// 设置client连接状态监听器
client.getConnectionStateListenable().addListener(connectionStateListener);
running = true;
}
}
通知listener的调用时机在curator间听到NodeCache发生变化的时候,和ZooKeeperLeaderElectionService
相同,这段逻辑位于nodeChanged
方法中,如下所示:
@Override
public void nodeChanged() throws Exception {
synchronized (lock) {
if (running) {
try {
LOG.debug("Leader node has changed.");
// 读取NodeCache的内容
ChildData childData = cache.getCurrentData();
String leaderAddress;
UUID leaderSessionID;
if (childData == null) {
leaderAddress = null;
leaderSessionID = null;
} else {
byte[] data = childData.getData();
if (data == null || data.length == 0) {
leaderAddress = null;
leaderSessionID = null;
} else {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais);
// 读取leader地址和session id
leaderAddress = ois.readUTF();
leaderSessionID = (UUID) ois.readObject();
}
}
// 通知新的leader地址
notifyIfNewLeaderAddress(leaderAddress, leaderSessionID);
} catch (Exception e) {
leaderListener.handleError(new Exception("Could not handle node changed event.", e));
throw e;
}
} else {
LOG.debug("Ignoring node change notification since the service has already been stopped.");
}
}
}
我们接下来分析下notifyIfNewLeaderAddress
方法。该方法最终通知了listener leader信息变更。
@GuardedBy("lock")
private void notifyIfNewLeaderAddress(String newLeaderAddress, UUID newLeaderSessionID) {
// 新老leader address或者是session id不同的时候,才会通知listener
if (!(Objects.equals(newLeaderAddress, lastLeaderAddress) &&
Objects.equals(newLeaderSessionID, lastLeaderSessionID))) {
if (newLeaderAddress == null && newLeaderSessionID == null) {
LOG.debug("Leader information was lost: The listener will be notified accordingly.");
} else {
LOG.debug(
"New leader information: Leader={}, session ID={}.",
newLeaderAddress,
newLeaderSessionID);
}
lastLeaderAddress = newLeaderAddress;
lastLeaderSessionID = newLeaderSessionID;
// 调用listener的notifyLeaderAddress方法
leaderListener.notifyLeaderAddress(newLeaderAddress, newLeaderSessionID);
}
}
本文为原创内容,欢迎大家讨论、批评指正与转载。转载时请注明出处。