在單機(jī)環(huán)境下實(shí)現(xiàn)字符串追加函數(shù)(Pulsar 2.4.2版本)
創(chuàng)新互聯(lián)公司主營(yíng)云南網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,成都app開(kāi)發(fā),云南h5微信小程序開(kāi)發(fā)搭建,云南網(wǎng)站營(yíng)銷推廣歡迎云南等地區(qū)企業(yè)咨詢1 啟動(dòng)單機(jī)Pulsar
$ bin/pulsar-daemon start standalone
2 創(chuàng)建函數(shù)
1) 準(zhǔn)備環(huán)境
項(xiàng)目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'
2) 創(chuàng)建JAVA函數(shù)(此函數(shù)用于數(shù)據(jù)源來(lái)的topic schema是string,輸出的tiopic schema是string)

導(dǎo)出jar包,放到pulsar服務(wù)器目錄下,本例子放在 /data/jar/下
3)使用命令行工具加載函數(shù)到Pulsar,? ? ? ? ? ? ? ? ? ? ?
bin/pulsar-admin functions create \
--classname test.AppStrFunction \
--jar /data/jar/pf.jar \
--inputs persistent://public/default/tlstest \
--output persistent://public/default/teststr \
--tenant public \
--namespace default \
--name appStrFunction
參數(shù)說(shuō)明:
| 參數(shù) | 說(shuō)明 |
| functions | 通知 pulsar broker,函數(shù)操作 |
| create | 創(chuàng)建函數(shù),默認(rèn)創(chuàng)建成功后啟動(dòng) |
| classname | 函數(shù)類名稱,需要加上包名 |
| jar | 指定 jar 包的運(yùn)行路徑 |
| inputs | 指定 函數(shù) 數(shù)據(jù)的來(lái)源在哪里,支持多個(gè) topics 作為輸入 |
| output | 如果該 函數(shù) 有輸出(有些情況下,function 沒(méi)有輸出),指定 function 輸出的 topic,只能有一個(gè)輸出 |
| tenant | 指定該 函數(shù) 運(yùn)行的租戶名 |
| namespace | 指定該 函數(shù) 運(yùn)行的命名空間 |
| name | 指定該 函數(shù) 運(yùn)行的名稱 |
停止函數(shù)
bin/pulsar-admin functions stop \
--tenant public \
--namespace default \
--name appStrFunction
啟動(dòng)函數(shù)
bin/pulsar-admin functions start \
--tenant public \
--namespace default \
--name appStrFunction
刪除函數(shù)
bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name appStrFunction
函數(shù)的日志在 pulsar安裝目錄 /logs/functions下
3 測(cè)試函數(shù)
根據(jù)前邊函數(shù)已成功加載啟動(dòng)
1)向tlstest主題發(fā)送消息? ?
import?java.util.concurrent.TimeUnit; import?org.apache.pulsar.client.api.Producer; import?org.apache.pulsar.client.api.PulsarClient; import?org.apache.pulsar.client.api.Schema; public?class?SendMsgTest{ ??public?static?void?main(String[]?args){ ??????String?url="pulsar://192.168.1.48:6650"; ??try{ ?????PulsarClient?client?=PulsarClient.builder() ???????????.serviceUrl(url) ???????????.connectionTimeout(10,TimeUnit.SECONDS) ???????????.build(); ?????Producer<String>?producer=client.newProducer(Schema.STRING) ???????????.topic("tlstest") ???????????.sendTimeout(10,TimeUnit.SECONDS) ???????????.producerName("senduser") ???????????.create(); ???????????producer.send("this?is?a?book"); ???????????System.out.print("send?ok"); ???????????client.close(); ??????}catch(Exception?e){ ????????e.printStackTrace(); ??????} ??} }2)讀取teststr主題消息
import?org.apache.pulsar.client.api.Consumer; import?org.apache.pulsar.client.api.Message; import?org.apache.pulsar.client.api.PulsarClient; import?org.apache.pulsar.client.api.Schema; import?org.apache.pulsar.client.api.SubscriptionInitialPosition; import?org.apache.pulsar.client.api.SubscriptionType; import?org.apache.pulsar.client.impl.schema.JSONSchema; import?schema.OrderModel; import?com.alibaba.fastjson.JSON; public?class?RecFunTest?{ public?static?void?main(String[]?args)?{ String?url?=?"http://192.168.1.48:8080"; try{ ??PulsarClient?client?=PulsarClient.builder() ????.serviceUrl(url) ????.build(); ?Consumer<String>?consumer=client.newConsumer(Schema.STRING) ????.topic("teststr") ????.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) ????.subscriptionType(SubscriptionType.Exclusive)//訂閱模式??Exclusive(獨(dú)占,默認(rèn)模式)?Failover(災(zāi)備)Shared(共享) ????.subscriptionName("wbq")//訂閱者名稱 ????.subscribe(); ?while?(true)?{ ???Message<String>?mondmsg?=?consumer.receive(); ???String?msg=mondmsg.getValue(); ????????????????System.out.println("receive?message=:"+msg); ?????????????} ??}catch(Exception?e){ ?????e.printStackTrace(); ??} ?} }另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。
網(wǎng)站欄目:PulsarFunction例子-創(chuàng)新互聯(lián)
本文來(lái)源:http://chinadenli.net/article16/cddggg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供手機(jī)網(wǎng)站建設(shè)、做網(wǎng)站、網(wǎng)站營(yíng)銷、服務(wù)器托管、品牌網(wǎng)站設(shè)計(jì)、關(guān)鍵詞優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容