修改:优化 EmqxService 实现

This commit is contained in:
安浩浩 2024-08-06 22:30:08 +08:00
parent b47176c96e
commit 3f01ba7538
3 changed files with 52 additions and 22 deletions

View File

@ -43,7 +43,7 @@ public class EmqxClient {
private void createMqttClient() { private void createMqttClient() {
try { try {
mqttClient = new MqttClient(mqttConfig.getHostUrl(), "yudao-" + mqttConfig.getClientId(), new MemoryPersistence()); mqttClient = new MqttClient(mqttConfig.getHostUrl(), "yudao" + mqttConfig.getClientId(), new MemoryPersistence());
mqttClient.setCallback(emqxCallback); mqttClient.setCallback(emqxCallback);
} catch (MqttException e) { } catch (MqttException e) {
log.error("创建MQTT客户端失败", e); log.error("创建MQTT客户端失败", e);

View File

@ -1,34 +1,27 @@
package cn.iocoder.yudao.module.iot.emq.service; package cn.iocoder.yudao.module.iot.emq.service;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
/** /**
* 用于处理MQTT消息的具体业务逻辑如订阅回调 * 用于处理MQTT消息的具体业务逻辑如订阅回调
* *
* @author ahh * @author ahh
*/ */
@Slf4j public interface EmqxService {
@Service
public class EmqxService {
public void subscribeCallback(String topic, MqttMessage mqttMessage) { /**
log.info("收到消息,主题: {}, 内容: {}", topic, new String(mqttMessage.getPayload())); * 订阅回调
// 根据不同的主题处理不同的业务逻辑 *
if (topic.contains("/property/post")) { * @param topic 主题
// 设备上报数据 * @param mqttMessage 消息
} */
} void subscribeCallback(String topic, MqttMessage mqttMessage);
public void subscribe(MqttClient client) { /**
try { * 订阅主题
// 订阅默认主题可以根据需要修改 *
client.subscribe("$share/yudao/+/+/#", 1); * @param client MQTT 客户端
log.info("订阅默认主题成功"); */
} catch (Exception e) { void subscribe(MqttClient client);
log.error("订阅默认主题失败", e);
}
}
} }

View File

@ -0,0 +1,37 @@
package cn.iocoder.yudao.module.iot.emq.service;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
/**
* 用于处理MQTT消息的具体业务逻辑如订阅回调
*
* @author ahh
*/
@Slf4j
@Service
public class EmqxServiceImpl implements EmqxService {
// TODO 多线程处理消息
@Override
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
log.info("收到消息,主题: {}, 内容: {}", topic, new String(mqttMessage.getPayload()));
// 根据不同的主题处理不同的业务逻辑
if (topic.contains("/property/post")) {
// 设备上报数据
}
}
@Override
public void subscribe(MqttClient client) {
try {
// 订阅默认主题可以根据需要修改
client.subscribe("$share/yudao/+/+/#", 1);
log.info("订阅默认主题成功");
} catch (Exception e) {
log.error("订阅默认主题失败", e);
}
}
}