哈哈哈哈哈操欧洲电影,久草网在线,亚洲久久熟女熟妇视频,麻豆精品色,久久福利在线视频,日韩中文字幕的,淫乱毛视频一区,亚洲成人一二三,中文人妻日韩精品电影

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

如何在Python中使用MQTT

瑞科慧聯(lián)(RAK) ? 2022-12-22 10:41 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

Python 是一種跨平臺的計算機程序設計語言,是ABC 語言的替代品,屬于面向?qū)ο蟮膭討B(tài)類型語言。它最初被設計用于編寫自動化腳本,隨著版本的不斷更新和語言新功能的添加,越來越多被用于獨立的、大型項目的開發(fā)。

MQTT 是一個物聯(lián)網(wǎng)傳輸協(xié)議,用于輕量級的發(fā)布/訂閱式消息傳輸,旨在為低帶寬和不穩(wěn)定的網(wǎng)絡環(huán)境中的物聯(lián)網(wǎng)設備提供可靠的網(wǎng)絡服務。其輕量、簡單、開放和易于實現(xiàn)等特點,使得它適用范圍更加廣泛。

本文主要介紹如何在 Python 項目中使用paho-mqtt客戶端庫 ,實現(xiàn)客戶端與MQTT服務器的連接、訂閱、取消訂閱、收發(fā)消息等功能。

一、項目準備

本項目使用 Python 3.10進行開發(fā)測試。

用戶可用以下命令來確認 Python的版本:

python3 --version

Python 3.10.9

測試設備:

瑞科慧聯(lián)(RAK)網(wǎng)關(guān)RAK7268 V2、帶溫濕度傳感器的數(shù)據(jù)采集器Sensor Hub

二、選擇 MQTT 客戶端庫

paho-mqtt是目前 Python 中使用較多的 MQTT 客戶端庫。它為 Python 2.7 或 3.x 版本以上的客戶端類提供了對 MQTT v3.1 和 v3.1.1 的支持,還提供了一些幫助程序功能。這使得消息發(fā)布到 MQTT 服務器變得更簡單。

三、Pip 安裝 Paho MQTT 客戶端

Pip 是 Python 包管理工具。該工具提供了對 Python 包的查找、下載、安裝、卸載的功能。

pip3install paho.mqtt

四、Python MQTT 使用

1、連接 MQTT 服務器

本文將使用瑞科慧聯(lián)LoRaWAN?網(wǎng)關(guān)提供的內(nèi)置 MQTT服務,該服務基于 Mosquitto的開源消息代理。服務器接入信息如下:

  • Broker:192.168.230.1
  • TCP Port:1883

2、導入 Paho MQTT客戶端

from paho.mqtt import client as mqtt

3、設置 MQTT Broker 連接參數(shù)

設置 MQTT Broker 連接地址,端口以及 topic,同時調(diào)用 Pythonrandom.randint函數(shù)隨機生成 MQTT 客戶端 id。

MQTT_SERVER_IP ="192.168.230.1"

MQTT_PORT =1883

4、編寫 MQTT 連接函數(shù)

編寫連接回調(diào)函數(shù) on_connect,該函數(shù)將在客戶端連接后會被調(diào)用。在該函數(shù)中可以依據(jù)rc來判斷客戶端是否連接成功。同時可創(chuàng)建一個 MQTT 客戶端連接到broker.emqx.io。

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)

    mqttClient.on_message=on_message # 返回訂閱消息回調(diào)函數(shù)

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務器賬號密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

5、發(fā)布消息

定義一個 while 循環(huán)語句,在循環(huán)中設置每秒調(diào)用 MQTT 客戶端publish函數(shù)向/python/mqtt主題發(fā)送消息。

ddefon_publish():

    # 發(fā)布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/1/device/0000000000000444/tx'# 發(fā)布的主題,訂閱時需要使用這個主題才能訂閱此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}條消息發(fā)送成功'.format(msg_count))

        else:

            print('第{}條消息發(fā)送失敗'.format(msg_count))

        msg_count+=1

6、訂閱消息

編寫消息回調(diào)函數(shù) on_message,函數(shù)將在客戶端從 MQTT Broker 收到消息后被調(diào)用,并打印出訂閱的 topic 名稱以及接收到的消息內(nèi)容。

defon_subscribe():

    """訂閱主題:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        time.sleep(1)

7、完整代碼

消息訂閱代碼

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

網(wǎng)關(guān)通過mqtt發(fā)出數(shù)據(jù)

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦連接成功, 回調(diào)此方法"""

    rc_status= ["連接成功","協(xié)議版本錯誤","無效的客戶端標識","服務器無法使用","用戶名或密碼錯誤","無授權(quán)"]

    print("connect:",rc_status[rc])

defon_message(client,userdata,msg):

    """一旦訂閱到消息, 回調(diào)此方法"""

    print("主題"+msg.topic +" 消息"+str(msg.payload.decode('gbk')))

    print("主題"+msg.topic +" 消息"+str(msg.payload.decode()))

    try:

        temp=json.loads(msg.payload.decode())

        # client.disconnect()

        deveui=temp['devEUI']

        print("devEUI: ",deveui)

        data=temp['data']

        print("解碼前的data為: ",data)

        data_decode=base64.b64decode(data).hex()

        print("解碼后的data為: ",data_decode)

        str1=data_decode[4:]

        ifstr1[0:4]=="0167":

            a=int(str1[4:8],16)*0.1 

            print("溫度:",a,"℃")

            ifstr1[8:12]=="0268":

               b=int(str1[12:16],16)

            print("濕度:",b,"%RH")

        elifstr1[0:4]=="0268":

            c=int(str1[4:8],16)

            print("濕度:",c,"%RH")                       

    exceptExceptionase:

        print(e)

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)

    mqttClient.on_message=on_message # 返回訂閱消息回調(diào)函數(shù)

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務器賬號密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

defon_subscribe():

    """訂閱主題:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        # allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")

        # mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)

        time.sleep(1)

if__name__=='__main__':

    on_subscribe()

消息發(fā)布代碼

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

網(wǎng)關(guān)通過mqtt發(fā)出數(shù)據(jù)

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦連接成功, 回調(diào)此方法"""

    rc_status= ["連接成功","協(xié)議版本錯誤","無效的客戶端標識","服務器無法使用","用戶名或密碼錯誤","無授權(quán)"]

    print("connect:",rc_status[rc])

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務器賬號密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

defon_publish():

    # 發(fā)布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/x/device/x/tx'# 發(fā)布的主題,訂閱時需要使用這個主題才能訂閱此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'#需要發(fā)布的消息內(nèi)容

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}條消息發(fā)送成功'.format(msg_count))

        else:

            print('第{}條消息發(fā)送失敗'.format(msg_count))

        msg_count+=1

if__name__=='__main__':

    on_publish()

測試

消息發(fā)布

運行 MQTT消息發(fā)布代碼,將看到客戶端連接成功,并且成功將消息發(fā)布。

pYYBAGOjwVmAR1KUAAApM_Y0F48108.png

消息訂閱

通過瑞科慧聯(lián)帶溫濕度傳感器的 Sensor hub進行數(shù)據(jù)傳輸,訂閱并解析數(shù)據(jù)結(jié)果如下:

poYBAGOjwVmAdS2hAABgCqVnG0E194.png

五、總結(jié)

至此,我們完成了使用paho-mqtt客戶端連接到LoRaWAN?網(wǎng)關(guān)內(nèi)置 MQTT服務器,并實現(xiàn)了測試客戶端與 MQTT 服務器的連接、消息發(fā)布和訂閱并解析。

與 C ++ 或 Java 之類的高級語言不同,Python 比較適合設備側(cè)的業(yè)務邏輯實現(xiàn)。使用 Python 可以減少代碼上的邏輯復雜度,降低與設備的交互成本。未來,我們相信在物聯(lián)網(wǎng)領(lǐng)域 Python 將會有更廣泛的應用!

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 物聯(lián)網(wǎng)
    +關(guān)注

    關(guān)注

    2950

    文章

    48091

    瀏覽量

    417957
  • python
    +關(guān)注

    關(guān)注

    58

    文章

    4882

    瀏覽量

    90281
  • MQTT
    +關(guān)注

    關(guān)注

    5

    文章

    736

    瀏覽量

    25248
收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關(guān)推薦
    熱點推薦

    何在 S32 配置工具中添加 ADC 并在 Simulink 中使用?

    對于 S32K3,如何在 S32 配置工具中添加 ADC 并在 Simulink 中使用?
    發(fā)表于 04-07 07:09

    [VirtualLab] 使用Python運行VirtualLab Fusion光學仿真

    : https://code.visualstudio.com/docs/python/python-tutorial 請注意,這里我們將演示如何在全局環(huán)境中安裝所需的軟件包。對于使用多個Py
    發(fā)表于 03-31 09:39

    何在 VisionFive 上使用 Python 包?

    VisionFive Fedora 下的本地目錄,請在源代碼目錄下執(zhí)行以下命令: 提示:源代碼可從以下位置下載:愿景五.gpio. sudo yum install python
    發(fā)表于 03-30 08:28

    【創(chuàng)芯工坊】PowerWriter 0048 如何在其他IDE中使用PowerWriter的Debugger(燒錄器常見使用問題)

    【創(chuàng)芯工坊】PowerWriter 0048 如何在其他IDE中使用PowerWriter的Debugger(燒錄器常見使用問題)
    發(fā)表于 03-26 10:38

    何在 S32 DS 中使用 BMS GEN2 SDK?

    do not support the BJB MC33777. 如何在 S32 DS 中使用 BMS GEN2 SDK?
    發(fā)表于 03-23 08:16

    何在 Vision Five 2 上安裝 python 庫?

    這可能是一個完全愚蠢的問題,但我如何在 Vision Five 2 上安裝 python 庫。 使用該命令后,它給了我這個錯誤。 默認為用戶安裝,因為普通站點包不可寫 錯誤:找不到滿足要求
    發(fā)表于 03-06 07:51

    DR1平臺Linux應用開發(fā)指南:含GDB調(diào)試、PythonMQTT實戰(zhàn)

    流程,以及 LED、按鍵、CAN、TCP/UDP、串口等常用開發(fā)案例,同時覆蓋 Python 腳本開發(fā)與 MQTT 消息發(fā)布 / 訂閱實戰(zhàn)。文檔基于 Ubuntu22.04
    的頭像 發(fā)表于 01-05 16:48 ?4774次閱讀
    DR1平臺Linux應用開發(fā)指南:含GDB調(diào)試、<b class='flag-5'>Python</b>及<b class='flag-5'>MQTT</b>實戰(zhàn)

    何在AMD Vitis Unified IDE中使用系統(tǒng)設備樹

    您將在這篇博客中了解系統(tǒng)設備樹 (SDT) 以及如何在 AMD Vitis Unified IDE 中使用 SDT 維護來自 XSA 的硬件元數(shù)據(jù)。本文還講述了如何對 SDT 進行操作,以便在 Vitis Unified IDE 中實現(xiàn)更靈活的使用場景。
    的頭像 發(fā)表于 11-18 11:13 ?3282次閱讀
    如<b class='flag-5'>何在</b>AMD Vitis Unified IDE<b class='flag-5'>中使</b>用系統(tǒng)設備樹

    何在vivadoHLS中使用.TLite模型

    本帖欲分享如何在vivadoHLS中使用.TLite模型。在Vivado HLS中導入模型后,需要設置其輸入和輸出接口以與您的設計進行適配。 1. 在Vivado HLS項目中導入模型文件 可以
    發(fā)表于 10-22 06:29

    物聯(lián)網(wǎng)MQTT網(wǎng)關(guān)是什么

    物聯(lián)網(wǎng)MQTT網(wǎng)關(guān)是一種采用MQTT物聯(lián)網(wǎng)協(xié)議的智能設備或軟件組件,其核心功能是連接不同通信協(xié)議的物聯(lián)網(wǎng)設備與消息代理服務器,實現(xiàn)設備間的數(shù)據(jù)交換與集中管理,同時支持邊緣計算、安全防護和協(xié)議轉(zhuǎn)換
    的頭像 發(fā)表于 08-29 15:24 ?1206次閱讀

    請問如何在 Keil μVision 或 IAR EWARM 中使用觀察點進行調(diào)試?

    何在 Keil μVision 或 IAR EWARM 中使用觀察點進行調(diào)試?
    發(fā)表于 08-20 06:29

    第二十三章 W55MH32 MQTT_OneNET示例

    本文講解了如何在 W55MH32?芯片上實現(xiàn) MQTT?協(xié)議并連接 OneNET?平臺,通過實戰(zhàn)例程展示了從準備工作、連接配置到消息訂閱、發(fā)布及接收處理的完整過程。文章詳細介紹了 MQTT?協(xié)議
    的頭像 發(fā)表于 07-24 14:59 ?1310次閱讀
    第二十三章 W55MH32 <b class='flag-5'>MQTT</b>_OneNET示例

    MQTT介紹

    一、什么是MQTT 物聯(lián)網(wǎng)(IoT)發(fā)展迅猛,傳感器、網(wǎng)關(guān)、云平臺之間如何高效通信成了核心問題。MQTT(Message Queuing Telemetry Transport)作為一種輕量級
    的頭像 發(fā)表于 07-14 09:34 ?3818次閱讀
    <b class='flag-5'>MQTT</b>介紹

    精通 MQTT:消息隊列遙測傳輸指南!

    ,解釋了其關(guān)鍵組件,并演示了如何使用Python實現(xiàn)MQTT客戶端。MQTT代理MQTT系統(tǒng)的核心是代理,它負責管理客戶端之間的消息交換。MQTT
    的頭像 發(fā)表于 06-16 16:56 ?1115次閱讀
    精通 <b class='flag-5'>MQTT</b>:消息隊列遙測傳輸指南!

    何在MQTT中發(fā)布和訂閱實體

    MQTT中發(fā)布和訂閱實體(主題)是MQTT通信的核心操作,下面將詳細介紹其原理、步驟以及示例代碼,幫助你全面理解這一過程。 一、MQTT發(fā)布與訂閱的基本概念 發(fā)布(Publish):客戶端將
    的頭像 發(fā)表于 05-20 17:21 ?1597次閱讀
    凤台县| 河东区| 安泽县| 恩施市| 阳信县| 崇仁县| 卢湾区| 翼城县| 伊宁市| 南投市| 晴隆县| 大英县| 巨野县| 呼伦贝尔市| 广平县| 华阴市| 武冈市| 洮南市| 华池县| 云浮市| 乌兰察布市| 邯郸市| 什邡市| 铜陵市| 若羌县| 广昌县| 海原县| 界首市| 新竹市| 确山县| 靖州| 余庆县| 浦东新区| 东城区| 利津县| 五家渠市| 汕头市| 交口县| 英山县| 报价| 蕲春县|