本篇文章給大家分享的是有關(guān)如何理解RocketMQ存儲(chǔ)中的主從同步,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
從策劃到設(shè)計(jì)制作,每一步都追求做到細(xì)膩,制作可持續(xù)發(fā)展的企業(yè)網(wǎng)站。為客戶提供成都網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)、網(wǎng)站策劃、網(wǎng)頁設(shè)計(jì)、主機(jī)域名、虛擬主機(jī)、網(wǎng)絡(luò)營銷、VI設(shè)計(jì)、 網(wǎng)站改版、漏洞修補(bǔ)等服務(wù)。為客戶提供更好的一站式互聯(lián)網(wǎng)解決方案,以客戶的口碑塑造優(yōu)易品牌,攜手廣大客戶,共同發(fā)展進(jìn)步。
1.消息存儲(chǔ)在Master上了,如何同步到Slave上了呢?
2.同步復(fù)制和異步復(fù)制流程是怎么樣的?
1.HA初始化調(diào)用鏈
@1 BrokerStartup#main
start(createBrokerController(args));
@2 BrokerStartup#createBrokerController
boolean initResult = controller.initialize();
@3 BrokerController#initialize
this.messageStore = new DefaultMessageStore
@4 DefaultMessageStore#DefaultMessageStore()
this.haService = new HAService(this);
this.defaultMessageStore = defaultMessageStore;
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig()
.getHaListenPort());
this.groupTransferService = new GroupTransferService();
this.haClient = new HAClient();
2.啟動(dòng)調(diào)用鏈
@1 BrokerStartup#start
controller.start();
@2 BrokerController#start
this.messageStore.start();
@3 DefaultMessageStore#start
@4 this.haService.start();
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();
小結(jié):從初始化和啟動(dòng)調(diào)用鏈中可以看到,在Broker啟動(dòng)時(shí),初始化并啟動(dòng)了三個(gè)線程類,分別為AcceptSocketService, GroupTransferService, HAClient。
問題:這三個(gè)線程類在干啥?
小結(jié):AcceptSocketService職責(zé)初始化TCP通道,監(jiān)聽新的連接并創(chuàng)建HAConnection。
問題:HAConnection在做什么?
//構(gòu)造方法
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
this.haService = haService;
this.socketChannel = socketChannel;
//獲取客戶端請(qǐng)求地址
this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
//將通道調(diào)整為非阻塞
this.socketChannel.configureBlocking(false);
//關(guān)閉連接前將數(shù)據(jù)發(fā)送完畢
this.socketChannel.socket().setSoLinger(false, -1);
//將Nagle算法關(guān)閉,客戶端每發(fā)送一次數(shù)據(jù)無論大小,都會(huì)將其發(fā)送出去
this.socketChannel.socket().setTcpNoDelay(true);
//設(shè)置接受緩存區(qū)為64K
this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
//設(shè)置發(fā)包緩存區(qū)為64K
this.socketChannel.socket().setSendBufferSize(1024 * 64);
//寫數(shù)據(jù)線程類
this.writeSocketService = new WriteSocketService(this.socketChannel);
//讀數(shù)據(jù)線程類
this.readSocketService = new ReadSocketService(this.socketChannel);
this.haService.getConnectionCount().incrementAndGet();
}
//啟動(dòng)
public void start() {
//啟動(dòng)讀數(shù)據(jù)線程
this.readSocketService.start();
//啟動(dòng)寫數(shù)據(jù)線程
this.writeSocketService.start();
}
疑問:HAConnection除了對(duì)通道做了一些設(shè)置外,啟動(dòng)了兩個(gè)線程服務(wù)類,分別為readSocketService和writeSocketService,他們職責(zé)是什么呢?
2.1 writeSocketService職責(zé)
流程圖
小結(jié):writeSocketService主要職責(zé),將數(shù)據(jù)不斷寫入socketChannel通道;寫入數(shù)據(jù)的大小為nextTransferFromWhere與最大可讀位置getReadPosition之間數(shù)據(jù);每次寫完傳輸指針自增this.nextTransferFromWhere += size;每隔5秒發(fā)送心跳包到socketChannel通道。
2.2 readSocketService職責(zé)
流程圖
小結(jié):readSocketService主要職責(zé)解析slave發(fā)來的請(qǐng)求位點(diǎn),并更新push3SlaveMaxOffset為該請(qǐng)求位點(diǎn);喚醒groupTransferService線程。
3.GroupTransferService職責(zé)
小結(jié):GroupTransferService職責(zé)判斷主從同步是否完成,完成后喚醒消息發(fā)送線程。
4.HAClient職責(zé)
小結(jié):HAClient職責(zé)Slave封裝實(shí)現(xiàn)類,負(fù)責(zé)與Master建立連接通道,并從通道中獲取數(shù)據(jù)存儲(chǔ);并向Master上報(bào)Slave存儲(chǔ)的最大物理偏移量。
五、主從同步示意圖
1.主從同步交互消息格式
1.1 Slave上報(bào)物理偏移量reportOffset量格式
00000018516677754880|長(zhǎng)度為8位的20位數(shù)字
1.2 Master寫入Slave的信息由Header與Body構(gòu)成
00000018516677754880+size|Header部分由8位物理偏移量+消息體大
消息Body具體內(nèi)容|Slave請(qǐng)求的位點(diǎn)與Master可讀位置之間的數(shù)據(jù)
2.主從同步示意圖
以上就是如何理解RocketMQ存儲(chǔ)中的主從同步,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
本文標(biāo)題:如何理解RocketMQ存儲(chǔ)中的主從同步
網(wǎng)頁地址:http://chinadenli.net/article20/ppchco.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制網(wǎng)站、網(wǎng)頁設(shè)計(jì)公司、微信小程序、網(wǎng)站設(shè)計(jì)、關(guān)鍵詞優(yōu)化、企業(yè)網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)