這篇文章主要介紹RocketMQ消費(fèi)中Broker端處理邏輯的示例分析,文中介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們一定要看完!
成都服務(wù)器托管,創(chuàng)新互聯(lián)建站提供包括服務(wù)器租用、成都聯(lián)通服務(wù)器托管、帶寬租用、云主機(jī)、機(jī)柜租用、主機(jī)租用托管、CDN網(wǎng)站加速、域名注冊等業(yè)務(wù)的一體化完整服務(wù)。電話咨詢:13518219792
1.Broker是如何處理消費(fèi)流程的?
2.消費(fèi)進(jìn)度是如何流轉(zhuǎn)的?
說明:本文分析均為PUSH消費(fèi)模式
本部分將消費(fèi)的切分成三塊梳理:Broker消費(fèi)處理流程概覽、查找消息流程、以及消息查詢結(jié)果處理流程。
小結(jié):在拉取消息時會進(jìn)行Broker和主題讀權(quán)限的判斷,實(shí)戰(zhàn)中若有必要可以封鎖Broker的拉取權(quán)限從而禁止從該broker進(jìn)行消費(fèi);或者封鎖某主題的讀權(quán)限禁止消費(fèi)組從該主題消費(fèi)消息。
小結(jié):如果需要從磁盤拉取消息則一次默認(rèn)最多拉取8條,一次消息的消息大小最大為64K。如果從緩存中拉取默認(rèn)最多32條,一次拉取的消息大小最大256K。使用tagcode會在查找消息前進(jìn)行過濾,使用SQL92過濾再消息查找出來后進(jìn)行過濾。
小結(jié):建議開啟slaveReadEnable=true,當(dāng)拉取的消息超過Broker內(nèi)存40%時會從Slave節(jié)點(diǎn)消費(fèi),Master不必從磁盤重新讀取數(shù)據(jù);transferMsgByHeap默認(rèn)為true即消息先拉取到堆空間再返回到客戶端;如果設(shè)置為false則使用Netty#FileRegion,可用零字節(jié)拷貝不必再拷貝到堆內(nèi)存提高性能。
//@1 順序消費(fèi)/并發(fā)消費(fèi)流程相同
//ConsumeMessageOrderlyService#processConsumeResult
//ConsumeMessageConcurrentlyService#processConsumeResult
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
//更新消費(fèi)進(jìn)度偏移量
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
@2 RemoteBrokerOffsetStore#updateOffset
AtomicLong offsetOld = this.offsetTable.get(mq);
MixAll.compareAndIncreaseOnly(offsetOld, offset);
@3 offsetTable存儲結(jié)構(gòu):key為MessageQueue value為消費(fèi)的偏移量進(jìn)度
ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>()
@4 定時同步消費(fèi)進(jìn)度
//持久化消息消費(fèi)進(jìn)度,默認(rèn)5秒保存一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
@5 RemoteBrokerOffsetStore#persistAll
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet())
this.updateConsumeOffsetToBroker(mq, offset.get());
小結(jié):PUSH消費(fèi)中消費(fèi)進(jìn)度存儲在offsetTable中,定時任務(wù)每5秒鐘上報Broker一次。
//@1 ConsumerManageProcessor#processRequest#updateConsumerOffset
this.brokerController.getConsumerOffsetManager().commitOffset
//@2 ConsumerOffsetManager#commitOffset
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
Long storeOffset = map.put(queueId, offset);
//@3 消費(fèi)進(jìn)度緩存結(jié)構(gòu)
//key=topic@group
//value=ConcurrentMap<Integer/* queueId*/, Long/*offset*/>>
offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
//@4 5秒鐘一次存儲消費(fèi)進(jìn)度
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//@5 consumerOffset.json文件格式
"zeus-package-mismatch-topic@autosort-packagelog":{0:9055300,1:9055157,2:9055304,3:9055232}
小結(jié):Broker接到客戶端消費(fèi)進(jìn)度上報后更新緩存offsetTable,每隔5秒中定時任務(wù)將offsetTable消費(fèi)進(jìn)度存儲在磁盤文件consumerOffset.json中。
//@1 PullMessageProcessor#processRequest
if (storeOffsetEnable) {
//更新消費(fèi)進(jìn)度
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
小結(jié):PUSH消費(fèi)客戶端拉取消息后會實(shí)時更新消費(fèi)的進(jìn)度。
以上是“RocketMQ消費(fèi)中Broker端處理邏輯的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
網(wǎng)頁標(biāo)題:RocketMQ消費(fèi)中Broker端處理邏輯的示例分析
分享網(wǎng)址:http://chinadenli.net/article20/gojgco.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、網(wǎng)站策劃、搜索引擎優(yōu)化、服務(wù)器托管、做網(wǎng)站、商城網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)