這篇文章主要介紹“RocketMQ順序消息是什么意思”,在日常操作中,相信很多人在RocketMQ順序消息是什么意思問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RocketMQ順序消息是什么意思”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
十余年的都江堰網(wǎng)站建設經(jīng)驗,針對設計、前端、開發(fā)、售后、文案、推廣等六對一服務,響應快,48小時及時工作處理。營銷型網(wǎng)站的優(yōu)勢是能夠根據(jù)用戶設備顯示端的尺寸不同,自動調(diào)整都江堰建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設計,從而大程度地提升瀏覽體驗。成都創(chuàng)新互聯(lián)從事“都江堰網(wǎng)站設計”,“都江堰網(wǎng)站推廣”以來,每個客戶項目都認真落實執(zhí)行。
我們知道消息隊列的特性導致其消息不是順序進行消費的,RocketMQ沒有提供所謂的順序消息來供我們使用,但是有時候一些場景需要需要順序的去接收消息。今天我們重點討論一下如何實現(xiàn)這種功能。雖然RocketMQ沒有提供順序消費但是我們可以變相的來實現(xiàn)它。我們知道消息需要放入隊列中才能被消費,而隊列本身的特性就是FIFO先進先出,我們可以將需要順序的消息放入一個隊列中,則就可以實現(xiàn)這個功能。
場景:兩個業(yè)務系統(tǒng)之間消息通過MQ傳輸,業(yè)務系統(tǒng)A數(shù)據(jù)傳輸至業(yè)務系統(tǒng)B,要求消息準確、實時。但是業(yè)務系統(tǒng)A的原始的數(shù)據(jù)可能會存在修改的情況,要求業(yè)務系統(tǒng)B需要實時的更改。保證消息的實時性、一致性、可靠性。
主題test_1發(fā)送消息到RocketMQ的雙主Broker1、Broker2上,每個broker上test_1主題對應4個隊列,消息id為001001的消息存在創(chuàng)建(create)、更新(update),MQ集群是雙主的,使用默認的消息發(fā)送算法,消息將輪詢的丟棄到各個隊列中。
默認按照輪詢算法將消息分發(fā)到各個broker的不同的隊列中,保證每個隊列的消息都是均勻分配,集群消費且消費者多個時,多個消費者會分散到不同的隊列中消費消息,保證消息能夠?qū)崟r消費。
因為消息本身是放入到不同的隊列中消費的就不能保證其順序性,更新的消息可能是最先被消費掉,創(chuàng)建的消息消費時業(yè)務需要判斷消息是否是最新的,需要進行查庫驗證,是則更新,不是則丟棄保存最新的消息,保證業(yè)務系統(tǒng)A與業(yè)務系統(tǒng)B,數(shù)據(jù)的一致性,增加了業(yè)務處理的難度。
我們在生產(chǎn)消息的時候可以將同一個消息ID的消息放入到相同的隊列中,保證同一類需要順序消費的消息放入到同一個隊列中,這樣隊列中的消息就是有序的。但是同時也需要保證消息的消費也是有序的才可以保證消息的順序消費。
集群模式下同一個消費組內(nèi)的消費者共同承擔其訂閱主題下的消息隊列的消費,同一個消息消息隊列在同一時刻只會被消費組內(nèi)的一個消費者消費,一個消費者同一時刻可以分配多個消費隊列。
集群模式下的普通消息,線程池默認創(chuàng)建20個(可配置)線程。多線程從隊列中拉取消息,提高并發(fā)加快消息的消費。
集群模式下的順序消息,順序消費是單線程,一個線程只能去一個隊列獲取數(shù)據(jù),當需要獲取某個隊列中的消息時,需要鎖定該消息隊列(PS:后面會根據(jù)源碼詳細分析其原理)。
廣播模式下的順序消息,順序消費是單線程,直接進行消費,無需鎖定消息隊列,因為相互之間無競爭
public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("order_group_test_1"); //Launch the instance. producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; //Create a message instance, specifying topic, tag and message body. Message msg = new Message("order_test_1", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } //server shutdown producer.shutdown(); }
我們根據(jù)100個消息的序號來放入到不同的隊列中,根據(jù)序號%10取模,相同的放入到一個隊列中。
上面是我們實現(xiàn)自定義隊列選擇器的算法,RocketMQ也提供了三種隊列選擇算法
從圖中我們可以看到一共三種
SelectMessageQueueByHash:通過 hash 進行選擇 queue。
SelectMessageQueueByRandom:隨機選擇 queue。
SelectMessageQueueByMachineRoom:機房選擇queue(未實現(xiàn))
我們分別來看一下實現(xiàn)
public class SelectMessageQueueByHash implements MessageQueueSelector { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); if (value < 0) { value = Math.abs(value); } value %= mqs.size(); return ((MessageQueue) mqs.get(value)); } }
我們差看源碼可以發(fā)現(xiàn),通過提供的參數(shù)獲取其HashCode,如果為負值則取絕對值,hash值與隊列的總數(shù)進行取模獲取其隊列。
public class SelectMessageQueueByRandom implements MessageQueueSelector { private Random random; public SelectMessageQueueByRandom() { this.random = new Random(System.currentTimeMillis()); } public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = this.random.nextInt(mqs.size()); return ((MessageQueue) mqs.get(value)); } }
生成一個隊列數(shù)以內(nèi)的隨機數(shù),通過隨機數(shù)獲取隊列。
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { private Set<String> consumeridcs; public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } public Set<String> getConsumeridcs() { return this.consumeridcs; } public void setConsumeridcs(Set<String> consumeridcs) { this.consumeridcs = consumeridcs; } }
我們發(fā)現(xiàn)其select方法為null,其實是沒有進行實現(xiàn)。需要我們自己實現(xiàn)。
雖然RocketMQ提供了三種(其實2種,SelectMessageQueueByMachineRoom未實現(xiàn))隊列選擇算法,但是不建議使用,不同的業(yè)務規(guī)則其選擇隊列的算法也不盡相同,建議手動實現(xiàn)。
public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("order_consumer_test_push"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("order_test_1", "*"); consumer.registerMessageListener(new MessageListenerOrderly(){ @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> paramList, ConsumeOrderlyContext paramConsumeOrderlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: "+ msgbody);//輸出消息內(nèi)容 } } catch (Exception e) { e.printStackTrace(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; //消費成功 } }); consumer.start(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }
查看結(jié)果
我們找到兩個典型的一組數(shù)字,尾號是6和9的,尾號是6的計較集中,尾號是9的比較分散,但是結(jié)果都是一樣的,按照順序消費的。
到此,關于“RocketMQ順序消息是什么意思”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
網(wǎng)站欄目:RocketMQ順序消息是什么意思
本文地址:http://chinadenli.net/article24/gphgce.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站策劃、網(wǎng)站制作、移動網(wǎng)站建設、網(wǎng)站維護、建站公司、微信小程序
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)