欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

SpringBoot整合RabbitMQ-創(chuàng)新互聯(lián)

RabbitMQ部署指北
下載鏡像

docker pull rabbitmq:3.8-management

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:域名與空間、雅安服務(wù)器托管、營銷軟件、網(wǎng)站建設(shè)、吉利網(wǎng)站維護(hù)、網(wǎng)站推廣。
執(zhí)行下面的命令來運(yùn)行MQ容器:

docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=zhangbo123456* \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \?
-p 5672:5672 \
-d \
rabbitmq:3.8-management

什么是消息隊列

MQ全稱為Message Queue,即消息隊列?!跋㈥犃小笔窃谙⒌膫鬏斶^程中保存消息的容器。它是典型的:生產(chǎn)者、消費(fèi)者模型。生產(chǎn)者不斷向消息隊列中生產(chǎn)消息,消費(fèi)者不斷的從隊列中獲取消息。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的,而且只關(guān)心消息的發(fā)送和接收,沒有業(yè)務(wù)邏輯的侵入,這樣就實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者的解耦。

RabbitMQ快速入門

RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queue 高級消息隊列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊列,它是一種應(yīng)用程序之間的通信方法,消息隊列在分布式系統(tǒng)開發(fā)中應(yīng)用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com

SpringAMQP

1,Basic Queue 簡單隊列模型

2,Work Queue 工作隊列模型

3,發(fā)布訂閱模型 fanout

4,發(fā)布訂閱模型 Direct

5,發(fā)布訂閱模型 Topic

6,消息轉(zhuǎn)換器

概念:

AMQP:是用于在應(yīng)用程序或之間傳遞業(yè)務(wù)消息的開放標(biāo)準(zhǔn),該協(xié)議與語言和平臺無關(guān),更符合微服務(wù)中獨(dú)立性的要求

SpringAMQP:是基于AMQP協(xié)議定義的一套API規(guī)范,提供了模板來發(fā)送和接收消息,包含兩部分,其中spring-amqp是基礎(chǔ)抽象,spring-rabbit是底層的默認(rèn)實(shí)現(xiàn)

AMQP和JMS區(qū)別和聯(lián)系

MQ是消息通信的模型,并發(fā)具體實(shí)現(xiàn)?,F(xiàn)在實(shí)現(xiàn)MQ的有兩種主流方式:AMQP、JMS。

兩者間的區(qū)別和聯(lián)系:

  • JMS是定義了統(tǒng)一的接口,來對消息操作進(jìn)行統(tǒng)一;AMQP是通過規(guī)定協(xié)議來統(tǒng)一數(shù)據(jù)交互的格式

  • JMS限定了必須使用Java語言;AMQP只是協(xié)議,不規(guī)定實(shí)現(xiàn)方式,因此是跨語言的。

  • JMS規(guī)定了兩種消息模型;而AMQP的消息模型更加豐富

HelloWorld案例

官方的helloword是基于最基礎(chǔ)的消息隊列模型來實(shí)現(xiàn)的,其中包括三個角色

1,publisher:消息發(fā)布者,要將消息發(fā)布到隊列queue

2,queue:消息隊列,負(fù)責(zé)接收并緩存消息

3,consumer:訂閱隊列,處理隊列中的消息

基本消息隊列的消息發(fā)送流程

1,建立connection

2,創(chuàng)建channel

3,利用channel聲名隊列

4,利用channel向隊列發(fā)送消息

基本消息隊列的消息接收流程

1,建立connection

2,創(chuàng)建channel

3,利用channel聲名隊列

4,定義consumer的消費(fèi)行為handleDelivery

5,利用channel將消費(fèi)者與隊列綁定

快速開始

第一步導(dǎo)入依賴


?org.springframework.boot
? spring-boot-starter-amqp

第二步編寫配置文件

spring:
  rabbitmq:
    host: 47.99.139.160  #主機(jī)
    port: 5672   #端口號
    virtual-host: /    #虛擬主機(jī)
    username: itcast   #用戶名
    password: zhangbo123456*   #密碼

第三步編寫測試方法

@Autowired
RabbitTemplate rabbitTemplate;

@Test
void contextLoads() {
    String queueName = "simple.queue";
    String message = "hello , spring amqp";
    rabbitTemplate.convertAndSend(queueName,message);
}

小注:這個消息不會 創(chuàng)建隊列,所以要手動創(chuàng)建隊列

第四步在Consumer中編寫消費(fèi)邏輯,監(jiān)聽隊列

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimplateQueueMessage(String msg) throws InterruptedException{
        System.out.println("spring消費(fèi)者接收到消息:"+msg);
    }
}

消息預(yù)取限制

修改application.yml,設(shè)置preFetch這個值,可以控制預(yù)取消息的上線

spring:
rabbitmq:
? host: 47.99.139.160 ?#主機(jī)
? port: 5672 ? #端口號
? virtual-host: / ? ?#虛擬主機(jī)
? username: itcast ? #用戶名
? password: zhangbo123456* ? #密碼
? listener:
simple:
? prefetch: 1 ?#每次只能獲取一條消息,處理完成才能獲取下一條消息?

發(fā)布 訂閱

發(fā)布訂閱模式允許將同一消息發(fā)送個多個消費(fèi)者,實(shí)現(xiàn)方式是加入了exchange

常見exchange類型包括

  • Fanout:廣播

  • Direct:路由

  • Topic:話題

發(fā)布訂閱-Fanout Exchange

Fanout Exchange會將接收到的消息路由到每一個跟其綁定的queue(可以用于實(shí)現(xiàn)廣播模式)

實(shí)現(xiàn)思路:

1,在consumer服務(wù)中,利用代碼聲明隊列,交換機(jī),并將兩者綁定

2,在consumer服務(wù)中,編寫兩個消費(fèi)者方法,分別監(jiān)聽fanout.queue1和fanout.queue2

3,在publisher中編寫測試方法,向itcast.fanout發(fā)送消息

步驟一 :在consumer服務(wù)聲名exchange,queue,binding,在consumer服務(wù)聲名一個配置類,添加@Configuration注解,并聲明FanoutExchange,queue和綁定關(guān)系對象binding

@Configuration
public class FanoutConfig {

    //聲名FanoutChange交換機(jī)
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    //聲名第一個隊列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    //綁定隊列一和交換機(jī)
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1 , FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //...略,以相同的方式聲名第二個隊列,并完成綁定
}

consumer代碼

//fanout 模式
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueueMessage(String msg) throws InterruptedException{
        System.out.println("spring消費(fèi)者接收到fanout.queue1消息:"+msg);
    }

    //fanout 模式
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueueMessage2(String msg) throws InterruptedException{
        System.out.println("spring消費(fèi)者接收到fanout.queue2消息:"+msg);
    }

publisher代碼

//fanout 模式
    @Test
    public void testSendFanoutExchange(){
        //交換機(jī)名稱
        String exchangeName = "itcast.fanout";
        //消息
        String message = "hello , every one";
        //發(fā)送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }

總結(jié):

交換機(jī)的作用?

1,接收publisher發(fā)送的消息

2,將消息按照路由規(guī)則路由到與之綁定的隊列

3,不能緩存消息,路由失敗,消息丟失

4,F(xiàn)anoutExchange的會將消息路由到每個綁定的隊列

聲名隊列,交換機(jī),綁定關(guān)系的bean是什么?

  • queue

  • fanoutExchange

  • Binding

發(fā)布訂閱-DirectExchange

Direct Exchange會將接收到的消息根據(jù)規(guī)則路由到指定的queue,因此稱之為路由模式(routes)

  • 每一個Queue都與Exchange設(shè)置一個BindingKey

  • 發(fā)布者發(fā)送消息時,指定消息的RoutingKey

  • Exchange將消息路由到BindingKey與消息RoutingKey一致的隊列

案例實(shí)現(xiàn)思路

1,利用@RabbitListener聲名Exchange,Queue,RoutingKey

2,zaiconsumer服務(wù)中,編寫兩個消費(fèi)者方法,分別監(jiān)聽direct.queue和direct.queue2

3,在publisher中編寫測試方法,向itcast.direct發(fā)送消息

consumer

//direct模式
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void listenDirectQueue(String msg){
        System.out.println("spring消費(fèi)者接收到direct.queue1消息:"+msg);
    }

    //direct模式
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("spring消費(fèi)者接收到direct.queue2消息:"+msg);
    }

publisher

//direct 模式
    @Test
    public void testSendDirectExchange(){
        //交換機(jī)名稱
        String exchangeName = "itcast.direct";
        //消息
        String message = "hello , smoky";
        //發(fā)送消息  參數(shù)分別是:交換機(jī)名稱 RoutingKey(暫時為空,路由key),消息
        rabbitTemplate.convertAndSend(exchangeName,"smoky",message);
    }

總結(jié):

描述direct交換機(jī)和fanout交換機(jī)的差異?

fanout交換機(jī)將消息發(fā)送給每一個與之綁定的隊列

directii交換機(jī)根據(jù)RoutingKey判斷路由給那個隊列

如果多個隊列具有相同的RoutingKey,則與Fanout功能類似

基于@RabbitListener注解聲名隊列和交換機(jī)有哪些常見注解?

@Queue

@Exchange

發(fā)布訂閱-TopicExchange

TopicExchange與DirectExchange類似,區(qū)別在于routingKey必須是多個單詞的列表,并且以.分割

Queue與Exchange指定BIndingKey時可以指定通配符

#:代指0個或多個單詞

*:代指一個單詞

案例實(shí)現(xiàn)思路

1,利用@RabbitListener聲名Exchange Queue RoutingKey

2,在consumer服務(wù)中,編寫兩個消費(fèi)者方法,分別監(jiān)聽topic.queue1和topic.queue2

3,在publisher中編寫測試方法,向itcast.topic發(fā)送消息

consumer

//topic模式
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue1"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = "chain.#"
    ))
    public void listenTopictQueue1(String msg){
        System.out.println("spring消費(fèi)者接收到topic.queue1消息:"+msg);
    }

    //topic模式
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue2"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = "*.news"
    ))
    public void listenTopictQueue2(String msg){
        System.out.println("spring消費(fèi)者接收到topic.queue2消息:"+msg);
    }

publisher

//direct 模式
    @Test
    public void testSendTopictExchange(){
        //交換機(jī)名稱
        String exchangeName = "itcast.topic";
        //消息
        String message = "今天天氣很好呀";
        //發(fā)送消息  參數(shù)分別是:交換機(jī)名稱 RoutingKey(暫時為空,路由key),消息
        rabbitTemplate.convertAndSend(exchangeName,"chain.weather",message);
    }
測試發(fā)送Object類型消息,消息轉(zhuǎn)換器

說明:在SpringAMQP的發(fā)送方法中,接收到的消息類型是Object,也就是說我們可以發(fā)送任意對象類型的消息,SpringAMQP會幫我們序列化為字節(jié)后發(fā)送,用的jdk的序列化器

補(bǔ)充: 使用jdk的序列化器的缺點(diǎn):1,性能比較差 2,安全性不好,容易出現(xiàn)注入的問題 3,數(shù)據(jù)長度長,占用額外內(nèi)存

測試代碼

//測試Object類型消息
    @Test
    public void sendObjectQueue(){
        Mapmsg = new HashMap<>();
        msg.put("name","柳巖");
        msg.put("age",21);
        rabbitTemplate.convertAndSend("object.queue",msg);
    }

Spring的對消息對象的處理是由import org.springframework.messaging.converter.MessageConverter;來處理的,而默認(rèn)實(shí)現(xiàn)是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化

如果要修改只需要定義一個MessageConverter類型的bean即可,推薦使用JSON的方式序列化

引入依賴

?
?com.fasterxml.jackson.core
? jackson-databind
?

聲名一個MessageConverter類型的bean ?

@Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

consumer

引入依賴


?com.fasterxml.jackson.core
? jackson-databind
?

consumer服務(wù)定義MessageConverter

@Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

SpringAMQP中消息的序列化和反序列化是怎么實(shí)現(xiàn)的

  • 利用MessageConverter實(shí)現(xiàn)的,默認(rèn)是JDK的序列化

  • 注意發(fā)送方接收必須使用相同的MessageConverter

MQ的一些常見問題

1,消息可靠性:如何確保發(fā)送的消息至少被消費(fèi)一次

2,延遲消息問題:如何實(shí)現(xiàn)消息的延遲投遞

3,高可用問題:如何避免單點(diǎn)的MQ故障而導(dǎo)致的不可用問題

4,消息堆積問題:如何解決數(shù)百萬消息堆積,無法及時消費(fèi)的問題

消息可靠性問題

消息從生產(chǎn)者發(fā)送到exchange,再到queue,再到消費(fèi)者,有哪些導(dǎo)致消息丟失的可能性?

  • 發(fā)送時丟失,

    • 生產(chǎn)者發(fā)送的消息未到達(dá)exchange

    • 消息到達(dá)exchange后未到達(dá)queue

  • MQ宕機(jī),queue將消息丟失

  • consumer接收到消息后未消費(fèi)就宕機(jī)

生產(chǎn)者確認(rèn)機(jī)制

RabbitMq提供了publisher confirm機(jī)制避免消息發(fā)送到MQ的過程中丟失。消息發(fā)送到MQ以后,會返回一個結(jié)果給發(fā)送者,表示消息是否處理成功,結(jié)果有兩種請求

  • publisher-confirm,發(fā)送者確認(rèn)

    • 消息成功投遞到交換機(jī)返回ack

    • 消息未投遞到交換機(jī),返回nack

  • publisher-return

    • 消息投遞到交換機(jī)了,但是沒有路由到隊列,返回ACK,及路由失敗原因

注意:確認(rèn)機(jī)制發(fā)送消息時,需要給每個消息設(shè)置一個全局唯一id,以區(qū)分不同的消息,避免ack沖突

消費(fèi)者確認(rèn)

RabbitMQ支持消費(fèi)者確認(rèn)機(jī)制,即消費(fèi)者成功處理消息后可以向MQ發(fā)送ack回執(zhí),MQ收到ack回執(zhí)后才會刪除該消息,

而SpringAMQP允許配置三種確認(rèn)模式

  • manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack

  • auto:自動ack,由spring檢測listener代碼是否出現(xiàn)異常,沒有異常則返回ack,拋出異常則返回nack

  • none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會成功處理,因此消息投遞后會立即被刪除

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

分享文章:SpringBoot整合RabbitMQ-創(chuàng)新互聯(lián)
網(wǎng)站路徑:http://chinadenli.net/article46/ddehhg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版、網(wǎng)站設(shè)計移動網(wǎng)站建設(shè)、面包屑導(dǎo)航、品牌網(wǎng)站制作、建站公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁設(shè)計公司