轉(zhuǎn)自:
創(chuàng)新互聯(lián)建站從2013年開始,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項目網(wǎng)站制作、網(wǎng)站設(shè)計網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元濟(jì)源做網(wǎng)站,已為上家服務(wù),為濟(jì)源各地企業(yè)和個人服務(wù),聯(lián)系電話:18980820575
h2引言/h2
p你是否遇到過兩個(多個)系統(tǒng)間需要通過定時任務(wù)來同步某些數(shù)據(jù)?你是否在為異構(gòu)系統(tǒng)的不同進(jìn)程間相互調(diào)用、通訊的問題而苦惱、掙扎?如果是,那么恭喜你,消息服務(wù)讓你可以很輕松地解決這些問題。br /
消息服務(wù)擅長于解決多系統(tǒng)、異構(gòu)系統(tǒng)間的數(shù)據(jù)交換(消息通知/通訊)問題,你也可以把它用于系統(tǒng)間服務(wù)的相互調(diào)用(RPC)。本文將要介紹的RabbitMQ就是當(dāng)前最主流的消息a href="" title="中間件"中間件/a之一。/p
h2RabbitMQ簡介/h2
pAMQP,即Advanced Message Queuing Protocol,高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的a href="" title="中間件"中間件/a設(shè)計。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在,反之亦然。br /
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發(fā)布/訂閱)、可靠性、安全。br /
RabbitMQ是一個開源的AMQP實現(xiàn),服務(wù)器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗。br /
下面將重點介紹RabbitMQ中的一些基礎(chǔ)概念,了解了這些概念,是使用好RabbitMQ的基礎(chǔ)。/p
h2ConnectionFactory、Connection、Channel/h2
pConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket鏈接,它封裝了socket協(xié)議相關(guān)部分邏輯。ConnectionFactory為Connection的制造工廠。br /
Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業(yè)務(wù)操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發(fā)布消息等。/p
h2Queue/h2
pQueue(隊列)是RabbitMQ的內(nèi)部對象,用于存儲消息,用下圖表示。br /
1.1、什么是RabbitMQ?
RabbitMQ 是由 LShift 提供的一個 Advanced Message Queuing Protocol (AMQP) 的開源實現(xiàn),由以高性能、健壯以及可伸縮性出名的 Erlang 寫成,因此也是繼承了這些優(yōu)點。
1.2、什么是AMQP?
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計。它從生產(chǎn)者接收消息并遞送給消費(fèi)者,在這個過程中,根據(jù)規(guī)則進(jìn)行路由,緩存與持久化。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發(fā)布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現(xiàn),服務(wù)器端用Erlang語言編寫,支持多種客戶端,如: Python 、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗。
1.3、RabbitMQ的基礎(chǔ)概念
1.4、RabbitMQ的特性
2.1、環(huán)境:兩臺Ubuntu16.04主機(jī)
10.27.0.53 rabbitmq-1
10.27.0.130 rabbitmq-2
必須保證各個主機(jī)名之間可以ping通
2.2、安裝Erlang
2.3、安裝rabbitmq
2.4、安裝完成后,驗證一下服務(wù)是否正常
搭建好的rabbitmq默認(rèn)是沒有配置文件的,需要我們來手動添加
Rabbitmq的一些運(yùn)行腳本存放在 /usr/sbin 下面
2.5、開啟web管理插件
2.5.1、創(chuàng)建一個用戶nova,并設(shè)置密碼為123456
2.5.2、查看現(xiàn)有用戶表
這個時候nova用戶是不能訪問web管理插件的,需要配置用戶角色,用戶角色可分為五類,超級管理員, 監(jiān)控者, 策略制定者, 普通管理者以及其他。
可登陸管理控制臺(啟用management plugin的情況下),可查看所有的信息,并且可以對用戶,策略(policy)進(jìn)行操作。
可登陸管理控制臺(啟用management plugin的情況下),同時可以查看rabbitmq節(jié)點的相關(guān)信息(進(jìn)程數(shù),內(nèi)存使用情況,磁盤使用情況等)
可登陸管理控制臺(啟用management plugin的情況下), 同時可以對policy進(jìn)行管理。但無法查看節(jié)點的相關(guān)信息。
僅可登陸管理控制臺(啟用management plugin的情況下),無法看到節(jié)點信息,也無法對策略進(jìn)行管理。
無法登陸管理控制臺,通常就是普通的生產(chǎn)者和消費(fèi)者。
將nova添加到administrator用戶組
此時的nova用戶只能通過本地來登錄其他的IP無法直接使用這個賬號。所以需要對他進(jìn)行授權(quán),使用戶nova /(可以訪問虛擬主機(jī)) 中所有資源的配置、寫、讀權(quán)限以便管理其中的資源
查看用戶授權(quán)
2.5.3、開啟web管理插件并重啟rabbitmq服務(wù)
以下為關(guān)閉插件命令
通過瀏覽器訪問 []輸入用戶名nova 密碼123456就可以看到后臺了
3.1、從管理界面可以看到,此時只有一個節(jié)點rabbitmq-1,我們需要把rabbitmq-2加進(jìn)來,rabbitmq-2按照步驟2進(jìn)行一系列安裝就可以。此處不再細(xì)說。
3.2、添加節(jié)點
兩臺主機(jī)上安裝的 RabbitMQ 都保證都可以正常啟動,才可以進(jìn)行以下操作
3.2.1、設(shè)置不同節(jié)點間統(tǒng)一認(rèn)證的Erlang Cookie
這里將 rabbitmq-1 的該文件復(fù)制到 rabbitmq-2由于這個文件權(quán)限是 400為方便傳輸,先修改權(quán)限,非必須操作,所以需要先修改rabbitmq-2中的該文件權(quán)限為 777
然后將rabbitmq-1中的該文件拷貝的rabbitmq-2中
最后將權(quán)限和所屬用戶/組修改回來
此時rabbitmq-2節(jié)點需要重啟一下服務(wù)
注意事項
cookie在所有節(jié)點上必須完全一樣,同步時一定要注意。
erlang是通過主機(jī)名來連接服務(wù),必須保證各個主機(jī)名之間可以ping通。可以通過編輯/etc/hosts來手工添加主機(jī)名和IP對應(yīng)關(guān)系。如果主機(jī)名ping不通,rabbitmq服務(wù)啟動會失敗。
3.2.2、通過rabbitmqctl cluster_status命令,可以查看和個節(jié)點的狀態(tài),節(jié)點的名稱是rabbit@shorthostname,
rabbitmq-1
rabbitmq-2
3.2.3
將兩個節(jié)點組成集群
因為rabbitmq-server啟動時,會一起啟動節(jié)點和應(yīng)用,它預(yù)先設(shè)置RabbitMQ應(yīng)用為standalone模式。要將一個節(jié)點加入到現(xiàn)有的集群中,你需要停止這個應(yīng)用并將節(jié)點設(shè)置為原始狀態(tài),然后就為加入集群準(zhǔn)備好了。使用rabbitmqctl stop_app僅僅關(guān)閉應(yīng)用。
將2加入1中
啟動節(jié)點2的應(yīng)用
如果要使用內(nèi)存節(jié)點,則可以使用以下命令:其中–ram指的是作為內(nèi)存節(jié)點,要是想做為磁盤節(jié)點的話,就不用加–ram這個參數(shù)了
集群配置好后,可以在 RabbitMQ 任意節(jié)點上執(zhí)行 rabbitmqctl cluster_status 來查看是否集群配置成功。
Rabbitmq-1
Rabbitmq-2
同時在Web管理工具中也可以看到效果
上面配置RabbitMQ默認(rèn)集群模式,但并不保證隊列的高可用性,盡管交換機(jī)、綁定這些可以復(fù)制到集群里的任何一個節(jié)點,但是隊列內(nèi)容不會復(fù)制,雖然該模式解決一部分節(jié)點壓力,但隊列節(jié)點宕機(jī)直接導(dǎo)致該隊列無法使用,只能等待重啟,所以要想在隊列節(jié)點宕機(jī)或故障也能正常使用,就要復(fù)制隊列內(nèi)容到集群里的每個節(jié)點,需要創(chuàng)建鏡像隊列。
鏡像隊列概念:鏡像隊列可以同步queue和message,當(dāng)主queue掛掉,從queue中會有一個變?yōu)橹鱭ueue來接替工作。鏡像隊列是基于普通的集群模式的,所以你還是得先配置普通集群,然后才能設(shè)置鏡像隊列。鏡像隊列設(shè)置后,會分一個主節(jié)點和多個從節(jié)點,如果主節(jié)點宕機(jī),從節(jié)點會有一個選為主節(jié)點,原先的主節(jié)點起來后會變?yōu)閺墓?jié)點。queue和message雖然會存在所有鏡像隊列中,但客戶端讀取時不論物理面連接的主節(jié)點還是從節(jié)點,都是從主節(jié)點讀取數(shù)據(jù),然后主節(jié)點再將queue和message的狀態(tài)同步給從節(jié)點,因此多個客戶端連接不同的鏡像隊列不會產(chǎn)生同一message被多次接受的情況。
設(shè)置鏡像隊列策略
在普通集群的中任意節(jié)點啟用策略,策略會自動同步到集群節(jié)點
命令格式
在任意一個節(jié)點上執(zhí)行
集群重啟
集群重啟時,最后一個掛掉的節(jié)點應(yīng)該第一個重啟,如果因特殊原因(比如同時斷電),而不知道哪個節(jié)點最后一個掛掉。可用以下方法重啟:
先在一個節(jié)點上執(zhí)行
在其他節(jié)點上執(zhí)行
查看cluster狀態(tài)是否正常(要在所有節(jié)點上查詢)。
添加用戶
處于安全的考慮,guest這個默認(rèn)的用戶只能通過 來登錄,其他的IP無法直接使用這個賬號。所以我們需要添加一個其他用戶。
命令格式
刪除用戶
命令格式
修改密碼
命令格式
用戶授權(quán)
命令格式
該命令使用戶nova /(可以訪問虛擬主機(jī)) 中所有資源的配置、寫、讀權(quán)限以便管理其中的資源
查看用戶授權(quán)
命令格式
查看當(dāng)前用戶列表
可以看到添加用戶成功了,但不是administrator角色
添加角色
這里我們也將nova用戶設(shè)置為administrator角色
命令格式
再次查看權(quán)限
清除權(quán)限信息
命令格式
下一篇為測試文章
上一篇主要介紹了AMQP的一些知識,接下來開始正式步入Spring AMQP。
Message:在AMQP中并沒有定義消息的模型,Spring為了方便我們理解與使用,新增了Message接口,在構(gòu)建消息的時候Spring提供了builde API,MessageBuilder.xx.xx的形式使用起來很方便。
Exchange:這個接口和AMQP中定義的exchange基本相同,就不說了
Queue:同上。
Binding:一般叫他綁定關(guān)系,AMQP也有對其的抽象模型,只不過我認(rèn)為他只不過相當(dāng)于是附加在隊列與交換機(jī)上的屬性,所以在上篇關(guān)于AMQP的介紹中并沒有詳細(xì)說明。呃,其實spring對其的定義就是代表了隊列與交換機(jī)的綁定關(guān)系。。。
spring提供了ConnectionFactory接口,當(dāng)我們使用的時候會使用它的實現(xiàn)類CachingConnectionFactory,看名字也知道就是基于緩存的連接池,默認(rèn)的池大小為25。Spring也提供了對于多個connectionFactory的支持接口例如SimpleRoutingConnectionFactory等。
我們使用SpringBoot進(jìn)行測試,最小化的配置如下
這里先給出一個簡單的例子然后再具體講解。
如圖,我們提前聲明了一個名為hello的隊列,瀏覽器訪問/send時,可以看到控制臺打印了相應(yīng)的時間信息,即被@RabbitListener注解的方法被調(diào)用了。如果我們打開RabbitMq的webUI,會發(fā)現(xiàn)名為hello的隊列中消息數(shù)量由0變?yōu)?再變?yōu)?。注意,這里我們并沒有聲明Exchange,MQ會為我們將隊列綁定到默認(rèn)的Exchange。
接下來就詳細(xì)的說一下這個例子。對于操作RabbitMQ,Spring提供了 RabbitTemplate(對于batch操作,相應(yīng)的是BatchingRabbitTemplate,在1.6版本以后,spring提供了異步的Template--AsyncRabbitTemplate)。我們使用它來發(fā)送與接收消息。當(dāng)發(fā)送完消息的時候如何知道本次操作的成功或者失敗呢?默認(rèn)情況下不能被路由的消息將會被丟棄,這會導(dǎo)致消息丟失,不能保證消息可靠性(消息可靠性請參照上一篇AMQP介紹中的推薦)。發(fā)布確認(rèn)機(jī)制是保證消息可靠性的第一步,發(fā)布確認(rèn)保證我們知道消息是否成功到達(dá)隊列中,返回ack則代表成功,nack則代表失敗。要使用這個特性,我們需要將RabbitTemplate的mandatory屬性和ConnectionFactory的publisherConfirms屬性都設(shè)為true。這時我們可以在RabbitTemplate上設(shè)置setReturnCallback監(jiān)聽來接收MQ服務(wù)器返回的狀態(tài)信息了。對于消息的確認(rèn),我們只需要設(shè)置RabbitTemplate.ConfirmCallback的回調(diào)方法即可。
當(dāng)我們每次發(fā)送請求時,都會打印相應(yīng)的ack,其中correlationData是生產(chǎn)者在發(fā)送數(shù)據(jù)時可以攜帶的相關(guān)信息。這里有個問題需要注意一下,RabbitTemplate只允許設(shè)置一個callback方法,這時你可以將RabbitTemplate的bean設(shè)為單例然后設(shè)置回調(diào)。
這樣的缺點是所有使用這個template的地方都會使用這個回調(diào),那么當(dāng)我們想要為不同的操作定制callBack該怎么做?如果直接在別的地方繼續(xù)設(shè)置會報"Only one ConfirmCallback is supported by each RabbitTemplate"異常,這時候我們就需要將RabbitTemplate的作用域設(shè)為@Scope,這樣每個bean都是一個新的。難道這樣就可以了么?我們的service類一般都是單例的,這意味著當(dāng)service類生成后,注入的RabbitTemplate就已經(jīng)不變了,這個就是Single域的bean中注入Scope域bean的問題。一種解決方法是實現(xiàn)ApplicationAware接口注入ApplicationContext,每次使用RabbitTemplate時調(diào)用其getBean方法。一個更好的解決方案是使用spring提供的lookup方法。
spring會幫我們代理lookup注解的方法,每次調(diào)用都會返回一個全新的bean。但其實平常使用一般都會將發(fā)送方單獨(dú)抽取出來實現(xiàn)回調(diào)接口,不會涉及上面的問題,一般都如下配置,注意將template配置成scope即可。
RabbitTemplate可以添加消息轉(zhuǎn)換器,作用就類似于mvc中配置的@ResponseBody消息轉(zhuǎn)換器。
具體如何發(fā)送與接收消息感覺不用咋說了。。。就send,receive(x,x,x)這個用IDE看一下方法doc就知道咋用了。receive為拉模式,很少使用,關(guān)于接收方法我們更常使用的是異步接收,即推模式,一般使用@RabbitListener 實現(xiàn)
當(dāng)hello隊列中有消息時,方法會自動調(diào)用。
像我們平常做web開發(fā),前端想要接受來自后臺的消息無非倆個方法,前臺請求和后臺推送,前臺輪詢一般就是ajax定時器,推送一般使用WebSocket實現(xiàn),MQ同樣有兩種模式:輪詢請求隊列看是否有消息即拉模式,隊列中有消息即對消費(fèi)者進(jìn)行通知即推模式。
對于拉模式,Spring提供了receive,receiveAndConvert,和receiveAndReply方法。接收并回復(fù)的方法很有用,比如訂單系統(tǒng),下單消息被MQ處理完后再返回消息給其他隊列,告訴她這個訂單已經(jīng)完成,可以進(jìn)行付費(fèi)操作了。接收并回復(fù)調(diào)用template.receiveAndReply實現(xiàn)自己的接收回調(diào)。對于推模式,項目中基本上使用@RabbitListener注解完成,該注解結(jié)合@SendTo注解完成receiveAndReply功能,若沒有sendto,這個方法是不允許有返回值的。對于異常情況,配置@RabbitListener的errorHandler和returnExceptions即可。關(guān)于@RabbitListener注解的具體使用其實也挺復(fù)雜的,推薦直接看文檔。使用監(jiān)聽器的過程中消息是默認(rèn)經(jīng)過消息轉(zhuǎn)換器的,可以手動為其設(shè)置消息轉(zhuǎn)換器。關(guān)于RabbitMQ LIstener的配置可以使用Config方式或者SpringBoot的配置文件方式。
上面只是官方文檔的一部分,其實除了Listener大部分Config方式的配置都可以用配置文件方式替代。
聲明隊列與交換機(jī):分為xml方式和Java Config方式(懶得寫了,這個基本官網(wǎng)就是復(fù)制粘貼)
配置Broker:Spring對其的抽象為RabbitAdmin,也是官網(wǎng)。。
延時隊列實現(xiàn):設(shè)置交換機(jī)延時屬性為true,通過convertAndSend中的MessagePostProcessor實現(xiàn)發(fā)送延時消息,這個方法需要安裝延時交換機(jī)這樣的一個插件(也可以通過死信隊列實現(xiàn))
好了。今天就先寫這么多,因為實在是寫的太亂了,以后有時間整理一下。。。
近期正在探索前端、后端、系統(tǒng)端各類常用組件與工具,對其一些常見的組件進(jìn)行再次整理一下,形成標(biāo)準(zhǔn)化組件專題,后續(xù)該專題將包含各類語言中的一些常用組件。歡迎大家進(jìn)行持續(xù)關(guān)注。
本節(jié)我們分享的是基于Golang實現(xiàn)的高性能和彈性的流處理器 benthos ,它能夠以各種代理模式連接各種 源 和 接收器,并對有效負(fù)載執(zhí)行 水合、濃縮、轉(zhuǎn)換和過濾 。
它帶有 強(qiáng)大的映射語言 ,易于部署和監(jiān)控,并且可以作為靜態(tài)二進(jìn)制文件、docker 映像或 無服務(wù)器函數(shù) 放入您的管道,使其成為云原生。
Benthos 是完全聲明性的,流管道在單個配置文件中定義,允許您指定連接器和處理階段列表:
Apache Pulsar, AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), Cassandra, Elasticsearch, File, GCP (Pub/Sub, Cloud storage), HDFS, HTTP (server and client, including websockets), Kafka, Memcached, MQTT, Nanomsg, NATS, NATS JetStream, NATS Streaming, NSQ, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), MongoDB, SQL (MySQL, PostgreSQL, Clickhouse, MSSQL), Stdin/Stdout, TCP UDP, sockets and ZMQ4.
1、docker安裝
具體使用方式可以參見該 文檔
有關(guān)如何配置更高級的流處理概念(例如流連接、擴(kuò)充工作流等)的指導(dǎo),請查看 說明書部分。
有關(guān)在 Go 中構(gòu)建您自己的自定義插件的指導(dǎo),請查看 公共 API。
網(wǎng)頁名稱:amqp實現(xiàn)go語言,go amqp
瀏覽地址:http://chinadenli.net/article44/heseee.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供動態(tài)網(wǎng)站、Google、網(wǎng)站營銷、軟件開發(fā)、網(wǎng)站維護(hù)、網(wǎng)站導(dǎo)航
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)