From 3f01ba7538dd7533a43f3e66ee6c66b5b48b9c50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=B5=A9=E6=B5=A9?= <1036606149@qq.com> Date: Tue, 6 Aug 2024 22:30:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A=E4=BC=98=E5=8C=96?= =?UTF-8?q?=20EmqxService=20=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/iot/emq/client/EmqxClient.java | 2 +- .../module/iot/emq/service/EmqxService.java | 35 +++++++----------- .../iot/emq/service/EmqxServiceImpl.java | 37 +++++++++++++++++++ 3 files changed, 52 insertions(+), 22 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxServiceImpl.java diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/client/EmqxClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/client/EmqxClient.java index 461a748d2..de24585b0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/client/EmqxClient.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/client/EmqxClient.java @@ -43,7 +43,7 @@ public class EmqxClient { private void createMqttClient() { try { - mqttClient = new MqttClient(mqttConfig.getHostUrl(), "yudao-" + mqttConfig.getClientId(), new MemoryPersistence()); + mqttClient = new MqttClient(mqttConfig.getHostUrl(), "yudao" + mqttConfig.getClientId(), new MemoryPersistence()); mqttClient.setCallback(emqxCallback); } catch (MqttException e) { log.error("创建MQTT客户端失败", e); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxService.java index b323d929c..1658dc376 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxService.java @@ -1,34 +1,27 @@ package cn.iocoder.yudao.module.iot.emq.service; -import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.stereotype.Service; /** * 用于处理MQTT消息的具体业务逻辑,如订阅回调 * * @author ahh */ -@Slf4j -@Service -public class EmqxService { +public interface EmqxService { - public void subscribeCallback(String topic, MqttMessage mqttMessage) { - log.info("收到消息,主题: {}, 内容: {}", topic, new String(mqttMessage.getPayload())); - // 根据不同的主题,处理不同的业务逻辑 - if (topic.contains("/property/post")) { - // 设备上报数据 - } - } + /** + * 订阅回调 + * + * @param topic 主题 + * @param mqttMessage 消息 + */ + void subscribeCallback(String topic, MqttMessage mqttMessage); - public void subscribe(MqttClient client) { - try { - // 订阅默认主题,可以根据需要修改 - client.subscribe("$share/yudao/+/+/#", 1); - log.info("订阅默认主题成功"); - } catch (Exception e) { - log.error("订阅默认主题失败", e); - } - } + /** + * 订阅主题 + * + * @param client MQTT 客户端 + */ + void subscribe(MqttClient client); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxServiceImpl.java new file mode 100644 index 000000000..a18bb73e1 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxServiceImpl.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.module.iot.emq.service; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Service; + +/** + * 用于处理MQTT消息的具体业务逻辑,如订阅回调 + * + * @author ahh + */ +@Slf4j +@Service +public class EmqxServiceImpl implements EmqxService { + + // TODO 多线程处理消息 + @Override + public void subscribeCallback(String topic, MqttMessage mqttMessage) { + log.info("收到消息,主题: {}, 内容: {}", topic, new String(mqttMessage.getPayload())); + // 根据不同的主题,处理不同的业务逻辑 + if (topic.contains("/property/post")) { + // 设备上报数据 + } + } + + @Override + public void subscribe(MqttClient client) { + try { + // 订阅默认主题,可以根据需要修改 + client.subscribe("$share/yudao/+/+/#", 1); + log.info("订阅默认主题成功"); + } catch (Exception e) { + log.error("订阅默认主题失败", e); + } + } +}