rabbitMQ是一個(gè)在AMQP協(xié)議標(biāo)準(zhǔn)基礎(chǔ)上完整的,可服用的企業(yè)消息系統(tǒng)。他遵循Mozilla Public License開(kāi)源協(xié)議。采用 Erlang 實(shí)現(xiàn)的工業(yè)級(jí)的消息隊(duì)列(MQ)服務(wù)器。

AMQP(高級(jí)消息隊(duì)列協(xié)議) 是一個(gè)異步消息傳遞所使用的應(yīng)用層協(xié)議規(guī)范,作為線路層協(xié)議,而不是API(例如JMS),AMQP 客戶端能夠無(wú)視消息的來(lái)源任意發(fā)送和接受信息。AMQP的原始用途只是為金融界提供一個(gè)可以彼此協(xié)作的消息協(xié)議,而現(xiàn)在的目標(biāo)則是為通用消息隊(duì)列架構(gòu)提供通用構(gòu)建工具。因此,面向消息的中間件 (MOM)系統(tǒng),例如發(fā)布/訂閱隊(duì)列,沒(méi)有作為基本元素實(shí)現(xiàn)。反而通過(guò)發(fā)送簡(jiǎn)化的AMQ實(shí)體,用戶被賦予了構(gòu)建例如這些實(shí)體的能力。這些實(shí)體也是規(guī)范的一 部分,形成了在線路層協(xié)議頂端的一個(gè)層級(jí):AMQP模型。這個(gè)模型統(tǒng)一了消息模式,諸如之前提到的發(fā)布/訂閱,隊(duì)列,事務(wù)以及流數(shù)據(jù),并且添加了額外的特性,例如更易于擴(kuò)展,基于內(nèi)容的路由。
AMQP當(dāng)中有四個(gè)概念非常重要
virtual host,虛擬主機(jī) exchange,交換機(jī) queue,隊(duì)列 binding,綁定一個(gè)虛擬主機(jī)持有一組交換機(jī)、隊(duì)列和綁定。
為什么需要多個(gè)虛擬主機(jī)呢?因?yàn)镽abbitMQ當(dāng)中,用戶只能在虛擬主機(jī)的粒度進(jìn)行權(quán)限控制。因此,如果需要禁止A組訪問(wèn)B組的交換機(jī)/隊(duì)列/綁定,必須為A和B分別創(chuàng)建一個(gè)虛擬主機(jī)。每一個(gè)RabbitMQ服務(wù)器都有一個(gè)默認(rèn)的虛擬主機(jī)/。
何謂虛擬主機(jī)(virtual host),交換機(jī)(exchange),隊(duì)列(queue)和綁定(binding)
隊(duì)列(Queues)是你的消息(messages)的終點(diǎn),可以理解成裝消息的容器。消息就一直在里面,直到有客戶端(也就是消費(fèi)者,Consumer)連接到這個(gè)隊(duì)列并且將其取走為止。不過(guò),也可以將一個(gè)隊(duì)列配置成這樣的:一旦消息進(jìn)入這個(gè)隊(duì)列,此消息就被刪除。
隊(duì)列是由消費(fèi)者(Consumer)通過(guò)程序建立的,不是通過(guò)配置文件或者命令行工具。這沒(méi)什么問(wèn)題,如果一個(gè)消費(fèi)者試圖創(chuàng)建一個(gè)已經(jīng)存在的隊(duì)列,RabbitMQ會(huì)直接忽略這個(gè)請(qǐng)求。因此我們可以將消息隊(duì)列的配置寫(xiě)在應(yīng)用程序的代碼里面。
而要把一個(gè)消息放進(jìn)隊(duì)列前,需要有一個(gè)交換機(jī)(Exchange)。
交換機(jī)(Exchange)可以理解成具有路由表的路由程序。每個(gè)消息都有一個(gè)稱(chēng)為路由鍵(routing key)的屬性,就是一個(gè)簡(jiǎn)單的字符串。交換機(jī)當(dāng)中有一系列的綁定(binding),即路由規(guī)則(routes)。(例如,指明具有路由鍵 “X” 的消息要到名為timbuku的隊(duì)列當(dāng)中去。)
消費(fèi)者程序(Consumer)要負(fù)責(zé)創(chuàng)建你的交換機(jī)。交換機(jī)可以存在多個(gè),每個(gè)交換機(jī)在自己獨(dú)立的進(jìn)程當(dāng)中執(zhí)行,因此增加多個(gè)交換機(jī)就是增加多個(gè)進(jìn)程,可以充分利用服務(wù)器上的CPU核以便達(dá)到更高的效率。例如,在一個(gè)8核的服務(wù)器上,可以創(chuàng)建5個(gè)交換機(jī)來(lái)用5個(gè)核,另外3個(gè)核留下來(lái)做消息處理。類(lèi)似的,在RabbitMQ的集群當(dāng)中,你可以用類(lèi)似的思路來(lái)擴(kuò)展交換機(jī)一邊獲取更高的吞吐量。
交換機(jī)如何判斷要把消息送到哪個(gè)隊(duì)列?你需要路由規(guī)則,即綁定(binding)。一個(gè)綁定就是一個(gè)類(lèi)似這樣的規(guī)則:將交換機(jī)“desert(沙漠)”當(dāng)中具有路由鍵“阿里巴巴”的消息送到隊(duì)列“hideout(山洞)”里面去。換句話說(shuō),一個(gè)綁定就是一個(gè)基于路由鍵將交換機(jī)和隊(duì)列連接起來(lái)的路由規(guī)則。例如,具有路由鍵“audit”的消息需要被送到兩個(gè)隊(duì)列,“log-forever”和“alert-the-big-dude”。要做到這個(gè),就需要?jiǎng)?chuàng)建兩個(gè)綁定,每個(gè)都連接一個(gè)交換機(jī)和一個(gè)隊(duì)列,兩者都是由“audit”路由鍵觸發(fā)。在這種情況下,交換機(jī)會(huì)復(fù)制一份消息并且把它們分別發(fā)送到兩個(gè)隊(duì)列當(dāng)中。交換機(jī)不過(guò)就是一個(gè)由綁定構(gòu)成的路由表。
交換機(jī)有多種類(lèi)型。他們都是做路由的,但是它們接受不同類(lèi)型的綁定。為什么不創(chuàng)建一種交換機(jī)來(lái)處理所有類(lèi)型的路由規(guī)則呢?因?yàn)槊糠N規(guī)則用來(lái)做匹配分子的CPU開(kāi)銷(xiāo)是不同的。例如,一個(gè)“topic”類(lèi)型的交換機(jī)試圖將消息的路由鍵與類(lèi)似“dogs.*”的模式進(jìn)行匹配。匹配這種末端的通配符比直接將路由鍵與“dogs”比較(“direct”類(lèi)型的交換機(jī))要消耗更多的CPU。如果你不需要“topic”類(lèi)型的交換機(jī)帶來(lái)的靈活性,你可以通過(guò)使用“direct”類(lèi)型的交換機(jī)獲取更高的處理效率。那么有哪些類(lèi)型,他們又是怎么處理的呢?
Exchange
Exchange Direct Exchange Fanout Exchange Topic你花了大量的時(shí)間來(lái)創(chuàng)建隊(duì)列、交換機(jī)和綁定,然后,服務(wù)器程序掛了。你的隊(duì)列、交換機(jī)和綁定怎么樣了?還有,放在隊(duì)列里面但是尚未處理的消息們呢?
如果你是用默認(rèn)參數(shù)構(gòu)造的這一切的話,那么,他們都灰飛煙滅了。RabbitMQ重啟之后會(huì)干凈的像個(gè)新生兒。你必須重做所有的一切,亡羊補(bǔ)牢,如何避免將來(lái)再度發(fā)生此類(lèi)杯具?
隊(duì)列和交換機(jī)有一個(gè)創(chuàng)建時(shí)候指定的標(biāo)志durable。durable的唯一含義就是具有這個(gè)標(biāo)志的隊(duì)列和交換機(jī)會(huì)在重啟之后重新建立,它不表示說(shuō)在隊(duì)列當(dāng)中的消息會(huì)在重啟后恢復(fù)。那么如何才能做到不只是隊(duì)列和交換機(jī),還有消息都是持久的呢?
但是首先需要考慮的問(wèn)題是:是否真的需要消息的持久化?如果需要重啟后消息可以回復(fù),那么它需要被寫(xiě)入磁盤(pán)。但即使是最簡(jiǎn)單的磁盤(pán)操作也是要消耗時(shí)間的。所以需要衡量判斷。
當(dāng)你將消息發(fā)布到交換機(jī)的時(shí)候,可以指定一個(gè)標(biāo)志“Delivery Mode”(投遞模式)。根據(jù)你使用的AMQP的庫(kù)不同,指定這個(gè)標(biāo)志的方法可能不太一樣。簡(jiǎn)單的說(shuō),就是將Delivery Mode設(shè)置成2,也就是持久的(persistent)即可。一般的AMQP庫(kù)都是將Delivery Mode設(shè)置成1,也就是非持久的。所以要持久化消息的步驟如下:
將交換機(jī)設(shè)成 durable。 將隊(duì)列設(shè)成 durable。 將消息的 Delivery Mode 設(shè)置成2 。綁定(Bindings)怎么辦?綁定無(wú)法在創(chuàng)建的時(shí)候設(shè)置成durable。沒(méi)問(wèn)題,如果你綁定了一個(gè)durable的隊(duì)列和一個(gè)durable的交換機(jī),RabbitMQ會(huì)自動(dòng)保留這個(gè)綁定。類(lèi)似的,如果刪除了某個(gè)隊(duì)列或交換機(jī)(無(wú)論是不是durable),依賴(lài)它的綁定都會(huì)自動(dòng)刪除。
注意:
RabbitMQ 不允許你綁定一個(gè)非堅(jiān)固(non-durable)的交換機(jī)和一個(gè)durable的隊(duì)列。反之亦然。要想成功必須隊(duì)列和交換機(jī)都是durable的。 一旦創(chuàng)建了隊(duì)列和交換機(jī),就不能修改其標(biāo)志了。例如,如果創(chuàng)建了一個(gè)non-durable的隊(duì)列,然后想把它改變成durable的,唯一的辦法就是刪除這個(gè)隊(duì)列然后重現(xiàn)創(chuàng)建。因此,最好仔細(xì)檢查創(chuàng)建的標(biāo)志。安裝Rabbit MQ
Rabbit MQ 是建立在強(qiáng)大的Erlang OTP平臺(tái)上,因此安裝Rabbit MQ的前提是安裝Erlang。通過(guò)下面兩個(gè)連接下載安裝3.2.3 版本:
下載并安裝Eralng OTP For Windows(vR16B03) 運(yùn)行安裝Rabbit MQ Server Windows Installer(v3.2.3)默認(rèn)安裝的Rabbit MQ 監(jiān)聽(tīng)端口是5672
激活Rabbit MQ\'s Management Plugin
使用Rabbit MQ 管理插件,可以更好的可視化方式查看Rabbit MQ 服務(wù)器實(shí)例的狀態(tài),你可以在命令行中使用下面的命令激活:
"C:Program Files (x86)RabbitMQ Serverrabbitmq_server-3.2.3sbinrabbitmq-plugins.bat" enable rabbitmq_management
要重啟服務(wù)才能生效,可以執(zhí)行
net stop RabbitMQ && net start RabbitMQ
下面我們使用rabbitmqctl控制臺(tái)命令(位于C:Program Files (x86)RabbitMQ Serverrabbitmq_server-3.2.3sbin>)來(lái)創(chuàng)建用戶,密碼,綁定權(quán)限等。
Microsoft Windows [版本 6.3.9600]
(c) 2013 Microsoft Corporation。保留所有權(quán)利。
c:Program Files (x86)RabbitMQ Serverrabbitmq_server-3.2.3sbin 的目錄
2014/11/01 15:04 <DIR> .
2014/11/01 15:04 <DIR> ..
2014/01/23 22:57 817 rabbitmq-echopid.bat
2014/01/23 22:57 1,900 rabbitmq-plugins.bat
2014/01/23 22:57 4,356 rabbitmq-server.bat
2014/01/23 22:57 7,123 rabbitmq-service.bat
2014/01/23 22:57 1,621 rabbitmqctl.bat
5 個(gè)文件 15,817 字節(jié)
2 個(gè)目錄 96,078,618,624 可用字節(jié)
c:Program Files (x86)RabbitMQ Serverrabbitmq_server-3.2.3sbin>rabbitmqctl.ba
t list_users
Listing users ...
guest [administrator]
...done.
c:Program Files (x86)RabbitMQ Serverrabbitmq_server-3.2.3sbin>rabbitmqctl.ba
t list_vhosts
Listing vhosts ...
/
...done.
c:Program Files (x86)RabbitMQ Serverrabbitmq_server-3.2.3sbin>rabbitmqctl.ba
t add_user geffzhang zsy@2014
Creating user "geffzhang" ...
...done.
c:Program Files (x86)RabbitMQ Serverrabbitmq_server-3.2.3sbin>rabbitmqctl.ba
t list_users
Listing users ...
geffzhang []
guest [administrator]
...done.
c:Program Files (x86)RabbitMQ Serverrabbitmq_server-3.2.3sbin>rabbitmqctl.ba
t set_user_tags geffzhang administrator
Setting tags for user "geffzhang" to [administrator] ...
...done.
c:Program Files (x86)RabbitMQ Serverrabbitmq_server-3.2.3sbin>rabbitmqctl.ba
t set_permissions -p / geffzhang ".*" ".*" ".*"
Setting permissions for user "geffzhang" in vhost "/" ...
...done.
c:Program Files (x86)RabbitMQ Serverrabbitmq_server-3.2.3sbin>rabbitmqctl.ba
t list_users
Listing users ...
geffzhang [administrator]
guest [administrator]
...done.
在.NET上使用Rabbit MQ
通過(guò)Nuget 獲取Rabbit MQNET client bindings from NuGet:
PM> Install-Package RabbitMQ.Client
我們最常見(jiàn)的一個(gè)場(chǎng)景是發(fā)送和接收Rabbit MQ 持久化消息:
第一步是聲明durable Exchange 和 Queue
private readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory { HostName = "Geffzhang-NB", UserName="geffzhang", Password ="zsy@2014 ", VirtualHost ="/" };
conststringExchangeName="test.exchange";
conststringQueueName="test.queue";
using(IConnectionconn=rabbitMqFactory.CreateConnection())
using(IModelchannel=conn.CreateModel())
{
channel.ExchangeDeclare(ExchangeName,"direct",durable:true,autoDelete:false,arguments:null);
channel.QueueDeclare(QueueName,durable:true,exclusive:false,autoDelete:false,arguments:null);
channel.QueueBind(QueueName,ExchangeName,routingKey:QueueName);
}
下面對(duì)上面代碼進(jìn)行說(shuō)明:
1. 使用ConnectionFactory創(chuàng)建連接,雖然創(chuàng)建時(shí)指定了多個(gè)server address,但每個(gè)connection只與一個(gè)物理的server進(jìn)行連接。
2. 定義交換方式,創(chuàng)建了Direct Exchange和Durable Queue,并使用QueueName作為routing key,可以把消息直接投遞到某個(gè)隊(duì)列。rabbitmq交換方式分為三種,分別是:
Direct Exchange– 處理路由鍵。需要將一個(gè)隊(duì)列綁定到交換機(jī)上,要求該消息與一個(gè)特定的路由鍵完全匹配。這是一個(gè)完整的匹配。如果一個(gè)隊(duì)列綁定到該交換機(jī)上要求路由鍵 “dog”,則只有被標(biāo)記為“dog”的消息才被轉(zhuǎn)發(fā),不會(huì)轉(zhuǎn)發(fā)dog.puppy,也不會(huì)轉(zhuǎn)發(fā)dog.guard,只會(huì)轉(zhuǎn)發(fā)dog。
Fanout Exchange– 不處理路由鍵。你只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的。
Topic Exchange– 將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上。符號(hào)“#”匹配一個(gè)或多個(gè)詞,符號(hào)“*”匹配不多不少一個(gè)詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會(huì)匹配到“audit.irs”。
運(yùn)行上述代碼,可以在Rabbit MQ的管理控制臺(tái)上看到test.exchangeExchange 綁定到 創(chuàng)建的隊(duì)列test.queue
第二步就是發(fā)布持久化消息到隊(duì)列
Exchange和Queue建立好以后,就可以發(fā)送消息到隊(duì)列了。RabbitMq 可以接受byte[]的數(shù)據(jù),字符串采用utf-8編碼的字節(jié)數(shù)組。確保消息可持久化的,需要設(shè)置PersistMode為true,參看下面的代碼:
varprops=channel.CreateBasicProperties();
props.SetPersistent(true);
varmsgBody=Encoding.UTF8.GetBytes("Hello, World!");
channel.BasicPublish(ExchangeName,routingKey:QueueName,basicProperties:props,body:msgBody);
第三步就是消費(fèi)消息了,有幾種不同的方法從隊(duì)列中消費(fèi)消息,最常見(jiàn)的是使用BasicGet:
BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
NoAck:true告訴RabbitMQ立即從隊(duì)列中刪除消息,另一個(gè)非常受歡迎的方式是從隊(duì)列中刪除已經(jīng)確認(rèn)接收的消息,可以通過(guò)單獨(dú)調(diào)用BasicAck 進(jìn)行確認(rèn):
BasicGetResultmsgResponse=channel.BasicGet(QueueName,noAck:false);
//process message ...
channel.BasicAck(msgResponse.DeliveryTag,multiple:false);
使用BasicAck方式來(lái)告之是否從隊(duì)列中移除該條消息,這一點(diǎn)很重要,因?yàn)樵谀承?yīng)用場(chǎng)景下,比如從隊(duì)列中獲取消息并用它來(lái)操作數(shù)據(jù)庫(kù)或日志文件時(shí),如果出現(xiàn)操作失敗時(shí),則該條消息應(yīng)該保留在隊(duì)列中,只到操作成功時(shí)才從隊(duì)列中移除。
另一種方法是通過(guò)基于推送的事件訂閱。您可以使用內(nèi)置的QueueingBasicConsumer提供簡(jiǎn)化的編程模型,通過(guò)允許您在共享隊(duì)列上阻塞,直到收到一條消息,例如
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
var msgResponse = consumer.Queue.Dequeue(); //blocking
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
分享題目:在Windows上安裝RabbitMQ指南
當(dāng)前路徑:http://chinadenli.net/article26/cpcijg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供云服務(wù)器、全網(wǎng)營(yíng)銷(xiāo)推廣、網(wǎng)站設(shè)計(jì)、定制開(kāi)發(fā)、虛擬主機(jī)、網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)