看到这个标题,可能你会感到疑惑,NIO?Netty?是什么东西。想要真正学习Netty的知识,就必须对这几个概念有一定的了解。
NIO:有人称之为New I/O,原因在于它相对于之前的I/O类库是新增的,这是它的官方叫法。但是,由于之前老的I/O类库是阻塞I/O,New I/O类库的目标就是要让Java支持非阻塞I/O,所以,更多的人喜欢称之为非阻塞I/O(Non-block I/O)。由于非阻塞I/O更能体现NIO的特点,所以,这里的NIO指的是非阻塞I/O。
Netty:Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
到这里,应该对这两个概念有了一定的了解,那接下来,进行Netty的学习。
配置pom文件:
<dependency>
<groupId>io.nettygroupId>
<artifactId>netty-allartifactId>
<version>4.0.31.Finalversion>
dependency>
先上代码,对Netty的开发有一个第一印象。
代码清单1-1 EchoServer服务端 EchoServer
/**
* Netty 服务端代码
*
*/
public class EchoServer {
public void bind(int port) throws Exception {
// 配置服务器的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 启动NIO服务端的辅助启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("报告");
System.out.println("信息:有一客户端链接到本服务端");
System.out.println("IP:"+ ch.localAddress().getHostName());
System.out.println("Port:"+ ch.localAddress().getPort());
System.out.println("报告完毕");
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder
(65535, 0,4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
// 解码器String
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
// 编码器 String
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new ProtocolDecode());
ch.pipeline().addLast(new ProtocolEncode());
ch.pipeline().addLast(new EchoServerHandler());
}
});
// 绑定端口,同步等待成功
// ChannelFuture用于异步操作的通知回调
ChannelFuture f = b.bind(port).sync();
// 等待服务器监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8082;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new EchoServer().bind(port);
}
}
我们从bind方法开始学习:
a) bind方法内首先创建了两个NioEventLoopGroup实例。NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理,实际上它们就是Reactor线程组。这里创建两个的原因是一个用于服务端接受客户端的连接,另一个用于进行SocketChannel的网络读写。
b) 创建ServerBootstrap对象,它是Netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度。
c) 调用ServerBootstrap的group方法,将两个NIO线程组当作入参传递到ServerBootstrap中。
d) 设置创建的Channel为NioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类。
e) 配置NioServerSocketChannel的TCP参数,此处将它的SO_BACKLOG设置为1024
f) 绑定I/O事件的处理类EchoServerHandler,它的作用类似于Reactor模式中的Handler类,主要用于处理网络I/O事件,例如记录日志,对消息进行编解码等。
g) 调用启动辅助类的bind方法绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定操作完成。完成之后Netty会返回一个ChannelFuture,主要用于异步操作的通知回调。
h) 使用f.channel().closeFuture().sync()方法进行阻塞,等待服务端链路关闭之后main函数才退出。
i) 最后调用NIO线程组的shutdownGracefully进行优雅退出,它会释放跟shutdownGracefully相关联的资源。
服务器端绑定端口实现流程
word/media/image1.gif
详解:ChannelInitializer类重写initChannel方法,构建pipeline
实际上,我们不需要自己创建pipeline,因为使用ServerBootstrap或者Bootstrap启动服务端或者客户端时,Netty会为每个Channel连接创建一个独立的pipeline,所以我们只需要将自定义的拦截器添加到pipeline中即可。
对于类似LengthFieldBasedFrameDecoder,StringDecoder的编解码处理器ChannelHandler,它其实存在先后顺序,比如MessageToMessageDecoder解码器,在它之前往往需要有ByteToMessageDecoder解码器将ByteBuf解码为对象,然后对对象进行二次解码得到最终的POJO对象。
上面代码也是存在先后顺序的,由于TCP的粘包/拆包导致解码的时候需要考虑如何处理半包的问题,所以需要先添加LengthFieldBasedFrameDecoder半包解码器来解决TCP粘包导致的半包问题。而LengthFieldPrepender负责在待发送的ByteBuf消息头中增加一个长度字段来标识消息的长度,这样简化了开发者的编码器开发,使我们不需要额外去设置这个长度字段,所以LengthFieldPrepender添加在后面。接下来,需要把得到的整包消息通过StringDecoder解码器解码成为字符串,设置编码格式为UTF-8,还需添加StringEncoder编码器,完成待发送消息String编码为ByteBuf的操作。后面可以继续添加自定义的解码器编码器,最后,绑定I/O事件的处理类EchoServerHandler,完成通道的消息读取,事件处理设置。
绑定I/O事件处理类,实例化通道流程:
word/media/image2.gif
代码清单1-2 EchoServer服务端 EchoServerHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger
.getLogger(EchoServerHandler.class.getName());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ProtocolBase pro = (ProtocolBase) msg;
pro.time = new Date().toString();
logger.debug(pro);
ctx.write (pro);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.debug("Unexpected exception from downstream-server:"
+ cause.getMessage());
// 发生异常,关闭链路
ctx.close();
}
}
代码解析:
a) EchoServerHandler继承自ChannelInboundHandlerAdapter,它用于对网络事件进行读写操作,通常我们只需要关注channelRead和exceptionCaught方法。
b) channelRead方法内,做类型转换,讲msg转换为ProtocolBase对象,修改该对象时间属性为当前时间,记录日志,然后通过ChannelHandlerContext的write方法异步发送应答消息给客户端。
c) channelReadComplete方法内调用了ChannelHandlerContext的flush方法,它的作用是将消息发送队列中的消息写入到SocketChannel中发送给对方。(从性能角度考虑,为了防止频繁的唤醒Selector进行消息发送,Netty的write方法并不直接将消息写入SocketChannel中,调用write方法只是把待发送的消息放到发送缓存数组中,在通过调用flush方法,将发送缓存区中的消息全部写到SocketChannel中)
d) exceptionCaught方法内,当发生异常时,关闭ChannelHandlerContext,释放和ChannelHandlerContext相关联的句柄等资源。
代码清单2-1 EchoClient客户端 EchoClient
public class EchoClient {
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
// 可用于处理半包消息,使得后面的都是整包消息
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0,4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
// 编码器 String
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
// 解码器String
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new ProtocolEncode());
ch.pipeline().addLast(new ProtocolDecode());
ch.pipeline().addLast(new EchoClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8082;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new EchoClient().connect(port, "127.0.0.1");
}
}
EchoClient代码解析:
a) connect方法内,首先创建客户端处理I/O读写的NioEventLoopGroup线程组,然后继续创建客户端辅助启动类Bootstrap,随后需要对其进行配置,与服务端不同的是,它的Channel需要设置为NioSocketChannel,它的option需要设置为(ChannelOption.TCP_NODELAY, true),然后为其添加Handler。此处为了简单直接创建匿名内部类,实现initChannel方法,其作用是当创建NioSocketChannel成功之后,在进行初始化时,将它的ChannelHandler设置为ChannelPipeline中,用于处理网络I/O事件。
b) initChannel方法内,添加了处理半包问题的解码类LengthFieldBasedFrameDecoder,然后添加LengthFieldPrepender编码器( 如果协议中的第一个字段为长度字段,Netty提供了LengthFieldPrepender编码器,它可以计算当前待发送消息的二进制字节长度,将该长度添加到ByteBuf的缓冲区头中)设置长度字段为4字节,添加String类型编码器StringEncoder,设置编码为UTF-8,添加String类型解码器StringDecoder。再添加协议编码器ProtocolEncode,协议解码器ProtocolDecode,最后设置EchoClientHandler处理类,处理网络I/O事件。
c) 客户端启动辅助类设置完成后,调用connect方法发起异步连接,然后调用同步方法sync等待连接成功。
d) 最后,当客户端连接关闭后,客户端主函数退出,退出之前释放NIO线程组的资源。
客户端绑定端口实现流程
word/media/image1.gif
代码清单2-2 EchoClient客户端 EchoClientHandler
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private ProtocolBase p;
public EchoClientHandler() {
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("==============channel--register==============");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("==============channel--unregistered==============");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("==============channel--inactive==============");
}
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("==============channel--active==============");
System.out.println("向服务器端写入数据");
p = new ProtocolBase();
p.command = 1;
p.subcommand = 2;
p.sign = "aa4eb15663a23eef5308ed3cd2b62ab3";
p.suquence = 120;
p.body = "{\"param1\":\"value1\"}";
ctx.write(JSON.toJSONString(p));
ctx.flush();
}
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("==============channel--read==============");
System.out.println("the msg type is " + msg.getClass().getName());
ProtocolBase result = (ProtocolBase) msg;
System.out.println("command:" + result.command);
System.out.println("clientId:" + result.clientId);
System.out.println("sign:" + result.sign);
System.out.println("body:" + result.body);
System.out.println("接收到服务器数据是" + JSON.toJSONString(result));
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("==============channel--readcomplete==============");
ctx.flush();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
代码解析:
a) channelActive方法,当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法,将消息数据发送给服务端。调用ChannelHandlerContext的write方法将消息放到消息缓存数组中,然后调用flush方法,将发送缓存区消息全部写到SocketChannel中,发送给服务端。
b) 当服务器返回应答消息时,channelRead方法被调用,读取msg并类型转化为ProtocolBase协议类型,然后打印应答消息。
c) exceptionCaught方法,当发生异常时,打印异常日志,释放客户端资源。
代码清单3-1 ProtocolEncode协议编码类
public class ProtocolEncode extends MessageToMessageEncoder
@Override
protected void encode(ChannelHandlerContext ctx, ProtocolBase msg,
Listthrows Exception {
Logger.getRootLogger().debug(msg.toString());
out.add(JSON.toJSONString(msg));
}
}
代码解析:
a) 继承MessageToMessageEncoder类的encode方法,实现msg消息的编码操作。
代码清单3-2 ProtocolDecode协议解码类
public class ProtocolDecode extends MessageToMessageDecoder
@Override
protected void decode(ChannelHandlerContext ctx, String msg,
Listthrows Exception {
Logger.getRootLogger().debug(msg);
out.add(JSON.parseObject(msg, ProtocolBase.class));
}
}
代码解析:
a) 继承MessageToMessageDecoder类的decode方法,实现msg消息的解码操作。
channel分析图:
基础研究部
本文来源:https://www.2haoxitong.net/k/doc/998acb4549649b6648d747a1.html
文档为doc格式