文章目录
- 一、MQTT说明
-
- 1.1、mqtt文档
- 1.2、MQTT消息服务质量
-
- 1.1.1、归纳
- 二、MQTT环境搭建
- 三、boot集成原生mqtt
-
- 1.1、项目结构
- 1.2、依赖
- 1.3、application.properties配置
- 1.4、实体类
- 1.5、mqtt配置类
- 1.6、mqtt发布接口
- 1.7、mqtt接收消息
- 1.8、集成Swagger2配置
- 1.9、mqtt测试类
- 1.10、测试效果
一、MQTT说明
1.1、mqtt文档
官网:https://mqtt.org/
仅供参考:https://www.emqx.com/zh/mqtt
1.2、MQTT消息服务质量
MQTT规定了3种消息等级
-
QoS 0:
消息最多传递一次
,不需要客户端给与回复,如果当时客户端不可用,则会丢失该消息。 -
QoS 1:
a、消息传递至少 1 次
,发布者会发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内要收到接收者的应答
,发布者若没收到应答
,会将消息的 DUP 置为 1 并重发消息
。
b、所以Qos 1消息级别取决于接受者在规定时间内给与发布者反馈,若没有反馈,则会再次接受到消息。 -
QoS
a、消息仅传送一次
,发布者发布 QoS 为 2 的消息之后,会将发布的消息储存
起来并等待接收者回复
PUBREC 的消息,发送者收到 PUBREC 消息后,它就可以安全丢弃掉之前的发布消息,因为它已经知道接收者成功收到了消息。
b、发布者会保存 PUBREC 消息并应答一个 PUBREL,等待接收者回复 PUBCOMP 消息,当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。
c、QoS 2 消息的核心是接收者给发布者反馈两次接收结果,相当于一次接收,一次确认接收
。
1.1.1、归纳
- QoS 0 消息只发一次,不在乎是否被别的客户端收到,只要发送了就算结束。
- QoS 1 消息需要消息接收者在规定时间内给予反馈,结束的标志是在发送后规定时间内收到反馈,否则就会一直发送。
- QoS 2 消息需要发送者和接收者双方互相进行消息确认,只要有一方没有确定就不会结束。
二、MQTT环境搭建
有2种方式
1、原生mqtt
2、rabbitmq的mqtt插件
第一种:centos、Ubuntu 安装mqtt和使用https://blog.csdn.net/qq_44413835/article/details/120606097
mqtt客户端下载
:
我是使用MQTTBox: https://dl.pconline.com.cn/download/1323304.html
mqttx下载:https://mqttx.app/zh
第二种:安装rabbitmq在开启mqtt插件-好处rabbitmq有web管理平台
注明:如果不会使用rabbitmq查看我的消息队列的专栏,里面有集成篇
docker安装rabbitmq
https://blog.csdn.net/qq_44413835/article/details/123648048
进入docker-rabbitmq容器
docker exec -it rabbitmq /bin/bash
安装后开启mqq插件
# 打开rabbitmq_mqtt
rabbitmq-plugins enable rabbitmq_mqtt
#打开rabbitmq_web_mqtt
rabbitmq-plugins enable rabbitmq_web_mqtt
如图:
三、boot集成原生mqtt
1.1、项目结构
版本boot:2.3.6.RELEASE、web工程
1.2、依赖
<!--集成MQTT-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--开启流支持-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<!--gson序列化工具-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--Swagger-UI API文档生产工具-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.7.0</version>
</dependency>
<!--健康检查-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--SpringBoot配置处理器,自定义配置项-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
1.3、application.properties配置
spring.application.name=mqtt_demo
server.port=8080
# --------------mqtt配置-----------------------------
# 默认接受消息的主题--指定多个多级主题【物联网数据主题、对话主题-聊天室】
mqtt.receiver.defaultTopic=receive_iot_topic/#,receive_chat_topic/#
# 默认发送消息的主题
mqtt.sender.defaultTopic=test_send
# mqtt发送者的id
mqtt.sender.clientId=mqttProducer
# mqtt接收者的id-随机id来拼串
mqtt.receiver.clientId=${random.value}
# 地址和用户名密码
mqtt.url=tcp://服务器ip地址:1883
mqtt.username=用户名
mqtt.password=密码
1.4、实体类
IotData
package sqy.bean;
import com.google.gson.annotations.SerializedName;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.Date;
@Component
public class IotData implements Serializable {
@SerializedName("deviceid")
String deviceid;//设备id
@SerializedName("sensorid")
String sensorid;//数据id
@SerializedName("types")
String types;//设备来源
@SerializedName("loraid")
String loraid;//loraid硬件的id
@SerializedName("createtime")
Date createtime;//创建时间
@SerializedName("temp")
float temp;//温度
@SerializedName("humi")
float humi;//湿度
@SerializedName("light")
float light;//光敏
//get/set/tostring省略...
api响应的实体类
package sqy.rvo;
/**
* @author suqinyi
* @Date 2022/4/15
* 通用返回对象
*/
public class ApiResult<T> {
private long code;
private String message;
private T data;
private final static long SUCCESS_CODE=1000;
private final static long FAIL_CODE=2000;
protected ApiResult() {
}
protected ApiResult(long code, String message, T data) {
this.code = code;
this.message = message;
this.data = data;
}
/**
* 成功返回结果
*/
public static <T> ApiResult<T> success(T data, String message) {
return new ApiResult<T>(SUCCESS_CODE, message, data);
}
/**
* 失败返回结果
*/
public static <T> ApiResult<T> failed(String message) {
return new ApiResult<T>(FAIL_CODE, message, null);
}
//get/set/tostring省略...
}
1.5、mqtt配置类
MqttConfig
package sqy.config.mqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;
import sqy.service.mqtt.MqttCaseServiceImpl;
/**
* @author suqinyi
* @Date 2022/4/15
* mqtt的配置类
*/
@Configuration
public class MqttConfig {
/**
* 发布的bean名称
*/
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.sender.clientId}")
private String clientsId;
@Value("${mqtt.sender.defaultTopic}")
private String defaultsTopic;
@Value("${mqtt.receiver.clientId}")
private String clientcId;
@Value("${mqtt.receiver.defaultTopic}")
private String defaultcTopic;
/**
* MQTT连接器选项
*/
@Bean
public MqttConnectOptions getSenderMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 设置连接的用户名
System.out.println(username);
if (!username.trim().equals("")) {
options.setUserName(username);
}
// 设置连接的密码
options.setPassword(password.toCharArray());
// 设置连接的地址
options.setServerURIs(new String[]{url});
// 设置超时时间 单位为秒
options.setConnectionTimeout(100);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
// 但这个方法并没有重连的机制
options.setKeepAliveInterval(30);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}
/**
* MQTT客户端
*/
@Bean
public MqttPahoClientFactory senderMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getSenderMqttConnectOptions());
return factory;
}
/**
* MQTT消息处理器(生产者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientsId, senderMqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultsTopic);
return messageHandler;
}
/**
* MQTT信息通道(生产者)
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
DirectChannel channel = new DirectChannel();
return channel;
}
/**
* MQTT信息通道(消费者)
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
DirectChannel channel = new DirectChannel();
return channel;
}
/**
* MQTT消息订阅绑定(消费者)
*/
@Bean
public MessageProducer inbound() {
String[] receiverTopics = StringUtils.split(defaultcTopic, ",");
// 可以同时消费(订阅)多个Topic
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
"re" + clientcId, senderMqttClientFactory(),
receiverTopics);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT消息处理器(消费者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
MqttCaseServiceImpl service = new MqttCaseServiceImpl();
return service;
}
}
1.6、mqtt发布接口
IMqttSender
package sqy.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import sqy.config.mqtt.MqttConfig;
/**
* MQTT生产者消息发送接口
* 通过接口将数据传递到集成流
*/
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {
/**
* 发送信息到MQTT服务器
*
* @param data 发送的文本
*/
void sendToMqtt(String data);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息处理的几种机制。
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
1.7、mqtt接收消息
MqttCaseServiceImpl
package sqy.service.mqtt;
import com.google.gson.Gson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;
import sqy.bean.IotData;
/**
* MQTT接收消息
*/
@Service
public class MqttCaseServiceImpl implements MessageHandler {
/**
* MessageHeaders:
* public static final String PREFIX = "mqtt_";
* public static final String QOS = "mqtt_qos";
* public static final String ID = "mqtt_id";
* public static final String RECEIVED_QOS = "mqtt_receivedQos";
* public static final String DUPLICATE = "mqtt_duplicate";
* public static final String RETAINED = "mqtt_retained";
* public static final String RECEIVED_RETAINED = "mqtt_receivedRetained";
* public static final String TOPIC = "mqtt_topic";
* public static final String RECEIVED_TOPIC = "mqtt_receivedTopic";
* public static final String MESSAGE_EXPIRY_INTERVAL = "mqtt_messageExpiryInterval";
* public static final String TOPIC_ALIAS = "mqtt_topicAlias";
* public static final String RESPONSE_TOPIC = "mqtt_responseTopic";
* public static final String CORRELATION_DATA = "mqtt_correlationData";
*/
@Autowired
Gson gson;
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
String payload = (String) message.getPayload();
System.out.println("headers:" + topic + " 接收的数据:" + payload);
if (topic.contains("receive_iot_topic")) {
System.out.println("硬件的信息的主题");
IotData entity = gson.fromJson(payload, IotData.class);
if (entity!=null){
//不是心跳数据
if (!entity.getTypes().equals("heartbeat")) {
//判断硬件来源
if (entity.getTypes().equals("esp32")) {
System.out.println("来着esp32的数据");
//写入数据库 ...
}
}
}else {
System.out.println("序列化失败");
}
}
if (topic.contains("receive_chat_topic")) {
System.out.println("对话的主题");
//...构建聊天室.....
//...相互订阅发送消息就可以了....
//...逻辑代码...
}
}
}
1.8、集成Swagger2配置
Swagger2Config
package sqy.config.swagger;
import io.swagger.annotations.Api;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
/**
* @author suqinyi
* @Date 2022/4/15
* Swagger2API文档的配置
* http://localhost:8081/swagger-ui.html
*/
@Configuration
@EnableSwagger2
public class Swagger2Config {
@Bean
public Docket createRestApi(){
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
//为当前包下controller生成API文档
.apis(RequestHandlerSelectors.basePackage("sqy.controller"))
//为有@Api注解的Controller生成API文档
.apis(RequestHandlerSelectors.withClassAnnotation(Api.class))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("SwaggerUI演示")
.description("mqtt-demo")
.contact("sqy")
.version("1.0")
.build();
}
}
1.9、mqtt测试类
MqttController
package sqy.controller;
import com.google.gson.Gson;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import sqy.bean.IotData;
import sqy.mqtt.IMqttSender;
import sqy.rvo.ApiResult;
/**
* @author suqinyi
* @Date 2022/4/15
* MQTT测试接口
*/
@Api(tags = "MqttController", description = "MQTT测试接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
IMqttSender mqttSender;
@Autowired
Gson gson;
//这个是外面配置文件里面的设置的接收主题之一
private final static String SEND_TOPIC_PREFIX = "receive_iot_topic/";
@ApiOperation("向指定主题发送消息")
@PostMapping("/sendToTopic")
public ApiResult sendToTopic(String topic, String payload) {
/**
* 想接收方方法消息-主题:receive_iot_topic/#,receive_chat_topic/#
*/
mqttSender.sendToMqtt(topic,payload);
System.out.println("发送成功=>" + "主题:" + topic + " 载荷:" + payload);
return ApiResult.success(null, "发送成功");
}
/**
* 127.0.0.1:8081/mqtt/control_command
* post、json
* {
* "createtime": "2022-04-17T07:02:23.707Z",
* "deviceid": "001设备",
* "humi": 30,
* "light": 55,
* "loraid": "r001",
* "sensorid": "123456789",
* "temp": 100,
* "types": "esp32"
* }
*/
@ApiOperation("模拟硬件发送的数据或控制指令")
@PostMapping("/control_command")
public ApiResult controlCommand(@RequestBody IotData iotData) {
String deviceId = iotData.getDeviceid();
// 前缀 + 设备号
String topic = SEND_TOPIC_PREFIX + deviceId;
String payload=gson.toJson(iotData);
mqttSender.sendToMqtt( topic,payload);
System.out.println("发送成功=>" + "主题:" + topic + " 载荷:" + payload);
return ApiResult.success(null, "发送成功");
}
}
1.10、测试效果
- 登入swagger测试:http://localhost:8081/swagger-ui.html#/
- 或用post测试
或者使用postman测试:
127.0.0.1:8081/mqtt/control_command
{
"deviceid": "001设备",
"humi": 30,
"light": 55,
"loraid": "r001",
"sensorid": "123456789",
"temp": 100,
"types": "esp32",
"createtime": "2022-04-17T07:02:23.707Z"
}
如图:
postman测试图:
后台打印
mqttbox订阅接收
swagger测试gif图: