這篇文章給大家分享的是有關(guān)怎么使用RabbitMQ的內(nèi)容。小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過(guò)來(lái)看看吧。
創(chuàng)新互聯(lián)為企業(yè)提供:成都品牌網(wǎng)站建設(shè)、網(wǎng)絡(luò)營(yíng)銷策劃、小程序開(kāi)發(fā)、營(yíng)銷型網(wǎng)站建設(shè)和網(wǎng)站運(yùn)營(yíng)托管,一站式網(wǎng)絡(luò)營(yíng)銷整體服務(wù)。實(shí)現(xiàn)不斷獲取潛在客戶之核心目標(biāo),建立了企業(yè)專屬的“營(yíng)銷型網(wǎng)站建設(shè)”,就用不著再為了獲取潛在客戶而苦惱,相反,客戶會(huì)主動(dòng)找您,生意就找上門來(lái)了!
前言
我們先來(lái)看一下一條消息在RabbitMQ中的流轉(zhuǎn)過(guò)程

圖示的主要流程如下
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
生產(chǎn)者發(fā)送消息的時(shí)候指定RoutingKey,然后消息被發(fā)送到Exchange
Exchange根據(jù)一些列規(guī)則將消息路由到指定的隊(duì)列中
消費(fèi)者從隊(duì)列中消費(fèi)消息
整個(gè)流程主要就4個(gè)參與者message,exchange,queue,consumer,我們就來(lái)認(rèn)識(shí)一下這4個(gè)參與者
Message
消息可以設(shè)置一些列屬性,每種屬性的作用可以參考《深入RabbitMQ》一書(shū)

Exchange
接收消息,并根據(jù)路由鍵轉(zhuǎn)發(fā)消息到所綁定的隊(duì)列,常用的屬性如下

我們最常使用的就是type屬性,下面就詳細(xì)解釋type屬性
Fanout Exchange

發(fā)送到該交換機(jī)的消息都會(huì)路由到與該交換機(jī)綁定的所有隊(duì)列上,可以用來(lái)做廣播
不處理路由鍵,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上
Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的
Direct Exchage

把消息路由到BindingKey和RoutingKey完全匹配的隊(duì)列中
Topic Exchange

前面說(shuō)到,direct類型的交換器路由規(guī)則是完全匹配RoutingKey和BindingKey。topic和direct類似,也是將消息發(fā)送到RoutingKey和BindingKey相匹配的隊(duì)列中,只不過(guò)可以模糊匹配。
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
RoutinKey為一個(gè)被“.”號(hào)分割的字符串(如com.rabbitmq.client)
BindingKey和RoutingKey也是“.”號(hào)分割的字符串
BindKey中可以存在兩種特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配不多不少一個(gè)詞,“#”用于匹配多個(gè)單詞(包含0個(gè),1個(gè))

假如現(xiàn)在有2個(gè)RoutingKey為java.lang和java.util.concurrent的消息,java.lang會(huì)被路由到Consumer1和Consumer2,java.util.concurrent會(huì)被路由到Consumer2。

Headers Exchange
headers類型的交換器不依賴于路由鍵的匹配規(guī)則來(lái)路由消息,而是根據(jù)發(fā)送消息內(nèi)容中的headers屬性進(jìn)行匹配。headers類型的交換器性能差,不實(shí)用,基本上不會(huì)使用。
Queue
隊(duì)列的常見(jiàn)屬性如下

arguments中可以設(shè)置的隊(duì)列的常見(jiàn)參數(shù)如下

rabbitmq-api(rabbitmq api的使用)
chapter_1: 快速開(kāi)始,手寫(xiě)一個(gè)RabbitMQ的生產(chǎn)者和消費(fèi)者
chapter_2: 演示了各種exchange的使用
來(lái)回顧一下上面說(shuō)的各種exchange機(jī)器路由規(guī)則

chapter_3: 拉取消息
消息的獲得方式有2種
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
拉取消息(get message)
推送消息(consume message)
那我們應(yīng)該拉取消息還是推送消息?get是一個(gè)輪詢模型,而consumer是一個(gè)推送模型。get模型會(huì)導(dǎo)致每條消息都會(huì)產(chǎn)生與RabbitMQ同步通信的開(kāi)銷,這一個(gè)請(qǐng)求由發(fā)送請(qǐng)求幀的客戶端應(yīng)用程序和發(fā)送應(yīng)答的RabbitMQ組成。所以推送消息,避免拉取
chapter_4: 手動(dòng)ack
消息的確認(rèn)方式有2種
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
自動(dòng)確認(rèn)(autoAck=true)
手動(dòng)確認(rèn)(autoAck=false)
消費(fèi)者在消費(fèi)消息的時(shí)候,可以指定autoAck參數(shù)
String basicConsume(String queue, boolean autoAck, Consumer callback)
autoAck=false: RabbitMQ會(huì)等待消費(fèi)者顯示回復(fù)確認(rèn)消息后才從內(nèi)存(或者磁盤)中移出消息
autoAck=true: RabbitMQ會(huì)自動(dòng)把發(fā)送出去的消息置為確認(rèn),然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者是否真正的消費(fèi)了這些消息
手動(dòng)確認(rèn)的方法如下,有2個(gè)參數(shù)
basicAck(long deliveryTag, boolean multiple)
deliveryTag: 用來(lái)標(biāo)識(shí)信道中投遞的消息。RabbitMQ 推送消息給Consumer時(shí),會(huì)附帶一個(gè)deliveryTag,以便Consumer可以在消息確認(rèn)時(shí)告訴RabbitMQ到底是哪條消息被確認(rèn)了。
RabbitMQ保證在每個(gè)信道中,每條消息的deliveryTag從1開(kāi)始遞增
multiple=true: 消息id<=deliveryTag的消息,都會(huì)被確認(rèn)
myltiple=false: 消息id=deliveryTag的消息,都會(huì)被確認(rèn)
消息一直不確認(rèn)會(huì)發(fā)生啥?
如果隊(duì)列中的消息發(fā)送到消費(fèi)者后,消費(fèi)者不對(duì)消息進(jìn)行確認(rèn),那么消息會(huì)一直留在隊(duì)列中,直到確認(rèn)才會(huì)刪除。
如果發(fā)送到A消費(fèi)者的消息一直不確認(rèn),只有等到A消費(fèi)者與rabbitmq的連接中斷,rabbitmq才會(huì)考慮將A消費(fèi)者未確認(rèn)的消息重新投遞給另一個(gè)消費(fèi)者
chapter_5: 拒絕消息的兩種方式
確認(rèn)消息只有一種方法
basicAck(long deliveryTag, boolean multiple)
而拒絕消息有兩種方式
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
basicNack(long deliveryTag, boolean multiple, boolean requeue)
basicReject(long deliveryTag, boolean requeue)
basicNack和basicReject的區(qū)別只有一個(gè),basicNack支持批量拒絕
deliveryTag和multiple參數(shù)前面已經(jīng)說(shuō)過(guò)。
requeue=true: 消息會(huì)被再次發(fā)送到隊(duì)列中
requeue=false: 消息會(huì)被直接丟失
chapter_6: 失敗通知
chapter_6到chapter_10主要簡(jiǎn)述了消息發(fā)布時(shí)的權(quán)衡

我們最常用的就是失敗通知和發(fā)布者確認(rèn)
當(dāng)消息不能被路由到某個(gè)queue時(shí),我們?nèi)绾潍@取到不能正確路由的消息呢?
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
在發(fā)送消息時(shí)設(shè)置mandatory為true
生產(chǎn)者可以通過(guò)調(diào)用channel.addReturnListener來(lái)添加ReturnListener監(jiān)聽(tīng)器獲取沒(méi)有被路由到隊(duì)列中的消息
mandatory是channel.basicPublish()方法中的參數(shù)
mandatory=true: 交換器無(wú)法根據(jù)路由鍵找到一個(gè)符合條件的隊(duì)列,那么RabbitMQ會(huì)調(diào)用Basic.Return命令將消息返回給生產(chǎn)者
mandatory=false: 出現(xiàn)上述情形,則消息直接被丟棄
chapter_7: 發(fā)布者確認(rèn)
當(dāng)消息被發(fā)送后,消息到底有沒(méi)有到達(dá)exchange呢?默認(rèn)情況下生產(chǎn)者是不知道消息有沒(méi)有到達(dá)exchange
RabbitMQ針對(duì)這個(gè)問(wèn)題,提供了兩種解決方式
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
事務(wù)(后面會(huì)講到)
發(fā)布者確認(rèn)(publisher confirm)
而發(fā)布者確認(rèn)有三種編程方式
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
普通confirm模式:每發(fā)送一條消息后,調(diào)用waitForConfirms()方法,等待服務(wù)器端confirm。實(shí)際上是一種串行confirm了。
批量confirm模式:每發(fā)送一批消息后,調(diào)用waitForConfirms()方法,等待服務(wù)器端confirm。
異步confirm模式:提供一個(gè)回調(diào)方法,服務(wù)端confirm了一條或者多條消息后Client端會(huì)回調(diào)這個(gè)方法。
異步confirm模式的性能最高,因此經(jīng)常使用,我想把這個(gè)分享的細(xì)一下
channel.addConfirmListener(newConfirmListener(){@OverridepublicvoidhandleAck(longdeliveryTag,booleanmultiple)throwsIOException{log.info("handleAck,deliveryTag:{},multiple:{}",deliveryTag,multiple);}@OverridepublicvoidhandleNack(longdeliveryTag,booleanmultiple)throwsIOException{log.info("handleNack,deliveryTag:{},multiple:{}",deliveryTag,multiple);}});寫(xiě)過(guò)異步confirm代碼的小伙伴應(yīng)該對(duì)這段代碼不陌生,可以看到這里也有deliveryTag和multiple。但是我要說(shuō)的是這里的deliveryTag和multiple和消息的ack沒(méi)有一點(diǎn)關(guān)系。
confirmListener中的ack: rabbitmq控制的,用來(lái)確認(rèn)消息是否到達(dá)exchange
消息的ack: 上面說(shuō)到可以自動(dòng)確認(rèn),也可以手動(dòng)確認(rèn),用來(lái)確認(rèn)queue中的消息是否被consumer消費(fèi)
chapter_8: 備用交換器
生產(chǎn)者在發(fā)送消息的時(shí)候如果不設(shè)置 mandatory 參數(shù)那么消息在未被路由到queue的情況下將會(huì)丟失,如果設(shè)置了 mandatory 參數(shù),那么需要添加 ReturnListener 的編程邏輯,生產(chǎn)者的代碼將變得復(fù)雜。如果既不想復(fù)雜化生產(chǎn)者的編程邏輯,又不想消息丟失,那么可以使用備用交換器,這樣可以將未被路由到queue的消息存儲(chǔ)在RabbitMQ 中,在需要的時(shí)候去處理這些消息
chapter_9: 事務(wù)
RabbitMQ中與事務(wù)機(jī)制相關(guān)的方法有3個(gè)

消息成功被發(fā)送到RabbitMQ的exchange上,事務(wù)才能提交成功,否則便可在捕獲異常之后進(jìn)行事務(wù)回滾,與此同時(shí)可以進(jìn)行消息重發(fā)
因?yàn)槭聞?wù)會(huì)榨干RabbitMQ的性能,所以一般使用發(fā)布者確認(rèn)代替事務(wù)
chapter_10: 消息持久化
消息做持久化,只需要將消息屬性的delivery-mode設(shè)置為2即可
RabbitMQ給我們封裝了這個(gè)屬性,即MessageProperties.PERSISTENT_TEXT_PLAIN,
詳細(xì)使用可以參考github的代碼
當(dāng)我們想做消息的持久化時(shí),最好同時(shí)設(shè)置隊(duì)列和消息的持久化,因?yàn)橹辉O(shè)置隊(duì)列的持久化,重啟之后消息會(huì)丟失。只設(shè)置隊(duì)列的持久化,重啟后隊(duì)列消失,繼而消息也丟失
chapter_11: 死信隊(duì)列
DLX,全稱為Dead-Letter-Exchange,稱之為死信交換器。當(dāng)一個(gè)消息在隊(duì)列中變成死信(dead message)之后,它能被重新發(fā)送到另一個(gè)交換器中,這個(gè)交換器就是DLX,綁定DLX的隊(duì)列就稱之為死信隊(duì)列。
DLX也是一個(gè)正常的交換器,和一般的交換器沒(méi)有區(qū)別,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性
消息變成死信一般是由于以下幾種情況
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
消息被拒絕(Basic.Reject/Basic.Nack)且不重新投遞(requeue=false)
消息過(guò)期
隊(duì)列達(dá)到最大長(zhǎng)度
死信交換器和備用交換器的區(qū)別
備用交換器: 1.消息無(wú)法路由時(shí)轉(zhuǎn)到備用交換器 2.備用交換器是在聲明主交換器的時(shí)候定義的
死信交換器: 1.消息已經(jīng)到達(dá)隊(duì)列,但是被消費(fèi)者拒絕等的消息會(huì)轉(zhuǎn)到死信交換器。2.死信交換器是在聲明隊(duì)列的時(shí)候定義的
chapter_12: 流量控制(服務(wù)質(zhì)量保證)
qos即服務(wù)端限流,qos對(duì)于拉模式的消費(fèi)方式無(wú)效
使用qos只要進(jìn)行如下2個(gè)步驟即可
autoAck設(shè)置為false(autoAck=true的時(shí)候不生效)
調(diào)用basicConsume方法前先調(diào)用basicQos方法,這個(gè)方法有3個(gè)參數(shù)
basicQos(int prefetchSize, int prefetchCount, boolean global)

為什么要使用qos?
提高服務(wù)穩(wěn)定性。假設(shè)消費(fèi)端有一段時(shí)間不可用,導(dǎo)致隊(duì)列中有上萬(wàn)條未處理的消息,如果開(kāi)啟客戶端,
巨量的消息推送過(guò)來(lái),可能會(huì)導(dǎo)致消費(fèi)端變卡,也有可能直接不可用,所以服務(wù)端限流很重要
提高吞吐量。當(dāng)隊(duì)列有多個(gè)消費(fèi)者時(shí),隊(duì)列收到的消息以輪詢的方式發(fā)送給消費(fèi)者。但由于機(jī)器性能等的原因,每個(gè)消費(fèi)者的消費(fèi)能力不一樣,
這就會(huì)導(dǎo)致一些消費(fèi)者處理完了消費(fèi)的消息,而另一些則還堆積了一些消息,會(huì)造成整體應(yīng)用吞吐量的下降
感謝各位的閱讀!關(guān)于“怎么使用RabbitMQ”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!
當(dāng)前題目:怎么使用RabbitMQ
URL鏈接:http://chinadenli.net/article28/gsjejp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站維護(hù)、做網(wǎng)站、、建站公司、網(wǎng)站導(dǎo)航、響應(yīng)式網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)