本篇內(nèi)容主要講解“怎么使用nsq消息中間件”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“怎么使用nsq消息中間件”吧!
目前成都創(chuàng)新互聯(lián)公司已為上千余家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)頁(yè)空間、網(wǎng)站托管運(yùn)營(yíng)、企業(yè)網(wǎng)站設(shè)計(jì)、崇明網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。
組成
nsq是一款輕量級(jí)的消息中間件,查看nsq官網(wǎng)給出的解釋,可知nsq的組成和分工:
nsqdis the daemon that receives, queues, and delivers messages to clients.
It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).
It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.
從上面可以看出:
nsqd是一個(gè)守護(hù)進(jìn)程,負(fù)責(zé)與客戶端打交道,負(fù)責(zé)緩存來自客戶端的消息
nsqd可以作為一個(gè)單實(shí)例獨(dú)自運(yùn)行,通常在nsqlookupd實(shí)例的協(xié)同下組成集群(集群場(chǎng)景下,nsqd能用于發(fā)現(xiàn)topics和channels)
nsqd監(jiān)聽兩個(gè)TCP端口,分別用于客戶端(默認(rèn)4150)和HTTP API(默認(rèn)4151),另外可監(jiān)聽用于HTTPS的端口
nsqlookupdis the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.
There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.
從上面可以看出:
nsqlookupd是一個(gè)守護(hù)進(jìn)程,負(fù)責(zé)管理拓?fù)湫畔ⅲ晒┛蛻舳瞬樵兊玫絥sqd節(jié)點(diǎn)(nsqd廣播topics和channels信息)
nsqlookupd提供兩種接口,TCP接口(默認(rèn)4160)被nsqd用來發(fā)送廣播,HTTP接口(默認(rèn)4161)被客戶端用于發(fā)現(xiàn)nsqd和連接nsqadmin
nsqadminis a Web UI to view aggregated cluster stats in realtime and perform various administrative tasks.
從上面可以看出:
nsqadmin是一個(gè)后臺(tái)管控Web進(jìn)程,可實(shí)時(shí)瀏覽集群狀態(tài),可發(fā)起多種管理任務(wù)(nsqadmin依賴nsqlookupd來處理用戶操作)
安裝
這里為了快速搭建,使用docker compose方式安裝(docker-compose.yaml見附件)
拷貝docker-compose.yaml到虛擬機(jī),相關(guān)命令如下:
分別啟動(dòng) nsqlookupd/nsqadmin/nsqd,對(duì)應(yīng)三個(gè)容器和端口映射
瀏覽器中可打開 http://192.168.1.91:32770 訪問 nsqadmin(虛擬機(jī)IP為192.168.1.91)
測(cè)試
package main import ( "bufio" "fmt" "github.com/bitly/go-nsq" "nsq-demo/src/config" "os" ) var producer *nsq.Producer func InitProducer(addr string) { var err error producer, err = nsq.NewProducer(addr, nsq.NewConfig()) if err != nil { panic(err) } fmt.Println("connect to ", producer.String()) } func Publish(topic, msg string) error { if producer == nil {// check producer return fmt.Errorf("producer is nil") } if msg == "" {// void empty msg return nil } return producer.Publish(topic, []byte(msg))// publish msg } func main() { InitProducer(config.Nsqd01) running := true reader := bufio.NewReader(os.Stdin) for running { data, _, _ := reader.ReadLine() command := string(data) if command == "stop" { running = false } for err := Publish(config.Topic, command); err != nil; err = Publish(config.Topic, command) { config.ExchangeNsqdIPs() InitProducer(config.Nsqd01) } } producer.Stop() }// producer直連nsqd后,接收來自控制臺(tái)的輸入,然后將消息發(fā)送給nsqd
package main import ( "fmt" "github.com/bitly/go-nsq" "nsq-demo/src/config" "time" ) type MyConsumer struct{} func (*MyConsumer) HandleMessage(msg *nsq.Message) error {// implementation Handler interface fmt.Println("receive from ", msg.NSQDAddress, "msg:", string(msg.Body)) return nil } func InitConsumer(topic, channel, addr string) { conf := nsq.NewConfig() conf.LookupdPollInterval = time.Second c, err := nsq.NewConsumer(topic, channel, conf) if err != nil { panic(err) } c.SetLogger(nil, 0)// set system log c.AddHandler(&MyConsumer{})// set Hander to handle msg //if err := c.ConnectToNSQLookupd(addr); err != nil { // panic(err) //} //if err := c.ConnectToNSQDs(config.GetNsqdIPs()); err != nil { // panic(err) //} if err := c.ConnectToNSQD(config.Nsqd01); err != nil { panic(err) } } func main() { InitConsumer(config.Topic, config.Channel, config.Lookupd) select {} }// consumer直連nsqd后,通過自定義的Handler來處理消息
附錄
version: '3' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "4160" # for the nsqd - "4161" # for the nsqadmin nsqd: image: nsqio/nsq command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 # connect to nsqlookupd depends_on: - nsqlookupd ports: - "4150" # for clients - "4151" # for the HTTP API nsqadmin: image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 # connect to nsqlookupd depends_on: - nsqlookupd ports: - "4171"# docker-compose.yaml of simple nsq
到此,相信大家對(duì)“怎么使用nsq消息中間件”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
                本文標(biāo)題:怎么使用nsq消息中間件
                
                當(dāng)前地址:http://chinadenli.net/article18/pgeidp.html
            
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供、微信公眾號(hào)、全網(wǎng)營(yíng)銷推廣、品牌網(wǎng)站建設(shè)、動(dòng)態(tài)網(wǎng)站、網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
