小編給大家分享一下如何設(shè)置RabbitMQ延遲隊(duì)列,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
成都創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的大悟網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
延遲消費(fèi)。比如:用戶生成訂單之后,需要過一段時(shí)間校驗(yàn)訂單的支付狀態(tài),如果訂單仍未支付則需要及時(shí)地關(guān)閉訂單;用戶注冊(cè)成功之后,需要過一段時(shí)間比如一周后校驗(yàn)用戶的使用情況,如果發(fā)現(xiàn)用戶活躍度較低,則發(fā)送郵件或者短信來提醒用戶使用。
rabbitmq的消息TTL和死信Exchange結(jié)合
1.消息的TTL(Time To Live)
消息的TTL就是消息的存活時(shí)間。RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。對(duì)隊(duì)列設(shè)置就是隊(duì)列沒有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱之為死信。如果隊(duì)列設(shè)置了,消息也設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息死亡的時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。這里單講單個(gè)消息的TTL,因?yàn)樗攀菍?shí)現(xiàn)延遲任務(wù)的關(guān)鍵??梢酝ㄟ^設(shè)置消息的expiration字段或者x-message-ttl屬性來設(shè)置時(shí)間,兩者是一樣的效果。
2.Dead Letter Exchanges
Exchage的概念在這里就不在贅述。一個(gè)消息在滿足如下條件下,會(huì)進(jìn)死信路由,記住這里是路由而不是隊(duì)列,一個(gè)路由可以對(duì)應(yīng)很多隊(duì)列。
①.一個(gè)消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。
②. 上面的消息的TTL到了,消息過期了。
③. 隊(duì)列的長(zhǎng)度限制滿了。排在前面的消息會(huì)被丟棄或者扔到死信路由上。
Dead Letter Exchange其實(shí)就是一種普通的exchange,和創(chuàng)建其他exchange沒有兩樣。只是在某一個(gè)設(shè)置Dead Letter Exchange的隊(duì)列中有消息過期了,會(huì)自動(dòng)觸發(fā)消息的轉(zhuǎn)發(fā),發(fā)送到Dead Letter Exchange中去。
3.實(shí)現(xiàn)延遲隊(duì)列
我們先設(shè)置好各個(gè)配置的字符串
public interface TestMq {/** * 隊(duì)列名 */ String TEST_QUEUE = "test";; /** * 服務(wù)添加routing key */ String ROUTING_KEY_TEST = "post.test"; /** * 死信隊(duì)列 */ String DEAD_QUEUE = "dead"; String ROURING_KEY_DEAD = "dead.routing.key"; String MQ_EXCHANGE_DEAD = "dead.exchange";}
配置信息
/** * rabbitmq配置 * */@Configurationpublic class RabbitmqConfig { /** * 死信隊(duì)列 * @return */ @Bean public Queue deadQueue() { Map<String,Object> arguments = new HashMap<>(); //此處填入死信交換機(jī) arguments.put("x-dead-letter-exchange", TestMq.MQ_EXCHANGE_DEAD); //此處填入消息隊(duì)列的路由,而非死信隊(duì)列自己的路由 arguments.put("x-dead-letter-routing-key", TestMq.ROUTING_KEY_TEST); return new Queue(TestMq.DEAD_QUEUE,true,false,false,arguments); } /** * 死信交換機(jī) * @return */ @Bean public DirectExchange deadExchange() { return new DirectExchange(TestMq.MQ_EXCHANGE_DEAD); } /** * 綁定死信隊(duì)列到死信交換機(jī) * @return */ @Bean public Binding bindingDeadExchange() { return BindingBuilder.bind(deadQueue()).to(deadExchange()) .with(TestMq.ROURING_KEY_DEAD); } /** * 被消費(fèi)者偵聽的獲取消息的隊(duì)列 * @return */ @Bean public Queue testQueue() { return new Queue(TestMq.TEST_QUEUE,true,false,false); } /** * 將消息隊(duì)列綁定到死信交換機(jī),跟死信隊(duì)列的路由不同 * @return */ @Bean public Binding bindingTest() { return BindingBuilder.bind(testQueue()).to(deadExchange()) .with(TestMq.ROUTING_KEY_TEST); } }
消息生產(chǎn)者
@Slf4j@Componentpublic class TestSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); MessagePostProcessor processor = message -> {//給消息設(shè)置的過期時(shí)間,我們這里為10秒 message.getMessageProperties().setExpiration(10000 + ""); return message; }; this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content),processor); }/** * 確認(rèn)后回調(diào): * @param correlationData * @param ack * @param cause */ @Override public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {if (!ack) {log.info("send ack fail, cause = " + cause); } else {log.info("send ack success"); } }/** * 失敗后return回調(diào): * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); }/** * 對(duì)消息對(duì)象進(jìn)行二進(jìn)制序列化 * @param o * @return */ private byte[] serialize(Object o) { Kryo kryo = new Kryo(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); kryo.writeObject(output, o); output.close(); return stream.toByteArray(); } }
消費(fèi)者
@Slf4j@Component@RabbitListener(queues = TestMq.TEST_QUEUE)public class TestConsumer {@RabbitHandler public void receice(byte[] data, Channel channel, Message message) throws IOException {try {//告訴服務(wù)器收到這條消息 已經(jīng)被我消費(fèi)了 可以在隊(duì)列刪掉;否則消息服務(wù)器以為這條消息沒處理掉 后續(xù)還會(huì)在發(fā) channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); Integer orderNo = unSerialize(data); log.info(orderNo + "為收到的消息"); } catch (IOException e) { e.printStackTrace(); //丟棄這條消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); log.info("receiver fail"); } }/** * 反序列化 * @param data * @return */ private Integer unSerialize(byte[] data) { Input input = null; try { Kryo kryo = new Kryo(); input = new Input(new ByteArrayInputStream(data)); return kryo.readObject(input,Integer.class); }finally { input.close(); } } }
我們隨便寫個(gè)測(cè)試
@Servicepublic class TestService {@Autowired private TestSender sender; @PostConstruct public void test() {//此處順序?yàn)樗佬沤粨Q機(jī),死信隊(duì)列路由,消息 sender.send(TestMq.MQ_EXCHANGE_DEAD,TestMq.ROURING_KEY_DEAD,1); } }
經(jīng)測(cè)試
2019-10-11 17:26:18.079 INFO 879 --- [ main] c.g.rabbitdelay.config.TestSender : send content=1
2019-10-11 17:26:18.098 INFO 879 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [xxx.xxx.xxx.xxx:5672]
2019-10-11 17:26:18.227 INFO 879 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2301b75:0/SimpleConnection@243f003c [delegate=amqp://admin@xxx.xxx.xxx.xxx:5672/, localPort= 52345]
2019-10-11 17:26:18.337 INFO 879 --- [39.9.225.2:5672] c.g.rabbitdelay.config.TestSender : send ack success
2019-10-11 17:26:18.446 INFO 879 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-10-11 17:26:18.751 INFO 879 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-10-11 17:26:18.959 INFO 879 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-10-11 17:26:18.962 INFO 879 --- [ main] c.g.rabbitdelay.RabbitdelayApplication : Started RabbitdelayApplication in 17.093 seconds (JVM running for 27.45)
2019-10-11 17:26:28.342 INFO 879 --- [ntContainer#0-1] c.g.rabbitdelay.consumer.TestConsumer : 1為收到的消息
通過日志可以看到,發(fā)送消息是18秒,收到消息消費(fèi)為28秒,中間隔了10秒鐘。
以上是“如何設(shè)置RabbitMQ延遲隊(duì)列”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
網(wǎng)站標(biāo)題:如何設(shè)置RabbitMQ延遲隊(duì)列
文章位置:http://chinadenli.net/article18/ppihgp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計(jì)、關(guān)鍵詞優(yōu)化、企業(yè)建站、標(biāo)簽優(yōu)化、搜索引擎優(yōu)化、網(wǎng)站導(dǎo)航
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)