大家好,由于我想去北京找工作,但是发现北京人才济济,面试要求各种五花八门的卷,很多岗位都能胜任,但是因为专科学历,连面试机会都没有,不过今天我给大家带来的教程,是完全可以让大家应用到实际工作中快速使用的干货,并给大家准备了一套纯净版springboot+netty 4.x的源码实现方案,有支持qos1、qos2、qos3具体的示例,并有日志说明
码云地址:mqtt-netty: 基于springboot2.1.5,netty4.1.53.Final 实现mqtt服务,实现功能1.服务端发布主题消息,所用订阅该主题客户端可接收到数据2.qos类型级别校验,使用qos1级别3.账号密码校验,客户端id校验。4.发布主题是否一致校验 - Gitee.com
首先了解一个很重要的知识点,Qos类型分为3种,而服务端与客户端ack应答包类型也不同,
详细讲解:MQTT - 随笔分类 - 胖达利亚 - 博客园
下列简单描述(B为服务端需要返回的包类型)
- 对于订阅时要求
Qos = 0
的客户端 C0:B 将该消息转发给 C0(不加确认是否收到) - 对于订阅时要求
Qos = 1
的客户端 C1:B 将该消息转发给 C1,C1 收到后向 B 发送确认PUBACK
,B 收到确认则将消息从队列中删除,否则确认超时重发;
C1 只要收到消息,就会将其转发给 C1 的应用,所以 C1 的应用可能收到重复的消息。 - 对于订阅时要求
Qos = 2
的客户端 C2:B 将该消息转发给 C2,C2 收到后向 B 发送确认PUBREC
,B 收到确认后向 C2 发送确认收到确认PUBREL
;C2收到PUBREL
后向 B 发送PUBCOMP发布完成包
C2 只有收到消息并发出PUBREC
且收到对应的PUBREL
,才会将消息转发给 C2 的应用,所以 C2 的应用不会收到重复的消息
第一步.准备maven环境:springboot + netty环境 +mqtt客户端
mqtt客户端安装包:MQTT X:跨平台 MQTT 5.0 桌面客户端工具
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.53.Final</version>
</dependency>
第二步.构建netty服务
因为mqtt为TCP协议,所以我们需要创建两组线程组,(如果是UDP协议创建一组就可以,以后我会写一篇UDP协议的netty服务搭建,用UDP协议操作树莓派小车也是挺有意思的)
需要一个netty服务启动类,
一个实现ChannelInboundHandlerAdapter的实体类用于通道解析
一个用于拼装mqtt包结构请求体的工具类
package com.my.netty;
import com.my.netty.mqtt.handler.TestMqttHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestServer {
public void run(){
// 监听端口号
int port = 8888;
// 构建主线程-用于分发socket请求
EventLoopGroup boosGroup = new NioEventLoopGroup(1);
// 构建工作线程-用于处理请求处理
EventLoopGroup workGroup = new NioEventLoopGroup(4);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
// .childOption(ChannelOption.SO_BACKLOG,1024) //等待队列
.childOption(ChannelOption.SO_REUSEADDR,true) //快速复用
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 这个地方注意,如果客户端发送请求体超过此设置值,会抛异常
socketChannel.pipeline().addLast(new MqttDecoder(1024*1024));
socketChannel.pipeline().addLast( MqttEncoder.INSTANCE);
// 加载MQTT编解码协议,包含业务逻辑对象
socketChannel.pipeline().addLast(new TestMqttHandler());
}
});
serverBootstrap.bind(port).addListener(future -> {
log.info("服务端成功绑定端口号={}",port);
});
}catch (Exception e){
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
log.error("mqttServer启动失败:{}",e);
}
}
}
package com.my.netty.mqtt.handler;
import com.my.netty.core.dto.ClientDTO;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
/**
* @author: liyang
* @date: 2020/7/29 13:22
* @description: MQTT业务类
*/
@Slf4j
@ChannelHandler.Sharable
public class TestMqttHandler extends ChannelInboundHandlerAdapter {
private static final Collection<Channel> clientList = new HashSet();
private static final Map<String,Object> msgMap = new HashMap<>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof MqttMessage) {
Channel channel = ctx.channel();
MqttMessage message = (MqttMessage) msg;
MqttMessageType messageType = message.fixedHeader().messageType();
log.info("MQTT接收到的发送类型===》{}",messageType);
switch (messageType) {
// 建立连接
case CONNECT:
try {
this.connect(channel, (MqttConnectMessage) message);
}catch (Exception e){
//如果用户密码,客户端ID校验不成功,会二次建立CONNECT类型连接
//但是没有实际意义
}
break;
// 发布消息
case PUBLISH:
this.publish(channel, (MqttPublishMessage) message);
break;
// 订阅主题
case SUBSCRIBE:
this.subscribe(channel, (MqttSubscribeMessage) message);
break;
// 退订主题
case UNSUBSCRIBE:
this.unSubscribe(channel, (MqttUnsubscribeMessage) message);
break;
// 心跳包
case PINGREQ:
this.pingReq(channel, message);
break;
// 断开连接
case DISCONNECT:
this.disConnect(channel, message);
break;
// 确认收到响应报文,用于服务器向客户端推送qos1/qos2后,客户端返回服务器的响应
case PUBACK:
this.puback(channel, message);
break;
// qos2类型,发布收到
case PUBREC:
this.pubrec(channel, message);
break;
// qos2类型,发布释放响应
case PUBREL:
this.pubrel(channel, message);
break;
// qos2类型,发布完成
case PUBCOMP:
this.pubcomp(channel, message);
break;
default:
if (log.isDebugEnabled()) {
log.debug("Nonsupport server message type of '{}'.", messageType);
}
break;
}
}
}
/**
* 创建连接时,需要响应对应的ACK包
*
* @param channel:
* @param msg:
* @author liyang
* @since 2022/3/13 23:12
*/
public void connect(Channel channel, MqttConnectMessage msg) {
//连接需要答复
MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(
MqttMessageType.CONNACK, false, AT_LEAST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, true), null);
channel.writeAndFlush(okResp);
clientList.add(channel);
}
/**
* 响应ping心跳ACK包
*
* @param channel:
* @param msg:
* @author liyang
* @since 2022/3/13 23:13
*/
public void pingReq(Channel channel, MqttMessage msg) {
if (log.isDebugEnabled()) {
log.debug("MQTT pingReq received.");
}
MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false,
AT_LEAST_ONCE, false, 0));
channel.writeAndFlush(pingResp);
}
/**
* 服务端主动断开连接
*
* @param channel:
* @param msg:
* @author liyang
* @since 2022/3/13 23:13
*/
public void disConnect(Channel channel, MqttMessage msg) {
clientList.remove(channel);
if (channel.isActive()) {
channel.close();
if (log.isDebugEnabled()) {
log.debug("MQTT channel '{}' was closed.", channel.id().asShortText());
}
}
}
/**
* qos2中使用,发布确认
*
* @param channel:
* @param msg:
* @author liyang
* @since 2022/3/13 23:14
*/
public void puback(Channel channel, MqttMessage msg){
// MqttMessageIdVariableHeader mqttMessageIdVariableHeader = msg.variableHeader();
}
/**
* qos2中发布释放ACK包
*
* @param channel:
* @param msg:
* @author liyang
* @since 2022/3/13 23:13
*/
public void pubrel(Channel channel, MqttMessage msg){
Object mqttMessageIdVariableHeader = msg.variableHeader();
if (mqttMessageIdVariableHeader instanceof MqttPubReplyMessageVariableHeader) {
// qos2类型,接收发布者消息
log.info("qos2客户端返回确认的消息包:{}",msg.payload());
MqttPubReplyMessageVariableHeader header = (MqttPubReplyMessageVariableHeader) mqttMessageIdVariableHeader;
MqttMessage mqttMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.EXACTLY_ONCE, false, 0),
MqttMessageIdVariableHeader.from(header.messageId()),
0);
channel.writeAndFlush(mqttMessage);
for (Channel channel1 : clientList) {
try {
send(channel1,"aaa",MqttQoS.EXACTLY_ONCE,"我收到那");
} catch (InterruptedException e) {
log.error("该通道推送消息失败,可加入容错机制,channel:{}",channel1);
}
}
}
}
/**
* qos2:发布收到ACK包
*
* @param channel:
* @param msg:
* @author liyang
* @since 2022/3/14 0:09
*/
public void pubrec(Channel channel, MqttMessage msg) {
Object mqttMessageIdVariableHeader = msg.variableHeader();
if (mqttMessageIdVariableHeader instanceof MqttPubReplyMessageVariableHeader) {
// qos2类型,接收发布者消息
log.info("qos2客户端返回确认的消息包:{}",msg.payload());
MqttPubReplyMessageVariableHeader header = (MqttPubReplyMessageVariableHeader) mqttMessageIdVariableHeader;
MqttMessage mqttMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.EXACTLY_ONCE, false, 0),
MqttMessageIdVariableHeader.from(header.messageId()),
0);
channel.writeAndFlush(mqttMessage);
}
}
/**
* qos2发布完成
*
* @param channel:
* @param msg:
* @author liyang
* @since 2022/3/14 0:11
*/
public void pubcomp(Channel channel, MqttMessage msg) {
Object mqttMessageIdVariableHeader = msg.variableHeader();
if (mqttMessageIdVariableHeader instanceof MqttPubReplyMessageVariableHeader) {
// qos2类型,接收发布者消息
// log.info("qos2客户端返回确认的消息包:{}",msg.payload());
// MqttPubReplyMessageVariableHeader header = (MqttPubReplyMessageVariableHeader) mqttMessageIdVariableHeader;
// MqttMessage mqttMessage = MqttMessageFactory.newMessage(
// new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.EXACTLY_ONCE, false, 0),
// MqttMessageIdVariableHeader.from(header.messageId()),
// 0);
// channel.writeAndFlush(mqttMessage);
}
}
/**
* 客户端发布消息时使用
*
* @param channel:
* @param msg:
* @author liyang
* @since 2022/3/13 23:14
*/
public void publish(Channel channel, MqttPublishMessage msg) {
log.info("qos类型是{}",msg.fixedHeader().qosLevel());
String topic = msg.variableHeader().topicName();
log.info("订阅主题:{}",topic);
ByteBuf buf = msg.content().duplicate();
byte[] tmp = new byte[buf.readableBytes()];
buf.readBytes(tmp);
String content = null;
try {
content = new String(tmp,"UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
//校验传入的数据是否符合要求
if(StringUtils.isBlank(content)){
log.error("MQTT接收到的数据包为空===》{}",content);
puback(channel,msg,"MQTT接收到的数据包为空");
return;
}
log.info("MQTT读取到的客户端发送信息===>{}",content);
// 如果是qos1或者qos2类型都需要响应
puback(channel,msg,content);
// 推送主题消息
log.info("推送客户端客户端消息:{}",content);
if (AT_LEAST_ONCE == msg.fixedHeader().qosLevel() || AT_MOST_ONCE == msg.fixedHeader().qosLevel()) {
for (Channel channel1 : clientList) {
try {
send(channel1,topic,msg.fixedHeader().qosLevel(),content);
} catch (InterruptedException e) {
log.error("该通道推送消息失败,可加入容错机制,channel:{}",channel1);
}
}
}
}
/**
* 客户端订阅消息ACK包
*
* @param channel:
* @param msg:
* @author liyang
* @since 2022/3/13 23:14
*/
public void subscribe(Channel channel, MqttSubscribeMessage msg) {
MqttQoS mqttQoS = msg.fixedHeader().qosLevel();
// mqttQoS = MqttQoS.EXACTLY_ONCE;
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, mqttQoS, false, 0),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
new MqttSubAckPayload(0));
channel.writeAndFlush(subAckMessage);
}
/**
* 客户端取消订阅ACK包
*
* @param channel:
* @param msg:
* @author liyang
* @since 2022/3/13 23:15
*/
public void unSubscribe(Channel channel, MqttUnsubscribeMessage msg) {
MqttUnsubAckMessage unSubAckMessage = (MqttUnsubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_LEAST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);
channel.writeAndFlush(unSubAckMessage);
disConnect(channel,msg);
}
/**
* 捕获异常状态,客户端断开钩子函数
*
* @param ctx:
* @param cause:
* @author liyang
* @since 2022/3/13 23:15
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("MQTT客户端被强制关闭:{}:{}",ctx.channel().id().asShortText(),cause);
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
ctx.channel().close();
}
}
/**
* qos1中响应客户端ACK包
*
* @param channel:
* @param msg:
* @param payLoad:
* @author liyang
* @since 2022/3/13 23:16
*/
// 客户端QOS1消息类型( MqttQoS.AT_LEAST_ONCE = qos1),需要服务器响应包
private void puback(Channel channel, MqttPublishMessage msg, String payLoad){
if (MqttQoS.AT_MOST_ONCE == msg.fixedHeader().qosLevel()) {
// qos0消息类型,不需要ACK客户端
return;
}
if (MqttQoS.AT_LEAST_ONCE == msg.fixedHeader().qosLevel()) {
// qos1消息类型,需要向客户端返回MqttMessageType.PUBACK 类型ACK应答
MqttPubAckMessage sendMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()),
payLoad);
channel.writeAndFlush(sendMessage);
return;
}
if (MqttQoS.EXACTLY_ONCE == msg.fixedHeader().qosLevel()) {
// qos2消息类型
MqttMessage mqttMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.EXACTLY_ONCE, false, 0),
MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()),
payLoad);
channel.writeAndFlush(mqttMessage);
}
}
/**
* 向客户端发布订阅主题消息
*
* @param channel:
* @param topic:
* @param qos:
* @param sendMessage:
* @return
* @author liyang
* @since 2022/3/13 23:16
*/
public ChannelFuture send(Channel channel, String topic,MqttQoS qos ,String sendMessage ) throws InterruptedException {
MqttRequest request = new MqttRequest((sendMessage.getBytes()));
MqttPublishMessage pubMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH,
request.isDup(),
qos,
request.isRetained(),
0),
new MqttPublishVariableHeader(topic, 0),
Unpooled.buffer().writeBytes(request.getPayload()));
msgMap.put(pubMessage.variableHeader().messageId()+"",pubMessage.variableHeader().messageId()+"");
// 超过高水位,则采取同步模式
if (channel.isWritable()) {
return channel.writeAndFlush(pubMessage);
}
return channel.writeAndFlush(pubMessage).sync();
}
}
package com.my.netty.mqtt.handler;
import io.netty.handler.codec.mqtt.MqttQoS;
/**
* @author: liyang
* @date: 2020-11-04 15:58
* @description: 请求消息体
**/
public class MqttRequest {
private boolean mutable = true;
private byte[] payload;
private MqttQoS qos = MqttQoS.AT_LEAST_ONCE;
private boolean retained = false;
private boolean dup = false;
private int messageId;
public MqttRequest() {
this.setPayload(new byte[0]);
}
public MqttRequest(byte[] payload) {
this.setPayload(payload);
}
public MqttRequest(byte[] payload,MqttQoS qos) {
this.setPayload(payload);
this.setQos(qos);
}
public byte[] getPayload() {
return this.payload;
}
public void clearPayload() {
this.checkMutable();
this.payload = new byte[0];
}
public void setPayload(byte[] payload) {
this.checkMutable();
if (payload == null) {
throw new NullPointerException();
} else {
this.payload = payload;
}
}
public boolean isRetained() {
return this.retained;
}
public void setRetained(boolean retained) {
this.checkMutable();
this.retained = retained;
}
public MqttQoS getQos() {
return qos;
}
public void setQos(MqttQoS qos) {
this.qos = qos;
}
public boolean isMutable() {
return mutable;
}
public void setMutable(boolean mutable) {
this.mutable = mutable;
}
protected void checkMutable() throws IllegalStateException {
if (!this.mutable) {
throw new IllegalStateException();
}
}
public boolean isDup() {
return dup;
}
public void setDup(boolean dup) {
this.dup = dup;
}
public int getMessageId() {
return messageId;
}
public void setMessageId(int messageId) {
this.messageId = messageId;
}
@Override
public String toString() {
return new String(this.payload);
}
}
第三步.发布netty服务
由于咱们是基于springboot架构搭建,所以可以利用
@Component + @PostConstruct 标签在springboot项目初始化时运行netty服务, 但是要注意springBean注入对象需要再创建netty服务是,将单例bean对象传入使用
package com.my.netty;
//import com.my.netty.core.cache.ClientCache;
//import com.my.netty.core.dto.ClientDTO;
//import com.my.netty.core.dto.InitParamDTO;
//import com.my.netty.core.server.IServer;
//import com.my.netty.core.service.IDataService;
//import com.my.netty.mqtt.MqttServer;
//import com.my.netty.websocket.WebSocketServer;
//import com.my.netty.websocket.service.WebSocketDataServiceImpl;
//import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
//import javax.annotation.Resource;
//import java.util.Collection;
/**
* @author: liyang
* @date: 2020-11-04 14:28
* @description: netty启动服务
**/
@Slf4j
@Component
public class NettyServerInit {
// @Resource(name = "mqttDataServiceImpl")
// private IDataService mqttService;
// @Resource(name = "webSocketDataServiceImpl")
// private WebSocketDataServiceImpl webSocketDataService;
@PostConstruct
public void init(){
// 简单版测试服务
TestServer testServer = new TestServer();
testServer.run();
// // 创建mqtt服务
// MqttServer mqttServer = new MqttServer(InitParamDTO.base(9001, 1, 4, mqttService,new ClientCache()));
// // 创建websocket服务
// WebSocketServer wsServer = new WebSocketServer(InitParamDTO.base(9010, 1, 4, webSocketDataService, new ClientCache()));
// // 启动服务
// IServer mqttRun = mqttServer.run();
// IServer wsRun = wsServer.run();
//
// // todo 测试代码-测试监听服务客户端状态
// Thread thread = new Thread(new Runnable() {
// @SneakyThrows
// @Override
// public void run() {
// while (true) {
// Thread.sleep(1500);
// Collection<ClientDTO> clientList = mqttRun.getClientAllList();
// log.info("mqtt客户端连接数:{}:{}", clientList.size());
//
// Collection<ClientDTO> clientAllList = wsRun.getClientAllList();
// log.info("ws客户端连接数:{}:{}", clientAllList.size());
// }
// }
// });
// thread.run();
}
}
第四步.netty客户端测试
win客户端安装包方式,双击打开
版权声明:程序员胖胖胖虎阿 发表于 2022年10月11日 上午6:00。
转载请注明:手把手使用netty4+Springboot2.x实现mqtt协议之客户端与服务器端 | 胖虎的工具箱-编程导航
转载请注明:手把手使用netty4+Springboot2.x实现mqtt协议之客户端与服务器端 | 胖虎的工具箱-编程导航
相关文章
暂无评论...