欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

rocketmq消費(fèi)負(fù)載均衡之push消費(fèi)的示例分析

這篇文章主要為大家展示了“rocketmq消費(fèi)負(fù)載均衡之push消費(fèi)的示例分析”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“rocketmq消費(fèi)負(fù)載均衡之push消費(fèi)的示例分析”這篇文章吧。

主要從事網(wǎng)頁(yè)設(shè)計(jì)、PC網(wǎng)站建設(shè)(電腦版網(wǎng)站建設(shè))、wap網(wǎng)站建設(shè)(手機(jī)版網(wǎng)站建設(shè))、響應(yīng)式網(wǎng)站開(kāi)發(fā)、程序開(kāi)發(fā)、微網(wǎng)站、成都微信小程序等,憑借多年來(lái)在互聯(lián)網(wǎng)的打拼,我們?cè)诨ヂ?lián)網(wǎng)網(wǎng)站建設(shè)行業(yè)積累了豐富的成都網(wǎng)站制作、成都做網(wǎng)站、網(wǎng)絡(luò)營(yíng)銷(xiāo)經(jīng)驗(yàn),集策劃、開(kāi)發(fā)、設(shè)計(jì)、營(yíng)銷(xiāo)、管理等多方位專(zhuān)業(yè)化運(yùn)作于一體,具備承接不同規(guī)模與類(lèi)型的建設(shè)項(xiàng)目的能力。

介紹之前首先拋出幾個(gè)問(wèn)題:

1. 要做負(fù)載均衡,首先要解決的一個(gè)問(wèn)題是什么?

2. 負(fù)載均衡是Client端處理還是Broker端處理?

個(gè)人理解:

1. 要做負(fù)載均衡,首先要做的就是信號(hào)收集。

所謂信號(hào)收集,就是得知道每一個(gè)consumerGroup有哪些consumer,對(duì)應(yīng)的topic是誰(shuí)。信號(hào)收集分為Client端信號(hào)收集與Broker端信號(hào)收集兩個(gè)部分。

2. 負(fù)載均衡放在Client端處理。

具體做法是:消費(fèi)者客戶(hù)端在啟動(dòng)時(shí)完善rebalanceImpl實(shí)例,同時(shí)拷貝訂閱信息存放rebalanceImpl實(shí)例對(duì)象中,另外也是很重要的一個(gè)步驟 -- 通過(guò)心跳消息,不停的上報(bào)自己到所有Broker,注冊(cè)RegisterConsumer,等待上述過(guò)程準(zhǔn)備好之后在Client端不斷執(zhí)行的負(fù)載均衡服務(wù)線程從Broker端獲取一份全局信息(該consumerGroup下所有的消費(fèi)Client),然后分配這些全局信息,獲取當(dāng)前客戶(hù)端分配到的消費(fèi)隊(duì)列。

本文具體的內(nèi)容:

I. copySubscription

Client端信號(hào)收集,拷貝訂閱信息。

在DefaultMQPushConsumerImpl.start()時(shí),會(huì)將消費(fèi)者的topic訂閱關(guān)系設(shè)置到rebalanceImpl的SubscriptionInner的map中用于負(fù)載:

private void copySubscription() throws MQClientException {
try {
//注:一個(gè)consumer對(duì)象可以訂閱多個(gè)topic
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

FilterAPI.buildSubscriptionData接口將訂閱關(guān)系轉(zhuǎn)換為SubscriptionData 數(shù)據(jù),其中subString包含訂閱tag等信息。另外,如果該消費(fèi)者的消費(fèi)模式為集群消費(fèi),則會(huì)將retry的topic一并放到。

II. 完善rebalanceImpl實(shí)例

Client繼續(xù)收集信息:

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

本文以DefaultMQPushConsumerImpl為例,因此this對(duì)象類(lèi)型為DefaultMQPushConsumerImp。

III. this.rebalanceService.start()

開(kāi)啟負(fù)載均衡服務(wù)。this.rebalanceService是一個(gè)RebalanceService實(shí)例對(duì)象,它繼承與ServiceThread,是一個(gè)線程類(lèi)。 this.rebalanceService.start()執(zhí)行時(shí),也即執(zhí)行RebalanceService線程體:

@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
this.waitForRunning(WaitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}

IV. this.mqClientFactory.doRebalance

客戶(hù)端遍歷消費(fèi)組table,對(duì)該客戶(hù)端上所有消費(fèi)者獨(dú)立進(jìn)行負(fù)載均衡,分發(fā)消費(fèi)隊(duì)列:

public void doRebalance() {
for (String group : this.consumerTable.keySet()) {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null) {
try {
impl.doRebalance();
} catch (Exception e) {
log.error("doRebalance exception", e);
}
}
}
}

V. MQConsumerInner.doRebalance

由于本文以DefaultMQPushConsumerImpl消費(fèi)過(guò)程為例,即DefaultMQPushConsumerImpl.doRebalance:

@Override
public void doRebalance() {
if (this.rebalanceImpl != null) {
this.rebalanceImpl.doRebalance();
}
}

步驟II 中完善了rebalanceImpl實(shí)例,為調(diào)用rebalanceImpl.doRebalance()提供了初始數(shù)據(jù)。

rebalanceImpl.doRebalance()過(guò)程如下:

public void doRebalance() {
     // 前文copySubscription中初始化了SubscriptionInner
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic);
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}

VI. rebalanceByTopic -- 核心步驟之一

rebalanceByTopic方法中根據(jù)消費(fèi)者的消費(fèi)類(lèi)型為BROADCASTING或CLUSTERING做不同的邏輯處理。CLUSTERING邏輯包括BROADCASTING邏輯,本部分只介紹集群消費(fèi)負(fù)載均衡的邏輯。

集群消費(fèi)負(fù)載均衡邏輯主要代碼如下(省略了log等代碼):

//1.從topicSubscribeInfoTable列表中獲取與該topic相關(guān)的所有消息隊(duì)列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//2. 從broker端獲取消費(fèi)該消費(fèi)組的所有客戶(hù)端clientId
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
f (null == mqSet) { ... }
if (null == cidAll) { ... }
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);

     // 3.創(chuàng)建DefaultMQPushConsumer對(duì)象時(shí)默認(rèn)設(shè)置為AllocateMessageQueueAveragely
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List<MessageQueue> allocateResult = null;
try {
         // 4.調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當(dāng)前client分配消費(fèi)隊(duì)列
allocateResult = strategy.allocate(
this.consumerGroup, 
this.mQClientFactory.getClientId(), 
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
    // 5. 將分配得到的allocateResult 中的隊(duì)列放入allocateResultSet 集合
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
、
     //6. 更新updateProcessQueue
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
if (changed) {
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}

注:BROADCASTING邏輯只包含上述的1、6。

集群消費(fèi)負(fù)載均衡邏輯中的1、2、4這三個(gè)點(diǎn)相關(guān)知識(shí)為其核心過(guò)程,各個(gè)點(diǎn)相關(guān)知識(shí)如下:

第1點(diǎn):從topicSubscribeInfoTable列表中獲取與該topic相關(guān)的所有消息隊(duì)列

rocketmq消費(fèi)負(fù)載均衡之push消費(fèi)的示例分析

第2點(diǎn): 從broker端獲取消費(fèi)該消費(fèi)組的所有客戶(hù)端clientId

首先,消費(fèi)者對(duì)象不斷地向所有broker發(fā)送心跳包,上報(bào)自己,注冊(cè)并更新訂閱關(guān)系以及客戶(hù)端ChannelInfoTable;之后,客戶(hù)端在做消費(fèi)負(fù)載均衡時(shí)獲取那些消費(fèi)客戶(hù)端,對(duì)這些客戶(hù)端進(jìn)行負(fù)載均衡,分發(fā)消費(fèi)的隊(duì)列。具體過(guò)程如下圖所示:

rocketmq消費(fèi)負(fù)載均衡之push消費(fèi)的示例分析

第4點(diǎn):調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當(dāng)前client分配消費(fèi)隊(duì)列

rocketmq消費(fèi)負(fù)載均衡之push消費(fèi)的示例分析

注:上圖中cId1、cId2、...、cIdN通過(guò) getConsumerIdListByGroup 獲取,它們?cè)谶@個(gè)ConsumerGroup下所有在線客戶(hù)端列表中。

當(dāng)前消費(fèi)對(duì)進(jìn)行負(fù)載均衡策略后獲取對(duì)應(yīng)的消息消費(fèi)隊(duì)列。

以上是“rocketmq消費(fèi)負(fù)載均衡之push消費(fèi)的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

網(wǎng)頁(yè)標(biāo)題:rocketmq消費(fèi)負(fù)載均衡之push消費(fèi)的示例分析
地址分享:http://chinadenli.net/article38/gdjssp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供商城網(wǎng)站網(wǎng)站維護(hù)網(wǎng)站導(dǎo)航網(wǎng)站內(nèi)鏈外貿(mào)建站微信公眾號(hào)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化