Zookeeper源码分析:Follower角色初始化

news/2024/7/7 21:36:36

参考资料

<<从PAXOS到ZOOKEEPER分布式一致性原理与实践>>
zookeeper-3.0.0

Follower角色初始化

本文主要简述一下Follower角色初始化的流程,并概述一下主要的操作。

Follower角色初始化流程

case FOLLOWING:try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory));                      // 如果是FOLLOWING状态则转换成follower 跟随主follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);setPeerState(ServerState.LOOKING);}break;

处理角色流程跟Leader的角色的初始化流程类似,首先生成一个followe类,然后再调用该类的followLeader方法。我们继续查看makeFollower方法。

 protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {return new Follower(this, new FollowerZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder()));}

从执行可知,生成了一个Follower实例,并生成了一个FollowerZooKeeperServer实例作为参数,首先查看Follower的初始化方法。

    Follower(QuorumPeer self,FollowerZooKeeperServer zk) {this.self = self;                                       // 设置QuormPeer实例this.zk=zk;                                             // 设置zk实例}

主要就是保存了传入参数。在初始化完成之后紧接着就执行了followLeader方法。

followLeader方法
void followLeader() throws InterruptedException {InetSocketAddress addr = null;                                  // Find the leader by idVote current = self.getCurrentVote();                                           // 获取当前的投票for (QuorumServer s : self.quorumPeers.values()) {                              // 找到与投票id与保存的获取相同的ID if (s.id == current.id) {addr = s.addr;                                                          // 此时就是找出对应主的ID的地址break;}}if (addr == null) {LOG.warn("Couldn't find the leader with id = "+ current.id);                                                      // 如果为空则打印当前信息}LOG.info("Following " + addr);sock = new Socket();                                                            // 生成一个socket 来连接主try {QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);             // 生成一个ACK 包sock.setSoTimeout(self.tickTime * self.initLimit);                          // 设置超时时间for (int tries = 0; tries < 5; tries++) {                                   // 设置尝试次数try {//sock = new Socket();//sock.setSoTimeout(self.tickTime * self.initLimit);sock.connect(addr, self.tickTime * self.syncLimit);                 // 连接远端主机sock.setTcpNoDelay(true);break;} catch (ConnectException e) {if (tries == 4) {                                                   // 如果连接超时超过4次则直接将错误抛出LOG.error("Unexpected exception",e);throw e;} else {LOG.warn("Unexpected exception",e);                             // 如果小于4次则重新设置超时时间并重新生成socksock = new Socket();sock.setSoTimeout(self.tickTime * self.initLimit);}}Thread.sleep(1000);                                                     // 休眠}leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));                                            // 获取读的内容bufferedOutput = new BufferedOutputStream(sock.getOutputStream());          leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);                  // 获取写的内容QuorumPacket qp = new QuorumPacket();                                       // 生成一个包qp.setType(Leader.LASTZXID);                                                // 设置类型 类型为最后一次事务IDlong sentLastZxid = self.getLastLoggedZxid();                               // 获取最后一次事务IDqp.setZxid(sentLastZxid);                                                   // 设置事务IDwritePacket(qp);                                                            // 发送该包readPacket(qp);                                                             // 读返回数据long newLeaderZxid = qp.getZxid();                                          // 获取回复的的事务IDif (qp.getType() != Leader.NEWLEADER) {                                     // 如果回复的类型不是新主则报错LOG.error("First packet should have been NEWLEADER");throw new IOException("First packet should have been NEWLEADER");}readPacket(qp);                                                             synchronized (zk) {if (qp.getType() == Leader.DIFF) {                                      // 判断类型 如果是DIFF 则加载数据LOG.info("Getting a diff from the leader!");zk.loadData();}else if (qp.getType() == Leader.SNAP) {                                 // 如果是SNAP 则做快照LOG.info("Getting a snapshot from leader");// The leader is going to dump the databasezk.deserializeSnapshot(leaderIs);                                   // 做快照String signature = leaderIs.readString("signature");if (!signature.equals("BenWasHere")) {LOG.error("Missing signature. Got " + signature);throw new IOException("Missing signature");}} else if (qp.getType() == Leader.TRUNC) {                              // 如果是TRUNC类型则截取日志//we need to truncate the log to the lastzxid of the leaderLOG.warn("Truncating log to get in sync with the leader 0x"+ Long.toHexString(qp.getZxid()));boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());if (!truncated) {// not able to truncate the logLOG.error("Not able to truncate the log "+ Long.toHexString(qp.getZxid()));System.exit(13);}zk.loadData();                                                      // 截取日志之后保存日志}else {LOG.error("Got unexpected packet from leader "+ qp.getType() + " exiting ... " );System.exit(13);}zk.dataTree.lastProcessedZxid = newLeaderZxid;                         // 此时数据同步完成设置最后的事务ID}ack.setZxid(newLeaderZxid & ~0xffffffffL);                                 // 设置ack内容的事务ID writePacket(ack);                                                           // 将该包内容发送出去sock.setSoTimeout(self.tickTime * self.syncLimit);                          // 设置超时时间zk.startup();                                                               // 加载follower的处理流程while (self.running) {                                                      // follower开始运行readPacket(qp);                                                         // 读取接受到的数据switch (qp.getType()) {                                                 // 判断接受到的数据类型case Leader.PING:                                                       // 如果是PING // Send back the ping with our session dataByteArrayOutputStream bos = new ByteArrayOutputStream();  DataOutputStream dos = new DataOutputStream(bos);HashMap<Long, Integer> touchTable = zk.getTouchSnapshot();for (Entry<Long, Integer> entry : touchTable.entrySet()) {          // 返回快照里面的快照数据dos.writeLong(entry.getKey());dos.writeInt(entry.getValue());}qp.setData(bos.toByteArray());writePacket(qp);                                                    // 发送回去break;case Leader.PROPOSAL:                                                   // 如果是提交的事务TxnHeader hdr = new TxnHeader();BinaryInputArchive ia = BinaryInputArchive.getArchive(new ByteArrayInputStream(qp.getData()));Record txn = SerializeUtils.deserializeTxn(ia, hdr);if (hdr.getZxid() != lastQueued + 1) {                              // 检查事务ID是否一致LOG.warn("Got zxid 0x"+ Long.toHexString(hdr.getZxid())+ " expected 0x"+ Long.toHexString(lastQueued + 1));}lastQueued = hdr.getZxid();                                         // 获取事务IDzk.logRequest(hdr, txn);                                            // 将事务记录到日志中break;case Leader.COMMIT:zk.commit(qp.getZxid());                                            // 提交日志中的事务break;case Leader.UPTODATE:                                                   // 生成快照zk.takeSnapshot();self.cnxnFactory.setZooKeeperServer(zk);break;case Leader.REVALIDATE:                                                 // 验证session是否活跃ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());DataInputStream dis = new DataInputStream(bis);long sessionId = dis.readLong();boolean valid = dis.readBoolean();synchronized (pendingRevalidations) {ServerCnxn cnxn = pendingRevalidations.remove(sessionId);                                     // 如果返回为空则 遗失了会话if (cnxn == null) {LOG.warn("Missing session 0x"+ Long.toHexString(sessionId)+ " for validation");} else {cnxn.finishSessionInit(valid);                              // 返回是否合法}}ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,"Session 0x" + Long.toHexString(sessionId)+ " is valid: " + valid);                  break; case Leader.SYNC:zk.sync();break;}}} catch (IOException e) {LOG.warn("Exception when following the leader", e);try {sock.close();} catch (IOException e1) {e1.printStackTrace();}synchronized (pendingRevalidations) {// clear pending revalitionspendingRevalidations.clear();                       // 清理数据pendingRevalidations.notifyAll();}}}

followLeader的执行流程,相对比较多,主要就是先获取主的地址,然后去通过一定的容错次数去连接主,在连接上主之后,然后向主发送当前follower的事务ID并根据该ID,再等待主返回的数据,根据主返回的数据来检查是否需要同步本地的数据来和主保持一致,最后就进入等待阶段,等待接收主发送过来的数据请求,包括处理PING、事务开始,事务提交,会话检查等工作。

总结

本文follower的内容的处理相对比较简单,流程比较清晰易懂,主要就是启动之后尝试去连接主,然后根据主返回的事务日志,将更新本地的数据同步,并接受主发送过来的同步,快照,事务提交等处理逻辑。在后续文章中,会分析从客户端发送过来之后leader与follower的处理过程是怎样,并概述一下大致的流程。由于本人才疏学浅,如有错误请批评指正。


http://lihuaxi.xjx100.cn/news/257919.html

相关文章

ECCV22 最新54篇论文分方向整理|包含Transformer、图像处理、人脸等(附下载)...

点击上方“视学算法”&#xff0c;选择加"星标"或“置顶”重磅干货&#xff0c;第一时间送达本文首发极市平台公众号&#xff0c;转载请获得授权并标明出处。导读 最近一周&#xff0c;ECCV2022陆续放出了更多和GAN&#xff0c;Transformers相关的论文&#xff0c;为…

使用深度学习阅读和分类扫描文档

作者|小白来源|小白学视觉收集数据首先&#xff0c;我们要做的第一件事是创建一个简单的数据集&#xff0c;这样我们就可以测试我们工作流程的每一部分。理想情况下&#xff0c;我们的数据集将包含各种易读性和时间段的扫描文档&#xff0c;以及每个文档所属的高级主题。我找不…

Matlab与线性代数 -- 寻找矩阵的非零元素

本微信图文详细介绍了Matlab中find函数的用法。

oracle中的sql%rowcount,sql%found、sql%notfound、sql%rowcount和sql%isopen

Oracle 存储过程 删除表记录时删除不存在的记录也是显示删除成功 create or replace procedure delDept(p_deptno in dept.deptno%type) is begindelete from dept where deptnop_deptno;dbms_output.put_line(部门删除成功...);exception when others thendbms_output.put_lin…

AJAX范例大搜罗(转载)

1&#xff0e;每天一个AJAX 该网站提供了很多非常酷的AJAX例子&#xff0c;号称是每天更新一个。 网址&#xff1a;http://www.ajaxcompilation.com/ 2&#xff0e;210个AJAX框架 一个不错的提供Ajax范例的网站&#xff0c;Ajax框架已更新至210个。 网址&#xff1a;http:…

为什么贝叶斯统计如此重要?

↑↑↑关注后"星标"Datawhale每日干货 & 每月组队学习&#xff0c;不错过Datawhale干货 译者&#xff1a;张峰&#xff0c;Datawhale成员即使对于一个非数据科学家来说&#xff0c;贝叶斯统计这个术语也已经很流行了。你可能在大学期间把它作为必修课之一来学习&…

用不惯VMware?试试这款更轻量级的虚拟机!

欢迎关注方志朋的博客&#xff0c;回复”666“获面试宝典说起虚拟机工具大家最熟悉的自然是 VMware&#xff0c;功能很多很强大&#xff0c;最让我认可的地方就是可以非常方便的修改虚拟机的配置&#xff0c;让虚拟机达到自己想要的性能&#xff5e;&#xff5e;可是 VMware 好…

地球超2亿蛋白质结构全预测,AlphaFold引爆「蛋白质全宇宙」!

视学算法报道 编辑&#xff1a;拉燕 Aeneas 如願 桃子【导读】今天&#xff0c;AlphaFold又让学术界沸腾了。DeepMind官宣&#xff0c;AlphaFold可以预测出2亿多个蛋白质结构&#xff0c;几乎覆盖了整个「蛋白质宇宙」。今天&#xff0c;DeepMind再次引爆学术界&#xff01;Al…