如何理解Python MQTT異步框架HBMQTT,針對(duì)這個(gè)問題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。
創(chuàng)新互聯(lián)主要從事成都網(wǎng)站建設(shè)、成都網(wǎng)站制作、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)巴中,十載網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):028-86922220
CPU 的速度遠(yuǎn)遠(yuǎn)快于磁盤、網(wǎng)絡(luò)等 IO 操作,而在一個(gè)線程中,無論 CPU 執(zhí)行得再快,遇到 IO 操作時(shí),都得停下來等待讀寫完成,這無疑浪費(fèi)了許多時(shí)間。
為了解決這個(gè)問題,Python 加入了異步 IO 的特性。在 Python 3.4 中,正式將 asyncio 納入標(biāo)準(zhǔn)庫中,并在 Python 3.5 中,加入了 async/await 關(guān)鍵字。用戶可以很輕松的使用在函數(shù)前加入 async 關(guān)鍵字,使函數(shù)變成異步函數(shù)。
在 Python 的 MQTT 客戶端庫中,HBMQTT 是最早支持異步 IO 的 Python MQTT 庫。
HBMQTT 是基于 Python 編寫的開源庫,實(shí)現(xiàn)了 MQTT 3.1.1 協(xié)議,特性如下:
支持 QoS 0, QoS 1 以及 QoS 2 消息
客戶端自動(dòng)重連
支持 TCP 和 WebSocket
支持 SSL
支持插件系統(tǒng)
下面我們將演示如何使用 Python MQTT 異步框架 - HBMQTT,輕松實(shí)現(xiàn)一個(gè)具備 MQTT 發(fā)布、訂閱功能的異步 Demo。
本項(xiàng)目使用 Python 3.6 進(jìn)行開發(fā)測(cè)試,讀者可用如下命令確認(rèn) Python 的版本。
因?yàn)樾枰褂?async 關(guān)鍵字,需要確保 Python 版本不低于 Python 3.5
? ~ python3 --version Python 3.6.7
Pip 是 Python 的包管理工具,該工具提供了對(duì) Python 包的查找、下載、安裝和卸載功能。
pip3 install -i https://pypi.doubanio.com/simple hbmqtt
本文將使用 EMQ X 提供的免費(fèi)公共 MQTT 服務(wù)器,該服務(wù)基于 EMQ X 的MQTT 物聯(lián)網(wǎng)云平臺(tái)創(chuàng)建。服務(wù)器接入信息如下:
Broker: broker.emqx.io
TCP Port: 1883
Websocket Port: 8083
首先,導(dǎo)入 MQTT 客戶端庫。
from hbmqtt.client import MQTTClient client = MQTTClient() # 連接服務(wù)器 client.connect('mqtt://broker.emqx.io/') # 斷開連接 client.disconnect()
異步寫法如下:
async def test_pub(): client = MQTTClient() await client.connect('mqtt://broker.emqx.io/') await client.disconnect()
發(fā)布消息函數(shù)為 MQTTClient 類的 publish 函數(shù)。
client = MQTTClient() # 函數(shù)的三個(gè)參數(shù)分別為主題、消息內(nèi)容、QoS client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)
異步寫法如下:
async def test_pub(): client = MQTTClient() await Client.connect('mqtt://broker.emqx.io/') await asyncio.gather( client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0), client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1), client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2) ) logging.info("messages published") await Client.disconnect()
在這段代碼中,我們將三個(gè)發(fā)送消息函數(shù)放進(jìn) asyncio 的任務(wù)列表里,它們將會(huì)依次被運(yùn)行。當(dāng)所有任務(wù)都完成后,斷開連接。
定閱消息函數(shù)為 MQTTClient 類中的 subscribe 函數(shù)。
client = MQTTClient() # 訂閱 client.subscribe([ ('topic/0', QOS_0), ('topic/1', QOS_1), ]) # 取消訂閱 client.unsubscribe([ ('topic/0', QOS_0), ]
異步寫法如下:
async def test_sub(): client = MQTTClient() await client.connect('mqtt://broker.emqx.io/') await client.subscribe([ ('a/b', QOS_1), ]) for i in range(0, 10): message = await client.deliver_message() packet = message.publish_packet print(f"{i}: {packet.variable_header.topic_name} => {packet.payload.data}") await client.disconnect()
在這段代碼中,我們?cè)诮邮障r(shí)設(shè)置了 await 等待,當(dāng)代碼執(zhí)行到如下位置時(shí),CPU 會(huì)先去執(zhí)行其它任務(wù),直到有消息傳達(dá),再將其打印。
message = await client.deliver_message()
最終,程序會(huì)等待 10 次消息接收,然后關(guān)閉連接。
# sub.py # python 3.6+ import asyncio import logging from hbmqtt.client import MQTTClient from hbmqtt.mqtt.constants import QOS_1 async def test_sub(): client = MQTTClient() await client.connect('mqtt://broker.emqx.io/') await client.subscribe([ ('a/b', QOS_1), ]) for i in range(0, 10): message = await client.deliver_message() packet = message.publish_packet print(f"{i}: {packet.variable_header.topic_name} => {packet.payload.data}") await client.disconnect() if __name__ == '__main__': formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) asyncio.run(test_sub())
# pub.py # python 3.6+ import asyncio import logging from hbmqtt.client import MQTTClient from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 async def test_pub(): client = MQTTClient() await client.connect('mqtt://broker.emqx.io/') await asyncio.gather( client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0), client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1), client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2) ) logging.info("messages published") await client.disconnect() if __name__ == '__main__': formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) asyncio.run(test_pub())
運(yùn)行 MQTT 消息發(fā)布代碼,我們將看到客戶端連接成功,并且成功發(fā)布消息。
如下為 MQTT X 客戶端成功接收到 HBMQTT 客戶端發(fā)布的消息:
運(yùn)行 MQTT 消息訂閱代碼,我們將看到客戶端連接成功,此時(shí)客戶端正在等待消息進(jìn)入
使用 MQTT X 客戶端連接 broker.emqx.io,然后向主題 a/b 發(fā)送 10 次消息
回到終端,我們看到客戶端接收并打印消息,并且在收到 10 條消息后,主動(dòng)退出了程序。
至此,我們完成了 HBMQTT 庫連接到公共 MQTT 服務(wù)器,并實(shí)現(xiàn)了測(cè)試客戶端與 MQTT 服務(wù)器的連接、消息發(fā)布和訂閱。通過使用 Python 異步 IO 執(zhí)行消息的發(fā)送接收,可以幫助我們實(shí)現(xiàn)更加高效的 MQTT 客戶端。
關(guān)于如何理解Python MQTT異步框架HBMQTT問題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。
本文題目:如何理解PythonMQTT異步框架HBMQTT
標(biāo)題網(wǎng)址:http://chinadenli.net/article4/ppgeie.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、企業(yè)建站、App開發(fā)、網(wǎng)站收錄、Google、App設(shè)計(jì)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)