一般我们想要使用tcp通信,最原始的方法就是使用socket套接字,但是每次都要创建新的socket对象,并且在finally中关闭,好浪费资源,很麻烦。所以就是用netty工具,可以轻松地实现tcp通信,当然也可以实现http通信,因为我们需要用tcp通信,所以这里只研究tcp。
最好先了解一下netty是干嘛的,以及NIO是什么。
netty:
https://blog.csdn.net/qq_34730511/article/details/98472924?ops_request_misc=&request_id=&biz_id=102&utm_term=netty&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-8-98472924.nonecase&spm=1018.2226.3001.4187
BIO NIO:
https://blog.csdn.net/weixin_34301132/article/details/85786733?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522164499824216780264047131%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=164499824216780264047131&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-85786733.first_rank_v2_pc_rank_v29&utm_term=NIO+BIO+AIO&spm=1018.2226.3001.4187
接下来就建项目了。这里我用的是maven项目,jdk1.8
- 引入netty的依赖
<!--netty配置--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency>
创建NettyServer类,在这个类中先创建一个方法,然后在这个方法中创建事件线程组,这个构造器里面可以不传参数。写一个run()方法,里边对需要的参数进行配置,
package com.yy.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LoggingHandler; public class NettyServer { public void run() { //创建事件线程组, EventLoopGroup boosGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); //创建服务启动器 ServerBootstrap serverBootstrap = new ServerBootstrap(); try { //给服务启动器配置参数 serverBootstrap.group(boosGroup, workerGroup)//绑定线程组 .channel(NioServerSocketChannel.class)//Nio模式 .option(ChannelOption.SO_BACKLOG, 1024)//标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。主要是作用于boss线程,用于处理新连接 .childOption(ChannelOption.SO_KEEPALIVE, true)//启用心跳保活机制,主要作用与worker线程,也就是已创建的channel。 .handler(new LoggingHandler("DEBUG")) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new StringDecoder())//加载编解码器 .addLast(new StringEncoder()) .addLast(new NettyServerHandler());//加载handler处理器 } }); //绑定端口,这里用到的是ChannelFuture类 ChannelFuture future = serverBootstrap.bind(8080).sync(); System.out.println("服务端启动成功...."); //关闭通道 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //在这里关闭事件线程组,释放资源 boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { new NettyServer().run(); } }
再创建另外一个处理类,用于接收客户端发送的消息,并且给客户端发送应答,这个类继承于ChannelInboundHandlerAdapter类,重写其中的方法即可:
package com.yy.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.log4j.Logger; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; import java.net.InetAddress; /** * 接收客户端发送的消息并返回应答 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static Logger logger = Logger.getLogger(NettyServerHandler.class); /** * 读取客户端的数据,也可以在这个方法中给ctx赋值,也就是给客户端返回的应答 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // ByteBuf in = (ByteBuf)msg; // int readableBytes = in.readableBytes(); // byte[] bytes =new byte[readableBytes]; // in.readBytes(bytes); // System.out.println("服务端接收的倒的消息:"+new String(bytes)); // System.out.println("远程连接到的地址为:"+ctx.channel().remoteAddress()); //System.out.print(in.toString(CharsetUtil.UTF_8)); System.out.println("服务端接收到一条来自客户端的消息:"+msg); ctx.write("我是服务端,我收到客户端的消息了,我现在给你返回"); // logger.error("服务端接受的消息 : " + msg); } /** * 发生异常时关闭 * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } /** * 读取数据完毕,就刷新ctx对象,把应答的信息推送给客户端 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); //也可以使用下面的方法,一次性将应答信息写入并且推送出去,但是一般不这么用,一般是在read方法中返回应答信息,处理复杂的业务逻辑,在complete方法中推送 //ctx.writeAndFlush(Unpooled.copiedBuffer("Hello World,I am Server.", CharsetUtil.UTF_8)); } /** * 将object类型转为字节数组的方法 * @param object * @return * @throws IOException */ public static byte[] objectToBytes(final Serializable object) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = null; try { oos = new ObjectOutputStream(baos); oos.writeObject(object); oos.flush(); return baos.toByteArray(); } finally { if (oos != null) { oos.close(); } if (baos != null) { baos.close(); } } } }
3)创建NettyClient类
package com.yy.netty; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; import java.util.concurrent.TimeUnit; public class NettyClient { private SocketChannel socketChannel; public void run(Object msg){ //配置线程组 EventLoopGroup group = new NioEventLoopGroup(); //创建服务启动器 Bootstrap bootstrap = new Bootstrap(); //配置参数 bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new StringEncoder()) .addLast(new StringDecoder()) .addLast(new NettyClientHandler()); } }) .remoteAddress("127.0.0.1",8080); //连接 ChannelFuture future = bootstrap.connect(); System.out.println("客户端正在连接服务端..."); //客户端断线重连逻辑 future.addListener((ChannelFutureListener) future1 -> { if (future1.isSuccess()) { System.out.println("连接Netty服务端成功"); future.channel().writeAndFlush(msg); } else { System.out.println("连接失败,进行断线重连"); future1.channel().eventLoop().schedule(() -> run(msg), 20, TimeUnit.SECONDS); } }); socketChannel = (SocketChannel) future.channel(); } public static void main(String[] args) { NettyClient nettyClient = new NettyClient(); nettyClient.run("hi"); } }
创建Client的处理类
package com.yy.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import org.apache.log4j.Logger; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Date; public class NettyClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger.getLogger(NettyClientHandler.class); private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static byte[] responseByte; /** * 管道就绪时触发该方法 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("云上建立连接时:" + sdf.format(new Date())); //ctx.writeAndFlush(Unpooled.copiedBuffer("Hello World,I am Client.", CharsetUtil.UTF_8)); } /** * 管道读取事件时会触发该方法, * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("客户端收到服务端返回的消息:"+msg); byte[] bytes = objectToBytes((Serializable) msg); //byte []scaleMsg=(byte[]) msg; StringBuffer originalData=new StringBuffer(); for (byte b : bytes) { originalData.append(b+" "); } System.out.println(originalData.toString()); if(bytes!=null){ responseByte = bytes; } } /** * 发生异常时触发 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } public static byte[] objectToBytes(final Serializable object) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = null; try { oos = new ObjectOutputStream(baos); oos.writeObject(object); oos.flush(); return baos.toByteArray(); } finally { if (oos != null) { oos.close(); } if (baos != null) { baos.close(); } } } }
单机测试是否能连通,先启动服务端代码,再启动客户端代码,就可以直接发数据了,效果如下图所示:
服务端:
客户端:
现在暂时只能发送一次数据,但是服务端始终处于监听的状态,后边再研究吧!