springboot集成mqtt

2年前 (2022) 程序员胖胖胖虎阿
201 0 0

文章目录

  • 一、MQTT说明
    • 1.1、mqtt文档
    • 1.2、MQTT消息服务质量
      • 1.1.1、归纳
  • 二、MQTT环境搭建
  • 三、boot集成原生mqtt
    • 1.1、项目结构
    • 1.2、依赖
    • 1.3、application.properties配置
    • 1.4、实体类
    • 1.5、mqtt配置类
    • 1.6、mqtt发布接口
    • 1.7、mqtt接收消息
    • 1.8、集成Swagger2配置
    • 1.9、mqtt测试类
    • 1.10、测试效果

一、MQTT说明

1.1、mqtt文档

官网:https://mqtt.org/
仅供参考:https://www.emqx.com/zh/mqtt

1.2、MQTT消息服务质量

MQTT规定了3种消息等级

  1. QoS 0:
    消息最多传递一次,不需要客户端给与回复,如果当时客户端不可用,则会丢失该消息。

  2. QoS 1:
    a、消息传递至少 1 次,发布者会发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内要收到接收者的应答,发布者若没收到应答,会将消息的 DUP 置为 1 并重发消息
    b、所以Qos 1消息级别取决于接受者在规定时间内给与发布者反馈,若没有反馈,则会再次接受到消息。

  3. QoS
    a、消息仅传送一次,发布者发布 QoS 为 2 的消息之后,会将发布的消息储存起来并等待接收者回复 PUBREC 的消息,发送者收到 PUBREC 消息后,它就可以安全丢弃掉之前的发布消息,因为它已经知道接收者成功收到了消息。
    b、发布者会保存 PUBREC 消息并应答一个 PUBREL,等待接收者回复 PUBCOMP 消息,当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。
    c、QoS 2 消息的核心是接收者给发布者反馈两次接收结果,相当于一次接收,一次确认接收

1.1.1、归纳

  1. QoS 0 消息只发一次,不在乎是否被别的客户端收到,只要发送了就算结束。
  2. QoS 1 消息需要消息接收者在规定时间内给予反馈,结束的标志是在发送后规定时间内收到反馈,否则就会一直发送。
  3. QoS 2 消息需要发送者和接收者双方互相进行消息确认,只要有一方没有确定就不会结束。

二、MQTT环境搭建

有2种方式
1、原生mqtt
2、rabbitmq的mqtt插件


第一种:centos、Ubuntu 安装mqtt和使用https://blog.csdn.net/qq_44413835/article/details/120606097

mqtt客户端下载
我是使用MQTTBox: https://dl.pconline.com.cn/download/1323304.html

mqttx下载:https://mqttx.app/zh

第二种:安装rabbitmq在开启mqtt插件-好处rabbitmq有web管理平台
注明:如果不会使用rabbitmq查看我的消息队列的专栏,里面有集成篇

docker安装rabbitmq
https://blog.csdn.net/qq_44413835/article/details/123648048

进入docker-rabbitmq容器

docker exec  -it rabbitmq  /bin/bash

安装后开启mqq插件

# 打开rabbitmq_mqtt
rabbitmq-plugins enable rabbitmq_mqtt
#打开rabbitmq_web_mqtt
rabbitmq-plugins enable rabbitmq_web_mqtt

如图:
springboot集成mqtt
springboot集成mqtt

三、boot集成原生mqtt

1.1、项目结构

版本boot:2.3.6.RELEASE、web工程
springboot集成mqtt

1.2、依赖

 <!--集成MQTT-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--开启流支持-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>

<!--gson序列化工具-->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

<!--lombok依赖-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>


<!--Swagger-UI API文档生产工具-->
<dependency>
    <groupId>io.springfox</groupId>
    <artifactId>springfox-swagger2</artifactId>
    <version>2.7.0</version>
</dependency>
<dependency>
    <groupId>io.springfox</groupId>
    <artifactId>springfox-swagger-ui</artifactId>
    <version>2.7.0</version>
</dependency>


<!--健康检查-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<!--SpringBoot配置处理器,自定义配置项-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>

1.3、application.properties配置

spring.application.name=mqtt_demo
server.port=8080
# --------------mqtt配置-----------------------------
# 默认接受消息的主题--指定多个多级主题【物联网数据主题、对话主题-聊天室】
mqtt.receiver.defaultTopic=receive_iot_topic/#,receive_chat_topic/#
# 默认发送消息的主题
mqtt.sender.defaultTopic=test_send
# mqtt发送者的id
mqtt.sender.clientId=mqttProducer
# mqtt接收者的id-随机id来拼串
mqtt.receiver.clientId=${random.value}
# 地址和用户名密码
mqtt.url=tcp://服务器ip地址:1883
mqtt.username=用户名
mqtt.password=密码

1.4、实体类

IotData

package sqy.bean;

import com.google.gson.annotations.SerializedName;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.Date;

@Component
public class IotData implements Serializable {

  @SerializedName("deviceid")
  String deviceid;//设备id

  @SerializedName("sensorid")
  String sensorid;//数据id

  @SerializedName("types")
  String types;//设备来源

  @SerializedName("loraid")
  String loraid;//loraid硬件的id

  @SerializedName("createtime")
  Date createtime;//创建时间

  @SerializedName("temp")
  float temp;//温度

  @SerializedName("humi")
  float humi;//湿度

  @SerializedName("light")
  float light;//光敏

  //get/set/tostring省略...

api响应的实体类

package sqy.rvo;

/**
 * @author suqinyi
 * @Date 2022/4/15
 * 通用返回对象
 */
public class ApiResult<T> {

    private long code;
    private String message;
    private T data;


    private final static long SUCCESS_CODE=1000;
    private final static long FAIL_CODE=2000;


    protected ApiResult() {
    }

    protected ApiResult(long code, String message, T data) {
        this.code = code;
        this.message = message;
        this.data = data;
    }


    /**
     * 成功返回结果
     */
    public static <T> ApiResult<T> success(T data, String message) {
        return new ApiResult<T>(SUCCESS_CODE, message, data);
    }


    /**
     * 失败返回结果
     */
    public static <T> ApiResult<T> failed(String message) {
        return new ApiResult<T>(FAIL_CODE, message, null);
    }

	//get/set/tostring省略...
    
}

1.5、mqtt配置类

MqttConfig

package sqy.config.mqtt;


import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;
import sqy.service.mqtt.MqttCaseServiceImpl;

/**
 * @author suqinyi
 * @Date 2022/4/15
 * mqtt的配置类
 */
@Configuration
public class MqttConfig {

    /**
     * 发布的bean名称
     */
    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";

    // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
    private static final byte[] WILL_DATA;

    static {
        WILL_DATA = "offline".getBytes();
    }

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.url}")
    private String url;

    @Value("${mqtt.sender.clientId}")
    private String clientsId;

    @Value("${mqtt.sender.defaultTopic}")
    private String defaultsTopic;

    @Value("${mqtt.receiver.clientId}")
    private String clientcId;

    @Value("${mqtt.receiver.defaultTopic}")
    private String defaultcTopic;


    /**
     * MQTT连接器选项
     */
    @Bean
    public MqttConnectOptions getSenderMqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置连接的用户名
        System.out.println(username);
        if (!username.trim().equals("")) {
            options.setUserName(username);
        }
        // 设置连接的密码
        options.setPassword(password.toCharArray());
        // 设置连接的地址
        options.setServerURIs(new String[]{url});
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(100);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        // 但这个方法并没有重连的机制
        options.setKeepAliveInterval(30);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        options.setWill("willTopic", WILL_DATA, 2, false);
        return options;
    }

    /**
     * MQTT客户端
     */
    @Bean
    public MqttPahoClientFactory senderMqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getSenderMqttConnectOptions());
        return factory;
    }


    /**
     * MQTT消息处理器(生产者)
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientsId, senderMqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultsTopic);
        return messageHandler;
    }


    /**
     * MQTT信息通道(生产者)
     */
    @Bean(name = CHANNEL_NAME_OUT)
    public MessageChannel mqttOutboundChannel() {
        DirectChannel channel = new DirectChannel();
        return channel;
    }

    /**
     * MQTT信息通道(消费者)
     */
    @Bean(name = CHANNEL_NAME_IN)
    public MessageChannel mqttInboundChannel() {
        DirectChannel channel = new DirectChannel();
        return channel;
    }


    /**
     * MQTT消息订阅绑定(消费者)
     */
    @Bean
    public MessageProducer inbound() {
        String[] receiverTopics = StringUtils.split(defaultcTopic, ",");
        // 可以同时消费(订阅)多个Topic
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        "re" + clientcId, senderMqttClientFactory(),
                        receiverTopics);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    /**
     * MQTT消息处理器(消费者)
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
    public MessageHandler handler() {
        MqttCaseServiceImpl service = new MqttCaseServiceImpl();
        return service;
    }
}

1.6、mqtt发布接口

IMqttSender

package sqy.mqtt;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import sqy.config.mqtt.MqttConfig;    

/**
 * MQTT生产者消息发送接口
 * 通过接口将数据传递到集成流
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {

    /**
     * 发送信息到MQTT服务器
     *
     * @param data 发送的文本
     */
    void sendToMqtt(String data);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic   主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic   主题
     * @param qos     对消息处理的几种机制。
     *                0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
     *                1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
     *                2 多了一次去重的动作,确保订阅者收到的消息有一次。
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

}

1.7、mqtt接收消息

MqttCaseServiceImpl

package sqy.service.mqtt;

import com.google.gson.Gson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;
import sqy.bean.IotData;


/**
 * MQTT接收消息
 */
@Service
public class MqttCaseServiceImpl implements MessageHandler {

    /**
     * MessageHeaders:
     * public static final String PREFIX = "mqtt_";
     * public static final String QOS = "mqtt_qos";
     * public static final String ID = "mqtt_id";
     * public static final String RECEIVED_QOS = "mqtt_receivedQos";
     * public static final String DUPLICATE = "mqtt_duplicate";
     * public static final String RETAINED = "mqtt_retained";
     * public static final String RECEIVED_RETAINED = "mqtt_receivedRetained";
     * public static final String TOPIC = "mqtt_topic";
     * public static final String RECEIVED_TOPIC = "mqtt_receivedTopic";
     * public static final String MESSAGE_EXPIRY_INTERVAL = "mqtt_messageExpiryInterval";
     * public static final String TOPIC_ALIAS = "mqtt_topicAlias";
     * public static final String RESPONSE_TOPIC = "mqtt_responseTopic";
     * public static final String CORRELATION_DATA = "mqtt_correlationData";
     */

    @Autowired
    Gson gson;


    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
        String payload = (String) message.getPayload();
        System.out.println("headers:" + topic + " 接收的数据:" + payload);
        if (topic.contains("receive_iot_topic")) {
            System.out.println("硬件的信息的主题");
            IotData entity = gson.fromJson(payload, IotData.class);
            if (entity!=null){
                //不是心跳数据
                if (!entity.getTypes().equals("heartbeat")) {
                    //判断硬件来源
                    if (entity.getTypes().equals("esp32")) {
                        System.out.println("来着esp32的数据");
                        //写入数据库 ...
                    }
                }
            }else {
                System.out.println("序列化失败");
            }

        }
        if (topic.contains("receive_chat_topic")) {
            System.out.println("对话的主题");
            //...构建聊天室.....
            //...相互订阅发送消息就可以了....
            //...逻辑代码...
        }
    }
}


1.8、集成Swagger2配置

Swagger2Config

package sqy.config.swagger;

import io.swagger.annotations.Api;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

/**
 * @author suqinyi
 * @Date 2022/4/15
 * Swagger2API文档的配置
 * http://localhost:8081/swagger-ui.html
 */
@Configuration
@EnableSwagger2
public class Swagger2Config {
    @Bean
    public Docket createRestApi(){
        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .select()
                //为当前包下controller生成API文档
                .apis(RequestHandlerSelectors.basePackage("sqy.controller"))
                //为有@Api注解的Controller生成API文档
                .apis(RequestHandlerSelectors.withClassAnnotation(Api.class))
                .paths(PathSelectors.any())
                .build();
    }

    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                .title("SwaggerUI演示")
                .description("mqtt-demo")
                .contact("sqy")
                .version("1.0")
                .build();
    }
}

1.9、mqtt测试类

MqttController

package sqy.controller;


import com.google.gson.Gson;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import sqy.bean.IotData;
import sqy.mqtt.IMqttSender;
import sqy.rvo.ApiResult;


/**
 * @author suqinyi
 * @Date 2022/4/15
 * MQTT测试接口
 */
@Api(tags = "MqttController", description = "MQTT测试接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {


    @Autowired
    IMqttSender mqttSender;

    @Autowired
    Gson gson;

    //这个是外面配置文件里面的设置的接收主题之一
    private final static String SEND_TOPIC_PREFIX = "receive_iot_topic/";


    @ApiOperation("向指定主题发送消息")
    @PostMapping("/sendToTopic")
    public ApiResult sendToTopic(String topic, String payload) {
        /**
         * 想接收方方法消息-主题:receive_iot_topic/#,receive_chat_topic/#
         */
        mqttSender.sendToMqtt(topic,payload);
        System.out.println("发送成功=>" + "主题:" + topic + "  载荷:" + payload);
        return ApiResult.success(null, "发送成功");
    }


    /**
     * 127.0.0.1:8081/mqtt/control_command
     * post、json
     * {
     *   "createtime": "2022-04-17T07:02:23.707Z",
     *   "deviceid": "001设备",
     *   "humi": 30,
     *   "light": 55,
     *   "loraid": "r001",
     *   "sensorid": "123456789",
     *   "temp": 100,
     *   "types": "esp32"
     * }
     */
    @ApiOperation("模拟硬件发送的数据或控制指令")
    @PostMapping("/control_command")
    public ApiResult controlCommand(@RequestBody IotData iotData) {
        String deviceId = iotData.getDeviceid();
        // 前缀 + 设备号
        String topic = SEND_TOPIC_PREFIX + deviceId;
        String payload=gson.toJson(iotData);
        mqttSender.sendToMqtt( topic,payload);
        System.out.println("发送成功=>" + "主题:" + topic + "  载荷:" + payload);
        return ApiResult.success(null, "发送成功");
    }


}

1.10、测试效果

  1. 登入swagger测试:http://localhost:8081/swagger-ui.html#/
  2. 或用post测试

或者使用postman测试:

127.0.0.1:8081/mqtt/control_command

{
  
  "deviceid": "001设备",
  "humi": 30,
  "light": 55,
  "loraid": "r001",
  "sensorid": "123456789",
  "temp": 100,
  "types": "esp32",
  "createtime": "2022-04-17T07:02:23.707Z"
}

如图:

postman测试图:
springboot集成mqtt
后台打印
springboot集成mqtt
mqttbox订阅接收
springboot集成mqtt

swagger测试gif图:
springboot集成mqtt

版权声明:程序员胖胖胖虎阿 发表于 2022年10月30日 上午6:24。
转载请注明:springboot集成mqtt | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...