个人简介
作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门。
文章目录
-
- 个人简介
- Netty
-
- Netty入门
-
- Netty著名项目
- Netty的优势
- Netty Maven
- 第一个Netty应用
- Netty组件
-
- EventLoop
- Channel
- Future&Promise
- Handler&Pipeline
- ByteBuf
- ByteBuf日志工具类
- Netty进阶
-
- 粘包和半包/拆包问题
- 粘包和半包/拆包解决方案
-
- 短连接
- 定长解码器
- 行解码器(推荐)
- 自定义分隔符解码器
- Netty协议解析
-
- Redis协议
- Http协议
- 群聊
Netty
Netty入门
Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序
Netty著名项目
由Netty开发的开源框架:
- dubbo
- Zookeeper
- RocketMQ
Netty的优势
- 不需要自己构建协议,Netty自带了多种协议,例如HTTP协议
- 解决了TCP传输问题,如粘包、半包
- 解决了一个epoll空轮询的JDK bug。(作者遇到过),即selector的select方法默认是阻塞的,但是并没有阻塞会一直空轮询。
- Netty对JDK的NIO API进行增强,如下:
- ThreadLocal==>FastThreadLocal
- ByteBuffer==>ByteBuf(重要),支持动态扩容,不像原厂的JDK的ByteBuffer超过缓存就报错
Netty Maven
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.65.Final</version>
</dependency>
依赖说明:暂时不推荐使用Netty5,使用Netty4即可
第一个Netty应用
服务器端:
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
// Netty的服务器端启动器,装配Netty组件
new ServerBootstrap()
// NioEventLoopGroup底层就是线程池+selector
.group(new NioEventLoopGroup())
// 通道
.channel(NioServerSocketChannel.class)
//“每一个”SocketChannel客户端连接上服务器端“都会”执行这个初始化器ChannelInitializer
//但是每一个SocketChannel只能够让这个初始化器执行一次
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
log.info("initChannel start......");
//往处理器流水线pipeline添加处理器
//因为'客户端'发送数据会进行'字符串的编码'再发送到服务器端,所以这里要'创建一个字符串解码器'StringDecoder
nioSocketChannel.pipeline().addLast(new StringDecoder());
//添加接收数据需要的处理器适配器
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
//重写通道的‘’读‘’方法,msg就是接收到的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.warn(msg.toString()); //打印数据
super.channelRead(ctx, msg);
}
});
log.info("initChannel end......");
}
})
.bind(8082);
}
客户端:
private static final Logger log = LoggerFactory.getLogger(NettyClient.class);
public static void main(String[] args) throws InterruptedException {
//创建Netty客户端的启动器,装配Netty组件
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
//一旦执行这个应用立刻初始化,这个和childHandler有所不同
//childHandler是需要socket连接上在初始化,这个不需要。。。。。
.handler(
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
log.info("initChannel start......");
//由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
//往通道流水线添加一个字符串编码器
channel.pipeline().addLast(new StringEncoder());
log.info("initChannel end......");
}
})
// connect方法是“”异步“”的
.connect("localhost", 8082)
//坑点:由于connect方法是异步的,所以要同步。。。。。
//由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。
//一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步
.sync()
// 获取通道。然后发送数据
.channel()
.writeAndFlush("hello你好");
}
Netty组件
查看CPU最大核心数
int hx = NettyRuntime.availableProcessors(); //cpu核心数
EventLoop
事件循环对象EventLoop
EventLoop本质是一个单线程执行器(同时维护了一个 Selector),里面有run方法处理一个或多个Channel上源源不断的io事件
事件循环组EventLoopGroup
EventLoopGroup是一组EventLoop,而每一个EventLoop都维护着一个selector,Channel 一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop。
int count=3;
EventLoopGroup ev=new NioEventLoopGroup(count);
System.out.println(ev.next().hashCode());//1
System.out.println(ev.next().hashCode());//2
System.out.println(ev.next().hashCode());//3
System.out.println(ev.next().hashCode());//4
通过上面的代码可以看出1和4是同一个对象,因为他们的hashCode相同。得出EventLoopGroup是一个线程池,里面装载着>1个的EventLoop,
EventLoop底层维护了一个线程和selector,而count可以指定EventLoopGroup的线程池大小。
EventLoop普通任务与定时任务
EventLoopGroup ev=new NioEventLoopGroup(3);
//普通任务
ev.next().submit(()->{
System.out.println("111");
});
System.out.println("222");
//定时任务
ev.next().scheduleAtFixedRate(()->{
System.out.println("333");
},0,1,TimeUnit.SECONDS);
关闭EventLoopGroup
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
eventLoopGroup.shutdownGracefully(); //优雅的关闭EventLoopGroup
分工
// Netty的服务器端启动器,装配Netty组件
new ServerBootstrap()
//******NioEventLoopGroup的分工合作,第一个NioEventLoopGroup处理accept事件
//第二个NioEventLoopGroup处理读写事件
.group(new NioEventLoopGroup(),new NioEventLoopGroup())
// 通道
.channel(NioServerSocketChannel.class)
//“每一个”SocketChannel客户端连接上服务器端“都会”执行这个初始化器ChannelInitializer
//但是每一个SocketChannel只能够让这个初始化器执行一次
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
log.info("initChannel start......");
//往处理器流水线pipeline添加处理器
//因为'客户端'发送数据会进行'字符串的编码'再发送到服务器端,所以这里要'创建一个字符串解码器'StringDecoder
nioSocketChannel.pipeline().addLast(new StringDecoder());
//添加接收数据需要的处理器适配器
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
//重写通道的‘’读‘’方法,msg就是接收到的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.warn(msg.toString()); //打印数据
super.channelRead(ctx, msg);
}
});
log.info("initChannel end......");
}
})
.bind(8082);
Channel
Channel常用方法:
- close()
- 可以用来关闭Channel
- closeFuture()
- 用来处理 Channel 的关闭
- pipeline()
- 添加处理器
- write()
- 写入数据,只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
- writeAndFlush()
- 立即发送数据,相当于同时调用write和flush方法,好处是不用等缓存满了才能发出数据的问题
ChannelFuture
获取ChannelFuture
//创建Netty客户端的启动器,装配Netty组件
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
//一旦执行这个应用立刻初始化,这个和childHandler有所不同
//childHandler是需要socket连接上在初始化,这个不需要。。。。。
.handler(
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
//由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
//往通道流水线添加一个字符串编码器
channel.pipeline().addLast(new StringEncoder());
}
})
// connect方法是“”异步“”的
.connect("localhost", 8082);
发送数据的两种方式
- sync同步channelFuture再发送数据
- channelFuture添加监听器
这两种方法本质上都是为了让channelFuture成功创建也就是connect方法完成调用之后才发送数据
//创建Netty客户端的启动器,装配Netty组件
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
//一旦执行这个应用立刻初始化,这个和childHandler有所不同
//childHandler是需要socket连接上在初始化,这个不需要。。。。。
.handler(
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
//由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
//往通道流水线添加一个字符串编码器
channel.pipeline().addLast(new StringEncoder());
}
})
// connect方法是“”异步“”的
.connect("localhost", 8082);
//"方法一":
//由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。
//一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步
// channelFuture.sync();
// Channel channel = channelFuture.channel();
// channel.writeAndFlush("你好");
//方法二:使用监听器,监听channelFuture是否完成连接。因为channelFuture只有connect完成之后才会创建
//使用这种监听器方法就不需要sync进行同步了
channelFuture.addListener(new ChannelFutureListener() {
//当connect成功连接之后就会进入这个方法
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
channel.writeAndFlush("operationComplete");
}
});
关闭通道channel
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
//创建Netty客户端的启动器,装配Netty组件
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
//一旦执行这个应用立刻初始化,这个和childHandler有所不同
//childHandler是需要socket连接上在初始化,这个不需要。。。。。
.handler(
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
//日志
channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
//由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
//往通道流水线添加一个字符串编码器
channel.pipeline().addLast(new StringEncoder());
}
})
// connect方法是“”异步“”的
.connect("localhost", 8082);
//"方法一":
//由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。
//一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步
// channelFuture.sync();
// Channel channel = channelFuture.channel();
// channel.writeAndFlush("你好");
//方法二:使用监听器,监听channelFuture是否完成连接。因为channelFuture只有connect完成之后才会创建
//使用这种监听器方法就不需要sync进行同步了
channelFuture.addListener(new ChannelFutureListener() {
//当connect成功连接之后就会进入这个方法
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
channel.writeAndFlush("operationComplete");
//只有close之后才会调用下面的关闭监听器
channel.close(); //关闭channel,这个关闭方法也是**异步**的,所以也需要进行监听
ChannelFuture closeFuture = channel.closeFuture();
//关闭通道监听器
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
log.info("已经关闭channel");
//关闭group
eventLoopGroup.shutdownGracefully();
}
});
}
});
Future&Promise
Future都是用线程池去返回得到的,所以JDK Future需要依赖线程池,Netty Future需要依赖于EventLoopGroup
JDK Futhure和Netty Future、Netty Promise区别:
Netty的Future继承与JDK的Future,Netty Promise又对Netty Future进行扩展。
- JDK Future只能同步等待任务结束(或成功、或失败)才能得到结果,例如JDK Future的get是阻塞的获取结果
- Netty Future既阻塞的获取结果,也可以非阻塞的获取结果,阻塞就是get,非阻塞就是getNow。
- Netty Promise有Netty Future所有的功能且增加了几个方法,setSuccess、setFailure,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
JDK Future
ExecutorService executorService = Executors.newFixedThreadPool(2); //创建一个固定大小的线程池
//Callable有返回值。
Future<String> future = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "hello";
}
});
String res = future.get(); //get方法会阻塞,直到线程池的submit执行完毕,返回了future对象才会解除阻塞
System.out.println(res);
executorService.shutdown(); //关闭线程池
Netty Future
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);
Future<String> future = eventLoopGroup.next().submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "Netty Future";
}
});
// String s1 = future.get(); //阻塞方法,这个方法和jdk的future一样
// System.out.println(s1);
String s2 = future.getNow(); //非阻塞方法,如果future没有立刻返回值则不会等待,直接返回null
System.out.println(s2);
Netty Promise
Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
EventLoop executors = eventLoopGroup.next();
DefaultPromise<Integer> promise = new DefaultPromise<>(executors);
new Thread(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
promise.setSuccess(100);
}).start();
Integer res = promise.get();
System.out.println(res);
Handler&Pipeline
服务端:
new ServerBootstrap()
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
//pipeline结构
//head->handle1->handle2->handle3->handle4->handle5->handle6->tail
//且为‘双向链表’,触发Inbound事件则会从head->tail一直走Inbound方法。
//触发Outbound事件则会从tail->head一直走Outbound方法。只有触发了对应事件才会走对应的方法。。。。。。
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringDecoder());
//Inbound处理器
//为处理器取名字
socketChannel.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.warn(Thread.currentThread().getName()+"==>"+"handle1");
super.channelRead(ctx, msg); //向下传递
}
});
socketChannel.pipeline().addLast("handle2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.warn(msg.toString());
log.warn(Thread.currentThread().getName()+"==>"+"handle2");
super.channelRead(ctx, msg); //向下传递
}
});
socketChannel.pipeline().addLast("handle3",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//***不能用这种方法,client会收不到
// ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello world");
//***用这种,记住*****一定要指定字符类型UTF-8***
ByteBuf byteBuf = ctx.alloc().buffer().writeBytes("hello".getBytes("utf-8"));
//发送数据,触发OutBound事件
socketChannel.writeAndFlush(byteBuf);
log.warn(Thread.currentThread().getName()+"==>"+"handle3");
super.channelRead(ctx, msg); //向下传递
}
});
//Outbound处理器
socketChannel.pipeline().addLast("handle4",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.warn(Thread.currentThread().getName()+"==>"+"handle4");
super.write(ctx, msg, promise);
}
});
socketChannel.pipeline().addLast("handle5",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.warn(Thread.currentThread().getName()+"==>"+"handle5");
super.write(ctx, msg, promise);
}
});
socketChannel.pipeline().addLast("handle6",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.warn(Thread.currentThread().getName()+"==>"+"handle6");
super.write(ctx, msg, promise);
}
});
}
}).bind(8080);
客户端:
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("--------------"+msg.toString());
super.channelRead(ctx, msg);
}
});
}
}).connect("localhost", 8080);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
channel.writeAndFlush("client-----");
// channel.close();
// ChannelFuture closeFuture = channel.closeFuture();
// closeFuture.addListener(new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture future) throws Exception {
// eventLoopGroup.shutdownGracefully();
// }
// });
}
});
通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler.
handler需要放入通道的pipeline中,才能根据放入顺序来使用handler:
- pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler处理器
- 要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
- 当有入站(Inbound)操作时,会从head开始向tail方向调用handler,直到handler不是处理Inbound操作为止
- 当有出站(Outbound)操作时,会从tail开始向head方向调用handler,直到handler不是处理Outbound操作为止
结构图:
ByteBuf
创建ByteBuf
//创建ByteBuf
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
log(byteBuf);
StringBuffer stringBuffer = new StringBuffer();
for (int i = 0; i < 50; i++) {
stringBuffer.append('1');
}
byteBuf.writeBytes(stringBuffer.toString().getBytes("utf-8"));
log(byteBuf);
运行结果:
read index:0 write index:0 capacity:10
read index:0 write index:50 capacity:64
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111|
|00000010| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111|
|00000020| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111|
|00000030| 31 31 |11 |
+--------+-------------------------------------------------+----------------+
Process finished with exit code 0
根据打印的capacity可知ByteBuf是会自动扩容的,而NIO的ByteBuffer是不能超出容量的。
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
static final int DEFAULT_INITIAL_CAPACITY = 256; //默认初始化容量
static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE; //最大容量
static final int DEFAULT_MAX_COMPONENTS = 16;
ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小
如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建
3种创建池化的ByteBuf方式
ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf
ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.heapBuffer(10);//指定创建‘’堆内存‘’的ByteBuf
ByteBuf byteBuf3 = ByteBufAllocator.DEFAULT.directBuffer(10);//指定创建‘’直接内存‘’的ByteBuf
查看当前ByteBuf对象类型
ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf
ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.heapBuffer(10);//指定创建‘’堆内存‘’的ByteBuf
ByteBuf byteBuf3 = ByteBufAllocator.DEFAULT.directBuffer(10);//指定创建‘’直接内存‘’的ByteBuf
System.out.println(byteBuf1.getClass());
System.out.println(byteBuf2.getClass());
System.out.println(byteBuf3.getClass());
输出结果:
class io.netty.buffer.PooledUnsafeDirectByteBuf
class io.netty.buffer.PooledUnsafeHeapByteBuf
class io.netty.buffer.PooledUnsafeDirectByteBuf
池化和非池化
- Netty4.1之前默认是非池化
- Netty4.1之后默认是池化,但是Android平台默认是非池化
池化优点:
- 本质上池化的意义就是可重用ByteBuf
- 没有池化的话每次需要使用ByteBuf都要重新申请内存。即使是堆内存,释放内存也会增大GC的压力
- 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
- 高并发下,池化更节约内存,减少内存溢出的可能。
IDEA IDE如何设置为非池化
只需要在IDEA IDE的VM options里面设置下面一段代码即可:
-Dio.netty.allocator.type={unpooled|pooled}
ByteBuf组成
- 最大容量与当前容量
- 在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE
- 当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出java.lang.IndexOutOfBoundsException异常
- 读写操作不同于ByteBuffer只用position进行控制,ByteBuf分别由读指针和写指针两个指针控制。进行读写操作时,无需进行模式的切换
- 读指针前的部分被称为废弃部分,是已经读过的内容
- 读指针与写指针之间的空间称为可读部分
- 写指针与当前容量之间的空间称为可写部分
ByteBuf写入
ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf
byteBuf1.writeBytes("hello".getBytes("utf-8"));
write和set方法的区别:
ByteBuf中set开头的一系列方法,也可以写入数据,但不会改变写指针位置
ByteBuf的扩容机制
当ByteBuf中的当前容量无法容纳写入的数据时,会自动进行扩容
触发扩容:
ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf
log(byteBuf1);
byteBuf1.writeBytes("helloaaaaaaaa".getBytes("utf-8"));
log(byteBuf1);
结果:
read index:0 write index:0 capacity:10
read index:0 write index:13 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 61 61 61 61 61 61 61 61 |helloaaaaaaaa |
+--------+-------------------------------------------------+----------------+
扩容机制如下:
有两种情况:
- 写入后的数据小于512字节
- 这种情况会选择使用16的整数倍进行扩容,比如写入后的数据是14字节,则16*1为最小整数倍,则会扩容到16字节
- 写入后的数据大于512字节
- 这种情况会以2的n次方扩容,例如写入后的数据是600字节,此时大于512字节,那么容纳它的容量为2的10次方,因为2的9次方是512容纳不了,所以会扩容到1024字节
- 如果扩容后的大小大于maxCapacity,则会抛出java.lang.IndexOutOfBoundsException异常
ByteBuf读取
读取后会移动读指针
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
byteBuf.writeBytes("hello".getBytes("utf-8"));
byte b[]=new byte[5];
byteBuf.readBytes(b);
System.out.println(Arrays.toString(b));
ByteBuf以get开头的方法,这些方法不会改变读指针的位置
ByteBuf日志工具类
public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];
static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}
int i;
// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}
// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(StringUtil.NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}
// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}
// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}
// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}
/**
* 打印所有内容
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}
/**
* 打印可读取内容
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}
private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");
final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;
// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;
// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");
// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}
// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");
// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}
dump.append(StringUtil.NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}
private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(StringUtil.NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}
public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
io.netty.buffer.ByteBufUtil.appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
ByteBuf的释放
由于ByteBuf中有堆外内存(直接内存)的实现,堆外内存最好是手动来释放,而不是等GC来进行垃圾回收。
- UnpooledHeapByteBuf使用的是JVM内存,只需等GC回收内存即可。
- UnpooledDirectByteBuf使用的是直接内存,需要特殊的方法来回收内存
- PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口
具体如下:
- 新创建的ByteBuf默认计数为1
- 调用release方法会使计数-1,如果计数为0,则内存将会被回收。
- 调用retain方法会使计数+1,表示调用者没用完之前,其它handler即使调用了release也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用。
ByteBuf内存释放规则是:谁最后使用这块内存,谁就要调用release方法进行释放。
- 入站(Inbound处理器链)ByteBuf处理原则:
- 可以遵循谁最后使用内存谁就release。也可以让尾释放内存。
- 我们知道Inbound是从head->tail,所以tail是入站的终点,TailContext也会处理内存释放的问题。
- 可以遵循谁最后使用内存谁就release。也可以让尾释放内存。
- 出站(Outbound处理器链)ByteBuf处理原则
- 可以遵循谁最后使用内存谁就release。也可以让头释放内存。
- 我们知道Outbound是从tail->head,所以head是出站的终点,HeadContext也会处理内存释放的问题。
- 有时候不清楚ByteBuf的计数是多少次,但又必须彻底释放,可以循环调用release直到返回true
while (!buffer.release()) {}
内存释放源码
public interface ReferenceCounted {
ReferenceCounted retain();
/**
* Decreases the reference count by {@code 1} and deallocates this object if the reference count reaches at
* {@code 0}.
*
* @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
*/
boolean release();
}
从注释可以看出,让release成功释放内存后将会返回true。
头尾释放内存源码:
/**
* Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
* in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
* to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
*/
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
/**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
*/
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}
使用被释放的内存会怎样
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
byteBuf.writeBytes("helloWorld".getBytes("utf-8"));
byteBuf.release(); //释放内存
ByteBufferUtil.log(byteBuf);
结果:
Exception in thread "main" io.netty.util.IllegalReferenceCountException: refCnt: 0
注意:一旦ByteBuf的计数到0,再进行retain也没用
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
byteBuf.writeBytes("helloWorld".getBytes("utf-8"));
byteBuf.release(); //-1
byteBuf.retain(); //+1
ByteBufferUtil.log(byteBuf);
结果
Exception in thread "main" io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1
内存切片slice
public abstract ByteBuf slice(int index, int length);
- ByteBuf的内存切片也是零拷贝的体现之一,切片后的内存还是原来ByteBuf的内存,过程没有发生过内存复制,切片后的 ByteBuf 维护独立的 read,write 指针.
- 切片后的ByteBuf需要调用retain使计数+1,防止原来的ByetBuf调用release释放内存导致切片的内存不可用。
- 修改原ByteBuf中的值,也会影响切片后得到的ByteBuf。
代码案例:
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
byteBuf.writeBytes("helloWorld".getBytes("utf-8"));
ByteBufferUtil.log(byteBuf);
ByteBuf buf = byteBuf.slice(0, 5); //内存分片
ByteBufferUtil.log(buf);
System.out.println("---------------");
buf.setByte(1,'g'); //修改分片内存的值
//重新打印
ByteBufferUtil.log(byteBuf);
ByteBufferUtil.log(buf);
结果:
read index:0 write index:10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 57 6f 72 6c 64 |helloWorld |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
---------------
read index:0 write index:10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 67 6c 6c 6f 57 6f 72 6c 64 |hglloWorld |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 67 6c 6c 6f |hgllo |
+--------+-------------------------------------------------+----------------+
Process finished with exit code 0
结论:可以看出修改分片的内存的值,原内存也会受到影响,因为他们都是用同一块内存。
ByteBuf优势
- 池化思想-可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能
- 读写指针分离,不需要像 ByteBuffer一样切换读写模式
- 可以自动扩容
- 支持链式调用,使用更流畅
- 很多地方体现零拷贝,例如slice、duplicate、CompositeByteBuf
Netty进阶
粘包和半包/拆包问题
粘包问题演示
服务器端:
private static final Logger log= LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//不进行加解密不然展示不出粘包效果
// ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端已成功连接服务器");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("msg={}",msg);
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
}
客户端:
private static final Logger log= LoggerFactory.getLogger(NettyClient.class);
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
//不进行加解密不然展示不出粘包效果
// ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new LoggingHandler());
}
}).connect("localhost", 8080);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
ByteBuf byteBuf = channel.alloc().buffer(16);
for (int i=0;i<10;i++){
byteBuf.retain();
byteBuf.writeBytes(("hello").getBytes("utf-8"));
channel.writeAndFlush(byteBuf);
byteBuf.clear();
}
channel.close();
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
eventLoopGroup.shutdownGracefully();
}
});
}
});
}
服务器端输出结果:
16:00:37.869 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xa191631f, L:/127.0.0.1:8080 - R:/127.0.0.1:53693] READ: 50B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 |hellohellohelloh|
|00000010| 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 |ellohellohellohe|
|00000020| 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c |llohellohellohel|
|00000030| 6c 6f |lo |
+--------+-------------------------------------------------+----------------+
可以看出原来我们是在客户端分10次发送,而服务器端却一下把10次的数据都粘在一起了,这就是粘包问题。
半包问题展示
服务器端:
private static final Logger log= LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
// 半包问题:例如,发送方发送100字节数据,而接收方最多只能接收30字节数据,这就是半包问题
//option(ChannelOption.SO_RCVBUF,10),调整接收缓冲区大小(滑动窗口)
.option(ChannelOption.SO_RCVBUF,10)
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 不进行加解密不然展示不出粘包效果
// ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline()
.addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端已成功连接服务器");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
log.info("msg={}", msg);
super.channelRead(ctx, msg);
}
});
}
})
.bind(8080);
}
只需使用这个方法即可
- option(ChannelOption.SO_RCVBUF,10)
option(ChannelOption.SO_RCVBUF,10),调整接收缓冲区大小。由于接收缓存区的大小<发送方发送的数据大小,所以产生了半包问题。
现象分析
粘包:
- 产生现象
- 第一次发送abc,第二次发送def,接收到的是一整个abcdef
- 原因
- Netty层
- 接收方的接收缓冲区太大,Netty的接收缓冲区默认是1024字节
- 网络层
- TCP滑动窗口:假如发送方发送100字节数据,而滑动窗口缓冲区可容纳>100字节数据,这时候就会出现粘包问题。
- Nagle 算法:会造成粘包
- Netty层
半包/拆包:
- 产生现象
- 发送abcdef数据,接收方第一次收到ab,第二次收到cd,第三次收到ef
- 原因
- Netty层
- 接收方的接收缓冲区太小,发送方的数据过大,导致接收方无法一次接收下所有数据,就会半包/拆包
- 网络层
- 滑动窗口:假设接收方的窗口只剩了128bytes,发送方的报文大小是256bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前128bytes,等待ack后才能发送剩余部分,这就造成了半包
- 数据链路层
- MSS 限制:当发送的数据超过MSS限制后,会将数据切分发送,就会造成半包
- Netty层
发送这些问题的本质:因为 TCP 是流式协议,消息无边界
粘包和半包/拆包解决方案
短连接
短连接:即每发送一条数据就重新连接再次发送,反复此操作。
短连接的缺点是显而易见的,每次发送一条数据都要重新连接,这样会大大的浪费时间,因为连接是需要时间的。
客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。
这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。
但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象
采用短连接解决粘包代码
服务端:
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline()
.addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端已成功连接服务器");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("msg="+msg);
super.channelRead(ctx, msg);
}
});
}
})
.bind(8080);
}
客户端:
private static final Logger log= LoggerFactory.getLogger(NettyClient.class);
public static void main(String[] args) {
//采用短连接解决“”粘包“”问题,无法解决半包问题
for (int i = 0; i < 10; i++) {
sendMessage("hello");
}
}
public static void sendMessage(String msg){
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
//不进行加解密不然展示不出粘包效果
// ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(msg.getBytes("utf-8"));
ch.writeAndFlush(buffer);
ch.close();
ChannelFuture closeFuture = ch.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
eventLoopGroup.shutdownGracefully();
}
});
}
});
}
}).connect("localhost", 8080);
}
定长解码器
客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度。
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder对数据进行定长解码
行解码器(推荐)
对于其他解码器,我还是更喜欢行解码器。行解码器主要是靠分隔符\n来判断行进行解码,不过需要进行限制长度,以免服务器一直搜索\n造成卡死。
改造前的粘包代码
服务端:
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("msg={}",msg);
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
客户端:
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer(16);
for(int i=0;i<10;i++){
buffer.retain();
buffer.writeBytes("hello world".getBytes("utf-8"));
ctx.channel().writeAndFlush(buffer);
}
ch.close();//关闭Channel
ChannelFuture closeFuture = ch.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
nioEventLoopGroup.shutdownGracefully();
}
});
}
});
}
}).connect("localhost",8080);
}catch (Exception e){
e.printStackTrace();
}
结果:
13:37:15.286 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x36ce6c5f, L:/127.0.0.1:8080 - R:/127.0.0.1:64550] READ: 110B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f |hello worldhello|
|00000010| 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c | worldhello worl|
|00000020| 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c |dhello worldhell|
|00000030| 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 |o worldhello wor|
|00000040| 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c |ldhello worldhel|
|00000050| 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f |lo worldhello wo|
|00000060| 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 |rldhello world |
+--------+-------------------------------------------------+----------------+
接收方使用行解码器改造后
服务端:
private static final Logger log= LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));//配置行解码器
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("msg={}",msg);
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
客户端:
//把消息加工成可以被行解码器识别的消息
private static String getMsg(String oldMsg){
oldMsg+='\n';
return oldMsg;
}
public static void main(String[] args) {
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer(16);
for(int i=0;i<10;i++){
buffer.retain();
String msg = getMsg("hello world");
buffer.writeBytes(msg.getBytes("utf-8"));
ctx.channel().writeAndFlush(buffer);
//清理缓存,防止数据堆叠
buffer.clear();
}
ch.close();//关闭Channel
ChannelFuture closeFuture = ch.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
nioEventLoopGroup.shutdownGracefully();
}
});
}
});
}
}).connect("localhost",8080);
}catch (Exception e){
e.printStackTrace();
}
}
输出结果:
13:47:15.199 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] REGISTERED
13:47:15.199 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] ACTIVE
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 |hello world |
+--------+-------------------------------------------------+----------------+
13:47:15.224 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 12, cap: 2048))
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 12, cap: 2048)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 |hello world |
+--------+-------------------------------------------------+----------------+
13:47:15.224 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 2048))
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 2048)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 |hello world |
+--------+-------------------------------------------------+----------------+
13:47:15.224 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 2048))
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 2048)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 |hello world |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 1024))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 |hello world |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 1024))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 |hello world |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 36, cap: 1024))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 36, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 |hello world |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 36, cap: 1024))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 36, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 |hello world |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 36, widx: 36, cap: 1024))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 36, widx: 36, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 |hello world |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 512))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 512)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 |hello world |
+--------+-------------------------------------------------+----------------+
可以看出已经解决了粘包问题。
自定义分隔符解码器
核心类DelimiterBasedFrameDecoder
/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param delimiter the delimiter
*/
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
this(maxFrameLength, true, delimiter);
}
服务端:
private static final Logger log= LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ByteBuf delimiter = ch.alloc().buffer(6);
delimiter.writeBytes("\r".getBytes("utf-8")); //自定义分隔符
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("msg={}",msg);
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
}
客户端:
//把消息加工成可以被行解码器识别的消息
private static String getMsg(String oldMsg){
oldMsg+='\r';
return oldMsg;
}
public static void main(String[] args) {
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer(16);
for(int i=0;i<10;i++){
buffer.retain();
String msg = getMsg("hello world");
buffer.writeBytes(msg.getBytes("utf-8"));
ctx.channel().writeAndFlush(buffer);
//清理缓存,防止数据堆叠
buffer.clear();
}
ch.close();//关闭Channel
ChannelFuture closeFuture = ch.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
nioEventLoopGroup.shutdownGracefully();
}
});
}
});
}
}).connect("localhost",8080);
}catch (Exception e){
e.printStackTrace();
}
}
Netty协议解析
Redis协议
我们要用netty执行Redis命令就需要遵循Redis协议。
Redis协议格式
*<参数数量> \r\n
$<参数1的字节数量> \r\n
<参数1的数据> \r\n
...
$<参数N的字节数量> \r\n
<参数N的数据> \r\n
使用Netty搭建一个Redis client
/**
* @author 游政杰
* @date 2022/1/13
* 模拟 Redis client
*/
public class RedisSender {
//true为继续循环,false是退出循环
private static ThreadLocal<Boolean> threadLocal=new ThreadLocal<Boolean>();
private static final Logger log= LoggerFactory.getLogger(RedisSender.class);
public static void main(String[] args) {
//Netty“”客户端“”执行Redis命令
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
threadLocal.set(true); //默认是继续循环
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){
//连接成功之后调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Scanner sc = new Scanner(System.in);
for (;;){
if(!threadLocal.get()){
System.out.println("退出成功");
break;
}
printInfo();
String sel = sc.next();
switch (sel)
{
case "a":
sc.nextLine(); //***上面会传下来字符,导致无法输入字符串,所以要加上这句,目的是吸收上面传下来的多余字符串
System.out.println("请输入Redis命令[以单空格分隔]:");
String redis_cmd = sc.nextLine();
String decodeProtocol = decodeProtocol(redis_cmd);
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(decodeProtocol.getBytes("utf-8"));
ctx.writeAndFlush(buffer);
buffer.clear();
break;
case "q":
ch.close();
ChannelFuture closeFuture = ch.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
nioEventLoopGroup.shutdownGracefully();
threadLocal.set(false); //退出只需要设置为false即可
}
});
break;
default:
System.out.println("无该选项");
break;
}
}
}
});
}
}).connect("localhost",6379);
}catch (Exception e){
e.printStackTrace();
nioEventLoopGroup.shutdownGracefully();
}
}
private static void printInfo()
{
System.out.println("请输入以下字符选项:");
System.out.println("输入a:执行Redis命令");
System.out.println("输入q:退出");
}
/**
* 协议解析
* @param redis_cmd 命令
* @return
*/
//set myname abc
//del key
//get key
private static synchronized String decodeProtocol(String redis_cmd){
String delimiter1="*";
String delimiter2="$";
String delimiter3="\r\n";
StringBuffer decodeCmd = new StringBuffer();//使用线程安全的StringBuffer
List<String> cmd = Arrays.asList(redis_cmd.split(" "));
decodeCmd.append(delimiter1+cmd.size()+delimiter3);
cmd.forEach((e)->{
decodeCmd.append(delimiter2+e.length()+delimiter3);
decodeCmd.append(e+delimiter3);
});
return decodeCmd.toString();
}
}
Http协议
http服务端:
public class HttpServer {
//http服务器
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
//netty自带的http协议转换
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//msg有两种类型
//class io.netty.handler.codec.http.DefaultHttpRequest
//class io.netty.handler.codec.http.LastHttpContent$1
if(msg instanceof HttpRequest){
HttpRequest request=(HttpRequest)msg;
//输出响应DefaultFullHttpResponse(HttpVersion version, HttpResponseStatus status)
DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
String s="hello 2022";
byte b[]=s.getBytes("utf-8");
//从请求头设置响应数据长度,以免浏览器空转
//content_length是io.netty.handler.codec.http包下的类
response.headers().setInt(CONTENT_LENGTH,b.length);
//输出内容
response.content().writeBytes(b);
ctx.channel().writeAndFlush(response);
}
super.channelRead(ctx, msg);
}
});
}
}).bind("localhost",8080);
}
}
输出结果:
16:03:05.785 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] REGISTERED
16:03:05.785 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] ACTIVE
16:03:05.788 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] READ: 756B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 20 48 54 54 50 2f 31 2e 31 0d 0a |GET / HTTP/1.1..|
|00000010| 48 6f 73 74 3a 20 6c 6f 63 61 6c 68 6f 73 74 3a |Host: localhost:|
|00000020| 38 30 38 30 0d 0a 55 73 65 72 2d 41 67 65 6e 74 |8080..User-Agent|
|00000030| 3a 20 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 57 |: Mozilla/5.0 (W|
|00000040| 69 6e 64 6f 77 73 20 4e 54 20 31 30 2e 30 3b 20 |indows NT 10.0; |
|00000050| 57 69 6e 36 34 3b 20 78 36 34 3b 20 72 76 3a 39 |Win64; x64; rv:9|
|00000060| 35 2e 30 29 20 47 65 63 6b 6f 2f 32 30 31 30 30 |5.0) Gecko/20100|
|00000070| 31 30 31 20 46 69 72 65 66 6f 78 2f 39 35 2e 30 |101 Firefox/95.0|
|00000080| 0d 0a 41 63 63 65 70 74 3a 20 74 65 78 74 2f 68 |..Accept: text/h|
|00000090| 74 6d 6c 2c 61 70 70 6c 69 63 61 74 69 6f 6e 2f |tml,application/|
|000000a0| 78 68 74 6d 6c 2b 78 6d 6c 2c 61 70 70 6c 69 63 |xhtml+xml,applic|
|000000b0| 61 74 69 6f 6e 2f 78 6d 6c 3b 71 3d 30 2e 39 2c |ation/xml;q=0.9,|
|000000c0| 69 6d 61 67 65 2f 61 76 69 66 2c 69 6d 61 67 65 |image/avif,image|
|000000d0| 2f 77 65 62 70 2c 2a 2f 2a 3b 71 3d 30 2e 38 0d |/webp,*/*;q=0.8.|
|000000e0| 0a 41 63 63 65 70 74 2d 4c 61 6e 67 75 61 67 65 |.Accept-Language|
|000000f0| 3a 20 7a 68 2d 43 4e 2c 7a 68 3b 71 3d 30 2e 38 |: zh-CN,zh;q=0.8|
|00000100| 2c 7a 68 2d 54 57 3b 71 3d 30 2e 37 2c 7a 68 2d |,zh-TW;q=0.7,zh-|
|00000110| 48 4b 3b 71 3d 30 2e 35 2c 65 6e 2d 55 53 3b 71 |HK;q=0.5,en-US;q|
|00000120| 3d 30 2e 33 2c 65 6e 3b 71 3d 30 2e 32 0d 0a 41 |=0.3,en;q=0.2..A|
|00000130| 63 63 65 70 74 2d 45 6e 63 6f 64 69 6e 67 3a 20 |ccept-Encoding: |
|00000140| 67 7a 69 70 2c 20 64 65 66 6c 61 74 65 0d 0a 43 |gzip, deflate..C|
|00000150| 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 70 2d |onnection: keep-|
|00000160| 61 6c 69 76 65 0d 0a 43 6f 6f 6b 69 65 3a 20 48 |alive..Cookie: H|
|00000170| 6d 5f 6c 76 74 5f 62 33 39 33 64 31 35 33 61 65 |m_lvt_b393d153ae|
|00000180| 62 32 36 62 34 36 65 39 34 33 31 66 61 62 61 66 |b26b46e9431fabaf|
|00000190| 30 66 36 31 39 30 3d 31 36 32 31 39 30 35 38 35 |0f6190=162190585|
|000001a0| 30 3b 20 49 64 65 61 2d 33 35 32 63 36 33 39 66 |0; Idea-352c639f|
|000001b0| 3d 31 32 37 64 31 61 65 35 2d 34 34 31 37 2d 34 |=127d1ae5-4417-4|
|000001c0| 61 62 36 2d 61 61 64 33 2d 36 32 36 62 66 38 36 |ab6-aad3-626bf86|
|000001d0| 34 62 62 62 33 3b 20 55 4d 5f 64 69 73 74 69 6e |4bbb3; UM_distin|
|000001e0| 63 74 69 64 3d 31 37 61 61 65 35 65 65 63 33 34 |ctid=17aae5eec34|
|000001f0| 35 31 30 2d 30 39 39 37 35 66 34 36 64 62 65 63 |510-09975f46dbec|
|00000200| 33 38 38 2d 34 63 33 65 32 35 37 61 2d 31 34 34 |388-4c3e257a-144|
|00000210| 30 30 30 2d 31 37 61 61 65 35 65 65 63 33 36 33 |000-17aae5eec363|
|00000220| 61 62 3b 20 43 4e 5a 5a 44 41 54 41 31 32 35 38 |ab; CNZZDATA1258|
|00000230| 35 36 36 39 36 33 3d 31 36 31 32 35 36 38 34 34 |566963=161256844|
|00000240| 38 2d 31 36 32 36 34 32 32 37 30 36 2d 25 37 43 |8-1626422706-%7C|
|00000250| 31 36 32 36 34 32 38 31 35 30 0d 0a 55 70 67 72 |1626428150..Upgr|
|00000260| 61 64 65 2d 49 6e 73 65 63 75 72 65 2d 52 65 71 |ade-Insecure-Req|
|00000270| 75 65 73 74 73 3a 20 31 0d 0a 53 65 63 2d 46 65 |uests: 1..Sec-Fe|
|00000280| 74 63 68 2d 44 65 73 74 3a 20 64 6f 63 75 6d 65 |tch-Dest: docume|
|00000290| 6e 74 0d 0a 53 65 63 2d 46 65 74 63 68 2d 4d 6f |nt..Sec-Fetch-Mo|
|000002a0| 64 65 3a 20 6e 61 76 69 67 61 74 65 0d 0a 53 65 |de: navigate..Se|
|000002b0| 63 2d 46 65 74 63 68 2d 53 69 74 65 3a 20 6e 6f |c-Fetch-Site: no|
|000002c0| 6e 65 0d 0a 53 65 63 2d 46 65 74 63 68 2d 55 73 |ne..Sec-Fetch-Us|
|000002d0| 65 72 3a 20 3f 31 0d 0a 43 61 63 68 65 2d 43 6f |er: ?1..Cache-Co|
|000002e0| 6e 74 72 6f 6c 3a 20 6d 61 78 2d 61 67 65 3d 30 |ntrol: max-age=0|
|000002f0| 0d 0a 0d 0a |.... |
+--------+-------------------------------------------------+----------------+
16:03:05.789 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] WRITE: 49B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 31 30 0d 0a 0d 0a 68 65 6c 6c 6f 20 32 30 32 | 10....hello 202|
|00000030| 32 |2 |
+--------+-------------------------------------------------+----------------+
16:03:05.789 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] FLUSH
16:03:05.789 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
GET / HTTP/1.1
Host: localhost:8080
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8
Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2
Accept-Encoding: gzip, deflate
Connection: keep-alive
Cookie: Hm_lvt_b393d153aeb26b46e9431fabaf0f6190=1621905850; Idea-352c639f=127d1ae5-4417-4ab6-aad3-626bf864bbb3; UM_distinctid=17aae5eec34510-09975f46dbec388-4c3e257a-144000-17aae5eec363ab; CNZZDATA1258566963=1612568448-1626422706-%7C1626428150
Upgrade-Insecure-Requests: 1
Sec-Fetch-Dest: document
Sec-Fetch-Mode: navigate
Sec-Fetch-Site: none
Sec-Fetch-User: ?1
Cache-Control: max-age=0 that reached at the tail of the pipeline. Please check your pipeline configuration.
16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LoggingHandler#0, HttpServerCodec#0, HttpServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486].
16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message EmptyLastHttpContent that reached at the tail of the pipeline. Please check your pipeline configuration.
16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LoggingHandler#0, HttpServerCodec#0, HttpServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486].
16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] READ COMPLETE
16:03:05.809 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] READ: 693B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
|00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.|
|00000030| 0a 55 73 65 72 2d 41 67 65 6e 74 3a 20 4d 6f 7a |.User-Agent: Moz|
|00000040| 69 6c 6c 61 2f 35 2e 30 20 28 57 69 6e 64 6f 77 |illa/5.0 (Window|
|00000050| 73 20 4e 54 20 31 30 2e 30 3b 20 57 69 6e 36 34 |s NT 10.0; Win64|
|00000060| 3b 20 78 36 34 3b 20 72 76 3a 39 35 2e 30 29 20 |; x64; rv:95.0) |
|00000070| 47 65 63 6b 6f 2f 32 30 31 30 30 31 30 31 20 46 |Gecko/20100101 F|
|00000080| 69 72 65 66 6f 78 2f 39 35 2e 30 0d 0a 41 63 63 |irefox/95.0..Acc|
|00000090| 65 70 74 3a 20 69 6d 61 67 65 2f 61 76 69 66 2c |ept: image/avif,|
|000000a0| 69 6d 61 67 65 2f 77 65 62 70 2c 2a 2f 2a 0d 0a |image/webp,*/*..|
|000000b0| 41 63 63 65 70 74 2d 4c 61 6e 67 75 61 67 65 3a |Accept-Language:|
|000000c0| 20 7a 68 2d 43 4e 2c 7a 68 3b 71 3d 30 2e 38 2c | zh-CN,zh;q=0.8,|
|000000d0| 7a 68 2d 54 57 3b 71 3d 30 2e 37 2c 7a 68 2d 48 |zh-TW;q=0.7,zh-H|
|000000e0| 4b 3b 71 3d 30 2e 35 2c 65 6e 2d 55 53 3b 71 3d |K;q=0.5,en-US;q=|
|000000f0| 30 2e 33 2c 65 6e 3b 71 3d 30 2e 32 0d 0a 41 63 |0.3,en;q=0.2..Ac|
|00000100| 63 65 70 74 2d 45 6e 63 6f 64 69 6e 67 3a 20 67 |cept-Encoding: g|
|00000110| 7a 69 70 2c 20 64 65 66 6c 61 74 65 0d 0a 43 6f |zip, deflate..Co|
|00000120| 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 70 2d 61 |nnection: keep-a|
|00000130| 6c 69 76 65 0d 0a 52 65 66 65 72 65 72 3a 20 68 |live..Referer: h|
|00000140| 74 74 70 3a 2f 2f 6c 6f 63 61 6c 68 6f 73 74 3a |ttp://localhost:|
|00000150| 38 30 38 30 2f 0d 0a 43 6f 6f 6b 69 65 3a 20 48 |8080/..Cookie: H|
|00000160| 6d 5f 6c 76 74 5f 62 33 39 33 64 31 35 33 61 65 |m_lvt_b393d153ae|
|00000170| 62 32 36 62 34 36 65 39 34 33 31 66 61 62 61 66 |b26b46e9431fabaf|
|00000180| 30 66 36 31 39 30 3d 31 36 32 31 39 30 35 38 35 |0f6190=162190585|
|00000190| 30 3b 20 49 64 65 61 2d 33 35 32 63 36 33 39 66 |0; Idea-352c639f|
|000001a0| 3d 31 32 37 64 31 61 65 35 2d 34 34 31 37 2d 34 |=127d1ae5-4417-4|
|000001b0| 61 62 36 2d 61 61 64 33 2d 36 32 36 62 66 38 36 |ab6-aad3-626bf86|
|000001c0| 34 62 62 62 33 3b 20 55 4d 5f 64 69 73 74 69 6e |4bbb3; UM_distin|
|000001d0| 63 74 69 64 3d 31 37 61 61 65 35 65 65 63 33 34 |ctid=17aae5eec34|
|000001e0| 35 31 30 2d 30 39 39 37 35 66 34 36 64 62 65 63 |510-09975f46dbec|
|000001f0| 33 38 38 2d 34 63 33 65 32 35 37 61 2d 31 34 34 |388-4c3e257a-144|
|00000200| 30 30 30 2d 31 37 61 61 65 35 65 65 63 33 36 33 |000-17aae5eec363|
|00000210| 61 62 3b 20 43 4e 5a 5a 44 41 54 41 31 32 35 38 |ab; CNZZDATA1258|
|00000220| 35 36 36 39 36 33 3d 31 36 31 32 35 36 38 34 34 |566963=161256844|
|00000230| 38 2d 31 36 32 36 34 32 32 37 30 36 2d 25 37 43 |8-1626422706-%7C|
|00000240| 31 36 32 36 34 32 38 31 35 30 0d 0a 53 65 63 2d |1626428150..Sec-|
|00000250| 46 65 74 63 68 2d 44 65 73 74 3a 20 69 6d 61 67 |Fetch-Dest: imag|
|00000260| 65 0d 0a 53 65 63 2d 46 65 74 63 68 2d 4d 6f 64 |e..Sec-Fetch-Mod|
|00000270| 65 3a 20 6e 6f 2d 63 6f 72 73 0d 0a 53 65 63 2d |e: no-cors..Sec-|
|00000280| 46 65 74 63 68 2d 53 69 74 65 3a 20 73 61 6d 65 |Fetch-Site: same|
|00000290| 2d 6f 72 69 67 69 6e 0d 0a 43 61 63 68 65 2d 43 |-origin..Cache-C|
|000002a0| 6f 6e 74 72 6f 6c 3a 20 6d 61 78 2d 61 67 65 3d |ontrol: max-age=|
|000002b0| 30 0d 0a 0d 0a |0.... |
+--------+-------------------------------------------------+----------------+
16:03:05.810 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] WRITE: 49B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 31 30 0d 0a 0d 0a 68 65 6c 6c 6f 20 32 30 32 | 10....hello 202|
|00000030| 32 |2 |
+--------+-------------------------------------------------+----------------+
16:03:05.810 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] FLUSH
16:03:05.810 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
GET /favicon.ico HTTP/1.1
Host: localhost:8080
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0
Accept: image/avif,image/webp,*/*
Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2
Accept-Encoding: gzip, deflate
Connection: keep-alive
Referer: http://localhost:8080/
Cookie: Hm_lvt_b393d153aeb26b46e9431fabaf0f6190=1621905850; Idea-352c639f=127d1ae5-4417-4ab6-aad3-626bf864bbb3; UM_distinctid=17aae5eec34510-09975f46dbec388-4c3e257a-144000-17aae5eec363ab; CNZZDATA1258566963=1612568448-1626422706-%7C1626428150
Sec-Fetch-Dest: image
Sec-Fetch-Mode: no-cors
Sec-Fetch-Site: same-origin
群聊
服务端:
private static final Logger log= LoggerFactory.getLogger(NettyServer.class);
//维护所有channel,key=名称,value为channel对象
private static Map<String, NioSocketChannel> sessions=new ConcurrentHashMap<>();
public static Map<String, NioSocketChannel> getSessions() {
return sessions;
}
public static void putSession(String name,NioSocketChannel channel){
sessions.put(name,channel);
}
public static void removeSession(String name){
sessions.remove(name);
}
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));//配置行解码器
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
//读消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf=(ByteBuf)msg;
byte b[]=new byte[byteBuf.readableBytes()];
byteBuf.readBytes(b);
String str=new String(b,"utf8");
JSONObject jsonObject = JSONObject.parseObject(str);
int state = (int) jsonObject.get("state");
String username = (String) jsonObject.get("username");
switch (state)
{
case 0: //上线
NettyServer.putSession(username, ch);
if(username.equals("client4")){ //如果是client4用户登录则群发
sessions.forEach((k,v)->{
ByteBuf buffer = ctx.alloc().buffer(16);
try {
buffer.writeBytes("群发hhhh".getBytes("utf8"));
v.writeAndFlush(buffer);
// buffer.clear();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
}
System.out.println("当前在线人数:"+NettyServer.getSessions().size());
break;
case 1: //下线
NettyServer.removeSession(username);
NettyServer.getSessions().forEach((k,v)->{
System.out.println(k);
System.out.println(v.hashCode());
});
System.out.println("当前在线人数:"+NettyServer.getSessions().size());
break;
default:
break;
}
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
}
客户端1:
public static void main(String[] args) {
String name="client1";
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setUsername(name);
user.setState(0);
String jsonString = JSON.toJSONString(user);
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(jsonString.getBytes("utf8"));
ch.writeAndFlush(buffer);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setUsername(name);
user.setState(1);
String jsonString = JSON.toJSONString(user);
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(jsonString.getBytes("utf8"));
ch.writeAndFlush(buffer);
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
super.channelRead(ctx, msg);
}
});
}
}).connect("localhost",8080);
}catch (Exception e){
e.printStackTrace();
}
}
客户端2:
public static void main(String[] args) {
String name="client2";
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setUsername(name);
user.setState(0);
String jsonString = JSON.toJSONString(user);
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(jsonString.getBytes("utf8"));
ch.writeAndFlush(buffer);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setUsername(name);
user.setState(1);
String jsonString = JSON.toJSONString(user);
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(jsonString.getBytes("utf8"));
ch.writeAndFlush(buffer);
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
super.channelRead(ctx, msg);
}
});
}
}).connect("localhost",8080);
}catch (Exception e){
e.printStackTrace();
}
}
客户端3:
public static void main(String[] args) {
String name="client3";
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setUsername(name);
user.setState(0);
String jsonString = JSON.toJSONString(user);
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(jsonString.getBytes("utf8"));
ch.writeAndFlush(buffer);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setUsername(name);
user.setState(1);
String jsonString = JSON.toJSONString(user);
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(jsonString.getBytes("utf8"));
ch.writeAndFlush(buffer);
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
super.channelRead(ctx, msg);
}
});
}
}).connect("localhost",8080);
}catch (Exception e){
e.printStackTrace();
}
}
客户端4:
public static void main(String[] args) {
String name="client4";
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setUsername(name);
user.setState(0);
String jsonString = JSON.toJSONString(user);
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(jsonString.getBytes("utf8"));
ch.writeAndFlush(buffer);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setUsername(name);
user.setState(1);
String jsonString = JSON.toJSONString(user);
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(jsonString.getBytes("utf8"));
ch.writeAndFlush(buffer);
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
super.channelRead(ctx, msg);
}
});
}
}).connect("localhost",8080);
}catch (Exception e){
e.printStackTrace();
}
}
实体类:
public class User implements Serializable {
private String username;
private int state; //用户状态,0在线,1下线
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
@Override
public String toString() {
return "User{" +
"username='" + username + '\'' +
", state=" + state +
'}';
}
}