本篇內容主要講解“zk中快速選舉FastLeaderElection的實現(xiàn)方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“zk中快速選舉FastLeaderElection的實現(xiàn)方法”吧!
創(chuàng)新互聯(lián)專注于白河企業(yè)網(wǎng)站建設,響應式網(wǎng)站,商城開發(fā)。白河網(wǎng)站建設公司,為白河等地區(qū)提供建站服務。全流程定制網(wǎng)站設計,專業(yè)設計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務
選舉涉及概念
服務器狀態(tài)
投票
如何選擇投票?
協(xié)議
選舉
如何進行選舉?
epoch
發(fā)送者
接收者
發(fā)送隊列
接收隊列
服務器狀態(tài)
public enum ServerState {
LOOKING,尋找Leader狀態(tài),當服務處于該狀態(tài)時當前集群中沒有Leader,因此需要進入Leader選舉
FOLLOWING,跟隨者狀態(tài),表示當前是Follower
LEADING,領導者狀態(tài),表明當前是Leader
OBSERVING ,觀察者
}
Vote投票
id | 被推薦的leader的sid |
zxid | 被推薦leader的事務id |
electionEpoch | 判斷多個投票是否在同一輪選舉周期中,在服務器是一個字增序列,進入新一輪投票后,都對該值進行加1 |
peerEpoch | 被推薦的leader的epoch |
state | 當前服務器狀態(tài) |
內部類

有Messenger ToSend Notification類
Notifications讓其他節(jié)點知道指定節(jié)點的投票發(fā)生了變化,可能是由于節(jié)點競選或投票中有更高zxid或相同的zxid有更高的serverid
ToSend類用于包裝發(fā)送的信息

Messenger分為
WorkerReceiver和WorkerSender
主要完成這兩個對象信息的設置
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
self.start_fle = Time.currentElapsedTime();
try {
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = minNotificationInterval;
synchronized (this) {
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();
SyncedLearnerTracker voteSet;
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if (n == null) {
if (manager.haveDelivered()) {
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout * 2;
notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
} else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding vote: from=" + n.sid
+ ", proposed leader=" + n.leader
+ ", proposed zxid=0x" + Long.toHexString(n.zxid)
+ ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// don't care about the version if it's in LOOKING state
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
if (voteSet.hasAllQuorums()) {
// Verify if there is any change in the proposed leader
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)");
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}
投票相關函數(shù)
更新投票字段
synchronized void updateProposal(long leader, long zxid, long epoch) {
if (LOG.isDebugEnabled()) {
LOG.debug("Updating proposal: " + leader
+ " (newleader), 0x" + Long.toHexString(zxid)
+ " (newzxid), " + proposedLeader
+ " (oldleader), 0x" + Long.toHexString(proposedZxid)
+ " (oldzxid)");
}
proposedLeader = leader;
proposedZxid = zxid;
proposedEpoch = epoch;
}
生成投票的函數(shù)
public synchronized Vote getVote() {
return new Vote(proposedLeader, proposedZxid, proposedEpoch);
}
狀態(tài)信息獲取函數(shù)
private ServerState learningState() {
if (self.getLearnerType() == LearnerType.PARTICIPANT) {
LOG.debug("I am a participant: {}", self.getId());
return ServerState.FOLLOWING;
} else {
LOG.debug("I am an observer: {}", self.getId());
return ServerState.OBSERVING;
}
}
獲取參與投票服務器的標識id
private long getInitId() {
if (self.getQuorumVerifier().getVotingMembers().containsKey(self.getId())) {
return self.getId();
} else {
return Long.MIN_VALUE;
}
}
獲取最新的日志事務id
private long getInitLastLoggedZxid() {
if (self.getLearnerType() == LearnerType.PARTICIPANT) {
return self.getLastLoggedZxid();
} else {
return Long.MIN_VALUE;
}
}
獲取保存在文件中當前epoch
public long getCurrentEpoch() throws IOException {
if (currentEpoch == -1) {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
}
return currentEpoch;
}
選舉相關函數(shù)
判斷當前 a pair (server id, zxid)是否贏得了當前選票,總而言之 ,當前選票和新選票,哪個id大就選哪一個
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
if (LOG.isDebugEnabled()) {
LOG.debug("id: " + newId
+ ", proposed id: " + curId
+ ", zxid: 0x" + Long.toHexString(newZxid)
+ ", proposed zxid: 0x" + Long.toHexString(curZxid));
}
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch)
|| ((newEpoch == curEpoch)
&& ((newZxid > curZxid)
|| ((newZxid == curZxid)
&& (newId > curId)))));
}
判斷是否是Leader,把不是leader的情況拆出來
protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch) {
boolean predicate = true;
/*
* If everyone else thinks I'm the leader, I must be the leader.
* The other two checks are just for the case in which I'm not the
* leader. If I'm not the leader and I haven't received a message
* from leader stating that it is leading, then predicate is false.
*/
if (leader != self.getId()) {
if (votes.get(leader) == null) {
predicate = false;
} else if (votes.get(leader).getState() != ServerState.LEADING) {
predicate = false;
}
} else if (logicalclock.get() != electionEpoch) {
predicate = false;
}
return predicate;
}開始新一輪競選工作
public Vote lookForLeader() throws InterruptedException

選舉中涉及的數(shù)據(jù)結構信息類

electionEpoch和peerEpoch區(qū)別
electionEpoch是選舉周期,用于判斷是不是他弄一個選舉周期,從0開始累計
peerEpoch是當前周期
兩個vote比較規(guī)則
依次比較peerEpoch,zxid,sid
peerEpoch代表所處周期,越大投票越新
peerEpoch相同時,zxid代表一個周期中事務記錄,越大投票越新
peerEpoch,zxid均相同時,sid大的贏取選票
到此,相信大家對“zk中快速選舉FastLeaderElection的實現(xiàn)方法”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!
標題名稱:zk中快速選舉FastLeaderElection的實現(xiàn)方法
分享地址:http://chinadenli.net/article34/ggjppe.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供微信公眾號、用戶體驗、網(wǎng)站設計、網(wǎng)站營銷、搜索引擎優(yōu)化、云服務器
聲明:本網(wǎng)站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)