Zookeeper源码分析:Leader角色初始化

news/2024/7/5 2:56:05

参考资料

<<从PAXOS到ZOOKEEPER分布式一致性原理与实践>>
zookeeper-3.0.0

Leader角色初始化

在上文的选举完成之后,每个zk实例都会根据选举结果进入对应的角色,本文主要就是讲述Leader的初始化相关内容。

Leader初始化流程

case LEADING:LOG.info("LEADING");try {setLeader(makeLeader(logFactory));                          // 设置成主状态leader.lead();                                              // 接听所有事件请求setLeader(null);                                            // 如果失去当前主  则将主设置为空} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {                                       // 设置为空并重置状态leader.shutdown("Forcing shutdown");setLeader(null);}setPeerState(ServerState.LOOKING);}break;

在角色进入到LEADING时, 此时就会进入生产一个leader实例并调用该leader实例的lead方法进入主角色开始执行。首先查看makeLeader方法。

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {return new Leader(this, new LeaderZooKeeperServer(logFactory,this,new ZooKeeperServer.BasicDataTreeBuilder()));}

此时就是初始化了一个新的Leader类并传入QuormPeer实例并初始化了一个LeaderZooKeeperServer实例。

该类的初始化方法如下;

    Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {this.self = self;try {ss = new ServerSocket(self.getQuorumAddress().getPort());           // 监听一下本地服务端口} catch (BindException e) {LOG.error("Couldn't bind to port "+ self.getQuorumAddress().getPort());throw e;}this.zk=zk;}

主要就是保存对应的实例并监听本地的端口。接着就执行了该类的lead方法。

leader.lead方法
    void lead() throws IOException, InterruptedException {self.tick = 0;                                                              // 计数置零zk.loadData();                                                              // zk加载数据 主要就是将会话删除旧的恢复可用的zk.startup();                                                               // zk创建会话 并注册调用链处理函数long epoch = self.getLastLoggedZxid() >> 32L;                               // 获取epoch 值 并加1epoch++;zk.setZxid(epoch << 32L);                                                   // 设置zxid值zk.dataTree.lastProcessedZxid = zk.getZxid();                               // 获取最后一次提交事物idlastProposed = zk.getZxid();newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),null, null);                                                        // 生成一个新leader的包if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {              // 判断值是否为0LOG.warn("NEWLEADER proposal has Zxid of "+ newLeaderProposal.packet.getZxid());}outstandingProposals.add(newLeaderProposal);                                // 添加事务// Start thread that waits for connection requests from // new followers.cnxAcceptor = new FollowerCnxAcceptor();                                    // 开启线程接受follower的信息cnxAcceptor.start();// We have to get at least a majority of servers in sync with// us. We do this by waiting for the NEWLEADER packet to get// acknowledgednewLeaderProposal.ackCount++;                                               // ack统计 默认包括自己 所有先加1while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {         // 检查回复是否大于集群总数的一半if (self.tick > self.initLimit) {                                       // 检查tick是否超过限制次数// Followers aren't syncing fast enough,// renounce leadership!shutdown("Waiting for " + (self.quorumPeers.size() / 2)             // 超过限制次数 则停止并返回  并继续进行选举+ " followers, only synced with "+ newLeaderProposal.ackCount);if (followers.size() >= self.quorumPeers.size() / 2) {              LOG.warn("Enough followers present. "+"Perhaps the initTicks need to be increased.");}return;}Thread.sleep(self.tickTime);                                            // 休眠self.tick++;                                                            // 增加tick 值}if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {    // 如果获取类熟属性 如果不是leaderServes则设置zkself.cnxnFactory.setZooKeeperServer(zk);}// Everything is a go, simply start counting the ticks// WARNING: I couldn't find any wait statement on a synchronized// block that would be notified by this notifyAll() call, so// I commented it out//synchronized (this) {//    notifyAll();//}// We ping twice a tick, so we only update the tick every other// iterationboolean tickSkip = true;while (true) {Thread.sleep(self.tickTime / 2);                                        // 休眠一半的tickTime时间if (!tickSkip) { self.tick++;}int syncedCount = 0;// lock on the followers when we use it.synchronized (followers) {                                              // 获取所有的followers并发送synced请求for (FollowerHandler f : followers) {if (f.synced()) {syncedCount++;}f.ping();                                                       // 发送ping请求}}if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {           // 检查是否获得半数以上的回复 如果没有则停止并重新进入选举流程// Lost quorum, shutdownshutdown("Only " + syncedCount + " followers, need "+ (self.quorumPeers.size() / 2));// make sure the order is the same!// the leader goes to lookingreturn;}tickSkip = !tickSkip;}}

lead方法,主要就是先加载会话相关的数据,然后再注册请求过来的调用链处理函数;在完成之后就进入等待,等待followers发来的确认消息,当获得的响应数超过一半时,就跳出等待;然后就定时检查followers的周期是否超时,并且是否存活,定时给followers发送ping消息。

FollowerCnxAcceptor获取followers的响应
class FollowerCnxAcceptor extends Thread{private volatile boolean stop = false;@Overridepublic void run() {try {while (!stop) {                                                         // 检查是否在运行try{Socket s = ss.accept();                                         // 接受follower的连接请求s.setSoTimeout(self.tickTime * self.syncLimit);                 // 设置该连接的过期时间s.setTcpNoDelay(true);                                          // 是否开启TCP_NODELAYnew FollowerHandler(s, Leader.this);                            // 新注册一个FollowerHandler} catch (SocketException e) {if (stop) {LOG.info("exception while shutting down acceptor: "+ e);// When Leader.shutdown() calls ss.close(),// the call to accept throws an exception.// We catch and set stop to true.stop = true;} else {throw e;}}}} catch (Exception e) {LOG.warn("Exception while accepting follower", e);}}public void halt() {stop = true;}}

通过一个线程来完成接受followers的连接,每接受一个连接就初始化一个FollowerHandler,并设置连接的超时时间等条件,并且设置最多网络只有一个未被确认的网络包,依次提高传输效率降低分组的报文个数。

FollowerHandler的处理流程

FollowerHandler类就是处理有关消息的发送的相关具体操作类。

    FollowerHandler(Socket sock, Leader leader) throws IOException {super("FollowerHandler-" + sock.getRemoteSocketAddress()); this.sock = sock;this.leader = leader;leader.addFollowerHandler(this);                                    // 添加到leader的followers列表中start();                                                            // 开启run方法运行}

由于该类继承自线程类,调用start方法就是执行了run函数;

    @Overridepublic void run() {try {ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));                                                // 初始化接入流bufferedOutput = new BufferedOutputStream(sock.getOutputStream());          // 初始化输入流oa = BinaryOutputArchive.getArchive(bufferedOutput);QuorumPacket qp = new QuorumPacket();                                       // 生成一个包ia.readRecord(qp, "packet");                                                // 读取输入数据if (qp.getType() != Leader.LASTZXID) {                                      // 检查类型LOG.error("First packet " + qp.toString()+ " is not LASTZXID!");                                         // 如果不等于最后的事务ID则报错返回return;}long peerLastZxid = qp.getZxid();                                           // 获取事务IDint packetToSend = Leader.SNAP;boolean logTxns = true;long zxidToSend = 0;// we are sending the diffsynchronized(leader.zk.committedLog) {                                      // 如果提交日志的大小不等于0if (leader.zk.committedLog.size() != 0) {if ((leader.zk.maxCommittedLog >= peerLastZxid)                     // 如果当前的最大日志大于接受事务ID&& (leader.zk.minCommittedLog <= peerLastZxid)) {           // 并且当前的最小日志小于接受事务IDpacketToSend = Leader.DIFF;zxidToSend = leader.zk.maxCommittedLog;                         // 发送日志设置成最大日志for (Proposal propose: leader.zk.committedLog) {                // 遍历获取事务日志if (propose.packet.getZxid() > peerLastZxid) {              // 如果获取的日志大于当前接受的事务IDqueuePacket(propose.packet);                            // 将数据发送给followers同步数据QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),null, null);queuePacket(qcommit);                                   // 添加到发送队列中}}}}else {logTxns = false;}            }long leaderLastZxid = leader.startForwarding(this, peerLastZxid);           // 加入到要处理的列表中QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,leaderLastZxid, null, null);                                        // 生成一个新的包oa.writeRecord(newLeaderQP, "packet");                                      // 发送该包bufferedOutput.flush();// a special case when both the ids are the sameif (peerLastZxid == leaderLastZxid) {                                       // 检查事务ID与当前最后的事务ID是否相同packetToSend = Leader.DIFF;                                             // 检查日志是否有不一样的zxidToSend = leaderLastZxid;}//check if we decided to send a diff or we need to send a truncate// we avoid using epochs for truncating because epochs make things// complicated. Two epochs might have the last 32 bits as same.// only if we know that there is a committed zxid in the queue that// is less than the one the peer has we send a trunc else to make// things simple we just send sanpshot.if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {// this is the only case that we are sure that// we can ask the follower to truncate the logpacketToSend = Leader.TRUNC;                                           // 截断日志zxidToSend = leader.zk.maxCommittedLog;}oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");       // 写入新的包确定了类型与ID值bufferedOutput.flush();// only if we are not truncating or fast sycningif (packetToSend == Leader.SNAP) {                                          // 如果数据没有改变LOG.warn("Sending snapshot last zxid of peer is 0x"+ Long.toHexString(peerLastZxid) + " " + " zxid of leader is 0x"+ Long.toHexString(leaderLastZxid));// Dump data to followerleader.zk.serializeSnapshot(oa);                                        // 将序列化快照发送给followeroa.writeString("BenWasHere", "signature"); }bufferedOutput.flush();//// Mutation packets will be queued during the serialize,// so we need to mark when the follower can actually start// using the data//queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));            // 添加到队列中// Start sending packetsnew Thread() {public void run() {Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());try {sendPackets();                                                  // 启动线程发送数据} catch (InterruptedException e) {LOG.warn("Interrupted",e);}}}.start();while (true) {qp = new QuorumPacket();                                                // 生成一个包ia.readRecord(qp, "packet");                                            // 读包的数据long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;if (qp.getType() == Leader.PING) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);tickOfLastAck = leader.self.tick;ByteBuffer bb;long sessionId;int cxid;int type;switch (qp.getType()) {                                                 // 获取读入包的类型case Leader.ACK:leader.processAck(qp.getZxid(), sock.getLocalSocketAddress());      // 确认获取了ACK信息break;case Leader.PING:// Process the touchesByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());DataInputStream dis = new DataInputStream(bis);                     // 处理ping类型消息while (dis.available() > 0) {long sess = dis.readLong();int to = dis.readInt();leader.zk.touch(sess, to);                                      // 获取sess值更新seesion}break;case Leader.REVALIDATE:bis = new ByteArrayInputStream(qp.getData());                       // 验证session是否存活dis = new DataInputStream(bis);long id = dis.readLong();int to = dis.readInt();ByteArrayOutputStream bos = new ByteArrayOutputStream();DataOutputStream dos = new DataOutputStream(bos);dos.writeLong(id);boolean valid = leader.zk.touch(id, to);ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"Session 0x" + Long.toHexString(id)+ " is valid: "+ valid);dos.writeBoolean(valid);qp.setData(bos.toByteArray());queuedPackets.add(qp);break;case Leader.REQUEST:bb = ByteBuffer.wrap(qp.getData());                                 // 处理请求sessionId = bb.getLong();cxid = bb.getInt();type = bb.getInt();bb = bb.slice();if(type == OpCode.sync){leader.zk.submitRequest(new FollowerSyncRequest(this, sessionId, cxid, type, bb,qp.getAuthinfo()));                                     // 如果是同步则提交请求到同步请求} else {leader.zk.submitRequest(null, sessionId, type, cxid, bb,qp.getAuthinfo());                                          // 否则直接提交数据去处理}break;default:}}} catch (IOException e) {if (sock != null && !sock.isClosed()) {LOG.error("FIXMSG",e);}} catch (InterruptedException e) {LOG.error("FIXMSG",e);} finally {LOG.warn("******* GOODBYE " + (sock != null ? sock.getRemoteSocketAddress() : "<null>")         // 打印信息+ " ********");// Send the packet of deathtry {queuedPackets.put(proposalOfDeath);                                     // 关闭发送的线程} catch (InterruptedException e) {LOG.error("FIXMSG",e);}shutdown();                                                                 // 重置并移除在leader中的该handler}}public void shutdown() {try {if (sock != null && !sock.isClosed()) {                                     // 检查sock是否关闭 如果没关则关闭sock.close();}} catch (IOException e) {LOG.error("FIXMSG",e);}leader.removeFollowerHandler(this);                                             // 移除该handler           }

run函数主要就是先同步数据,检查获取从的包的事务ID如果ID不同则将当前主的数据同步发送给从,主要完成了数据同步的工作,在检查完成之后,就会启动一个单独的线程去发送数据给从,并且主会监听从发送过来的请求并将该请求处理。从这段执行流程也可知followe会转发客户端的请求到主上面来,全局只有主来处理客户端的数据请求。

    private void sendPackets() throws InterruptedException {long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;while (true) {QuorumPacket p;p = queuedPackets.take();                                   // 获取队列中的数据if (p == proposalOfDeath) {                                 // 如果要停止则停止循环// Packet of death!break;}if (p.getType() == Leader.PING) {                           // 获取待发送消息类型traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);try {oa.writeRecord(p, "packet");                           // 发送该消息bufferedOutput.flush();} catch (IOException e) {if (!sock.isClosed()) {LOG.warn("Unexpected exception",e);}break;}}}

启动之后就又一个单独的线程专门监听发送队列并从该队列中取数据发送给从。至此,Leader角色的主要的流程基本执行完成。

总结

本文主要是分析了Leader角色的启动流程,主要就是先恢复重建本地的日志和事物数据,然后接受从的请求,并比较从的数据是否和主数据一致,如果不一致则从主中发送数据给从达到数据同步。然后再监听从的响应请求并处理,其中包括如果从接受的客户端的请求会转发给主处理,基本的处理流程就是这样。由于本人才疏学浅,如有错误请批评指正。


http://lihuaxi.xjx100.cn/news/257926.html

相关文章

什么是标记符控制的分水岭算法

本文首先指出传统分水岭算法的不足&#xff0c;然后介绍了标记符控制的分水岭算法&#xff0c;最后通过实例演示了该算法在图像分割中的应用。该微信图文由安晟提供。

[Flash开发笔记] 如何在as2.0中使用自定义类事件

as2编程中&#xff0c;我们通常要处理一些异步加载的数据&#xff0c;有点类似ajax中的callback&#xff0c;即我们不知道何时数据才会返回&#xff0c;并且只有当数据返回时&#xff0c;执行我们定义的操作。在flash6及以前&#xff0c;我们会常常碰到从外部加载一张图片或一段…

CentOS搭建msmtp+mutt实现邮件发送

1&#xff1a;搭建配置msmtp下载msmtp包&#xff1a;官方地址&#xff1a;http://msmtp.sourceforge.net/download.html编译&#xff0c;安装(官方下载的包为tar.xz格式):#xz -d msmtp-1.6.3.tar.xz #tar -xvf msmtp-1.6.3.tar #cd msmtp-1.6.3 #./configure --prefix /opt/app…

开源困境:Log4j2 维护者发声:没有工资,还要挨骂!!

欢迎关注方志朋的博客&#xff0c;回复”666“获面试宝典文 | 罗奇奇出品 | OSC开源社区&#xff08;ID&#xff1a;oschina2013&#xff09;基于 Java 的日志记录工具 Apache Log4j2 近日出现了一个高危漏洞&#xff0c;攻击者可以利用其 JNDI 注入漏洞远程执行代码&#xff0…

bzoj1079: [SCOI2008]着色方案(DP)

1079: [SCOI2008]着色方案 题目&#xff1a;传送门 题解&#xff1a; DP刚神多年前讲过的一道神题。 二话不说&#xff0c;上来就是一个六维数组&#xff1a;F[i][a][b][c][d][e]//表示上一次涂的颜色是还剩下i次可用的&#xff0c;a~e表示不同次数的颜色种数。 次数一样的颜色…

Datawhale厦门大学分享记录!

Datawhale线下 作者&#xff1a;李明夷&#xff0c;厦门大学 WISER CLUB2021 年 5 月 16 日下午&#xff0c;Datawhale 团队受邀来到厦门大学&#xff0c;同 WISER CLUB 在经济楼 N402 共同举办学习、竞赛及工作经验分享会&#xff0c;吸引了校内各学院的同学参加。本次活动由 …

Zookeeper源码分析:Follower角色初始化

参考资料 <<从PAXOS到ZOOKEEPER分布式一致性原理与实践>> zookeeper-3.0.0Follower角色初始化 本文主要简述一下Follower角色初始化的流程&#xff0c;并概述一下主要的操作。 Follower角色初始化流程 case FOLLOWING:try {LOG.info("FOLLOWING");se…

ECCV22 最新54篇论文分方向整理|包含Transformer、图像处理、人脸等(附下载)...

点击上方“视学算法”&#xff0c;选择加"星标"或“置顶”重磅干货&#xff0c;第一时间送达本文首发极市平台公众号&#xff0c;转载请获得授权并标明出处。导读 最近一周&#xff0c;ECCV2022陆续放出了更多和GAN&#xff0c;Transformers相关的论文&#xff0c;为…