下載鏡像docker pull rabbitmq:3.8-management
讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:域名與空間、雅安服務(wù)器托管、營銷軟件、網(wǎng)站建設(shè)、吉利網(wǎng)站維護(hù)、網(wǎng)站推廣。
執(zhí)行下面的命令來運(yùn)行MQ容器:docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=zhangbo123456* \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \?
-p 5672:5672 \
-d \
rabbitmq:3.8-management
什么是消息隊列MQ全稱為Message Queue,即消息隊列?!跋㈥犃小笔窃谙⒌膫鬏斶^程中保存消息的容器。它是典型的:生產(chǎn)者、消費(fèi)者模型。生產(chǎn)者不斷向消息隊列中生產(chǎn)消息,消費(fèi)者不斷的從隊列中獲取消息。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的,而且只關(guān)心消息的發(fā)送和接收,沒有業(yè)務(wù)邏輯的侵入,這樣就實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者的解耦。
RabbitMQ快速入門RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queue 高級消息隊列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊列,它是一種應(yīng)用程序之間的通信方法,消息隊列在分布式系統(tǒng)開發(fā)中應(yīng)用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com
SpringAMQP1,Basic Queue 簡單隊列模型
2,Work Queue 工作隊列模型
3,發(fā)布訂閱模型 fanout
4,發(fā)布訂閱模型 Direct
5,發(fā)布訂閱模型 Topic
6,消息轉(zhuǎn)換器
概念:
AMQP:是用于在應(yīng)用程序或之間傳遞業(yè)務(wù)消息的開放標(biāo)準(zhǔn),該協(xié)議與語言和平臺無關(guān),更符合微服務(wù)中獨(dú)立性的要求
SpringAMQP:是基于AMQP協(xié)議定義的一套API規(guī)范,提供了模板來發(fā)送和接收消息,包含兩部分,其中spring-amqp是基礎(chǔ)抽象,spring-rabbit是底層的默認(rèn)實(shí)現(xiàn)
AMQP和JMS區(qū)別和聯(lián)系MQ是消息通信的模型,并發(fā)具體實(shí)現(xiàn)?,F(xiàn)在實(shí)現(xiàn)MQ的有兩種主流方式:AMQP、JMS。
兩者間的區(qū)別和聯(lián)系:
JMS是定義了統(tǒng)一的接口,來對消息操作進(jìn)行統(tǒng)一;AMQP是通過規(guī)定協(xié)議來統(tǒng)一數(shù)據(jù)交互的格式
JMS限定了必須使用Java語言;AMQP只是協(xié)議,不規(guī)定實(shí)現(xiàn)方式,因此是跨語言的。
JMS規(guī)定了兩種消息模型;而AMQP的消息模型更加豐富
HelloWorld案例快速開始官方的helloword是基于最基礎(chǔ)的消息隊列模型來實(shí)現(xiàn)的,其中包括三個角色
1,publisher:消息發(fā)布者,要將消息發(fā)布到隊列queue
2,queue:消息隊列,負(fù)責(zé)接收并緩存消息
3,consumer:訂閱隊列,處理隊列中的消息
基本消息隊列的消息發(fā)送流程
1,建立connection
2,創(chuàng)建channel
3,利用channel聲名隊列
4,利用channel向隊列發(fā)送消息
基本消息隊列的消息接收流程
1,建立connection
2,創(chuàng)建channel
3,利用channel聲名隊列
4,定義consumer的消費(fèi)行為handleDelivery
5,利用channel將消費(fèi)者與隊列綁定
第一步導(dǎo)入依賴
?org.springframework.boot
? spring-boot-starter-amqp
第二步編寫配置文件
spring: rabbitmq: host: 47.99.139.160 #主機(jī) port: 5672 #端口號 virtual-host: / #虛擬主機(jī) username: itcast #用戶名 password: zhangbo123456* #密碼
第三步編寫測試方法
@Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() { String queueName = "simple.queue"; String message = "hello , spring amqp"; rabbitTemplate.convertAndSend(queueName,message); }
小注:這個消息不會 創(chuàng)建隊列,所以要手動創(chuàng)建隊列
第四步在Consumer中編寫消費(fèi)邏輯,監(jiān)聽隊列
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimplateQueueMessage(String msg) throws InterruptedException{ System.out.println("spring消費(fèi)者接收到消息:"+msg); } }
消息預(yù)取限制
修改application.yml,設(shè)置preFetch這個值,可以控制預(yù)取消息的上線
spring:
rabbitmq:
? host: 47.99.139.160 ?#主機(jī)
? port: 5672 ? #端口號
? virtual-host: / ? ?#虛擬主機(jī)
? username: itcast ? #用戶名
? password: zhangbo123456* ? #密碼
? listener:
simple:
? prefetch: 1 ?#每次只能獲取一條消息,處理完成才能獲取下一條消息?
發(fā)布 訂閱發(fā)布訂閱模式允許將同一消息發(fā)送個多個消費(fèi)者,實(shí)現(xiàn)方式是加入了exchange
常見exchange類型包括
Fanout:廣播
Direct:路由
Topic:話題
發(fā)布訂閱-Fanout ExchangeFanout Exchange會將接收到的消息路由到每一個跟其綁定的queue(可以用于實(shí)現(xiàn)廣播模式)
實(shí)現(xiàn)思路:
1,在consumer服務(wù)中,利用代碼聲明隊列,交換機(jī),并將兩者綁定
2,在consumer服務(wù)中,編寫兩個消費(fèi)者方法,分別監(jiān)聽fanout.queue1和fanout.queue2
3,在publisher中編寫測試方法,向itcast.fanout發(fā)送消息
步驟一 :在consumer服務(wù)聲名exchange,queue,binding,在consumer服務(wù)聲名一個配置類,添加@Configuration注解,并聲明FanoutExchange,queue和綁定關(guān)系對象binding
@Configuration public class FanoutConfig { //聲名FanoutChange交換機(jī) @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } //聲名第一個隊列 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } //綁定隊列一和交換機(jī) @Bean public Binding bindingQueue1(Queue fanoutQueue1 , FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } //...略,以相同的方式聲名第二個隊列,并完成綁定 }
consumer代碼
//fanout 模式 @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueueMessage(String msg) throws InterruptedException{ System.out.println("spring消費(fèi)者接收到fanout.queue1消息:"+msg); } //fanout 模式 @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueueMessage2(String msg) throws InterruptedException{ System.out.println("spring消費(fèi)者接收到fanout.queue2消息:"+msg); }
publisher代碼
//fanout 模式 @Test public void testSendFanoutExchange(){ //交換機(jī)名稱 String exchangeName = "itcast.fanout"; //消息 String message = "hello , every one"; //發(fā)送消息 rabbitTemplate.convertAndSend(exchangeName,"",message); }
總結(jié):
交換機(jī)的作用?
1,接收publisher發(fā)送的消息
2,將消息按照路由規(guī)則路由到與之綁定的隊列
3,不能緩存消息,路由失敗,消息丟失
4,F(xiàn)anoutExchange的會將消息路由到每個綁定的隊列
聲名隊列,交換機(jī),綁定關(guān)系的bean是什么?
queue
fanoutExchange
Binding
發(fā)布訂閱-DirectExchangeDirect Exchange會將接收到的消息根據(jù)規(guī)則路由到指定的queue,因此稱之為路由模式(routes)
每一個Queue都與Exchange設(shè)置一個BindingKey
發(fā)布者發(fā)送消息時,指定消息的RoutingKey
Exchange將消息路由到BindingKey與消息RoutingKey一致的隊列
案例實(shí)現(xiàn)思路
1,利用@RabbitListener聲名Exchange,Queue,RoutingKey
2,zaiconsumer服務(wù)中,編寫兩個消費(fèi)者方法,分別監(jiān)聽direct.queue和direct.queue2
3,在publisher中編寫測試方法,向itcast.direct發(fā)送消息
consumer
//direct模式 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"red","blue"} )) public void listenDirectQueue(String msg){ System.out.println("spring消費(fèi)者接收到direct.queue1消息:"+msg); } //direct模式 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"red","yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("spring消費(fèi)者接收到direct.queue2消息:"+msg); }
publisher
//direct 模式 @Test public void testSendDirectExchange(){ //交換機(jī)名稱 String exchangeName = "itcast.direct"; //消息 String message = "hello , smoky"; //發(fā)送消息 參數(shù)分別是:交換機(jī)名稱 RoutingKey(暫時為空,路由key),消息 rabbitTemplate.convertAndSend(exchangeName,"smoky",message); }
總結(jié):
描述direct交換機(jī)和fanout交換機(jī)的差異?
fanout交換機(jī)將消息發(fā)送給每一個與之綁定的隊列
directii交換機(jī)根據(jù)RoutingKey判斷路由給那個隊列
如果多個隊列具有相同的RoutingKey,則與Fanout功能類似
基于@RabbitListener注解聲名隊列和交換機(jī)有哪些常見注解?
@Queue
@Exchange
發(fā)布訂閱-TopicExchangeTopicExchange與DirectExchange類似,區(qū)別在于routingKey必須是多個單詞的列表,并且以.分割
Queue與Exchange指定BIndingKey時可以指定通配符
#:代指0個或多個單詞
*:代指一個單詞
案例實(shí)現(xiàn)思路
1,利用@RabbitListener聲名Exchange Queue RoutingKey
2,在consumer服務(wù)中,編寫兩個消費(fèi)者方法,分別監(jiān)聽topic.queue1和topic.queue2
3,在publisher中編寫測試方法,向itcast.topic發(fā)送消息
consumer
//topic模式 @RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue1"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "chain.#" )) public void listenTopictQueue1(String msg){ System.out.println("spring消費(fèi)者接收到topic.queue1消息:"+msg); } //topic模式 @RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue2"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "*.news" )) public void listenTopictQueue2(String msg){ System.out.println("spring消費(fèi)者接收到topic.queue2消息:"+msg); }
publisher
//direct 模式 @Test public void testSendTopictExchange(){ //交換機(jī)名稱 String exchangeName = "itcast.topic"; //消息 String message = "今天天氣很好呀"; //發(fā)送消息 參數(shù)分別是:交換機(jī)名稱 RoutingKey(暫時為空,路由key),消息 rabbitTemplate.convertAndSend(exchangeName,"chain.weather",message); }
測試發(fā)送Object類型消息,消息轉(zhuǎn)換器說明:在SpringAMQP的發(fā)送方法中,接收到的消息類型是Object,也就是說我們可以發(fā)送任意對象類型的消息,SpringAMQP會幫我們序列化為字節(jié)后發(fā)送,用的jdk的序列化器
補(bǔ)充: 使用jdk的序列化器的缺點(diǎn):1,性能比較差 2,安全性不好,容易出現(xiàn)注入的問題 3,數(shù)據(jù)長度長,占用額外內(nèi)存
測試代碼
//測試Object類型消息 @Test public void sendObjectQueue(){ Map
msg = new HashMap<>(); msg.put("name","柳巖"); msg.put("age",21); rabbitTemplate.convertAndSend("object.queue",msg); }
Spring的對消息對象的處理是由import org.springframework.messaging.converter.MessageConverter;來處理的,而默認(rèn)實(shí)現(xiàn)是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化
如果要修改只需要定義一個MessageConverter類型的bean即可,推薦使用JSON的方式序列化
引入依賴
?
?com.fasterxml.jackson.core
? jackson-databind
?
聲名一個MessageConverter類型的bean ?
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
consumer
引入依賴
?com.fasterxml.jackson.core
? jackson-databind
?
consumer服務(wù)定義MessageConverter
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
SpringAMQP中消息的序列化和反序列化是怎么實(shí)現(xiàn)的
利用MessageConverter實(shí)現(xiàn)的,默認(rèn)是JDK的序列化
注意發(fā)送方接收必須使用相同的MessageConverter
MQ的一些常見問題1,消息可靠性:如何確保發(fā)送的消息至少被消費(fèi)一次
2,延遲消息問題:如何實(shí)現(xiàn)消息的延遲投遞
3,高可用問題:如何避免單點(diǎn)的MQ故障而導(dǎo)致的不可用問題
4,消息堆積問題:如何解決數(shù)百萬消息堆積,無法及時消費(fèi)的問題
消息可靠性問題
消息從生產(chǎn)者發(fā)送到exchange,再到queue,再到消費(fèi)者,有哪些導(dǎo)致消息丟失的可能性?
發(fā)送時丟失,
生產(chǎn)者發(fā)送的消息未到達(dá)exchange
消息到達(dá)exchange后未到達(dá)queue
MQ宕機(jī),queue將消息丟失
consumer接收到消息后未消費(fèi)就宕機(jī)
生產(chǎn)者確認(rèn)機(jī)制
RabbitMq提供了publisher confirm機(jī)制避免消息發(fā)送到MQ的過程中丟失。消息發(fā)送到MQ以后,會返回一個結(jié)果給發(fā)送者,表示消息是否處理成功,結(jié)果有兩種請求
publisher-confirm,發(fā)送者確認(rèn)
消息成功投遞到交換機(jī)返回ack
消息未投遞到交換機(jī),返回nack
publisher-return
消息投遞到交換機(jī)了,但是沒有路由到隊列,返回ACK,及路由失敗原因
注意:確認(rèn)機(jī)制發(fā)送消息時,需要給每個消息設(shè)置一個全局唯一id,以區(qū)分不同的消息,避免ack沖突
消費(fèi)者確認(rèn)
RabbitMQ支持消費(fèi)者確認(rèn)機(jī)制,即消費(fèi)者成功處理消息后可以向MQ發(fā)送ack回執(zhí),MQ收到ack回執(zhí)后才會刪除該消息,
而SpringAMQP允許配置三種確認(rèn)模式
manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack
auto:自動ack,由spring檢測listener代碼是否出現(xiàn)異常,沒有異常則返回ack,拋出異常則返回nack
none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會成功處理,因此消息投遞后會立即被刪除
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧
分享文章:SpringBoot整合RabbitMQ-創(chuàng)新互聯(lián)
網(wǎng)站路徑:http://chinadenli.net/article46/ddehhg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版、網(wǎng)站設(shè)計、移動網(wǎng)站建設(shè)、面包屑導(dǎo)航、品牌網(wǎng)站制作、建站公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容