NIO编程之Netty入门篇V1

发布时间:2018-07-02 05:07:16   来源:文档文库   
字号:

NIO编程之Netty入门篇

看到这个标题,可能你会感到疑惑,NIONetty?是什么东西。想要真正学习Netty的知识,就必须对这几个概念有一定的了解。

NIO:有人称之为New I/O,原因在于它相对于之前的I/O类库是新增的,这是它的官方叫法。但是,由于之前老的I/O类库是阻塞I/ONew I/O类库的目标就是要让Java支持非阻塞I/O,所以,更多的人喜欢称之为非阻塞I/ONon-block I/O)。由于非阻塞I/O更能体现NIO的特点,所以,这里的NIO指的是非阻塞I/O

NettyNetty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序

到这里,应该对这两个概念有了一定的了解,那接下来,进行Netty的学习。

配置pom文件:

<dependency>

<groupId>io.nettygroupId>

<artifactId>netty-allartifactId>

<version>4.0.31.Finalversion>

dependency>

先上代码,对Netty的开发有一个第一印象。

代码清单1-1 EchoServer服务端 EchoServer

/** 

 * Netty 服务端代码 

  *

 */

public class EchoServer {

public void bind(int port) throws Exception {

// 配置服务器的NIO线程组

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

// 启动NIO服务端的辅助启动类

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, 1024)

.handler(new LoggingHandler(LogLevel.INFO))

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

System.out.println("报告");

System.out.println("信息:有一客户端链接到本服务端");

System.out.println("IP:"+ ch.localAddress().getHostName());

System.out.println("Port:"+ ch.localAddress().getPort());

System.out.println("报告完毕");

ch.pipeline().addLast(new LengthFieldBasedFrameDecoder

(65535, 0,4, 0, 4));

ch.pipeline().addLast(new LengthFieldPrepender(4));

// 解码器String

ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));

// 编码器 String

ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));

ch.pipeline().addLast(new ProtocolDecode());

ch.pipeline().addLast(new ProtocolEncode());

ch.pipeline().addLast(new EchoServerHandler());

}

});

// 绑定端口,同步等待成功

// ChannelFuture用于异步操作的通知回调

ChannelFuture f = b.bind(port).sync();

// 等待服务器监听端口关闭

f.channel().closeFuture().sync();

} finally {

// 优雅退出,释放线程池资源

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

public static void main(String[] args) throws Exception {

int port = 8082;

if (args != null && args.length > 0) {

try {

port = Integer.valueOf(args[0]);

} catch (NumberFormatException e) {

// 采用默认值

}

}

new EchoServer().bind(port);

}

}

我们从bind方法开始学习:

a) bind方法内首先创建了两个NioEventLoopGroup实例。NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理,实际上它们就是Reactor线程组。这里创建两个的原因是一个用于服务端接受客户端的连接,另一个用于进行SocketChannel的网络读写。

b) 创建ServerBootstrap对象,它是Netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度

c) 调用ServerBootstrapgroup方法,将两个NIO线程组当作入参传递到ServerBootstrap中。

d) 设置创建的ChannelNioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类。

e) 配置NioServerSocketChannelTCP参数,此处将它的SO_BACKLOG设置为1024

f) 绑定I/O事件的处理类EchoServerHandler,它的作用类似于Reactor模式中的Handler类,主要用于处理网络I/O事件,例如记录日志,对消息进行编解码等。

g) 调用启动辅助类的bind方法绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定操作完成。完成之后Netty会返回一个ChannelFuture,主要用于异步操作的通知回调。

h) 使用f.channel().closeFuture().sync()方法进行阻塞,等待服务端链路关闭之后main函数才退出。

i) 最后调用NIO线程组的shutdownGracefully进行优雅退出,它会释放跟shutdownGracefully相关联的资源。

服务器端绑定端口实现流程

word/media/image1.gif

详解ChannelInitializer类重写initChannel方法,构建pipeline

实际上,我们不需要自己创建pipeline,因为使用ServerBootstrap或者Bootstrap启动服务端或者客户端时,Netty会为每个Channel连接创建一个独立的pipeline,所以我们只需要将自定义的拦截器添加到pipeline中即可。

对于类似LengthFieldBasedFrameDecoderStringDecoder的编解码处理器ChannelHandler,它其实存在先后顺序,比如MessageToMessageDecoder解码器,在它之前往往需要有ByteToMessageDecoder解码器将ByteBuf解码为对象,然后对对象进行二次解码得到最终的POJO对象。

上面代码也是存在先后顺序的,由于TCP的粘包/拆包导致解码的时候需要考虑如何处理半包的问题,所以需要先添加LengthFieldBasedFrameDecoder半包解码器来解决TCP粘包导致的半包问题。而LengthFieldPrepender负责在待发送的ByteBuf消息头中增加一个长度字段来标识消息的长度,这样简化了开发者的编码器开发,使我们不需要额外去设置这个长度字段,所以LengthFieldPrepender添加在后面。接下来,需要把得到的整包消息通过StringDecoder解码器解码成为字符串,设置编码格式为UTF-8,还需添加StringEncoder编码器,完成待发送消息String编码为ByteBuf的操作。后面可以继续添加自定义的解码器编码器,最后,绑定I/O事件的处理类EchoServerHandler,完成通道的消息读取,事件处理设置。

绑定I/O事件处理类,实例化通道流程:

word/media/image2.gif

代码清单1-2 EchoServer服务端 EchoServerHandler

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

private static final Logger logger = Logger

.getLogger(EchoServerHandler.class.getName());

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {

ProtocolBase pro = (ProtocolBase) msg;

pro.time = new Date().toString();

logger.debug(pro);

ctx.write (pro);

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

super.channelInactive(ctx);

ctx.close();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

logger.debug("Unexpected exception from downstream-server"

+ cause.getMessage());

// 发生异常,关闭链路

ctx.close();

}

}

代码解析:

a) EchoServerHandler继承自ChannelInboundHandlerAdapter,它用于对网络事件进行读写操作,通常我们只需要关注channelReadexceptionCaught方法。

b) channelRead方法内,做类型转换,讲msg转换为ProtocolBase对象,修改该对象时间属性为当前时间,记录日志,然后通过ChannelHandlerContextwrite方法异步发送应答消息给客户端。

c) channelReadComplete方法内调用了ChannelHandlerContextflush方法,它的作用是将消息发送队列中的消息写入到SocketChannel中发送给对方。(从性能角度考虑,为了防止频繁的唤醒Selector进行消息发送,Nettywrite方法并不直接将消息写入SocketChannel中,调用write方法只是把待发送的消息放到发送缓存数组中,在通过调用flush方法,将发送缓存区中的消息全部写到SocketChannel中)

d) exceptionCaught方法内,当发生异常时,关闭ChannelHandlerContext,释放和ChannelHandlerContext相关联的句柄等资源。

代码清单2-1 EchoClient客户端 EchoClient

public class EchoClient {

public void connect(int port, String host) throws Exception {

// 配置客户端NIO线程组

EventLoopGroup group = new NioEventLoopGroup();

try {

Bootstrap b = new Bootstrap();

b.group(group).channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch)

throws Exception {

// 可用于处理半包消息,使得后面的都是整包消息

ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0,4, 0, 4));

ch.pipeline().addLast(new LengthFieldPrepender(4));

// 编码器 String

ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));

// 解码器String

ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));

ch.pipeline().addLast(new ProtocolEncode());

ch.pipeline().addLast(new ProtocolDecode());

ch.pipeline().addLast(new EchoClientHandler());

}

});

// 发起异步连接操作

ChannelFuture f = b.connect(host, port).sync();

// 等待客户端链路关闭

f.channel().closeFuture().sync();

} finally {

// 优雅退出,释放NIO线程组

group.shutdownGracefully();

}

}

public static void main(String[] args) throws Exception {

int port = 8082;

if (args != null && args.length > 0) {

try {

port = Integer.valueOf(args[0]);

} catch (NumberFormatException e) {

// 采用默认值

}

}

new EchoClient().connect(port, "127.0.0.1");

}

}

EchoClient代码解析:

a) connect方法内,首先创建客户端处理I/O读写的NioEventLoopGroup线程组,然后继续创建客户端辅助启动类Bootstrap,随后需要对其进行配置,与服务端不同的是,它的Channel需要设置为NioSocketChannel它的option需要设置为(ChannelOption.TCP_NODELAY, true)然后为其添加Handler。此处为了简单直接创建匿名内部类,实现initChannel方法,其作用是当创建NioSocketChannel成功之后,在进行初始化时,将它的ChannelHandler设置为ChannelPipeline中,用于处理网络I/O事件。

b) initChannel方法内,添加了处理半包问题的解码类LengthFieldBasedFrameDecoder,然后添加LengthFieldPrepender编码器( 如果协议中的第一个字段为长度字段Netty提供了LengthFieldPrepender编码器,它可以计算当前待发送消息的二进制字节长度,将该长度添加到ByteBuf的缓冲区头中)设置长度字段为4字节,添加String类型编码器StringEncoder,设置编码为UTF-8,添加String类型解码器StringDecoder。再添加协议编码器ProtocolEncode,协议解码器ProtocolDecode,最后设置EchoClientHandler处理类,处理网络I/O事件。

c) 客户端启动辅助类设置完成后,调用connect方法发起异步连接,然后调用同步方法sync等待连接成功。

d) 最后,当客户端连接关闭后,客户端主函数退出,退出之前释放NIO线程组的资源。

客户端绑定端口实现流程

word/media/image1.gif

代码清单2-2 EchoClient客户端 EchoClientHandler

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

private ProtocolBase p;

public EchoClientHandler() {

}

@Override

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

System.out.println("==============channel--register==============");

}

@Override

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

System.out.println("==============channel--unregistered==============");

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

System.out.println("==============channel--inactive==============");

}

public void channelActive(ChannelHandlerContext ctx) {

System.out.println("==============channel--active==============");

System.out.println("向服务器端写入数据");

p = new ProtocolBase();

p.command = 1;

p.subcommand = 2;

p.sign = "aa4eb15663a23eef5308ed3cd2b62ab3";

p.suquence = 120;

p.body = "{\"param1\":\"value1\"}";

ctx.write(JSON.toJSONString(p));

ctx.flush();

}

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {

System.out.println("==============channel--read==============");

System.out.println("the msg type is " + msg.getClass().getName());

ProtocolBase result = (ProtocolBase) msg;

System.out.println("command:" + result.command);

System.out.println("clientId:" + result.clientId);

System.out.println("sign:" + result.sign);

System.out.println("body:" + result.body);

System.out.println("接收到服务器数据是" + JSON.toJSONString(result));

}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

System.out.println("==============channel--readcomplete==============");

ctx.flush();

}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

cause.printStackTrace();

ctx.close();

}

}

代码解析:

a) channelActive方法,当客户端和服务端TCP链路建立成功之后,NettyNIO线程会调用channelActive方法,将消息数据发送给服务端。调用ChannelHandlerContextwrite方法将消息放到消息缓存数组中,然后调用flush方法,将发送缓存区消息全部写到SocketChannel中,发送给服务端。

b) 当服务器返回应答消息时,channelRead方法被调用,读取msg并类型转化为ProtocolBase协议类型,然后打印应答消息。

c) exceptionCaught方法,当发生异常时,打印异常日志,释放客户端资源。

代码清单3-1 ProtocolEncode协议编码类

public class ProtocolEncode extends MessageToMessageEncoder {

@Override

protected void encode(ChannelHandlerContext ctx, ProtocolBase msg,

List out) throws Exception {

Logger.getRootLogger().debug(msg.toString());

out.add(JSON.toJSONString(msg));

}

}

代码解析:

a) 继承MessageToMessageEncoder类的encode方法,实现msg消息的编码操作。

代码清单3-2 ProtocolDecode协议解码类

public class ProtocolDecode extends MessageToMessageDecoder {

@Override

protected void decode(ChannelHandlerContext ctx, String msg,

List out) throws Exception {

Logger.getRootLogger().debug(msg);

out.add(JSON.parseObject(msg, ProtocolBase.class));

}

}

代码解析:

a) 继承MessageToMessageDecoder类的decode方法,实现msg消息的解码操作。

channel分析图:

基础研究部

本文来源:https://www.2haoxitong.net/k/doc/998acb4549649b6648d747a1.html

《NIO编程之Netty入门篇V1.doc》
将本文的Word文档下载到电脑,方便收藏和打印
推荐度:
点击下载文档

文档为doc格式