一、EMQX介绍
EMQX是大规模分布式MQTT消息服务器,可以高效可靠连接海量物联网设备,实时处理分发消息与事件流数据,助力构建关键业务的物联网与云应用。EMQX 作为物联网应用开发和物联网平台搭建必须用到的基础设施软件,主要在边缘和云端实现物联网设备互联与设备上云,提供物联网设备接入、协议处理、消息路由、数据存储、流数据处理等核心能力。
二、EMQX安装
访问官网下载安装包:下载 EMQX
解压zip文件得到软件目录
运行EMQX,打开cmd命令窗口,进入软件bin目录,输入
登录emqx控制台,访问http://127.0.0.1:18083/,默认用户名、密码是admin、public。
三、Java实现发送和订阅消息
3.1 基础代码案例
- 引入pom依赖
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
- 订阅和发布消息相关代码
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class App { public static void main(String[] args) { String subTopic = "testtopic/#"; String pubTopic = "testtopic/1"; String content = "Hello World"; int qos = 2; String broker = "tcp://127.0.0.1:1883"; String clientId = "emqx_test"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(broker, clientId, persistence); // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName("用户名"); connOpts.setPassword("密码".toCharArray()); // 保留会话 connOpts.setCleanSession(true); MqttCallback callback = new OnMessageCallback(); // 设置回调 client.setCallback(callback); // 建立连接 System.out.println("Connecting to broker: " + broker); client.connect(connOpts); System.out.println("Connected"); System.out.println("Publishing message: " + content); // 订阅主题 client.subscribe(subTopic); // 消息发布所需参数 MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); // 发布消息 client.publish(pubTopic, message); System.out.println("Message published"); // client.disconnect(); // System.out.println("Disconnected"); // client.close(); // System.exit(0); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
- 接收消息相关代码
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class OnMessageCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连"); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 System.out.println("接收消息主题:" + topic); System.out.println("接收消息Qos:" + message.getQos()); System.out.println("接收消息内容:" + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
3.2 进阶代码案例
- 引入pom依赖
<!--mqtt--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.1</version> </dependency> <!-- fastJSON --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency>
- 定义发送消息客户端的配置
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration @IntegrationComponentScan public class MqttSenderConfig { /** * 发布的bean名称 */ public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel"; /** * 客户端与服务器之间的连接意外中断,服务器将发布客户端的"遗嘱"消息 */ private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } private static final String username = "admin"; private static final String password = "DCDremote@997"; private static final String url = "tcp://127.0.0.1:1883"; private static final String clientId = "honeywell-server1"; private static final String defaultTopic = "default"; // @Value("${mqtt.username}") // private String username; // // @Value("${mqtt.password}") // private String password; // // @Value("${mqtt.url}") // private String url; // // @Value("${mqtt.sender.clientId}") // private String clientId; // // @Value("${mqtt.sender.topic}") // private String defaultTopic; @Bean public MqttConnectOptions getMqttConnectOption(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setKeepAliveInterval(90); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{url}); mqttConnectOptions.setKeepAliveInterval(30); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientsFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOption()); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientsFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); messageHandler.setDefaultQos(1); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
- 编写接收消息的客户端的相关配置
package org.jianying.emqxstudy.mqtt; import com.alibaba.fastjson.JSON; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.Arrays; import java.util.List; import java.util.Map; @Configuration public class MqttReceiverConfig { final static Logger logger = LoggerFactory.getLogger(MqttReceiverConfig.class); /** * 订阅的bean名称 */ public static final String CHANNEL_NAME_IN = "mqttInboundChannel"; // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } private static final String username = "admin"; private static final String password = "DCDremote@997"; private static final String url = "tcp://127.0.0.1:1883"; // 接收消息的客户端id private static final String clientId = "test-server"; // 接收的消息主题, $SYS/brokers 表示发送的是系统主题 private static final String defaultTopic = "$SYS/brokers/+/clients/#,hello/info/faceid/#,hello/server/result/#,info_topic"; // @Value("${mqtt.username}") // private String username; // // @Value("${mqtt.password}") // private String password; // // @Value("${mqtt.url}") // private String url; // // @Value("${mqtt.receiver.clientId}") // private String clientId; // // @Value("${mqtt.receiver.topic}") // private String defaultTopic; @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setKeepAliveInterval(90); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{url}); mqttConnectOptions.setKeepAliveInterval(60); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } //接收通道 @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); String[] topics = new String[topicList.size()]; topicList.toArray(topics); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), topics); adapter.setCompletionTimeout(10000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } //通过通道获取数据 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { logger.info(("收到消息" + message.getHeaders().get("mqtt_receivedTopic") + message.getPayload())); // 主题 String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 消息体 Map maps = (Map) JSON.parse(message.getPayload().toString().trim()); // 判断设备状态 if (topic.contains("$SYS/brokers") && !topic.contains("faceid-server") && !topic.contains("faceid-mqtt-server")) { if (maps.get("clientid").toString().contains("uniwin-mqtt-client")) { } } else if (topic.contains("uniwin/server/result/faceid")) { //结果返回 if (maps.get("type") != null && !maps.get("type").equals("")) { } } else { System.out.println("info..."); if (maps.get("type") != null && !maps.get("type").equals("")) { String type = maps.get("type").toString(); // 设备心跳检测 if (type.equals("heart")) { } // 上传打卡记录 if (type.equals("note")) { } // 上传设备参数 if (type.equals("param_upload")) { } } } } }; } }
- 编写发送消息的工具类
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { /** * 发送信息到MQTT服务器 * * @param data 发送的文本 */ void sendToMqtt(String data); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param qos 对消息处理的几种机制。 * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。 * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。 * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }