幾個(gè)概念說明:

Broker:簡單來說就是消息隊(duì)列服務(wù)器實(shí)體。
Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
Queue:消息隊(duì)列載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。
vhost:虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費(fèi)者,就是接受消息的程序。
channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。
消息隊(duì)列的使用過程大概如下:
(1)客戶端連接到消息隊(duì)列服務(wù)器,打開一個(gè)channel。
(2)客戶端聲明一個(gè)exchange,并設(shè)置相關(guān)屬性。
(3)客戶端聲明一個(gè)queue,并設(shè)置相關(guān)屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系。
(5)客戶端投遞消息到exchange。
exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里。
exchange也有幾個(gè)類型,完全根據(jù)key進(jìn)行投遞的叫做Direct交換機(jī),例如,綁定時(shí)設(shè)置了routing key為”abc”,那么客戶端提交的消息,只有設(shè)置了key為”abc”的才會(huì)投遞到隊(duì)列。對key進(jìn)行模式匹配后進(jìn)行投遞的叫做Topic交換機(jī),符號(hào)”#”匹配一個(gè)或多個(gè)詞,符號(hào)”*”匹配正好一個(gè)詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機(jī),它采取廣播模式,一個(gè)消息進(jìn)來時(shí),投遞到與該交換機(jī)綁定的所有隊(duì)列。
RabbitMQ支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會(huì)選擇持久化。消息隊(duì)列持久化包括3個(gè)部分:
(1)exchange持久化,在聲明時(shí)指定durable => 1
(2)queue持久化,在聲明時(shí)指定durable => 1
(3)消息持久化,在投遞時(shí)指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個(gè)持久化,一個(gè)非持久化,就不允許建立綁定。
1.必需掌握的指令
添加用戶:
rabbitmqctl add_user rainbird password
添加權(quán)限:
rabbitmqctl set_permissions -p "/" rainbird ".*" ".*" ".*"
刪除測試用戶:
rabbitmqctl delete_user guest
所有指令列表(很簡單的英文):
add_user <UserName> <Password>
delete_user <UserName>
change_password <UserName> <NewPassword>
list_users
add_vhost <VHostPath>
delete_vhost <VHostPath>
list_vhosts
set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
clear_permissions [-p <VHostPath>] <UserName>
list_permissions [-p <VHostPath>]
list_user_permissions <UserName>
list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]
2.vhost / 不能刪除
刪除/以后,新建立的vhost不能正常使用(即便不刪除/,新建立的vhost也是不能正常使用).不知道為什么,有待研究.
3.關(guān)于持久化
示例里沒有一點(diǎn)兒和持久化相關(guān)的東東,而這卻是筆者最關(guān)心的,想想作為消息服務(wù)器如果不能保證消息一定被接收到,算什么事兒啊?比著網(wǎng)上狂轉(zhuǎn)的python版本從php-amqp的庫里一點(diǎn)一點(diǎn)兒翻,找到了如下持久化的設(shè)置:
接收端聲明隊(duì)列和交換機(jī)自動(dòng)建立:
$ch->queue_declare($_QUEUE,false,true,false,false);
第三個(gè)參數(shù)設(shè)置true保證服務(wù)器重啟后,自動(dòng)建立隊(duì)列
第五個(gè)參數(shù)設(shè)置成false防止接收端沒連接的時(shí)候丟失消息
$ch->exchange_declare($EXCHANGE, \'direct\', false, true, false);
第四個(gè)參數(shù)設(shè)置true保證重啟后,自動(dòng)建立交換機(jī)
第五個(gè)參數(shù)設(shè)置false防止接收端斷開后,交換機(jī)被刪除
發(fā)布端聲明消息持久:
$message = new AMQPMessage(serialize($object), array(\'content_type\' => \'text/plain\', \'delivery_mode\' => 2));
同時(shí)滿足了上面三個(gè)條件,就可以保證未接收的消息在服務(wù)器意外重啟以后依然存在了.
4.持久化的后遺癥
比如說你初始化了一個(gè)隊(duì)列msgs.你會(huì)發(fā)現(xiàn)它真的持久了!每次服務(wù)器端重啟后,通過list_queues命令查看的時(shí)候都存在.但是時(shí)間久了,這個(gè)msgs我們并不需要了,怎么辦呢?筆者發(fā)現(xiàn),想清除這個(gè)隊(duì)列只能刪除它所在的vhost,然后再重建vhost,再設(shè)置vhost的權(quán)限.
rabbitmqctl delete_vhost /
rabbitmqctl add_vhost /
rabbitmqctl set_permissions -p / rainbird \'.*\' \'.*\' \'.*\'
要注意,如果這個(gè)操作過程中有接收端處于連接狀態(tài)它們不會(huì)自動(dòng)斷開,但也不會(huì)再收到消息,需要手動(dòng)重新連接一下.
5.關(guān)于修改監(jiān)聽ip和監(jiān)聽端口
出于一些需要,比如我們有多個(gè)ip,我們希望rabbitmq僅運(yùn)行在指定的ip上.或者考慮到安全問題,我們希望修改一下rabbitmq的監(jiān)聽端口.默認(rèn)安裝完成以后,在/etc下面會(huì)有一個(gè)rabbitmq的空目錄,這時(shí)候我們需要手工創(chuàng)建rabbitmq.conf,并寫入相關(guān)內(nèi)容.
vi /etc/rabbitmq/rabbitmq.conf
RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
RABBITMQ_NODE_PORT=2222
保存以后重啟服務(wù)就生效了.
這個(gè)東東網(wǎng)上又沒介紹,翻了半天+無限嘗試才搞出來.
6.關(guān)于運(yùn)行接收端cpu100%問題
第一眼看到接收端會(huì)運(yùn)行一個(gè)while等待消息的時(shí)候,筆者就知道這個(gè)進(jìn)程肯定cpu占用會(huì)100%.在代碼里幾處while嘗試添加usleep無效后,筆者最后還是在官方的問題列表里找到了答案:
vi +286 amqp_wire.inc
293 while ($read < $n && (false !== ($buf = fread($this->sock, $n - $read))))
294 {
295 usleep(50000);
296 $read += strlen($buf);
297 $res .= $buf;
298 }
筆者的出發(fā)點(diǎn)是對的,只是沒找對while.可能有人會(huì)奇怪為什么要用usleep(50000)呢?實(shí)際上筆者有遇到運(yùn)行php起來的daemon導(dǎo)致cpu100%的情況.當(dāng)時(shí)筆者加的是usleep(500000)也就是半秒鐘.這樣就可以使進(jìn)程看上去cpu占用為0.沒想到再降一個(gè)數(shù)量級(jí)也是可以正常的,這次算賺到了.
7.學(xué)到了error_log函數(shù)
以前有見過這個(gè)函數(shù),以為是向系統(tǒng)日志里寫log的時(shí)候才用得到呢,沒想到還可以像下面這樣用:
function debug_msg($s)
{
//error_log($s);
}
在不同的地方寫上debug_msg,最后不用的時(shí)候時(shí)候,直接注釋掉error_log,不錯(cuò)的小技巧!
RabbitMQ將消息投遞到客戶端后,客戶端如果沒處理完這個(gè)消息就死掉了,這個(gè)消息還會(huì)不會(huì)存在?這取決于RabbitMQ的消息確認(rèn)機(jī)制(Message acknowledgment)是否打開。
為了確保消息不會(huì)丟失,RabbitMQ支持消息確認(rèn)機(jī)制。客戶端在接受到消息并處理完后,可以發(fā)送一個(gè)ack消息給RabbitMQ,告訴它該消息可以安全的刪除了。假如客戶端在發(fā)送ack之前意外死掉了,那么RabbitMQ會(huì)將消息投遞到下一個(gè)consumer客戶端。如果有多個(gè)consumer客戶端,RabbitMQ在投遞消息時(shí)是輪詢的。
RabbitMQ如何判斷客戶端死掉了?唯一根據(jù)是客戶端連接是否斷開。這里沒有超時(shí)機(jī)制,也就是說客戶端可以處理一個(gè)消息很長時(shí)間,只要沒斷開連接,RabbitMQ就一直等待ack消息。
消息確認(rèn)機(jī)制默認(rèn)是打開的,除非你設(shè)置no_ack=True標(biāo)記來手工關(guān)閉它。
通過如下命令查看系統(tǒng)里的未確認(rèn)消息:
# rabbitmqctl list_queues -p /path name messages_unacknowledged
Listing queues …
queue_storm_actionlog 0
dev_queue_storm_actionlog 0
…done.
標(biāo)題名稱:RibbitMQ初識(shí)
標(biāo)題來源:http://chinadenli.net/article34/cjiipe.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制開發(fā)、App開發(fā)、App設(shè)計(jì)、外貿(mào)建站、品牌網(wǎng)站制作、建站公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)