高性能网络通信框架Netty一套就够(作者原创)

2年前 (2022) 程序员胖胖胖虎阿
174 0 0

个人简介

作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门。

文章目录

    • 个人简介
    • Netty
      • Netty入门
        • Netty著名项目
        • Netty的优势
        • Netty Maven
        • 第一个Netty应用
        • Netty组件
          • EventLoop
          • Channel
          • Future&Promise
          • Handler&Pipeline
          • ByteBuf
          • ByteBuf日志工具类
      • Netty进阶
        • 粘包和半包/拆包问题
        • 粘包和半包/拆包解决方案
          • 短连接
          • 定长解码器
          • 行解码器(推荐)
          • 自定义分隔符解码器
        • Netty协议解析
          • Redis协议
          • Http协议
        • 群聊

Netty

Netty入门

Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序

Netty著名项目

由Netty开发的开源框架:

  • dubbo
  • Zookeeper
  • RocketMQ

Netty的优势

  • 不需要自己构建协议,Netty自带了多种协议,例如HTTP协议
  • 解决了TCP传输问题,如粘包、半包
  • 解决了一个epoll空轮询的JDK bug。(作者遇到过),即selector的select方法默认是阻塞的,但是并没有阻塞会一直空轮询。
  • Netty对JDK的NIO API进行增强,如下:
    • ThreadLocal==>FastThreadLocal
    • ByteBuffer==>ByteBuf(重要),支持动态扩容,不像原厂的JDK的ByteBuffer超过缓存就报错

Netty Maven

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.65.Final</version>
        </dependency>

依赖说明:暂时不推荐使用Netty5,使用Netty4即可

第一个Netty应用

服务器端:

  private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

  public static void main(String[] args) {

    // Netty的服务器端启动器,装配Netty组件
    new ServerBootstrap()
        // NioEventLoopGroup底层就是线程池+selector
        .group(new NioEventLoopGroup())
        // 通道
        .channel(NioServerSocketChannel.class)
        //“每一个”SocketChannel客户端连接上服务器端“都会”执行这个初始化器ChannelInitializer
        //但是每一个SocketChannel只能够让这个初始化器执行一次
        .childHandler(
            new ChannelInitializer<NioSocketChannel>() {
              @Override
              protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                  log.info("initChannel start......");
                  //往处理器流水线pipeline添加处理器
                  //因为'客户端'发送数据会进行'字符串的编码'再发送到服务器端,所以这里要'创建一个字符串解码器'StringDecoder
                  nioSocketChannel.pipeline().addLast(new StringDecoder());
                  //添加接收数据需要的处理器适配器
                  nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                      //重写通道的‘’读‘’方法,msg就是接收到的数据
                      @Override
                      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                          log.warn(msg.toString()); //打印数据
                          super.channelRead(ctx, msg);
                      }
                  });
                  log.info("initChannel end......");
              }
            })
        .bind(8082);
  }

客户端:

  private static final Logger log = LoggerFactory.getLogger(NettyClient.class);

  public static void main(String[] args) throws InterruptedException {

      //创建Netty客户端的启动器,装配Netty组件
    new Bootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioSocketChannel.class)
        //一旦执行这个应用立刻初始化,这个和childHandler有所不同
        //childHandler是需要socket连接上在初始化,这个不需要。。。。。
        .handler(
            new ChannelInitializer<Channel>() {
              @Override
              protected void initChannel(Channel channel) throws Exception {
                  log.info("initChannel start......");
                  //由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
                  //往通道流水线添加一个字符串编码器
                  channel.pipeline().addLast(new StringEncoder());
                  log.info("initChannel end......");
              }
            })
        // connect方法是“”异步“”的
        .connect("localhost", 8082)
        //坑点:由于connect方法是异步的,所以要同步。。。。。
        //由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。
        //一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步
        .sync()
        // 获取通道。然后发送数据
        .channel()
        .writeAndFlush("hello你好");
  }

Netty组件

查看CPU最大核心数

int hx = NettyRuntime.availableProcessors(); //cpu核心数
EventLoop

事件循环对象EventLoop

EventLoop本质是一个单线程执行器(同时维护了一个 Selector),里面有run方法处理一个或多个Channel上源源不断的io事件

事件循环组EventLoopGroup

EventLoopGroup是一组EventLoop,而每一个EventLoop都维护着一个selector,Channel 一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop。

      int count=3;
      EventLoopGroup ev=new NioEventLoopGroup(count);
      System.out.println(ev.next().hashCode());//1
      System.out.println(ev.next().hashCode());//2
      System.out.println(ev.next().hashCode());//3
      System.out.println(ev.next().hashCode());//4

通过上面的代码可以看出1和4是同一个对象,因为他们的hashCode相同。得出EventLoopGroup是一个线程池,里面装载着>1个的EventLoop,
EventLoop底层维护了一个线程和selector,而count可以指定EventLoopGroup的线程池大小。

EventLoop普通任务与定时任务

      EventLoopGroup ev=new NioEventLoopGroup(3);
      //普通任务
      ev.next().submit(()->{

          System.out.println("111");

      });

      System.out.println("222");

      //定时任务
      ev.next().scheduleAtFixedRate(()->{

          System.out.println("333");

      },0,1,TimeUnit.SECONDS);

关闭EventLoopGroup

      EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
      eventLoopGroup.shutdownGracefully(); //优雅的关闭EventLoopGroup

分工

      // Netty的服务器端启动器,装配Netty组件
      new ServerBootstrap()
               //******NioEventLoopGroup的分工合作,第一个NioEventLoopGroup处理accept事件
              //第二个NioEventLoopGroup处理读写事件
              .group(new NioEventLoopGroup(),new NioEventLoopGroup())
              // 通道
              .channel(NioServerSocketChannel.class)
              //“每一个”SocketChannel客户端连接上服务器端“都会”执行这个初始化器ChannelInitializer
              //但是每一个SocketChannel只能够让这个初始化器执行一次
              .childHandler(
                      new ChannelInitializer<NioSocketChannel>() {
                          @Override
                          protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                              log.info("initChannel start......");
                              //往处理器流水线pipeline添加处理器
                              //因为'客户端'发送数据会进行'字符串的编码'再发送到服务器端,所以这里要'创建一个字符串解码器'StringDecoder
                              nioSocketChannel.pipeline().addLast(new StringDecoder());
                              //添加接收数据需要的处理器适配器
                              nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                  //重写通道的‘’读‘’方法,msg就是接收到的数据
                                  @Override
                                  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                      log.warn(msg.toString()); //打印数据
                                      super.channelRead(ctx, msg);
                                  }
                              });
                              log.info("initChannel end......");
                          }
                      })
              .bind(8082);
Channel

Channel常用方法:

  • close()
    • 可以用来关闭Channel
  • closeFuture()
    • 用来处理 Channel 的关闭
  • pipeline()
    • 添加处理器
  • write()
    • 写入数据,只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
  • writeAndFlush()
    • 立即发送数据,相当于同时调用write和flush方法,好处是不用等缓存满了才能发出数据的问题

ChannelFuture

获取ChannelFuture

      //创建Netty客户端的启动器,装配Netty组件
      ChannelFuture channelFuture = new Bootstrap()
              .group(new NioEventLoopGroup())
              .channel(NioSocketChannel.class)
              //一旦执行这个应用立刻初始化,这个和childHandler有所不同
              //childHandler是需要socket连接上在初始化,这个不需要。。。。。
              .handler(
                      new ChannelInitializer<Channel>() {
                          @Override
                          protected void initChannel(Channel channel) throws Exception {
                              //由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
                              //往通道流水线添加一个字符串编码器
                              channel.pipeline().addLast(new StringEncoder());
                          }
                      })
              // connect方法是“”异步“”的
              .connect("localhost", 8082);

发送数据的两种方式

  • sync同步channelFuture再发送数据
  • channelFuture添加监听器

这两种方法本质上都是为了让channelFuture成功创建也就是connect方法完成调用之后才发送数据

      //创建Netty客户端的启动器,装配Netty组件
      ChannelFuture channelFuture = new Bootstrap()
              .group(new NioEventLoopGroup())
              .channel(NioSocketChannel.class)
              //一旦执行这个应用立刻初始化,这个和childHandler有所不同
              //childHandler是需要socket连接上在初始化,这个不需要。。。。。
              .handler(
                      new ChannelInitializer<Channel>() {
                          @Override
                          protected void initChannel(Channel channel) throws Exception {
                              //由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
                              //往通道流水线添加一个字符串编码器
                              channel.pipeline().addLast(new StringEncoder());
                          }
                      })
              // connect方法是“”异步“”的
              .connect("localhost", 8082);

      //"方法一":
      //由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。
      //一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步

//      channelFuture.sync();
//      Channel channel = channelFuture.channel();
//      channel.writeAndFlush("你好");

      //方法二:使用监听器,监听channelFuture是否完成连接。因为channelFuture只有connect完成之后才会创建
      //使用这种监听器方法就不需要sync进行同步了
      channelFuture.addListener(new ChannelFutureListener() {
          //当connect成功连接之后就会进入这个方法
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {

              Channel channel = future.channel();
              channel.writeAndFlush("operationComplete");
          }
      });

关闭通道channel

      EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
      //创建Netty客户端的启动器,装配Netty组件
      ChannelFuture channelFuture = new Bootstrap()
              .group(eventLoopGroup)
              .channel(NioSocketChannel.class)
              //一旦执行这个应用立刻初始化,这个和childHandler有所不同
              //childHandler是需要socket连接上在初始化,这个不需要。。。。。
              .handler(
                      new ChannelInitializer<Channel>() {
                          @Override
                          protected void initChannel(Channel channel) throws Exception {
                             //日志
                              channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                            //由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
                              //往通道流水线添加一个字符串编码器
                              channel.pipeline().addLast(new StringEncoder());
                          }
                      })
              // connect方法是“”异步“”的
              .connect("localhost", 8082);

      //"方法一":
      //由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。
      //一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步

//      channelFuture.sync();
//      Channel channel = channelFuture.channel();
//      channel.writeAndFlush("你好");

      //方法二:使用监听器,监听channelFuture是否完成连接。因为channelFuture只有connect完成之后才会创建
      //使用这种监听器方法就不需要sync进行同步了
      channelFuture.addListener(new ChannelFutureListener() {
          //当connect成功连接之后就会进入这个方法
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {

              Channel channel = future.channel();
              channel.writeAndFlush("operationComplete");
              //只有close之后才会调用下面的关闭监听器
              channel.close(); //关闭channel,这个关闭方法也是**异步**的,所以也需要进行监听

              ChannelFuture closeFuture = channel.closeFuture();

              //关闭通道监听器
              closeFuture.addListener(new ChannelFutureListener() {
                  @Override
                  public void operationComplete(ChannelFuture future) throws Exception {
                      log.info("已经关闭channel");
                      //关闭group
                      eventLoopGroup.shutdownGracefully();
                  }
              });

          }
      });
Future&Promise

Future都是用线程池去返回得到的,所以JDK Future需要依赖线程池,Netty Future需要依赖于EventLoopGroup

JDK Futhure和Netty Future、Netty Promise区别:

Netty的Future继承与JDK的Future,Netty Promise又对Netty Future进行扩展

  • JDK Future只能同步等待任务结束(或成功、或失败)才能得到结果,例如JDK Future的get是阻塞的获取结果
  • Netty Future既阻塞的获取结果,也可以非阻塞的获取结果,阻塞就是get,非阻塞就是getNow。
  • Netty Promise有Netty Future所有的功能且增加了几个方法,setSuccess、setFailure,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。

JDK Future

      ExecutorService executorService = Executors.newFixedThreadPool(2); //创建一个固定大小的线程池
      //Callable有返回值。
      Future<String> future = executorService.submit(new Callable<String>() {
          @Override
          public String call() throws Exception {
              Thread.sleep(1000);
              return "hello";
          }
      });

      String res = future.get(); //get方法会阻塞,直到线程池的submit执行完毕,返回了future对象才会解除阻塞
      System.out.println(res);
      executorService.shutdown(); //关闭线程池

Netty Future

      EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);
      Future<String> future = eventLoopGroup.next().submit(new Callable<String>() {
          @Override
          public String call() throws Exception {
              Thread.sleep(1000);
              return "Netty Future";
          }
      });

//      String s1 = future.get(); //阻塞方法,这个方法和jdk的future一样
//      System.out.println(s1);

      String s2 = future.getNow(); //非阻塞方法,如果future没有立刻返回值则不会等待,直接返回null
      System.out.println(s2);

Netty Promise

Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果

      NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
      EventLoop executors = eventLoopGroup.next();
      DefaultPromise<Integer> promise = new DefaultPromise<>(executors);
      new Thread(()->{

          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          promise.setSuccess(100);

      }).start();
      Integer res = promise.get();
      System.out.println(res);
Handler&Pipeline

服务端:

              new ServerBootstrap()
              .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<SocketChannel>() {

                  //pipeline结构
                  //head->handle1->handle2->handle3->handle4->handle5->handle6->tail
                  //且为‘双向链表’,触发Inbound事件则会从head->tail一直走Inbound方法。
                  //触发Outbound事件则会从tail->head一直走Outbound方法。只有触发了对应事件才会走对应的方法。。。。。。
                  @Override
                  protected void initChannel(SocketChannel socketChannel) throws Exception {

                      socketChannel.pipeline().addLast(new StringDecoder());

                      //Inbound处理器
                      //为处理器取名字
                      socketChannel.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              log.warn(Thread.currentThread().getName()+"==>"+"handle1");
                              super.channelRead(ctx, msg); //向下传递
                          }
                      });

                      socketChannel.pipeline().addLast("handle2",new ChannelInboundHandlerAdapter(){
                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              log.warn(msg.toString());
                              log.warn(Thread.currentThread().getName()+"==>"+"handle2");
                              super.channelRead(ctx, msg); //向下传递
                          }
                      });

                      socketChannel.pipeline().addLast("handle3",new ChannelInboundHandlerAdapter(){
                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                              //***不能用这种方法,client会收不到
//                              ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello world");


                              //***用这种,记住*****一定要指定字符类型UTF-8***
                              ByteBuf byteBuf = ctx.alloc().buffer().writeBytes("hello".getBytes("utf-8"));
                              //发送数据,触发OutBound事件
                              socketChannel.writeAndFlush(byteBuf);

                              log.warn(Thread.currentThread().getName()+"==>"+"handle3");
                              super.channelRead(ctx, msg); //向下传递
                          }
                      });

                      //Outbound处理器
                      socketChannel.pipeline().addLast("handle4",new ChannelOutboundHandlerAdapter(){

                          @Override
                          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                              log.warn(Thread.currentThread().getName()+"==>"+"handle4");
                              super.write(ctx, msg, promise);
                          }
                      });

                      socketChannel.pipeline().addLast("handle5",new ChannelOutboundHandlerAdapter(){

                          @Override
                          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                              log.warn(Thread.currentThread().getName()+"==>"+"handle5");
                              super.write(ctx, msg, promise);
                          }
                      });

                      socketChannel.pipeline().addLast("handle6",new ChannelOutboundHandlerAdapter(){

                          @Override
                          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                              log.warn(Thread.currentThread().getName()+"==>"+"handle6");
                              super.write(ctx, msg, promise);
                          }
                      });
                  }
              }).bind(8080);

客户端:

      NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
      ChannelFuture channelFuture = new Bootstrap()
              .group(eventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      ch.pipeline().addLast(new StringEncoder());
                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              System.out.println("--------------"+msg.toString());
                              super.channelRead(ctx, msg);
                          }
                      });

                  }
              }).connect("localhost", 8080);

      channelFuture.addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {

              Channel channel = future.channel();
              channel.writeAndFlush("client-----");
//              channel.close();
//              ChannelFuture closeFuture = channel.closeFuture();
//              closeFuture.addListener(new ChannelFutureListener() {
//                  @Override
//                  public void operationComplete(ChannelFuture future) throws Exception {
//                      eventLoopGroup.shutdownGracefully();
//                  }
//              });
          }
      });

通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler.

handler需要放入通道的pipeline中,才能根据放入顺序来使用handler:

  • pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler处理器
    • 要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
  • 当有入站(Inbound)操作时,会从head开始向tail方向调用handler,直到handler不是处理Inbound操作为止
  • 当有出站(Outbound)操作时,会从tail开始向head方向调用handler,直到handler不是处理Outbound操作为止

结构图:

高性能网络通信框架Netty一套就够(作者原创)

ByteBuf

创建ByteBuf

      //创建ByteBuf
      ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);

      log(byteBuf);

      StringBuffer stringBuffer = new StringBuffer();

    for (int i = 0; i < 50; i++) {
      stringBuffer.append('1');
    }
    byteBuf.writeBytes(stringBuffer.toString().getBytes("utf-8"));
    log(byteBuf);

运行结果:

read index:0 write index:0 capacity:10

read index:0 write index:50 capacity:64
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111|
|00000010| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111|
|00000020| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111|
|00000030| 31 31                                           |11              |
+--------+-------------------------------------------------+----------------+

Process finished with exit code 0

根据打印的capacity可知ByteBuf是会自动扩容的,而NIO的ByteBuffer是不能超出容量的。

public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
    static final int DEFAULT_INITIAL_CAPACITY = 256; //默认初始化容量
    static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE; //最大容量
    static final int DEFAULT_MAX_COMPONENTS = 16;

ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小

如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建

3种创建池化的ByteBuf方式

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf

      ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.heapBuffer(10);//指定创建‘’堆内存‘’的ByteBuf

      ByteBuf byteBuf3 = ByteBufAllocator.DEFAULT.directBuffer(10);//指定创建‘’直接内存‘’的ByteBuf

查看当前ByteBuf对象类型

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf

      ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.heapBuffer(10);//指定创建‘’堆内存‘’的ByteBuf

      ByteBuf byteBuf3 = ByteBufAllocator.DEFAULT.directBuffer(10);//指定创建‘’直接内存‘’的ByteBuf

      System.out.println(byteBuf1.getClass());
      System.out.println(byteBuf2.getClass());
      System.out.println(byteBuf3.getClass());

输出结果:

class io.netty.buffer.PooledUnsafeDirectByteBuf
class io.netty.buffer.PooledUnsafeHeapByteBuf
class io.netty.buffer.PooledUnsafeDirectByteBuf

池化和非池化

  • Netty4.1之前默认是非池化
  • Netty4.1之后默认是池化,但是Android平台默认是非池化

池化优点:

  • 本质上池化的意义就是可重用ByteBuf
    • 没有池化的话每次需要使用ByteBuf都要重新申请内存。即使是堆内存,释放内存也会增大GC的压力
    • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
    • 高并发下,池化更节约内存,减少内存溢出的可能。

IDEA IDE如何设置为非池化

只需要在IDEA IDE的VM options里面设置下面一段代码即可:

-Dio.netty.allocator.type={unpooled|pooled}

ByteBuf组成

  • 最大容量与当前容量
    • 在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE
    • 当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出java.lang.IndexOutOfBoundsException异常
  • 读写操作不同于ByteBuffer只用position进行控制,ByteBuf分别由读指针写指针两个指针控制。进行读写操作时,无需进行模式的切换
    • 读指针前的部分被称为废弃部分,是已经读过的内容
    • 读指针与写指针之间的空间称为可读部分
    • 写指针与当前容量之间的空间称为可写部分

高性能网络通信框架Netty一套就够(作者原创)

ByteBuf写入

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf
      
      byteBuf1.writeBytes("hello".getBytes("utf-8"));

write和set方法的区别:

ByteBuf中set开头的一系列方法,也可以写入数据,但不会改变写指针位置

ByteBuf的扩容机制

当ByteBuf中的当前容量无法容纳写入的数据时,会自动进行扩容

触发扩容:

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf
      log(byteBuf1);
      byteBuf1.writeBytes("helloaaaaaaaa".getBytes("utf-8"));
      log(byteBuf1);

结果:

read index:0 write index:0 capacity:10

read index:0 write index:13 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 61 61 61 61 61 61 61 61          |helloaaaaaaaa   |
+--------+-------------------------------------------------+----------------+

扩容机制如下:

有两种情况:

  • 写入后的数据小于512字节
    • 这种情况会选择使用16的整数倍进行扩容,比如写入后的数据是14字节,则16*1为最小整数倍,则会扩容到16字节
  • 写入后的数据大于512字节
    • 这种情况会以2的n次方扩容,例如写入后的数据是600字节,此时大于512字节,那么容纳它的容量为2的10次方,因为2的9次方是512容纳不了,所以会扩容到1024字节
    • 如果扩容后的大小大于maxCapacity,则会抛出java.lang.IndexOutOfBoundsException异常

ByteBuf读取

读取后会移动读指针

      ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);

      byteBuf.writeBytes("hello".getBytes("utf-8"));

      byte b[]=new byte[5];
      byteBuf.readBytes(b);

     System.out.println(Arrays.toString(b));

ByteBuf以get开头的方法,这些方法不会改变读指针的位置

ByteBuf日志工具类
    public class ByteBufferUtil {
    private static final char[] BYTE2CHAR = new char[256];
    private static final char[] HEXDUMP_TABLE = new char[256 * 4];
    private static final String[] HEXPADDING = new String[16];
    private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
    private static final String[] BYTE2HEX = new String[256];
    private static final String[] BYTEPADDING = new String[16];

    static {
        final char[] DIGITS = "0123456789abcdef".toCharArray();
        for (int i = 0; i < 256; i++) {
            HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
            HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
        }

        int i;

        // Generate the lookup table for hex dump paddings
        for (i = 0; i < HEXPADDING.length; i++) {
            int padding = HEXPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding * 3);
            for (int j = 0; j < padding; j++) {
                buf.append("   ");
            }
            HEXPADDING[i] = buf.toString();
        }

        // Generate the lookup table for the start-offset header in each row (up to 64KiB).
        for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
            StringBuilder buf = new StringBuilder(12);
            buf.append(StringUtil.NEWLINE);
            buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
            buf.setCharAt(buf.length() - 9, '|');
            buf.append('|');
            HEXDUMP_ROWPREFIXES[i] = buf.toString();
        }

        // Generate the lookup table for byte-to-hex-dump conversion
        for (i = 0; i < BYTE2HEX.length; i++) {
            BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
        }

        // Generate the lookup table for byte dump paddings
        for (i = 0; i < BYTEPADDING.length; i++) {
            int padding = BYTEPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding);
            for (int j = 0; j < padding; j++) {
                buf.append(' ');
            }
            BYTEPADDING[i] = buf.toString();
        }

        // Generate the lookup table for byte-to-char conversion
        for (i = 0; i < BYTE2CHAR.length; i++) {
            if (i <= 0x1f || i >= 0x7f) {
                BYTE2CHAR[i] = '.';
            } else {
                BYTE2CHAR[i] = (char) i;
            }
        }
    }

    /**
     * 打印所有内容
     * @param buffer
     */
    public static void debugAll(ByteBuffer buffer) {
        int oldlimit = buffer.limit();
        buffer.limit(buffer.capacity());
        StringBuilder origin = new StringBuilder(256);
        appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
        System.out.println("+--------+-------------------- all ------------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
        System.out.println(origin);
        buffer.limit(oldlimit);
    }

    /**
     * 打印可读取内容
     * @param buffer
     */
    public static void debugRead(ByteBuffer buffer) {
        StringBuilder builder = new StringBuilder(256);
        appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
        System.out.println("+--------+-------------------- read -----------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
        System.out.println(builder);
    }

    private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
        if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {
            throw new IndexOutOfBoundsException(
                    "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
                            + ") <= " + "buf.capacity(" + buf.capacity() + ')');
        }
        if (length == 0) {
            return;
        }
        dump.append(
                "         +-------------------------------------------------+" +
                        StringUtil.NEWLINE + "         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +
                        StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");

        final int startIndex = offset;
        final int fullRows = length >>> 4;
        final int remainder = length & 0xF;

        // Dump the rows which have 16 bytes.
        for (int row = 0; row < fullRows; row++) {
            int rowStartIndex = (row << 4) + startIndex;

            // Per-row prefix.
            appendHexDumpRowPrefix(dump, row, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + 16;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append(" |");

            // ASCII dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append('|');
        }

        // Dump the last row which has less than 16 bytes.
        if (remainder != 0) {
            int rowStartIndex = (fullRows << 4) + startIndex;
            appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + remainder;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append(HEXPADDING[remainder]);
            dump.append(" |");

            // Ascii dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append(BYTEPADDING[remainder]);
            dump.append('|');
        }

        dump.append(StringUtil.NEWLINE +
                "+--------+-------------------------------------------------+----------------+");
    }

    private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
        if (row < HEXDUMP_ROWPREFIXES.length) {
            dump.append(HEXDUMP_ROWPREFIXES[row]);
        } else {
            dump.append(StringUtil.NEWLINE);
            dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
            dump.setCharAt(dump.length() - 9, '|');
            dump.append('|');
        }
    }

    public static short getUnsignedByte(ByteBuffer buffer, int index) {
        return (short) (buffer.get(index) & 0xFF);
    }

    public static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder buf = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index:").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(NEWLINE);
        io.netty.buffer.ByteBufUtil.appendPrettyHexDump(buf, buffer);
        System.out.println(buf.toString());
    }
}

ByteBuf的释放

由于ByteBuf中有堆外内存(直接内存)的实现,堆外内存最好是手动来释放,而不是等GC来进行垃圾回收。

  • UnpooledHeapByteBuf使用的是JVM内存,只需等GC回收内存即可。
  • UnpooledDirectByteBuf使用的是直接内存,需要特殊的方法来回收内存
  • PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口

具体如下:

  • 新创建的ByteBuf默认计数为1
  • 调用release方法会使计数-1如果计数为0,则内存将会被回收
  • 调用retain方法会使计数+1,表示调用者没用完之前,其它handler即使调用了release也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用。

ByteBuf内存释放规则是:谁最后使用这块内存,谁就要调用release方法进行释放。

  • 入站(Inbound处理器链)ByteBuf处理原则:
    • 可以遵循谁最后使用内存谁就release。也可以让尾释放内存。
      • 我们知道Inbound是从head->tail,所以tail是入站的终点,TailContext也会处理内存释放的问题。
  • 出站(Outbound处理器链)ByteBuf处理原则
    • 可以遵循谁最后使用内存谁就release。也可以让头释放内存。
    • 我们知道Outbound是从tail->head,所以head是出站的终点,HeadContext也会处理内存释放的问题。
  • 有时候不清楚ByteBuf的计数是多少次,但又必须彻底释放,可以循环调用release直到返回true
while (!buffer.release()) {}

内存释放源码

public interface ReferenceCounted {
    ReferenceCounted retain();
    /**
         * Decreases the reference count by {@code 1} and deallocates this object if the reference count reaches at
         * {@code 0}.
         *
         * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
         */
    boolean release();
}

从注释可以看出,让release成功释放内存后将会返回true。

头尾释放内存源码:

    /**
     * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
     * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
     * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
     */
    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    /**
     * Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
     * If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
     */
    public static boolean release(Object msg) {
        if (msg instanceof ReferenceCounted) {
            return ((ReferenceCounted) msg).release();
        }
        return false;
    }

使用被释放的内存会怎样

     ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);

      byteBuf.writeBytes("helloWorld".getBytes("utf-8"));

      byteBuf.release(); //释放内存
      ByteBufferUtil.log(byteBuf);

结果:

Exception in thread "main" io.netty.util.IllegalReferenceCountException: refCnt: 0

注意:一旦ByteBuf的计数到0,再进行retain也没用

      ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);

      byteBuf.writeBytes("helloWorld".getBytes("utf-8"));

      byteBuf.release(); //-1
      byteBuf.retain(); //+1
      ByteBufferUtil.log(byteBuf);

结果

Exception in thread "main" io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1

内存切片slice

public abstract ByteBuf slice(int index, int length);
  • ByteBuf的内存切片也是零拷贝的体现之一,切片后的内存还是原来ByteBuf的内存,过程没有发生过内存复制,切片后的 ByteBuf 维护独立的 read,write 指针.
  • 切片后的ByteBuf需要调用retain使计数+1,防止原来的ByetBuf调用release释放内存导致切片的内存不可用。
  • 修改原ByteBuf中的值,也会影响切片后得到的ByteBuf。

代码案例:

      ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);

      byteBuf.writeBytes("helloWorld".getBytes("utf-8"));

      ByteBufferUtil.log(byteBuf);

      ByteBuf buf = byteBuf.slice(0, 5); //内存分片

      ByteBufferUtil.log(buf);

      System.out.println("---------------");

      buf.setByte(1,'g'); //修改分片内存的值

      //重新打印
      ByteBufferUtil.log(byteBuf);

      ByteBufferUtil.log(buf);

结果:

read index:0 write index:10 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 57 6f 72 6c 64                   |helloWorld      |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+
---------------
read index:0 write index:10 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 67 6c 6c 6f 57 6f 72 6c 64                   |hglloWorld      |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 67 6c 6c 6f                                  |hgllo           |
+--------+-------------------------------------------------+----------------+

Process finished with exit code 0

结论:可以看出修改分片的内存的值,原内存也会受到影响,因为他们都是用同一块内存。

ByteBuf优势

  • 池化思想-可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如slice、duplicate、CompositeByteBuf

Netty进阶

粘包和半包/拆包问题

粘包问题演示

服务器端:

  private static final Logger log= LoggerFactory.getLogger(NettyServer.class);

  public static void main(String[] args) {

      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup(6);

      new ServerBootstrap()
              .group(boss,worker)
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {

                      //不进行加解密不然展示不出粘包效果
//                      ch.pipeline().addLast(new StringDecoder());

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {
                              log.info("客户端已成功连接服务器");
                              super.channelActive(ctx);
                          }

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                              log.info("msg={}",msg);
                              super.channelRead(ctx, msg);
                          }
                      });


                  }
              }).bind(8080);
  }

客户端:

  private static final Logger log= LoggerFactory.getLogger(NettyClient.class);

  public static void main(String[] args) {

    NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    ChannelFuture channelFuture = new Bootstrap()
            .group(eventLoopGroup)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<Channel>() {
              @Override
              protected void initChannel(Channel ch) throws Exception {

                //不进行加解密不然展示不出粘包效果
//                ch.pipeline().addLast(new StringEncoder());
                ch.pipeline().addLast(new LoggingHandler());



              }
            }).connect("localhost", 8080);

    channelFuture.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {

        Channel channel = future.channel();

        ByteBuf byteBuf = channel.alloc().buffer(16);
        for (int i=0;i<10;i++){
          byteBuf.retain();
          byteBuf.writeBytes(("hello").getBytes("utf-8"));
          channel.writeAndFlush(byteBuf);
          byteBuf.clear();
        }

        channel.close();

        ChannelFuture closeFuture = channel.closeFuture();
        closeFuture.addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
            eventLoopGroup.shutdownGracefully();
          }
        });
      }
    });

  }

服务器端输出结果:

16:00:37.869 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xa191631f, L:/127.0.0.1:8080 - R:/127.0.0.1:53693] READ: 50B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 |hellohellohelloh|
|00000010| 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 |ellohellohellohe|
|00000020| 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c |llohellohellohel|
|00000030| 6c 6f                                           |lo              |
+--------+-------------------------------------------------+----------------+

可以看出原来我们是在客户端分10次发送,而服务器端却一下把10次的数据都粘在一起了,这就是粘包问题。

半包问题展示

服务器端:

  private static final Logger log= LoggerFactory.getLogger(NettyServer.class);

  public static void main(String[] args) {

      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup(6);

    new ServerBootstrap()
        .group(boss, worker)
        .channel(NioServerSocketChannel.class)
        // 半包问题:例如,发送方发送100字节数据,而接收方最多只能接收30字节数据,这就是半包问题
            //option(ChannelOption.SO_RCVBUF,10),调整接收缓冲区大小(滑动窗口)
        .option(ChannelOption.SO_RCVBUF,10)
        .childHandler(
            new ChannelInitializer<NioSocketChannel>() {
              @Override
              protected void initChannel(NioSocketChannel ch) throws Exception {

                // 不进行加解密不然展示不出粘包效果
                //                      ch.pipeline().addLast(new StringDecoder());

                ch.pipeline().addLast(new LoggingHandler());

                ch.pipeline()
                    .addLast(
                        new ChannelInboundHandlerAdapter() {

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.info("客户端已成功连接服务器");
                            super.channelActive(ctx);
                          }

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg)
                              throws Exception {

                            log.info("msg={}", msg);
                            super.channelRead(ctx, msg);
                          }
                        });
              }
            })
        .bind(8080);
  }

只需使用这个方法即可

  • option(ChannelOption.SO_RCVBUF,10)

option(ChannelOption.SO_RCVBUF,10),调整接收缓冲区大小。由于接收缓存区的大小<发送方发送的数据大小,所以产生了半包问题。

现象分析

粘包:

  • 产生现象
    • 第一次发送abc,第二次发送def,接收到的是一整个abcdef
  • 原因
    • Netty层
      • 接收方的接收缓冲区太大,Netty的接收缓冲区默认是1024字节
    • 网络层
      • TCP滑动窗口:假如发送方发送100字节数据,而滑动窗口缓冲区可容纳>100字节数据,这时候就会出现粘包问题。
      • Nagle 算法:会造成粘包

半包/拆包:

  • 产生现象
    • 发送abcdef数据,接收方第一次收到ab,第二次收到cd,第三次收到ef
  • 原因
    • Netty层
      • 接收方的接收缓冲区太小,发送方的数据过大,导致接收方无法一次接收下所有数据,就会半包/拆包
    • 网络层
      • 滑动窗口:假设接收方的窗口只剩了128bytes,发送方的报文大小是256bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前128bytes,等待ack后才能发送剩余部分,这就造成了半包
    • 数据链路层
      • MSS 限制:当发送的数据超过MSS限制后,会将数据切分发送,就会造成半包

发送这些问题的本质:因为 TCP 是流式协议,消息无边界

粘包和半包/拆包解决方案

短连接

短连接:即每发送一条数据就重新连接再次发送,反复此操作。

短连接的缺点是显而易见的,每次发送一条数据都要重新连接,这样会大大的浪费时间,因为连接是需要时间的。

客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。
这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。
但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象

采用短连接解决粘包代码

服务端:

  private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

  public static void main(String[] args) {

    NioEventLoopGroup boss = new NioEventLoopGroup(1);
    NioEventLoopGroup worker = new NioEventLoopGroup(6);

    new ServerBootstrap()
        .group(boss, worker)
        .channel(NioServerSocketChannel.class)
        .childHandler(
            new ChannelInitializer<NioSocketChannel>() {
              @Override
              protected void initChannel(NioSocketChannel ch) throws Exception {

                ch.pipeline().addLast(new LoggingHandler());

                ch.pipeline()
                    .addLast(
                        new ChannelInboundHandlerAdapter() {

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.info("客户端已成功连接服务器");
                            super.channelActive(ctx);
                          }

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              log.info("msg="+msg);
                              super.channelRead(ctx, msg);
                            }
                        });
              }
            })
        .bind(8080);
  }

客户端:

  private static final Logger log= LoggerFactory.getLogger(NettyClient.class);

  public static void main(String[] args) {

    //采用短连接解决“”粘包“”问题,无法解决半包问题
    for (int i = 0; i < 10; i++) {
      sendMessage("hello");
    }


  }


  public static void sendMessage(String msg){

     NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
     ChannelFuture channelFuture = new Bootstrap()
            .group(eventLoopGroup)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<Channel>() {
              @Override
              protected void initChannel(Channel ch) throws Exception {

                //不进行加解密不然展示不出粘包效果
//                ch.pipeline().addLast(new StringEncoder());
                ch.pipeline().addLast(new LoggingHandler());

                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                  @Override
                  public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    ByteBuf buffer = ctx.alloc().buffer(16);
                    buffer.writeBytes(msg.getBytes("utf-8"));
                    ch.writeAndFlush(buffer);
                    ch.close();
                    ChannelFuture closeFuture = ch.closeFuture();
                    closeFuture.addListener(new ChannelFutureListener() {
                      @Override
                      public void operationComplete(ChannelFuture future) throws Exception {
                        eventLoopGroup.shutdownGracefully();
                      }
                    });
                  }
                });

              }
            }).connect("localhost", 8080);


  }
定长解码器

客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度。
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder对数据进行定长解码

行解码器(推荐)

对于其他解码器,我还是更喜欢行解码器。行解码器主要是靠分隔符\n来判断行进行解码,不过需要进行限制长度,以免服务器一直搜索\n造成卡死。

改造前的粘包代码

服务端:

      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup(6);
      new ServerBootstrap()
              .group(boss,worker)
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              log.info("msg={}",msg);
                              super.channelRead(ctx, msg);
                          }
                      });

                  }
              }).bind(8080);

客户端:

     NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
      try{
      new Bootstrap()
              .group(nioEventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {

                              ByteBuf buffer = ctx.alloc().buffer(16);

                              for(int i=0;i<10;i++){
                                  buffer.retain();
                                  buffer.writeBytes("hello world".getBytes("utf-8"));
                                  ctx.channel().writeAndFlush(buffer);
                              }

                              ch.close();//关闭Channel
                              ChannelFuture closeFuture = ch.closeFuture();
                              closeFuture.addListener(new ChannelFutureListener() {
                                  @Override
                                  public void operationComplete(ChannelFuture future) throws Exception {
                                      nioEventLoopGroup.shutdownGracefully();
                                  }
                              });
                          }
                      });
                  }
              }).connect("localhost",8080);
      }catch (Exception e){
          e.printStackTrace();
      }

结果:

13:37:15.286 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x36ce6c5f, L:/127.0.0.1:8080 - R:/127.0.0.1:64550] READ: 110B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f |hello worldhello|
|00000010| 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c | worldhello worl|
|00000020| 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c |dhello worldhell|
|00000030| 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 |o worldhello wor|
|00000040| 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c |ldhello worldhel|
|00000050| 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f |lo worldhello wo|
|00000060| 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c 64       |rldhello world  |
+--------+-------------------------------------------------+----------------+

接收方使用行解码器改造后

服务端:

  private static final Logger log= LoggerFactory.getLogger(NettyServer.class);

  public static void main(String[] args) {

      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup(6);
      new ServerBootstrap()
              .group(boss,worker)
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {

                      ch.pipeline().addLast(new LineBasedFrameDecoder(1024));//配置行解码器
                      
                      ch.pipeline().addLast(new LoggingHandler());
                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              log.info("msg={}",msg);
                              super.channelRead(ctx, msg);
                          }
                      });

                  }
              }).bind(8080);

客户端:

 //把消息加工成可以被行解码器识别的消息
    private static String getMsg(String oldMsg){

        oldMsg+='\n';
        return oldMsg;
    }

  public static void main(String[] args) {

      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
      try{
      new Bootstrap()
              .group(nioEventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {

                              ByteBuf buffer = ctx.alloc().buffer(16);

                              for(int i=0;i<10;i++){
                                  buffer.retain();
                                  String msg = getMsg("hello world");
                                  buffer.writeBytes(msg.getBytes("utf-8"));
                                  ctx.channel().writeAndFlush(buffer);
                                  //清理缓存,防止数据堆叠
                                  buffer.clear();
                              }

                              ch.close();//关闭Channel
                              ChannelFuture closeFuture = ch.closeFuture();
                              closeFuture.addListener(new ChannelFutureListener() {
                                  @Override
                                  public void operationComplete(ChannelFuture future) throws Exception {
                                      nioEventLoopGroup.shutdownGracefully();
                                  }
                              });
                          }
                      });
                  }
              }).connect("localhost",8080);
      }catch (Exception e){
          e.printStackTrace();
      }

  }

输出结果:

13:47:15.199 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] REGISTERED
13:47:15.199 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] ACTIVE
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+
13:47:15.224 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 12, cap: 2048))
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 12, cap: 2048)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+
13:47:15.224 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 2048))
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 2048)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+
13:47:15.224 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 2048))
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 2048)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE
13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 1024))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 1024))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 36, cap: 1024))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 36, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 36, cap: 1024))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 36, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 36, widx: 36, cap: 1024))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 36, widx: 36, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+
13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 512))
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 512)) that reached at the tail of the pipeline. Please check your pipeline configuration.
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].
13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+

可以看出已经解决了粘包问题。

自定义分隔符解码器

核心类DelimiterBasedFrameDecoder

    /**
     * Creates a new instance.
     *
     * @param maxFrameLength  the maximum length of the decoded frame.
     *                        A {@link TooLongFrameException} is thrown if
     *                        the length of the frame exceeds this value.
     * @param delimiter  the delimiter
     */
    public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
        this(maxFrameLength, true, delimiter);
    }

服务端:

  private static final Logger log= LoggerFactory.getLogger(NettyServer.class);

  public static void main(String[] args) {

      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup(6);
      new ServerBootstrap()
              .group(boss,worker)
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {

                      ByteBuf delimiter = ch.alloc().buffer(6);
                      delimiter.writeBytes("\r".getBytes("utf-8")); //自定义分隔符
                      ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));

                      ch.pipeline().addLast(new LoggingHandler());
                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              log.info("msg={}",msg);
                              super.channelRead(ctx, msg);
                          }
                      });

                  }
              }).bind(8080);
  }

客户端:

 //把消息加工成可以被行解码器识别的消息
    private static String getMsg(String oldMsg){

        oldMsg+='\r';
        return oldMsg;
    }

  public static void main(String[] args) {

      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
      try{
      new Bootstrap()
              .group(nioEventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {

                              ByteBuf buffer = ctx.alloc().buffer(16);

                              for(int i=0;i<10;i++){
                                  buffer.retain();
                                  String msg = getMsg("hello world");
                                  buffer.writeBytes(msg.getBytes("utf-8"));
                                  ctx.channel().writeAndFlush(buffer);
                                  //清理缓存,防止数据堆叠
                                  buffer.clear();
                              }

                              ch.close();//关闭Channel
                              ChannelFuture closeFuture = ch.closeFuture();
                              closeFuture.addListener(new ChannelFutureListener() {
                                  @Override
                                  public void operationComplete(ChannelFuture future) throws Exception {
                                      nioEventLoopGroup.shutdownGracefully();
                                  }
                              });
                          }
                      });
                  }
              }).connect("localhost",8080);
      }catch (Exception e){
          e.printStackTrace();
      }

  }

Netty协议解析

Redis协议

我们要用netty执行Redis命令就需要遵循Redis协议。

Redis协议格式

*<参数数量> \r\n
$<参数1的字节数量> \r\n
<参数1的数据> \r\n
...
$<参数N的字节数量> \r\n
<参数N的数据> \r\n

使用Netty搭建一个Redis client

/**
 * @author 游政杰
 * @date 2022/1/13
 * 模拟 Redis client
 */
public class RedisSender {

    //true为继续循环,false是退出循环
  private static ThreadLocal<Boolean> threadLocal=new ThreadLocal<Boolean>();
  private static final Logger log= LoggerFactory.getLogger(RedisSender.class);

  public static void main(String[] args) {

      //Netty“”客户端“”执行Redis命令
      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
      try{

      new Bootstrap()
              .group(nioEventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      threadLocal.set(true); //默认是继续循环

                      ch.pipeline().addLast(new LoggingHandler());


                      ch.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){

                          //连接成功之后调用
                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {

                              Scanner sc = new Scanner(System.in);
                              for (;;){
                                  if(!threadLocal.get()){
                                      System.out.println("退出成功");
                                      break;
                                  }
                                  printInfo();
                                  String sel = sc.next();
                                  switch (sel)
                                  {
                                      case "a":
                                          sc.nextLine(); //***上面会传下来字符,导致无法输入字符串,所以要加上这句,目的是吸收上面传下来的多余字符串
                                          System.out.println("请输入Redis命令[以单空格分隔]:");
                                          String redis_cmd = sc.nextLine();
                                          String decodeProtocol = decodeProtocol(redis_cmd);
                                          ByteBuf buffer = ctx.alloc().buffer(16);
                                          buffer.writeBytes(decodeProtocol.getBytes("utf-8"));
                                          ctx.writeAndFlush(buffer);
                                          buffer.clear();
                                          break;
                                      case "q":
                                          ch.close();
                                          ChannelFuture closeFuture = ch.closeFuture();
                                          closeFuture.addListener(new ChannelFutureListener() {
                                              @Override
                                              public void operationComplete(ChannelFuture future) throws Exception {
                                                  nioEventLoopGroup.shutdownGracefully();
                                                  threadLocal.set(false); //退出只需要设置为false即可
                                              }
                                          });
                                          break;
                                      default:
                                          System.out.println("无该选项");
                                          break;
                                  }
                              }


                          }
                      });
                  }
              }).connect("localhost",6379);
      }catch (Exception e){
          e.printStackTrace();
          nioEventLoopGroup.shutdownGracefully();
      }

  }

  private static void printInfo()
  {
    System.out.println("请输入以下字符选项:");
    System.out.println("输入a:执行Redis命令");
    System.out.println("输入q:退出");
  }


    /**
     * 协议解析
     * @param redis_cmd 命令
     * @return
     */
    //set myname abc
    //del key
    //get key
  private static synchronized String decodeProtocol(String redis_cmd){
      String delimiter1="*";
      String delimiter2="$";
      String delimiter3="\r\n";
      StringBuffer decodeCmd = new StringBuffer();//使用线程安全的StringBuffer
      List<String> cmd = Arrays.asList(redis_cmd.split(" "));
      decodeCmd.append(delimiter1+cmd.size()+delimiter3);
      cmd.forEach((e)->{

          decodeCmd.append(delimiter2+e.length()+delimiter3);
          decodeCmd.append(e+delimiter3);
      });

      return decodeCmd.toString();
  }

}

Http协议

http服务端:

public class HttpServer {
    //http服务器
  public static void main(String[] args) {


      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup(6);
      new ServerBootstrap()
              .group(boss,worker)
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<NioSocketChannel>() {

                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());
                      //netty自带的http协议转换
                      ch.pipeline().addLast(new HttpServerCodec());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                              //msg有两种类型
                              //class io.netty.handler.codec.http.DefaultHttpRequest
                              //class io.netty.handler.codec.http.LastHttpContent$1

                              if(msg instanceof HttpRequest){

                                  HttpRequest request=(HttpRequest)msg;

                                  //输出响应DefaultFullHttpResponse(HttpVersion version, HttpResponseStatus status)
                                  DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);

                                  String s="hello 2022";
                                  byte b[]=s.getBytes("utf-8");
                                  //从请求头设置响应数据长度,以免浏览器空转
                                  //content_length是io.netty.handler.codec.http包下的类
                                  response.headers().setInt(CONTENT_LENGTH,b.length);

                                  //输出内容
                                  response.content().writeBytes(b);

                                  ctx.channel().writeAndFlush(response);
                              }

                              super.channelRead(ctx, msg);
                          }
                      });
                  }
              }).bind("localhost",8080);


  }
}

高性能网络通信框架Netty一套就够(作者原创)

输出结果:

16:03:05.785 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] REGISTERED
16:03:05.785 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] ACTIVE
16:03:05.788 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] READ: 756B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 20 48 54 54 50 2f 31 2e 31 0d 0a |GET / HTTP/1.1..|
|00000010| 48 6f 73 74 3a 20 6c 6f 63 61 6c 68 6f 73 74 3a |Host: localhost:|
|00000020| 38 30 38 30 0d 0a 55 73 65 72 2d 41 67 65 6e 74 |8080..User-Agent|
|00000030| 3a 20 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 57 |: Mozilla/5.0 (W|
|00000040| 69 6e 64 6f 77 73 20 4e 54 20 31 30 2e 30 3b 20 |indows NT 10.0; |
|00000050| 57 69 6e 36 34 3b 20 78 36 34 3b 20 72 76 3a 39 |Win64; x64; rv:9|
|00000060| 35 2e 30 29 20 47 65 63 6b 6f 2f 32 30 31 30 30 |5.0) Gecko/20100|
|00000070| 31 30 31 20 46 69 72 65 66 6f 78 2f 39 35 2e 30 |101 Firefox/95.0|
|00000080| 0d 0a 41 63 63 65 70 74 3a 20 74 65 78 74 2f 68 |..Accept: text/h|
|00000090| 74 6d 6c 2c 61 70 70 6c 69 63 61 74 69 6f 6e 2f |tml,application/|
|000000a0| 78 68 74 6d 6c 2b 78 6d 6c 2c 61 70 70 6c 69 63 |xhtml+xml,applic|
|000000b0| 61 74 69 6f 6e 2f 78 6d 6c 3b 71 3d 30 2e 39 2c |ation/xml;q=0.9,|
|000000c0| 69 6d 61 67 65 2f 61 76 69 66 2c 69 6d 61 67 65 |image/avif,image|
|000000d0| 2f 77 65 62 70 2c 2a 2f 2a 3b 71 3d 30 2e 38 0d |/webp,*/*;q=0.8.|
|000000e0| 0a 41 63 63 65 70 74 2d 4c 61 6e 67 75 61 67 65 |.Accept-Language|
|000000f0| 3a 20 7a 68 2d 43 4e 2c 7a 68 3b 71 3d 30 2e 38 |: zh-CN,zh;q=0.8|
|00000100| 2c 7a 68 2d 54 57 3b 71 3d 30 2e 37 2c 7a 68 2d |,zh-TW;q=0.7,zh-|
|00000110| 48 4b 3b 71 3d 30 2e 35 2c 65 6e 2d 55 53 3b 71 |HK;q=0.5,en-US;q|
|00000120| 3d 30 2e 33 2c 65 6e 3b 71 3d 30 2e 32 0d 0a 41 |=0.3,en;q=0.2..A|
|00000130| 63 63 65 70 74 2d 45 6e 63 6f 64 69 6e 67 3a 20 |ccept-Encoding: |
|00000140| 67 7a 69 70 2c 20 64 65 66 6c 61 74 65 0d 0a 43 |gzip, deflate..C|
|00000150| 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 70 2d |onnection: keep-|
|00000160| 61 6c 69 76 65 0d 0a 43 6f 6f 6b 69 65 3a 20 48 |alive..Cookie: H|
|00000170| 6d 5f 6c 76 74 5f 62 33 39 33 64 31 35 33 61 65 |m_lvt_b393d153ae|
|00000180| 62 32 36 62 34 36 65 39 34 33 31 66 61 62 61 66 |b26b46e9431fabaf|
|00000190| 30 66 36 31 39 30 3d 31 36 32 31 39 30 35 38 35 |0f6190=162190585|
|000001a0| 30 3b 20 49 64 65 61 2d 33 35 32 63 36 33 39 66 |0; Idea-352c639f|
|000001b0| 3d 31 32 37 64 31 61 65 35 2d 34 34 31 37 2d 34 |=127d1ae5-4417-4|
|000001c0| 61 62 36 2d 61 61 64 33 2d 36 32 36 62 66 38 36 |ab6-aad3-626bf86|
|000001d0| 34 62 62 62 33 3b 20 55 4d 5f 64 69 73 74 69 6e |4bbb3; UM_distin|
|000001e0| 63 74 69 64 3d 31 37 61 61 65 35 65 65 63 33 34 |ctid=17aae5eec34|
|000001f0| 35 31 30 2d 30 39 39 37 35 66 34 36 64 62 65 63 |510-09975f46dbec|
|00000200| 33 38 38 2d 34 63 33 65 32 35 37 61 2d 31 34 34 |388-4c3e257a-144|
|00000210| 30 30 30 2d 31 37 61 61 65 35 65 65 63 33 36 33 |000-17aae5eec363|
|00000220| 61 62 3b 20 43 4e 5a 5a 44 41 54 41 31 32 35 38 |ab; CNZZDATA1258|
|00000230| 35 36 36 39 36 33 3d 31 36 31 32 35 36 38 34 34 |566963=161256844|
|00000240| 38 2d 31 36 32 36 34 32 32 37 30 36 2d 25 37 43 |8-1626422706-%7C|
|00000250| 31 36 32 36 34 32 38 31 35 30 0d 0a 55 70 67 72 |1626428150..Upgr|
|00000260| 61 64 65 2d 49 6e 73 65 63 75 72 65 2d 52 65 71 |ade-Insecure-Req|
|00000270| 75 65 73 74 73 3a 20 31 0d 0a 53 65 63 2d 46 65 |uests: 1..Sec-Fe|
|00000280| 74 63 68 2d 44 65 73 74 3a 20 64 6f 63 75 6d 65 |tch-Dest: docume|
|00000290| 6e 74 0d 0a 53 65 63 2d 46 65 74 63 68 2d 4d 6f |nt..Sec-Fetch-Mo|
|000002a0| 64 65 3a 20 6e 61 76 69 67 61 74 65 0d 0a 53 65 |de: navigate..Se|
|000002b0| 63 2d 46 65 74 63 68 2d 53 69 74 65 3a 20 6e 6f |c-Fetch-Site: no|
|000002c0| 6e 65 0d 0a 53 65 63 2d 46 65 74 63 68 2d 55 73 |ne..Sec-Fetch-Us|
|000002d0| 65 72 3a 20 3f 31 0d 0a 43 61 63 68 65 2d 43 6f |er: ?1..Cache-Co|
|000002e0| 6e 74 72 6f 6c 3a 20 6d 61 78 2d 61 67 65 3d 30 |ntrol: max-age=0|
|000002f0| 0d 0a 0d 0a                                     |....            |
+--------+-------------------------------------------------+----------------+
16:03:05.789 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] WRITE: 49B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 31 30 0d 0a 0d 0a 68 65 6c 6c 6f 20 32 30 32 | 10....hello 202|
|00000030| 32                                              |2               |
+--------+-------------------------------------------------+----------------+
16:03:05.789 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] FLUSH
16:03:05.789 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
GET / HTTP/1.1
Host: localhost:8080
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8
Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2
Accept-Encoding: gzip, deflate
Connection: keep-alive
Cookie: Hm_lvt_b393d153aeb26b46e9431fabaf0f6190=1621905850; Idea-352c639f=127d1ae5-4417-4ab6-aad3-626bf864bbb3; UM_distinctid=17aae5eec34510-09975f46dbec388-4c3e257a-144000-17aae5eec363ab; CNZZDATA1258566963=1612568448-1626422706-%7C1626428150
Upgrade-Insecure-Requests: 1
Sec-Fetch-Dest: document
Sec-Fetch-Mode: navigate
Sec-Fetch-Site: none
Sec-Fetch-User: ?1
Cache-Control: max-age=0 that reached at the tail of the pipeline. Please check your pipeline configuration.
16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LoggingHandler#0, HttpServerCodec#0, HttpServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486].
16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message EmptyLastHttpContent that reached at the tail of the pipeline. Please check your pipeline configuration.
16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LoggingHandler#0, HttpServerCodec#0, HttpServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486].
16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] READ COMPLETE
16:03:05.809 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] READ: 693B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
|00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.|
|00000030| 0a 55 73 65 72 2d 41 67 65 6e 74 3a 20 4d 6f 7a |.User-Agent: Moz|
|00000040| 69 6c 6c 61 2f 35 2e 30 20 28 57 69 6e 64 6f 77 |illa/5.0 (Window|
|00000050| 73 20 4e 54 20 31 30 2e 30 3b 20 57 69 6e 36 34 |s NT 10.0; Win64|
|00000060| 3b 20 78 36 34 3b 20 72 76 3a 39 35 2e 30 29 20 |; x64; rv:95.0) |
|00000070| 47 65 63 6b 6f 2f 32 30 31 30 30 31 30 31 20 46 |Gecko/20100101 F|
|00000080| 69 72 65 66 6f 78 2f 39 35 2e 30 0d 0a 41 63 63 |irefox/95.0..Acc|
|00000090| 65 70 74 3a 20 69 6d 61 67 65 2f 61 76 69 66 2c |ept: image/avif,|
|000000a0| 69 6d 61 67 65 2f 77 65 62 70 2c 2a 2f 2a 0d 0a |image/webp,*/*..|
|000000b0| 41 63 63 65 70 74 2d 4c 61 6e 67 75 61 67 65 3a |Accept-Language:|
|000000c0| 20 7a 68 2d 43 4e 2c 7a 68 3b 71 3d 30 2e 38 2c | zh-CN,zh;q=0.8,|
|000000d0| 7a 68 2d 54 57 3b 71 3d 30 2e 37 2c 7a 68 2d 48 |zh-TW;q=0.7,zh-H|
|000000e0| 4b 3b 71 3d 30 2e 35 2c 65 6e 2d 55 53 3b 71 3d |K;q=0.5,en-US;q=|
|000000f0| 30 2e 33 2c 65 6e 3b 71 3d 30 2e 32 0d 0a 41 63 |0.3,en;q=0.2..Ac|
|00000100| 63 65 70 74 2d 45 6e 63 6f 64 69 6e 67 3a 20 67 |cept-Encoding: g|
|00000110| 7a 69 70 2c 20 64 65 66 6c 61 74 65 0d 0a 43 6f |zip, deflate..Co|
|00000120| 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 70 2d 61 |nnection: keep-a|
|00000130| 6c 69 76 65 0d 0a 52 65 66 65 72 65 72 3a 20 68 |live..Referer: h|
|00000140| 74 74 70 3a 2f 2f 6c 6f 63 61 6c 68 6f 73 74 3a |ttp://localhost:|
|00000150| 38 30 38 30 2f 0d 0a 43 6f 6f 6b 69 65 3a 20 48 |8080/..Cookie: H|
|00000160| 6d 5f 6c 76 74 5f 62 33 39 33 64 31 35 33 61 65 |m_lvt_b393d153ae|
|00000170| 62 32 36 62 34 36 65 39 34 33 31 66 61 62 61 66 |b26b46e9431fabaf|
|00000180| 30 66 36 31 39 30 3d 31 36 32 31 39 30 35 38 35 |0f6190=162190585|
|00000190| 30 3b 20 49 64 65 61 2d 33 35 32 63 36 33 39 66 |0; Idea-352c639f|
|000001a0| 3d 31 32 37 64 31 61 65 35 2d 34 34 31 37 2d 34 |=127d1ae5-4417-4|
|000001b0| 61 62 36 2d 61 61 64 33 2d 36 32 36 62 66 38 36 |ab6-aad3-626bf86|
|000001c0| 34 62 62 62 33 3b 20 55 4d 5f 64 69 73 74 69 6e |4bbb3; UM_distin|
|000001d0| 63 74 69 64 3d 31 37 61 61 65 35 65 65 63 33 34 |ctid=17aae5eec34|
|000001e0| 35 31 30 2d 30 39 39 37 35 66 34 36 64 62 65 63 |510-09975f46dbec|
|000001f0| 33 38 38 2d 34 63 33 65 32 35 37 61 2d 31 34 34 |388-4c3e257a-144|
|00000200| 30 30 30 2d 31 37 61 61 65 35 65 65 63 33 36 33 |000-17aae5eec363|
|00000210| 61 62 3b 20 43 4e 5a 5a 44 41 54 41 31 32 35 38 |ab; CNZZDATA1258|
|00000220| 35 36 36 39 36 33 3d 31 36 31 32 35 36 38 34 34 |566963=161256844|
|00000230| 38 2d 31 36 32 36 34 32 32 37 30 36 2d 25 37 43 |8-1626422706-%7C|
|00000240| 31 36 32 36 34 32 38 31 35 30 0d 0a 53 65 63 2d |1626428150..Sec-|
|00000250| 46 65 74 63 68 2d 44 65 73 74 3a 20 69 6d 61 67 |Fetch-Dest: imag|
|00000260| 65 0d 0a 53 65 63 2d 46 65 74 63 68 2d 4d 6f 64 |e..Sec-Fetch-Mod|
|00000270| 65 3a 20 6e 6f 2d 63 6f 72 73 0d 0a 53 65 63 2d |e: no-cors..Sec-|
|00000280| 46 65 74 63 68 2d 53 69 74 65 3a 20 73 61 6d 65 |Fetch-Site: same|
|00000290| 2d 6f 72 69 67 69 6e 0d 0a 43 61 63 68 65 2d 43 |-origin..Cache-C|
|000002a0| 6f 6e 74 72 6f 6c 3a 20 6d 61 78 2d 61 67 65 3d |ontrol: max-age=|
|000002b0| 30 0d 0a 0d 0a                                  |0....           |
+--------+-------------------------------------------------+----------------+
16:03:05.810 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] WRITE: 49B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 31 30 0d 0a 0d 0a 68 65 6c 6c 6f 20 32 30 32 | 10....hello 202|
|00000030| 32                                              |2               |
+--------+-------------------------------------------------+----------------+
16:03:05.810 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] FLUSH
16:03:05.810 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
GET /favicon.ico HTTP/1.1
Host: localhost:8080
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0
Accept: image/avif,image/webp,*/*
Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2
Accept-Encoding: gzip, deflate
Connection: keep-alive
Referer: http://localhost:8080/
Cookie: Hm_lvt_b393d153aeb26b46e9431fabaf0f6190=1621905850; Idea-352c639f=127d1ae5-4417-4ab6-aad3-626bf864bbb3; UM_distinctid=17aae5eec34510-09975f46dbec388-4c3e257a-144000-17aae5eec363ab; CNZZDATA1258566963=1612568448-1626422706-%7C1626428150
Sec-Fetch-Dest: image
Sec-Fetch-Mode: no-cors
Sec-Fetch-Site: same-origin

群聊

服务端:

private static final Logger log= LoggerFactory.getLogger(NettyServer.class);

   //维护所有channel,key=名称,value为channel对象
    private static Map<String, NioSocketChannel> sessions=new ConcurrentHashMap<>();

    public static Map<String, NioSocketChannel> getSessions() {
        return sessions;
    }
    public static void putSession(String name,NioSocketChannel channel){

        sessions.put(name,channel);

    }

    public static void removeSession(String name){

        sessions.remove(name);
    }

  public static void main(String[] args) {

      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup(6);
      new ServerBootstrap()
              .group(boss,worker)
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {

//                      ch.pipeline().addLast(new LineBasedFrameDecoder(1024));//配置行解码器

                      ch.pipeline().addLast(new LoggingHandler());
                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

                              super.exceptionCaught(ctx, cause);
                          }
                          //读消息
                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                              ByteBuf byteBuf=(ByteBuf)msg;
                              byte b[]=new byte[byteBuf.readableBytes()];
                              byteBuf.readBytes(b);
                              String str=new String(b,"utf8");
                              JSONObject jsonObject = JSONObject.parseObject(str);
                              int state = (int) jsonObject.get("state");
                              String username = (String) jsonObject.get("username");
                              switch (state)
                                  {
                                      case 0: //上线
                                          NettyServer.putSession(username, ch);

                                          if(username.equals("client4")){ //如果是client4用户登录则群发

                                              sessions.forEach((k,v)->{

                                                  ByteBuf buffer = ctx.alloc().buffer(16);
                                                  try {
                                                      buffer.writeBytes("群发hhhh".getBytes("utf8"));
                                                      v.writeAndFlush(buffer);
//                                                      buffer.clear();
                                                  } catch (UnsupportedEncodingException e) {
                                                      e.printStackTrace();
                                                  }
                                              });

                                          }
                                          System.out.println("当前在线人数:"+NettyServer.getSessions().size());
                                          break;
                                      case 1: //下线
                                          NettyServer.removeSession(username);
                                          NettyServer.getSessions().forEach((k,v)->{
                                              System.out.println(k);
                                              System.out.println(v.hashCode());
                                          });
                                          System.out.println("当前在线人数:"+NettyServer.getSessions().size());
                                          break;
                                      default:
                                          break;
                                  }


                              super.channelRead(ctx, msg);
                          }
                      });

                  }
              }).bind(8080);



  }

客户端1:

public static void main(String[] args) {
        String name="client1";
      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
      try{
      new Bootstrap()
              .group(nioEventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {
                              User user = new User();
                              user.setUsername(name);
                              user.setState(0);
                              String jsonString = JSON.toJSONString(user);
                              ByteBuf buffer = ctx.alloc().buffer(16);
                              buffer.writeBytes(jsonString.getBytes("utf8"));
                              ch.writeAndFlush(buffer);
                              super.channelActive(ctx);
                          }

                          @Override
                          public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                              User user = new User();
                              user.setUsername(name);
                              user.setState(1);
                              String jsonString = JSON.toJSONString(user);
                              ByteBuf buffer = ctx.alloc().buffer(16);
                              buffer.writeBytes(jsonString.getBytes("utf8"));
                              ch.writeAndFlush(buffer);
                              super.channelInactive(ctx);
                          }

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              System.out.println(msg);
                              super.channelRead(ctx, msg);
                          }
                      });
                  }
              }).connect("localhost",8080);
      }catch (Exception e){
          e.printStackTrace();
      }

  }

客户端2:

 public static void main(String[] args) {
        String name="client2";
      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
      try{
      new Bootstrap()
              .group(nioEventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {
                              User user = new User();
                              user.setUsername(name);
                              user.setState(0);
                              String jsonString = JSON.toJSONString(user);
                              ByteBuf buffer = ctx.alloc().buffer(16);
                              buffer.writeBytes(jsonString.getBytes("utf8"));
                              ch.writeAndFlush(buffer);
                              super.channelActive(ctx);
                          }

                          @Override
                          public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                              User user = new User();
                              user.setUsername(name);
                              user.setState(1);
                              String jsonString = JSON.toJSONString(user);
                              ByteBuf buffer = ctx.alloc().buffer(16);
                              buffer.writeBytes(jsonString.getBytes("utf8"));
                              ch.writeAndFlush(buffer);
                              super.channelInactive(ctx);
                          }

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              System.out.println(msg);
                              super.channelRead(ctx, msg);
                          }
                      });
                  }
              }).connect("localhost",8080);
      }catch (Exception e){
          e.printStackTrace();
      }

  }

客户端3:

 public static void main(String[] args) {
        String name="client3";
      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
      try{
      new Bootstrap()
              .group(nioEventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {

                              User user = new User();
                              user.setUsername(name);
                              user.setState(0);
                              String jsonString = JSON.toJSONString(user);
                              ByteBuf buffer = ctx.alloc().buffer(16);
                              buffer.writeBytes(jsonString.getBytes("utf8"));
                              ch.writeAndFlush(buffer);
                              super.channelActive(ctx);
                          }

                          @Override
                          public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                              User user = new User();
                              user.setUsername(name);
                              user.setState(1);
                              String jsonString = JSON.toJSONString(user);
                              ByteBuf buffer = ctx.alloc().buffer(16);
                              buffer.writeBytes(jsonString.getBytes("utf8"));
                              ch.writeAndFlush(buffer);
                              super.channelInactive(ctx);
                          }

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              System.out.println(msg);
                              super.channelRead(ctx, msg);
                          }
                      });
                  }
              }).connect("localhost",8080);
      }catch (Exception e){
          e.printStackTrace();
      }

  }

客户端4:

public static void main(String[] args) {
        String name="client4";
      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
      try{
      new Bootstrap()
              .group(nioEventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {


                              User user = new User();
                              user.setUsername(name);
                              user.setState(0);
                              String jsonString = JSON.toJSONString(user);
                              ByteBuf buffer = ctx.alloc().buffer(16);
                              buffer.writeBytes(jsonString.getBytes("utf8"));
                              ch.writeAndFlush(buffer);
                              super.channelActive(ctx);
                          }

                          @Override
                          public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                              User user = new User();
                              user.setUsername(name);
                              user.setState(1);
                              String jsonString = JSON.toJSONString(user);
                              ByteBuf buffer = ctx.alloc().buffer(16);
                              buffer.writeBytes(jsonString.getBytes("utf8"));
                              ch.writeAndFlush(buffer);
                              super.channelInactive(ctx);
                          }

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              System.out.println(msg);
                              super.channelRead(ctx, msg);
                          }
                      });
                  }
              }).connect("localhost",8080);
      }catch (Exception e){
          e.printStackTrace();
      }

  }

实体类:

public class User implements Serializable {

    private String username;
    private int state; //用户状态,0在线,1下线

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
    }

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", state=" + state +
                '}';
    }
}

相关文章

暂无评论

暂无评论...