欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

如何理解PythonMQTT異步框架HBMQTT

如何理解Python MQTT異步框架HBMQTT,針對(duì)這個(gè)問題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。

創(chuàng)新互聯(lián)主要從事成都網(wǎng)站建設(shè)、成都網(wǎng)站制作、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)巴中,十載網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):028-86922220

什么是異步

CPU 的速度遠(yuǎn)遠(yuǎn)快于磁盤、網(wǎng)絡(luò)等 IO 操作,而在一個(gè)線程中,無論 CPU 執(zhí)行得再快,遇到 IO 操作時(shí),都得停下來等待讀寫完成,這無疑浪費(fèi)了許多時(shí)間。

為了解決這個(gè)問題,Python 加入了異步 IO 的特性。在 Python 3.4 中,正式將 asyncio 納入標(biāo)準(zhǔn)庫中,并在 Python 3.5 中,加入了 async/await 關(guān)鍵字。用戶可以很輕松的使用在函數(shù)前加入 async 關(guān)鍵字,使函數(shù)變成異步函數(shù)。

在 Python 的 MQTT 客戶端庫中,HBMQTT 是最早支持異步 IO 的 Python MQTT 庫。

HBMQTT 庫

HBMQTT 是基于 Python 編寫的開源庫,實(shí)現(xiàn)了 MQTT 3.1.1 協(xié)議,特性如下:

  • 支持 QoS 0, QoS 1 以及 QoS 2 消息

  • 客戶端自動(dòng)重連

  • 支持 TCP 和 WebSocket

  • 支持 SSL

  • 支持插件系統(tǒng)

下面我們將演示如何使用 Python MQTT 異步框架 - HBMQTT,輕松實(shí)現(xiàn)一個(gè)具備 MQTT 發(fā)布、訂閱功能的異步 Demo。

項(xiàng)目初始化

確定 Python 版本

本項(xiàng)目使用 Python 3.6 進(jìn)行開發(fā)測(cè)試,讀者可用如下命令確認(rèn) Python 的版本。

因?yàn)樾枰褂?async 關(guān)鍵字,需要確保 Python 版本不低于 Python 3.5

?  ~ python3 --version
Python 3.6.7

使用 Pip 安裝 HBMQTT 庫

Pip 是 Python 的包管理工具,該工具提供了對(duì) Python 包的查找、下載、安裝和卸載功能。

pip3 install -i https://pypi.doubanio.com/simple hbmqtt

連接 MQTT 服務(wù)器

本文將使用 EMQ X 提供的免費(fèi)公共 MQTT 服務(wù)器,該服務(wù)基于 EMQ X 的MQTT 物聯(lián)網(wǎng)云平臺(tái)創(chuàng)建。服務(wù)器接入信息如下:

  • Broker: broker.emqx.io

  • TCP Port: 1883

  • Websocket Port: 8083

首先,導(dǎo)入 MQTT 客戶端庫。

from hbmqtt.client import MQTTClient

client = MQTTClient()
# 連接服務(wù)器
client.connect('mqtt://broker.emqx.io/')
# 斷開連接
client.disconnect()

異步寫法如下:

async def test_pub():
    client = MQTTClient()
    await client.connect('mqtt://broker.emqx.io/')
    await client.disconnect()

發(fā)布消息

發(fā)布消息函數(shù)為 MQTTClient 類的 publish 函數(shù)。

client = MQTTClient()
# 函數(shù)的三個(gè)參數(shù)分別為主題、消息內(nèi)容、QoS
client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)

異步寫法如下:

async def test_pub():
    client = MQTTClient()
    await Client.connect('mqtt://broker.emqx.io/')
    await asyncio.gather(
        client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
    )
    logging.info("messages published")
    await Client.disconnect()

在這段代碼中,我們將三個(gè)發(fā)送消息函數(shù)放進(jìn) asyncio 的任務(wù)列表里,它們將會(huì)依次被運(yùn)行。當(dāng)所有任務(wù)都完成后,斷開連接。

訂閱消息

定閱消息函數(shù)為 MQTTClient 類中的 subscribe 函數(shù)。

client = MQTTClient()
# 訂閱
client.subscribe([
  ('topic/0', QOS_0),
  ('topic/1', QOS_1),  
])
# 取消訂閱
client.unsubscribe([
  ('topic/0', QOS_0),
]

異步寫法如下:

async def test_sub():
    client = MQTTClient()
    await client.connect('mqtt://broker.emqx.io/')
    await client.subscribe([
            ('a/b', QOS_1),
         ])
    for i in range(0, 10):
        message = await client.deliver_message()
        packet = message.publish_packet
        print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")
    await client.disconnect()

在這段代碼中,我們?cè)诮邮障r(shí)設(shè)置了 await 等待,當(dāng)代碼執(zhí)行到如下位置時(shí),CPU 會(huì)先去執(zhí)行其它任務(wù),直到有消息傳達(dá),再將其打印。

message = await client.deliver_message()

最終,程序會(huì)等待 10 次消息接收,然后關(guān)閉連接。

完整代碼

消息訂閱代碼

# sub.py
# python 3.6+

import asyncio
import logging

from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_1


async def test_sub():
    client = MQTTClient()
    await client.connect('mqtt://broker.emqx.io/')
    await client.subscribe([
        ('a/b', QOS_1),
    ])
    for i in range(0, 10):
        message = await client.deliver_message()
        packet = message.publish_packet
        print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")
    await client.disconnect()


if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.INFO, format=formatter)
    asyncio.run(test_sub())

消息發(fā)布代碼

# pub.py
# python 3.6+

import asyncio
import logging

from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2


async def test_pub():
    client = MQTTClient()

    await client.connect('mqtt://broker.emqx.io/')
    await asyncio.gather(
        client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
    )
    logging.info("messages published")
    await client.disconnect()


if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.INFO, format=formatter)
    asyncio.run(test_pub())

測(cè)試

消息發(fā)布

運(yùn)行 MQTT 消息發(fā)布代碼,我們將看到客戶端連接成功,并且成功發(fā)布消息。

如何理解Python MQTT異步框架HBMQTT 如下為 MQTT X 客戶端成功接收到 HBMQTT 客戶端發(fā)布的消息:

如何理解Python MQTT異步框架HBMQTT

消息訂閱

運(yùn)行 MQTT 消息訂閱代碼,我們將看到客戶端連接成功,此時(shí)客戶端正在等待消息進(jìn)入

如何理解Python MQTT異步框架HBMQTT

使用 MQTT X 客戶端連接 broker.emqx.io,然后向主題 a/b 發(fā)送 10 次消息

如何理解Python MQTT異步框架HBMQTT

回到終端,我們看到客戶端接收并打印消息,并且在收到 10 條消息后,主動(dòng)退出了程序。

如何理解Python MQTT異步框架HBMQTT

至此,我們完成了 HBMQTT 庫連接到公共 MQTT 服務(wù)器,并實(shí)現(xiàn)了測(cè)試客戶端與 MQTT 服務(wù)器的連接、消息發(fā)布和訂閱。通過使用 Python 異步 IO 執(zhí)行消息的發(fā)送接收,可以幫助我們實(shí)現(xiàn)更加高效的 MQTT 客戶端。

關(guān)于如何理解Python MQTT異步框架HBMQTT問題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。

本文題目:如何理解PythonMQTT異步框架HBMQTT
標(biāo)題網(wǎng)址:http://chinadenli.net/article4/ppgeie.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、企業(yè)建站、App開發(fā)、網(wǎng)站收錄、Google、App設(shè)計(jì)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)