VUE3基于开源项目:pure-admin
EMQX使用开源版:EMQX开源版
初衷是探索物联网平台一些基础功能的实现路线,这次主要记录一下基于VUE3+SpringBoot简易平台与EMQX消息中间件打通的方法,实现了自动订阅和自动消息接收过滤。
目前的场景是:
1.物联网平台已经可以完成设备的注册,设备对应的主题生成。设备的基础信息与主题信息都存入数据库。
2.根据获取的主题信息列表想EMQX消息平台进行批量订阅
3.完成回传信息的解析
我的实现路线是:
1.Vue3安装mqtt组件
npm install mqtt --save # or pnpm install mqtt --save # Alternatively, use yarn yarn add mqtt
2.MQTT连接+消息监听
import mqtt from "mqtt"; const connection = { protocol: "ws", host: "192.168.204.95", // ws: 8083; wss: 8084 port: 8083, endpoint: "/mqtt", clean: true, connectTimeout: 60 * 1000, // ms, keepAlive: 60 * 1000, reconnectPeriod: -1, // ms clientId: "IOT_Platform_" + Math.random().toString(16).substring(2, 8), // auth username: "test", password: "123456" }; const state = { mqttClient: null, mqttConnected: false }; const actions = { async connectMqtt({ commit, state }) { if (state.mqttClient && state.mqttConnected) { console.log("MQTT already connected"); return state.mqttClient; } const { protocol, host, port, endpoint, ...options } = connection; const connectUrl = `${protocol}://${host}:${port}${endpoint}`; const client = mqtt.connect(connectUrl, options); try { await new Promise((resolve, reject) => { client.on("connect", () => { console.log("MQTT connected"); commit("setMqttClient", client); commit("setMqttConnected", true); resolve(); }); client.on("error", err => { console.error("MQTT connection error:", err.message); reject(err); }); client.on("message", (topic, payload) => { DeviceList.value.some((item, index) => { if (item["DevicePublishMessageTopic"] === topic) { item["topic"] = payload.toString(); //console.log("Received Message:", topic, payload.toString()); return true; // 结束循环 } return false; // 继续循环 }); }); }); } catch (error) { console.error("Failed to connect to MQTT:", error.message); } return client; } };
client.on("message", (topic, payload) 主要是用于设置监听订阅消息的回调,不仅返回消息体本身,还会返回对应接收的消息主题(用于区分,发过来的消息是订阅的那个主题发过来的),在这里我们就可以用主题进行比对,从而过对消息更好的处理。
client.on("message", (topic, payload) => { DeviceList.value.some((item, index) => { if (item["DevicePublishMessageTopic"] === topic) { item["topic"] = payload.toString(); //console.log("Received Message:", topic, payload.toString()); return true; // 结束循环 } return false; // 继续循环 }); });
3.MQTT订阅基础函数
import mqtt from "mqtt"; import { createStore } from "vuex"; import { ref } from "vue"; import { GetAllInfo } from "@/api/device"; export const DeviceList = ref([]); export const messages = ref([]); const actions = { async doSubscribe(context, { topic: topic1, qos: qos1 }) { const topic = topic1; const qos = qos1; //const { topic, qos } = subscription.value; if (!state.mqttClient) { // 如果 MQTT 客户端未初始化且已连接,等待连接建立 await context.dispatch("connectMqtt"); } if (!state.mqttConnected) { // 如果 MQTT 客户端未初始化且已连接,等待连接建立 await new Promise(resolve => { const checkConnection = () => { if (state.mqttClient) { resolve(); } else { // 如果仍未建立连接,延迟一段时间后再次检查 setTimeout(checkConnection, 100); } }; // 开始检查连接 checkConnection(); }); } if (state.mqttClient && state.mqttConnected) { // 执行订阅 state.mqttClient.subscribe(topic, { qos }, (error, res) => { if (error) { console.log("Subscribe to topics error", error); return; } console.log("Subscribe to topics res", res); }); } } };
doSubscribe函数本身是通过MQTT的库完成主题订阅的函数,需要提供Topic与QoS作为输入
4.MQTT主题循环订阅
import mqtt from "mqtt"; import { createStore } from "vuex"; import { ref } from "vue"; import { GetAllInfo } from "@/api/device"; export const DeviceList = ref([]); export const messages = ref([]); const actions = { async MQTTFetchSubAllInfo(context) { messages.value = []; const qos = 1; DeviceList.value = await GetAllInfo(); await Promise.all( DeviceList.value.map(async (item: JSON) => { const topic = item["DevicePublishMessageTopic"]; await context.dispatch("doSubscribe", { topic, qos }); messages.value.push({ topic: "未收到消息" }); }) ); // 合并两个数组 DeviceList.value = DeviceList.value.map((item, index) => { return { ...item, ...messages.value[index] }; }); } };
MQTTFetchSubAllInfo 主要是从数据库获取现在所有设备的信息,包括其对应的订阅主题信息,然后对订阅主题信息进行循环遍历,一个一个去调用doSubscribe函数重新向EMQX平台订阅主题。
5. 最终就完成了平台与EMQX中间件的初步消息打通。