VUE3+SpringBoot+EMQX实现数据打通

VUE3基于开源项目:pure-admin

EMQX使用开源版:EMQX开源版

初衷是探索物联网平台一些基础功能的实现路线,这次主要记录一下基于VUE3+SpringBoot简易平台与EMQX消息中间件打通的方法,实现了自动订阅和自动消息接收过滤。

目前的场景是:

1.物联网平台已经可以完成设备的注册,设备对应的主题生成。设备的基础信息与主题信息都存入数据库。

2.根据获取的主题信息列表想EMQX消息平台进行批量订阅

3.完成回传信息的解析

我的实现路线是:

1.Vue3安装mqtt组件

npm install mqtt --save
# or
pnpm install mqtt --save
# Alternatively, use yarn
yarn add mqtt

2.MQTT连接+消息监听

import mqtt from "mqtt";
const connection = {
  protocol: "ws",
  host: "192.168.204.95",
  // ws: 8083; wss: 8084
  port: 8083,
  endpoint: "/mqtt",
  clean: true,
  connectTimeout: 60 * 1000, // ms,
  keepAlive: 60 * 1000,
  reconnectPeriod: -1, // ms
  clientId: "IOT_Platform_" + Math.random().toString(16).substring(2, 8),
  // auth
  username: "test",
  password: "123456"
};

const state = {
  mqttClient: null,
  mqttConnected: false
};

const actions = {
  async connectMqtt({ commit, state }) {
    if (state.mqttClient && state.mqttConnected) {
      console.log("MQTT already connected");
      return state.mqttClient;
    }

    const { protocol, host, port, endpoint, ...options } = connection;
    const connectUrl = `${protocol}://${host}:${port}${endpoint}`;

    const client = mqtt.connect(connectUrl, options);

    try {
      await new Promise((resolve, reject) => {
        client.on("connect", () => {
          console.log("MQTT connected");
          commit("setMqttClient", client);
          commit("setMqttConnected", true);
          resolve();
        });
        client.on("error", err => {
          console.error("MQTT connection error:", err.message);
          reject(err);
        });
        client.on("message", (topic, payload) => {
          DeviceList.value.some((item, index) => {
            if (item["DevicePublishMessageTopic"] === topic) {
              item["topic"] = payload.toString();
              //console.log("Received Message:", topic, payload.toString());

              return true; // 结束循环
            }
            return false; // 继续循环
          });
        });
      });
    } catch (error) {
      console.error("Failed to connect to MQTT:", error.message);
    }
    return client;
  }
};

 client.on("message", (topic, payload)  主要是用于设置监听订阅消息的回调,不仅返回消息体本身,还会返回对应接收的消息主题(用于区分,发过来的消息是订阅的那个主题发过来的),在这里我们就可以用主题进行比对,从而过对消息更好的处理。

        client.on("message", (topic, payload) => {
          DeviceList.value.some((item, index) => {
            if (item["DevicePublishMessageTopic"] === topic) {
              item["topic"] = payload.toString();
              //console.log("Received Message:", topic, payload.toString());

              return true; // 结束循环
            }
            return false; // 继续循环
          });
        });

3.MQTT订阅基础函数

import mqtt from "mqtt";
import { createStore } from "vuex";
import { ref } from "vue";
import { GetAllInfo } from "@/api/device";
export const DeviceList = ref([]);

export const messages = ref([]);

const actions = {
  async doSubscribe(context, { topic: topic1, qos: qos1 }) {
    const topic = topic1;
    const qos = qos1;
    //const { topic, qos } = subscription.value;

    if (!state.mqttClient) {
      // 如果 MQTT 客户端未初始化且已连接,等待连接建立
      await context.dispatch("connectMqtt");
    }

    if (!state.mqttConnected) {
      // 如果 MQTT 客户端未初始化且已连接,等待连接建立
      await new Promise(resolve => {
        const checkConnection = () => {
          if (state.mqttClient) {
            resolve();
          } else {
            // 如果仍未建立连接,延迟一段时间后再次检查
            setTimeout(checkConnection, 100);
          }
        };

        // 开始检查连接
        checkConnection();
      });
    }

    if (state.mqttClient && state.mqttConnected) {
      // 执行订阅
      state.mqttClient.subscribe(topic, { qos }, (error, res) => {
        if (error) {
          console.log("Subscribe to topics error", error);
          return;
        }
        console.log("Subscribe to topics res", res);
      });
    }
  }
};

doSubscribe函数本身是通过MQTT的库完成主题订阅的函数,需要提供Topic与QoS作为输入

4.MQTT主题循环订阅

import mqtt from "mqtt";
import { createStore } from "vuex";
import { ref } from "vue";
import { GetAllInfo } from "@/api/device";
export const DeviceList = ref([]);

export const messages = ref([]);

const actions = {
  async MQTTFetchSubAllInfo(context) {
    messages.value = [];
    const qos = 1;
    DeviceList.value = await GetAllInfo();

    await Promise.all(
      DeviceList.value.map(async (item: JSON) => {
        const topic = item["DevicePublishMessageTopic"];
        await context.dispatch("doSubscribe", { topic, qos });
        messages.value.push({ topic: "未收到消息" });
      })
    );
    // 合并两个数组
    DeviceList.value = DeviceList.value.map((item, index) => {
      return { ...item, ...messages.value[index] };
    });
  }
};

MQTTFetchSubAllInfo 主要是从数据库获取现在所有设备的信息,包括其对应的订阅主题信息,然后对订阅主题信息进行循环遍历,一个一个去调用doSubscribe函数重新向EMQX平台订阅主题。

5. 最终就完成了平台与EMQX中间件的初步消息打通。