官网上给Netty的定义是:
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
即Netty是一个异步的、基于事件驱动的、网络应用框架,是用来快速的开发高性能的客户端和服务器。
Netty对JDK自带的API进行了封装,解决了使用NIO出现的代码比较繁琐的问题。
1.线程模型
目前存在的线程模型有:传统阻塞IO服务模型,Reactor模式。不同的线程模型对程序的性能影响很大。
1.1 传统阻塞IO服务模型
其实在上篇讲NIO的时候提到过阻塞IO模型。即阻塞模式下,相关方法都会导致线程暂停。
比如在实现客户端服务器的时候,服务器监听一个特定端口,相关方法是accpet,如果没有连接,服务器就会一直阻塞到这里,当一个客户端与服务器进行连接时,这个方法才会继续往下运行,然后服务器监听是否有客户端进行写入数据,即read方法,在没有数据可读时会让线程暂停。
如果在单线程情况下,又来一个客户端想要与服务器进行连接,那么现在就不能进行连接,因为服务器正在read方法处阻塞,等待第一个客户端发送数据,只有当第一个客户端发送完数据,服务器才会处理新的accpet事件。
在多线程情况下,每次当有连接建立时,服务端都需要创建一个新的线程来处理客户端的业务,但是这样也很不好,因为系统最大的线程数是有限的,对于突发的大量客户端连接不可能创建很多线程去处理连接。并且线程的频繁上下文切换也极度浪费系统资源。
线程池在一定程度上解决了这个问题
1.2 Reactor模式
1.2.1 单Reactor单线程
Reactor就类似于NIO中的selector,因为服务器线程只有一个,所以是单Reactor,它可以同时监听多路请求,一次性将监听到的事件都取出来然后依次处理。但是如果在处理一个事件时发生阻塞,其他事件还是需要等待。
优点:服务器端用一个线程通过多路复用完成所有的io操作(连接,读,写等),模型简单
缺点:性能问题,无法发挥多核CPU的性能,服务器线程在处理一个客户端上的事件时,无法处理其他客户端的事件。也不太可靠,如果线程意外终止或者进入死循环,则整个系统通信模块不可以使用,造成节点故障。
使用NIO实现的聊天室就是单reactor单线程模型。
1.2.2 单Reactor多线程
服务器通过select监听客户端请求事件,将不同的请求分发给不同的线程处理。
优点:可以充分利用多核CPU的处理能力
缺点:多线程数据共享和访问比较复杂,reactor处理所有事件的监听和响应,是在单线程下运行的,在高并发场景下容易出现性能瓶颈。
1.2.3 主从Reactor多线程
可以有多个reactor子线程
优点:父线程与子线程的数据交互简单,Reactor父线程只需要接收新连接,子线程完成后序的业务处理。
缺点:编程复杂度高
2.Netty
netty主要基于主从Reactor(反应器)多线程模型,并做了一定改进
以下是netty中重要的组件
2.1 Channel
Netty不直接使用java nio中的Channel组件,而是对Channel组件进行了自己的封装。服务器编程应用中使用最多的是通信协议是TCP,对应的netty传输通道类型是NioSocketChannel类,netty服务器监听通道类型为NioServerSocketChannel。
在netty的NioSocketChannel内部封装了一个java nio的SelectableChannel成员,对NioSocketChannel通道上的所有io操作最终都会落地到java nio 中的SelectableChannel底层通道。
一个Channel绑定一个EventLoop,以后的所有关于这个channel的请求处理的事件都是由绑定的EventLoop接收。
2.2 EventLoop
EventLoop,即事件循环,是一个单线程执行器,其中维护了一个Selector,里面有run方法处理Channel上源源不断的io事件。
EventLoopGroup是一组EventLoop,即事件循环组,Channel会调用EventLoopGroup的register方法来绑定其中的一个EventLoop,以后这个channel上的io事件都由此EventLoop来处理。一个EventLoop可以管理多个Channel
1、构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
EventLoopGroup group = new NioEventLoopGroup();
public NioEventLoopGroup() { this(0); }
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
|
2、处理普通事件、定时事件
处理普通任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class TestEventLoop { public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup(2); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next());
group.next().execute(()->{ log.debug("event"); });
group.next().execute(()->{ log.debug("event2"); });
group.next().execute(()->{ log.debug("event3"); });
log.debug("main");
优雅地关闭 group.shutdownGracefully(); } }
|
处理定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class TestEventLoop { public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup(2);
group.next().scheduleAtFixedRate(()->{ log.debug("event"); },0,1, TimeUnit.SECONDS);
log.debug("main");
} }
|
3、处理IO事件
io事件就是客户端服务器事件。客户端发送数据,服务器接收数据。
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class Client { public static void main(String[] args) throws InterruptedException, IOException { Channel channel = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)) .sync() .channel(); System.out.println(channel); System.in.read(); } }
|
服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class Server { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(StandardCharsets.UTF_8)); super.channelRead(ctx, msg); } }); } }) .bind(8080); }
}
|
3、分工
创建两个EventLoopGroup,一个只负责accept事件(称为boss),另一个负责读写事件(称为worker)。
1 2 3 4
| ServerBootstrap serverBootstrap = new ServerBootstrap();
ServerBootstrap group1 = serverBootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup());
|
当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理,只需要将这个eventLoop的处理结果传递给这个非NioEventLoop处理即可(DefaultEventLoop)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class Server {
public static void main(String[] args) { EventLoopGroup group = new DefaultEventLoop(); new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(StandardCharsets.UTF_8)); ctx.fireChannelRead(msg); } }).addLast(group,new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(StandardCharsets.UTF_8)); } }); } }) .bind(8080); }
}
|
4、如何切换EventLoop
channelRead事件的传播流程, channelRead方法是在AbstractChannelHandlerContext类的invokeChannelRead方法中被调用。这里会判断当前handler和下一个handler是否是同一个,如果是,
1 2 3 4 5 6 7 8 9 10 11 12
| static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRegistered(); } }); } }
|
2.3 Future & Promise
2.4 Handler & Pipeline
2.5 ByteBuf