参考资料
<<从PAXOS到ZOOKEEPER分布式一致性原理与实践>>
zookeeper-3.0.0
Follower角色初始化
本文主要简述一下Follower角色初始化的流程,并概述一下主要的操作。
Follower角色初始化流程
case FOLLOWING:try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory)); // 如果是FOLLOWING状态则转换成follower 跟随主follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);setPeerState(ServerState.LOOKING);}break;
处理角色流程跟Leader的角色的初始化流程类似,首先生成一个followe类,然后再调用该类的followLeader方法。我们继续查看makeFollower方法。
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {return new Follower(this, new FollowerZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder()));}
从执行可知,生成了一个Follower实例,并生成了一个FollowerZooKeeperServer实例作为参数,首先查看Follower的初始化方法。
Follower(QuorumPeer self,FollowerZooKeeperServer zk) {this.self = self; // 设置QuormPeer实例this.zk=zk; // 设置zk实例}
主要就是保存了传入参数。在初始化完成之后紧接着就执行了followLeader方法。
followLeader方法
void followLeader() throws InterruptedException {InetSocketAddress addr = null; // Find the leader by idVote current = self.getCurrentVote(); // 获取当前的投票for (QuorumServer s : self.quorumPeers.values()) { // 找到与投票id与保存的获取相同的ID if (s.id == current.id) {addr = s.addr; // 此时就是找出对应主的ID的地址break;}}if (addr == null) {LOG.warn("Couldn't find the leader with id = "+ current.id); // 如果为空则打印当前信息}LOG.info("Following " + addr);sock = new Socket(); // 生成一个socket 来连接主try {QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); // 生成一个ACK 包sock.setSoTimeout(self.tickTime * self.initLimit); // 设置超时时间for (int tries = 0; tries < 5; tries++) { // 设置尝试次数try {//sock = new Socket();//sock.setSoTimeout(self.tickTime * self.initLimit);sock.connect(addr, self.tickTime * self.syncLimit); // 连接远端主机sock.setTcpNoDelay(true);break;} catch (ConnectException e) {if (tries == 4) { // 如果连接超时超过4次则直接将错误抛出LOG.error("Unexpected exception",e);throw e;} else {LOG.warn("Unexpected exception",e); // 如果小于4次则重新设置超时时间并重新生成socksock = new Socket();sock.setSoTimeout(self.tickTime * self.initLimit);}}Thread.sleep(1000); // 休眠}leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); // 获取读的内容bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); // 获取写的内容QuorumPacket qp = new QuorumPacket(); // 生成一个包qp.setType(Leader.LASTZXID); // 设置类型 类型为最后一次事务IDlong sentLastZxid = self.getLastLoggedZxid(); // 获取最后一次事务IDqp.setZxid(sentLastZxid); // 设置事务IDwritePacket(qp); // 发送该包readPacket(qp); // 读返回数据long newLeaderZxid = qp.getZxid(); // 获取回复的的事务IDif (qp.getType() != Leader.NEWLEADER) { // 如果回复的类型不是新主则报错LOG.error("First packet should have been NEWLEADER");throw new IOException("First packet should have been NEWLEADER");}readPacket(qp); synchronized (zk) {if (qp.getType() == Leader.DIFF) { // 判断类型 如果是DIFF 则加载数据LOG.info("Getting a diff from the leader!");zk.loadData();}else if (qp.getType() == Leader.SNAP) { // 如果是SNAP 则做快照LOG.info("Getting a snapshot from leader");// The leader is going to dump the databasezk.deserializeSnapshot(leaderIs); // 做快照String signature = leaderIs.readString("signature");if (!signature.equals("BenWasHere")) {LOG.error("Missing signature. Got " + signature);throw new IOException("Missing signature");}} else if (qp.getType() == Leader.TRUNC) { // 如果是TRUNC类型则截取日志//we need to truncate the log to the lastzxid of the leaderLOG.warn("Truncating log to get in sync with the leader 0x"+ Long.toHexString(qp.getZxid()));boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());if (!truncated) {// not able to truncate the logLOG.error("Not able to truncate the log "+ Long.toHexString(qp.getZxid()));System.exit(13);}zk.loadData(); // 截取日志之后保存日志}else {LOG.error("Got unexpected packet from leader "+ qp.getType() + " exiting ... " );System.exit(13);}zk.dataTree.lastProcessedZxid = newLeaderZxid; // 此时数据同步完成设置最后的事务ID}ack.setZxid(newLeaderZxid & ~0xffffffffL); // 设置ack内容的事务ID writePacket(ack); // 将该包内容发送出去sock.setSoTimeout(self.tickTime * self.syncLimit); // 设置超时时间zk.startup(); // 加载follower的处理流程while (self.running) { // follower开始运行readPacket(qp); // 读取接受到的数据switch (qp.getType()) { // 判断接受到的数据类型case Leader.PING: // 如果是PING // Send back the ping with our session dataByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos);HashMap<Long, Integer> touchTable = zk.getTouchSnapshot();for (Entry<Long, Integer> entry : touchTable.entrySet()) { // 返回快照里面的快照数据dos.writeLong(entry.getKey());dos.writeInt(entry.getValue());}qp.setData(bos.toByteArray());writePacket(qp); // 发送回去break;case Leader.PROPOSAL: // 如果是提交的事务TxnHeader hdr = new TxnHeader();BinaryInputArchive ia = BinaryInputArchive.getArchive(new ByteArrayInputStream(qp.getData()));Record txn = SerializeUtils.deserializeTxn(ia, hdr);if (hdr.getZxid() != lastQueued + 1) { // 检查事务ID是否一致LOG.warn("Got zxid 0x"+ Long.toHexString(hdr.getZxid())+ " expected 0x"+ Long.toHexString(lastQueued + 1));}lastQueued = hdr.getZxid(); // 获取事务IDzk.logRequest(hdr, txn); // 将事务记录到日志中break;case Leader.COMMIT:zk.commit(qp.getZxid()); // 提交日志中的事务break;case Leader.UPTODATE: // 生成快照zk.takeSnapshot();self.cnxnFactory.setZooKeeperServer(zk);break;case Leader.REVALIDATE: // 验证session是否活跃ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());DataInputStream dis = new DataInputStream(bis);long sessionId = dis.readLong();boolean valid = dis.readBoolean();synchronized (pendingRevalidations) {ServerCnxn cnxn = pendingRevalidations.remove(sessionId); // 如果返回为空则 遗失了会话if (cnxn == null) {LOG.warn("Missing session 0x"+ Long.toHexString(sessionId)+ " for validation");} else {cnxn.finishSessionInit(valid); // 返回是否合法}}ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,"Session 0x" + Long.toHexString(sessionId)+ " is valid: " + valid); break; case Leader.SYNC:zk.sync();break;}}} catch (IOException e) {LOG.warn("Exception when following the leader", e);try {sock.close();} catch (IOException e1) {e1.printStackTrace();}synchronized (pendingRevalidations) {// clear pending revalitionspendingRevalidations.clear(); // 清理数据pendingRevalidations.notifyAll();}}}
followLeader的执行流程,相对比较多,主要就是先获取主的地址,然后去通过一定的容错次数去连接主,在连接上主之后,然后向主发送当前follower的事务ID并根据该ID,再等待主返回的数据,根据主返回的数据来检查是否需要同步本地的数据来和主保持一致,最后就进入等待阶段,等待接收主发送过来的数据请求,包括处理PING、事务开始,事务提交,会话检查等工作。
总结
本文follower的内容的处理相对比较简单,流程比较清晰易懂,主要就是启动之后尝试去连接主,然后根据主返回的事务日志,将更新本地的数据同步,并接受主发送过来的同步,快照,事务提交等处理逻辑。在后续文章中,会分析从客户端发送过来之后leader与follower的处理过程是怎样,并概述一下大致的流程。由于本人才疏学浅,如有错误请批评指正。