【Netty】5 源码 Bootstrap
上一篇讲了AbstractBootstrap,为这篇做了个铺垫。
一、概述
Bootstrap 是 Netty 提供的一个便利的工厂类, 我们可以通过它来完成 Netty 的客户端或服务器端的 Netty 初始化.
Bootstrap
: 用于客户端,只需要一个单独的Channel,来与服务端进行数据交互,对应server端的子Channel。
作用职责
:EventLoop初始化,channel的注册过程 ,关于pipeline的初始化,handler的添加过程,客户端连接分析。
Netty客户端源码部分
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group) // 注册线程池
.channel(NioSocketChannel.class) // 使用NioSocketChannel来作为连接用的channel类
.handler(new ChannelInitializer<SocketChannel>() { // 绑定连接初始化器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//这里放入自定义助手类
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture cf = b.connect(host, port).sync(); // 异步连接服务器
cf.channel().closeFuture().sync(); // 异步等待关闭连接channel
} finally {
group.shutdownGracefully().sync(); // 释放线程池资源
}
}
从上面的客户端代码虽然简单, 但是却展示了 Netty 客户端初始化时所需的所有内容:
1. EventLoopGroup: 不论是服务器端还是客户端, 都必须指定 EventLoopGroup. 在这个例子中,
指定了 NioEventLoopGroup, 表示一个 NIO 的EventLoopGroup.
2. ChannelType: 指定 Channel 的类型. 因为是客户端, 因此使用了 NioSocketChannel.
3. Handler: 设置数据的处理器.
4. 这里的option,提供了一系列的TCP参数
下面我们深入代码, 看一下客户端通过 Bootstrap 启动后, 都做了哪些工作.
二、源码分析
1、group(group)
/**
* 直接调用父类AbstractBootstrap的方法
*/
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
直接调用父类的方法 ,说明该EventLoopGroup,作为客户端 Connector 线程,负责注册监听连接操作位,用于判断异步连接结果。
2、channel(NioServerSocketChannel.class)
在 Netty 中, Channel是一个Socket的抽象, 它为用户提供了关于 Socket 状态(是否是连接还是断开) 以及对 Socket 的读写等操作. 每当 Netty 建立了一个连接后, 都会有一个对应的 Channel 实例。
2.1源码
/**
* 同样也是直接调用父类AbstractBootstrap的方法
*/
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
我们再来看下ReflectiveChannelFactory类
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
/**
* 通过构造函数 传入 clazz
*/
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
/**
* 只用这一个方法 通过传入不同的Channel.class 创建不同的Channel 对象。
* newChannel() 什么时候调用呢 仔细追源码 发现是在绑定 IP 和 端口的 doResolveAndConnect方法里会调用
*/
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
在看channelFactory(new ReflectiveChannelFactory
/**
* 创建好Channel后,返回对象Bootstrap本身
*/
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
因此对于我们这个例子中的客户端的 Bootstrap 而言, 生成的的 Channel 实例就是 NioSocketChannel。
2.2 Channel 类型
除了 TCP 协议以外, Netty 还支持很多其他的连接协议, 并且每种协议还有 NIO(异步 IO) 和 OIO(Old-IO, 即传统的阻塞 IO) 版本的区别. 不同协议不同的阻塞类型的连接都有不同的 Channel 类型与之对应下面是一些常用的 Channel 类型:
- NioSocketChannel, 代表异步的客户端 TCP Socket 连接.
- NioServerSocketChannel, 异步的服务器端 TCP Socket 连接.
- NioDatagramChannel, 异步的 UDP 连接
- NioSctpChannel, 异步的客户端 Sctp 连接.
- NioSctpServerChannel, 异步的 Sctp 服务器端连接.
- OioSocketChannel, 同步的客户端 TCP Socket 连接.
- OioServerSocketChannel, 同步的服务器端 TCP Socket 连接.
- OioDatagramChannel, 同步的 UDP 连接
- OioSctpChannel, 同步的 Sctp 服务器端连接.
- OioSctpServerChannel, 同步的客户端 TCP Socket 连接.
3、handler(ChannelHandler handler)
Netty 的一个强大和灵活之处就是基于 Pipeline 的自定义 handler 机制
. 基于此, 我们可以像添加插件一样自由组合各种各样的 handler 来完成业务逻辑. 例如我们需要处理 HTTP 数据, 那么就可以在 pipeline 前添加一个 Http 的编解码的 Handler, 然后接着添加我们自己的业务逻辑的 handler, 这样网络上的数据流就向通过一个管道一样, 从不同的 handler 中流过并进行编解码, 最终在到达我们自定义的 handler 中。
/**
* 同样也是 直接调用父类 AbstractBootstrap 的方法
*/
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return self();
}
不过我们看到代码 一般都是这样写的
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
})
那是因为Bootstrap.handler 方法接收一个 ChannelHandler, 而我们传递的是一个 派生于 ChannelInitializer 的匿名类, 它正好也实现了 ChannelHandler 接口. 我们来看一下, ChannelInitializer 类部分代码:
/**
* ChannelInboundHandlerAdapter 父类的父类 最终会继承 ChannelHandler
* 那么ChannelInitializer 也就是 ChannelHandler的 子类
*/
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
private static final InternalLogger logger
=InternalLoggerFactory.getInstance(ChannelInitializer.class);
/**
* 这里只有这一个抽象类 所以我们只需重写这一个方法就可以了
*/
protected abstract void initChannel(C ch) throws Exception;
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
}
ChannelInitializer 是一个抽象类, 它有一个抽象的方法 initChannel
, 我们正是实现了这个方法, 并添加的自定义的 handler 的. 那么 initChannel 是哪里被调用的呢?
答案是 ChannelInitializer.channelRegistered 方法中。
我们来关注一下 channelRegistered 方法. 从上面的源码中, 我们可以看到, 在 channelRegistered 方法中, 会调用 initChannel 方法, 将自定义的 handler 添加到 ChannelPipeline 中, 然后调用 ctx.pipeline().remove(this) 将自己从 ChannelPipeline 中删除. 上面的分析过程, 可以用如下图片展示:
一开始, ChannelPipeline 中只有三个 handler, head, tail 和我们添加的 ChannelInitializer.
接着 initChannel 方法调用后, 添加了自定义的 handler
最后将 ChannelInitializer 删除
### 4、ChannelPipeline对象
/**
* 我们在initChannel抽象方法的实现方法中 通过 SocketChannel获得 ChannelPipeline对象
*/
ChannelPipeline p = ch.pipeline();
p.addLast(newEchoClientHandler());
在实例化一个 Channel 时, 会伴随着一个 ChannelPipeline 的实例化
, 并且此 Channel 会与这个 ChannelPipeline 相互关联, 这一点可以通过NioSocketChannel 的父类 AbstractChannel 的构造器:
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
//这个可以看出
pipeline = new DefaultChannelPipeline(this);
}
当实例化一个 Channel(这里以 EchoClient 为例, 那么 Channel 就是 NioSocketChannel), 其 pipeline 字段就是我们新创建的 DefaultChannelPipeline 对象, 那么我们就来看一下 DefaultChannelPipeline 的构造方法。
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
我们调用 DefaultChannelPipeline 的构造器, 传入了一个 channel, 而这个 channel 其实就是我们实例化的 NioSocketChannel, DefaultChannelPipeline 会将这个 NioSocketChannel 对象保存在channel 字段中。DefaultChannelPipeline 中, 还有两个特殊的字段, 即 head
和 tail
, 而这两个字段是一个双向链表的头和尾
. 其实在 DefaultChannelPipeline 中, 维护了一个以 AbstractChannelHandlerContext 为节点的双向链表, 这个链表是 Netty 实现 Pipeline 机制的关键。
5、.connect(host, port)
经过上面的各种分析后, 我们大致了解了 Netty 初始化时, 所做的工作, 接下来 分析一下客户端是如何发起 TCP 连接的。
/**
* 1、 这里 终于是Bootstrap 自己的方法了。 传入IP 地址 和 端口号
*/
public ChannelFuture connect(String inetHost, int inetPort) {
//通过InetSocketAddress 构造函数 绑定 IP地址+端口号
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
/**
* 2、上面调用该方法 ,该方法在调用 doResolveAndConnect方法
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}
/**
* 3、这步 实例化 Channer
*/
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//注意 这里 initAndRegister()方法就是实例化 Channer 的方法 上面说过 真正获取Channer 对象 是在这步获取的
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 这里省略的 很大一部分逻辑判断的代码
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
}
/**
* 3.1 这里 就开始 调 newChannel() 方法 也就创建了 Channel 对象
*/
final ChannelFuture initAndRegister() {
Channel channel = null;
channel = channelFactory.newChannel();
return regFuture;
}
/**
* 4、在看doResolveAndConnect0方法
* 这一步还是对一些 参数数据 进行校验 省略了校验代码
*/
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
// 获取 当前 EventLoop线程
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
//这一步 才是 连接的关键
doConnect(resolveFuture.getNow(), localAddress, promise);
return promise;
}
接下来看重要的方法,在 connect 中, 会进行一些参数检查后, 最终调用的是 doConnect 方法,有关doConnect之后接下来源码,等自己对Netty了解更细致之后 ,再来写吧。
这里推荐一个博主,有关Netty源码分析的蛮好的:源码之下无秘密 ── 做最好的 Netty 源码分析教程
如果一个人充满快乐,正面的思想,那么好的人事物就会和他共鸣,而且被他吸引过来。同样,一个人老带悲伤,倒霉的事情也会跟过来。
——在自己心情低落的时候,告诫自己不要把负能量带给别人。(大校13)