OneNET平台MQTT设备通信实战手把手教你实现两个ESP32开发板之间的Topic订阅与发布在物联网项目中设备间的直接通信往往比单一设备与云平台的交互更具实用价值。想象一下这样的场景安装在温室大棚的ESP32温湿度传感器需要将数据实时传输给控制通风系统的另一块ESP32开发板而OneNET平台则充当可靠的消息中转站。这种设备到设备D2D的通信模式正是MQTT协议的强项所在。本文将带您深入实战使用两块ESP32开发板分别作为MQTT的发布者和订阅者通过OneNET平台实现跨设备通信。不同于基础的单设备数据上报我们将重点探讨如何设计高效的Topic系统、构建规范的JSON数据包以及处理设备间的联动逻辑。无论您是正在开发智能家居系统还是工业监控项目这些技能都将大幅提升您的物联网开发能力。1. 环境准备与硬件配置1.1 硬件清单与连接实现双ESP32通信需要以下硬件组件ESP32开发板×2推荐使用ESP32-WROOM-32DMicro-USB数据线×2用于供电和程序烧录DHT22温湿度传感器可选用于模拟真实数据源面包板和杜邦线用于传感器连接对于发布者设备数据采集端连接DHT22传感器的步骤如下将DHT22的VCC引脚接ESP32的3.3V将GND引脚接ESP32的GND将DATA引脚接ESP32的GPIO4可根据实际需求调整订阅者设备数据接收端若需要执行具体操作可连接执行元件如继电器模块但本教程主要演示数据接收部分。1.2 软件环境搭建确保您的开发环境已配置以下组件Arduino IDE1.8.x或更高版本ESP32开发板支持包通过Arduino IDE的板管理器安装必要的库文件PubSubClientMQTT客户端库ArduinoJsonJSON数据处理DHT sensor library如需使用温湿度传感器安装库文件时建议通过Arduino IDE的库管理器直接搜索安装确保版本兼容性。以下是检查库是否安装成功的简单代码#include PubSubClient.h #include ArduinoJson.h #include WiFi.h void setup() { Serial.begin(115200); Serial.println(库加载检查通过); }2. OneNET平台配置2.1 产品与设备创建登录OneNET平台后按以下步骤创建MQTT通信环境创建新产品进入开发者中心选择产品创建协议类型选择MQTT(旧版)其他参数根据实际需求填写网络运营商建议选择中国移动添加设备在产品详情页点击添加设备为两块ESP32分别创建设备记录下各自的设备ID和鉴权信息建议命名规范如ESP32_Publisher和ESP32_Subscriber以便区分Topic系统设计在产品详情页的Topic列表中添加以下自定义Topic/sys/{product_id}/{device_id}/thing/event/property/post设备属性上报/sys/{product_id}/{device_id}/thing/event/property/post_reply平台响应/sys/{product_id}/{device_id}/thing/service/property/set属性设置/custom/{product_id}/{device_id}/data自定义数据通道注意OneNET的Topic格式有严格限制自定义Topic必须以/custom/开头否则无法正常使用。2.2 安全认证配置OneNET MQTT接入采用三元组认证方式需要准备以下信息product_id产品IDdevice_name设备名称access_key设备鉴权密钥认证参数生成算法如下建议在设备端预先计算好String generateMqttPassword(String product_id, String device_name, String access_key) { String timestamp String(millis() / 1000); String src products/ product_id /devices/ device_name; String signature sha256_HMAC(access_key, src \n timestamp); String password version2018-10-31res urlEncode(src) et timestamp methodsha256sign urlEncode(signature); return password; }3. 发布者设备实现3.1 WiFi与MQTT连接发布者设备需要先连接WiFi再建立与OneNET平台的MQTT连接。以下是核心代码框架#include WiFi.h #include PubSubClient.h const char* ssid your_SSID; const char* password your_PASSWORD; const char* mqtt_server mqtt.heclouds.com; const int mqtt_port 1883; WiFiClient espClient; PubSubClient client(espClient); void setup_wifi() { delay(10); WiFi.begin(ssid, password); while (WiFi.status() ! WL_CONNECTED) { delay(500); Serial.print(.); } } void reconnect() { while (!client.connected()) { if (client.connect(ESP32_Publisher, your_product_id, your_auth_info)) { Serial.println(MQTT connected); } else { delay(5000); } } } void setup() { Serial.begin(115200); setup_wifi(); client.setServer(mqtt_server, mqtt_port); }3.2 数据采集与发布对于温湿度传感器数据我们需要定期采集并发布到指定Topic。以下是典型实现#include DHT.h #define DHTPIN 4 #define DHTTYPE DHT22 DHT dht(DHTPIN, DHTTYPE); void publishSensorData() { float h dht.readHumidity(); float t dht.readTemperature(); if (isnan(h) || isnan(t)) { Serial.println(Failed to read from DHT sensor!); return; } DynamicJsonDocument doc(1024); doc[humidity] h; doc[temperature] t; doc[device] ESP32_Publisher; doc[timestamp] millis(); char jsonBuffer[512]; serializeJson(doc, jsonBuffer); String topic /custom/your_product_id/ESP32_Publisher/data; client.publish(topic.c_str(), jsonBuffer); } void loop() { if (!client.connected()) { reconnect(); } client.loop(); static unsigned long lastMsg 0; if (millis() - lastMsg 5000) { publishSensorData(); lastMsg millis(); } }4. 订阅者设备实现4.1 Topic订阅与消息处理订阅者设备需要订阅发布者使用的Topic并实现消息回调函数void callback(char* topic, byte* payload, unsigned int length) { Serial.print(Message arrived [); Serial.print(topic); Serial.print(] ); for (int i 0; i length; i) { Serial.print((char)payload[i]); } Serial.println(); // JSON解析 DynamicJsonDocument doc(1024); deserializeJson(doc, payload, length); float humidity doc[humidity]; float temperature doc[temperature]; String device doc[device]; unsigned long timestamp doc[timestamp]; Serial.print(From: ); Serial.println(device); Serial.print(Humidity: ); Serial.print(humidity); Serial.println(%); Serial.print(Temperature: ); Serial.print(temperature); Serial.println(°C); } void setup() { // ...其他初始化代码... client.setCallback(callback); } void loop() { if (!client.connected()) { reconnect(); // 订阅Topic String topic /custom/your_product_id//data; client.subscribe(topic.c_str()); } client.loop(); }4.2 多级Topic与通配符使用OneNET支持MQTT通配符订阅可以实现更灵活的消息路由通配符说明示例单级通配符/custom///data#多级通配符/custom/product1/#例如订阅/custom/your_product_id//data可以接收该产品下所有设备的数据消息这在设备数量较多时特别有用。5. 高级功能与优化5.1 QoS等级与消息可靠性MQTT提供三种服务质量等级QoS 0最多交付一次消息可能丢失QoS 1至少交付一次消息可能重复QoS 2精确交付一次最可靠但开销大在PubSubClient中设置QoS级别// 发布消息时指定QoS client.publish(topic, message, true); // 第三个参数为retain标志 // 订阅时指定QoS client.subscribe(topic, 1); // QoS 15.2 断线重连与持久会话物联网设备常面临网络不稳定的情况完善的断线处理机制必不可少void reconnect() { while (!client.connected()) { if (client.connect(ESP32_Subscriber, MQTT_USER, MQTT_PASSWORD, /will, 1, true, offline)) { // 重新订阅Topic client.subscribe(topic); } else { delay(5000); } } }5.3 数据加密与安全虽然OneNET平台已经提供了传输层安全但敏感数据建议额外加密String encryptData(String plaintext, String key) { // 简单的AES加密示例 // 实际项目中应使用更安全的加密库 char ciphertext[plaintext.length() * 2]; for (int i 0; i plaintext.length(); i) { ciphertext[i] plaintext[i] ^ key[i % key.length()]; } return String(ciphertext); }6. 实际项目扩展建议在真实物联网项目中您可以考虑以下扩展方向设备影子服务利用OneNET的设备影子功能同步设备状态解决设备离线时的状态同步问题规则引擎配置平台级的数据转发规则实现设备数据的自动处理与转发OTA升级通过MQTT通道实现固件无线升级设计差分升级包减少流量消耗设备联动基于Topic设计设备间触发逻辑例如温度超过阈值自动开启风扇// 简单的联动逻辑示例 if (temperature 30.0) { String controlTopic /custom/product_id/ESP32_Fan/control; String message {\command\:\on\, \duration\:300}; client.publish(controlTopic.c_str(), message.c_str()); }