ZooKeeper选举源码阅读上
1. 选举流程
选举流程2. 源码分析
2.1 源码入口
从QuorumPeerMain的main方法开始
2.2 QuorumPeerMain.initializeAndRun()方法
初始启动方法,解析配置文件到内存、清理快照文件任务,最后启动(如果没有配置文件,也没有定义节点,就启动单机模式)
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
QuorumPeerConfig config = new QuorumPeerConfig();
//从启动参数中,读取配置文件
if (args.length == 1) {
config.parse(args[0]);
}
//开始一个purge任务
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(),
config.getPurgeInterval());
purgeMgr.start();
//有配置文件,从配置文件中加载的参数启动
if (args.length == 1 && config.isDistributed()) {
//主要方法
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
//无配置文件或者没有定义节点,启动单机模式
ZooKeeperServerMain.main(args);
}
}
2.3 runFromConfig()方法
本方法主要做了注册JMX、初始化服务端连接对象、获取本服务节点对象、设置选举类型、初始化内存数据库对象、将上面创建的初始服务连接对象放入本服务节点对象以及最后启动服务节点。
2.3.1 ManagedUtil.registerLog4jMBeans()
本方法注册JMX。
2.3.2 ServerCnxnFactory.createFactory()
这里初始化服务端连接对象。不配置netty启动参数的话,默认用NIO启动,官方推荐用Netty。(-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory)
public static ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
//默认用NIO
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
//通过反射创建实例
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor()
.newInstance();
// 打印通信方式
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e);
throw ioe;
}
}
2.3.3 getQuorumPeer()
获取本服务节点对象
public QuorumPeer() throws SaslException {
super("QuorumPeer");
quorumStats = new QuorumStats(this);
jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
adminServer = AdminServerFactory.createAdminServer();
x509Util = createX509Util();
initialize();
reconfigEnabled = QuorumPeerConfig.isReconfigEnabled();
}
2.3.4 quorumPeer.setElectionType(config.getElectionAlg())
设置选举类型为3,后边会用到这个。
2.3.5 quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())
初始化内存数据库对象
2.3.6 quorumPeer.setCnxnFactory(cnxnFactory)
将上边创建的服务连接对象放入本服务节点对象
2.3.7 quorumPeer.start()
启动服务节点,本方法是重点方法,下边单独分析。
2.4 启动服务节点
启动服务节点一共做了以下几件事:
- 加载文件数据到内存
- 启动netty服务
- 启动默认jetty服务,默认8080端口,用来查看服务端状态信息
- 初始化集群选举leader相关对象数据
- 启动集群选举leader线程
quorumPeer.start()方法整体代码:
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//加载文件数据到内存
loadDataBase();
//启动netty服务
startServerCnxnFactory();
try {
//启动默认jetty服务
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//初始化集群选举leader相关对象数据
startLeaderElection();
startJvmPauseMonitor();
//启动集群选举leader线程
super.start();
}
2.4.1 loadDataBase()
加载文件数据到内存
2.4.2 startServerCnxnFactory()
启动netty服务,最终会调用NettyServerCnxnFactory.start()方法
@Override
public void start() {
if (listenBacklog != -1) {
bootstrap.option(ChannelOption.SO_BACKLOG, listenBacklog);
}
LOG.info("binding to port {}", localAddress);
//绑定端口启动
parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
// 当绑定端口之后,原始端口为0的话,需要更新
// localAddress to get the real port.
localAddress = (InetSocketAddress) parentChannel.localAddress();
LOG.info("bound to port {}", getLocalPort());
}
2.4.3 adminServer.start()
启动默认jetty服务,最终会调用JettyAdminServer.start()方法
public void start() throws AdminServerException {
try {
server.start();
} catch (Exception e) {
// Server.start() only throws Exception, so let's at least wrap it
// in an identifiable subclass
String message = String.format(
"Problem starting AdminServer on address %s, port %d and command URL %s",
address,
port,
commandUrl);
throw new AdminServerException(message, e);
}
LOG.info("Started AdminServer on address {}, port {} and command URL {}", address, port, commandUrl);
}
2.4.4 startLeaderElection()
初始化集群选举leader相关对象数据,重点方法
public synchronized void startLeaderElection() {
try {
// 初始状态为LOOKING,即查找选举状态
if (getPeerState() == ServerState.LOOKING) {
//创建当前选举节点,包括zxid、选举周期、状态
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//根据选举类型,选用不同的选举算法,这里类型是3
this.electionAlg = createElectionAlgorithm(electionType);
}
2.4.4.1 createElectionAlgorithm(electionType)
本方法主要做了三件事:
- 初始化选举管理器
- 启动选举监听
- 启动快速选举算法相关流程
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
//这里最好用工厂方法创建对象,switch太丑陋了
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
// 初始化选举管理器
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
//启动监听
listener.start();
//启动快速选举算法相关流程
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
2.4.4.1.1 createCnxnManager()
初始化选举管理器
2.4.4.1.2 listener.start()
启动选举监听,最终调用QuorumCnxManager.Listener.run()方法
@Override
public void run() {
if (!shutdown) {
LOG.debug("Listener thread started, myId: {}", self.getId());
Set<InetSocketAddress> addresses;
if (self.getQuorumListenOnAllIPs()) {
addresses = self.getElectionAddress().getWildcardAddresses();
} else {
addresses = self.getElectionAddress().getAllAddresses();
}
CountDownLatch latch = new CountDownLatch(addresses.size());
listenerHandlers = addresses.stream().map(address ->
new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch))
.collect(Collectors.toList());
ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
// 调用QuorumCnxManager.ListenerHandler.run()方法
listenerHandlers.forEach(executor::submit);
try {
latch.await();
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. Ignoring exception", ie);
} finally {
// Clean up for shutdown.
for (ListenerHandler handler : listenerHandlers) {
try {
handler.close();
} catch (IOException ie) {
// Don't log an error for shutdown.
LOG.debug("Error closing server socket", ie);
}
}
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error(
"As I'm leaving the listener thread, I won't be able to participate in leader election any longer: {}",
self.getElectionAddress().getAllAddresses().stream()
.map(NetUtils::formatInetAddr)
.collect(Collectors.joining("|")));
if (socketException.get()) {
// After leaving listener thread, the host cannot join the quorum anymore,
// this is a severe error that we cannot recover from, so we need to exit
socketBindErrorHandler.run();
}
}
}
QuorumCnxManager.ListenerHandler.run()方法
@Override
public void run() {
try {
Thread.currentThread().setName("ListenerHandler-" + address);
//接收连接
acceptConnections();
try {
close();
} catch (IOException e) {
LOG.warn("Exception when shutting down listener: ", e);
}
} catch (Exception e) {
// Output of unexpected exception, should never happen
LOG.error("Unexpected error ", e);
} finally {
latch.countDown();
}
}
/**
* Sleeps on accept().
*/
private void acceptConnections() {
int numRetries = 0;
Socket client = null;
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
try {
//创建socket连接
serverSocket = createNewServerSocket();
LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, address.toString());
while (!shutdown) {
try {
client = serverSocket.accept();
setSockOpts(client);
LOG.info("Received connection request from {}", client.getRemoteSocketAddress());
// Receive and handle the connection request
// asynchronously if the quorum sasl authentication is
// enabled. This is required because sasl server
// authentication process may take few seconds to finish,
// this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
// 接收连接
receiveConnection(client);
}
numRetries = 0;
} catch (SocketTimeoutException e) {
LOG.warn("The socket is listening for the election accepted "
+ "and it timed out unexpectedly, but will retry."
+ "see ZOOKEEPER-2836");
}
}
} catch (IOException e) {
if (shutdown) {
break;
}
LOG.error("Exception while listening", e);
if (e instanceof SocketException) {
socketException.set(true);
}
numRetries++;
try {
close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. Ignoring exception", ie);
}
closeSocket(client);
}
}
if (!shutdown) {
LOG.error(
"Leaving listener thread for address {} after {} errors. Use {} property to increase retry count.",
formatInetAddr(address),
numRetries,
ELECTION_PORT_BIND_RETRY);
}
}
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
LOG.debug("Sync handling of connection request received from: {}", sock.getRemoteSocketAddress());
//处理连接消息
handleConnection(sock, din);
} catch (IOException e) {
LOG.error("Exception handling connection, addr: {}, closing server connection", sock.getRemoteSocketAddress());
LOG.debug("Exception details: ", e);
closeSocket(sock);
}
}
下面重点分析handleConnection(sock, din)方法。
2.4.5 handleConnection(sock, din)方法
首先通过protocolVersion = din.readLong()获取发送选票的机器ID。
然后进入选票流程,关键的if判断。
//如果发送选票的Id小于当前Id,关闭连接,zk不允许Id小的机器连接大的
if (sid < self.getId()) {
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
LOG.debug("Create new connection to server: {}", sid);
//先进一个连接
closeSocket(sock);
if (electionAddr != null) {
connectOne(sid, electionAddr);
} else {
connectOne(sid); //当前机器主动发起socket连接发送选票给Id较小的机器
}
} else if (sid == self.getId()) {
// 一般不可能发生,可能是BUG
LOG.warn("We got a connection request from a server with our own ID. "
+ "This should be either a configuration error, or a bug.");
} else { 开启接收选票线程
// 创建发送任务
SendWorker sw = new SendWorker(sock, sid);
创建接收任务
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();
rw.start();
}
RecvWorker.run()方法
@Override
public void run() {
threadCnt.incrementAndGet();
try {
LOG.debug("RecvWorker thread towards {} started. myId: {}", sid, QuorumCnxManager.this.mySid);
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException("Received packet with invalid packet: " + length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
final byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
//将接收到的选票放入队列中异步处理
addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
}
} catch (Exception e) {
LOG.warn(
"Connection broken for id {}, my id = {}",
sid,
QuorumCnxManager.this.mySid,
e);
} finally {
LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
sw.finish();
closeSocket(sock);
}
}
SendWorker.run()方法
@Override
public void run() {
threadCnt.incrementAndGet();
try {
/**
* If there is nothing in the queue to send, then we
* send the lastMessage to ensure that the last message
* was received by the peer. The message could be dropped
* in case self or the peer shutdown their connection
* (and exit the thread) prior to reading/processing
* the last message. Duplicate messages are handled correctly
* by the peer.
*
* If the send queue is non-empty, then we have a recent
* message than that stored in lastMessage. To avoid sending
* stale message, we should send the message in the send queue.
*/
//取出发送选票队列
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
// 如果发送队列里为空,但是最近一次消息Map不为空,就发送消息
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid={}", sid);
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
LOG.debug("SendWorker thread started towards {}. myId: {}", sid, QuorumCnxManager.this.mySid);
try {
//while循环发送选票
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
//取出发送选票队列
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for server {}", sid);
break;
}
if (b != null) {
//存储到最近一次消息Map里
lastMessageSent.put(sid, b);
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue", e);
}
}
} catch (Exception e) {
LOG.warn(
"Exception when using channel: for id {} my id = {}",
sid ,
QuorumCnxManager.this.mySid,
e);
}
this.finish();
LOG.warn("Send worker leaving thread id {} my id = {}", sid, self.getId());
}
2.4.6 快速选举流程
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
调用FastLeaderElection.messenger.start()方法
/**
* This method starts the sender and receiver threads.
*/
public void start() {
this.messenger.start();
}
再看this.messenger.start()方法
/**
* Starts instances of WorkerSender and WorkerReceiver
*/
void start() {
//运行发送选票流程
this.wsThread.start();
//运行接收选票流程
this.wrThread.start();
}
2.4.6.1 发送选票流程
下次再讲这个方法
2.4.6.2 接收选票流程
下次再讲这个方法
2.4.7 super.start()
启动集群选举leader线程,实际上是执行QuorumPeer.run()方法,下次再讲这个方法。