Java网络编程-Netty

官网上给Netty的定义是:

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

即Netty是一个异步的、基于事件驱动的、网络应用框架,是用来快速的开发高性能的客户端和服务器。

Netty对JDK自带的API进行了封装,解决了使用NIO出现的代码比较繁琐的问题。

1.线程模型

目前存在的线程模型有:传统阻塞IO服务模型,Reactor模式。不同的线程模型对程序的性能影响很大。

1.1 传统阻塞IO服务模型

其实在上篇讲NIO的时候提到过阻塞IO模型。即阻塞模式下,相关方法都会导致线程暂停。

比如在实现客户端服务器的时候,服务器监听一个特定端口,相关方法是accpet,如果没有连接,服务器就会一直阻塞到这里,当一个客户端与服务器进行连接时,这个方法才会继续往下运行,然后服务器监听是否有客户端进行写入数据,即read方法,在没有数据可读时会让线程暂停。

如果在单线程情况下,又来一个客户端想要与服务器进行连接,那么现在就不能进行连接,因为服务器正在read方法处阻塞,等待第一个客户端发送数据,只有当第一个客户端发送完数据,服务器才会处理新的accpet事件。

在多线程情况下,每次当有连接建立时,服务端都需要创建一个新的线程来处理客户端的业务,但是这样也很不好,因为系统最大的线程数是有限的,对于突发的大量客户端连接不可能创建很多线程去处理连接。并且线程的频繁上下文切换也极度浪费系统资源。

线程池在一定程度上解决了这个问题

1.2 Reactor模式

1.2.1 单Reactor单线程

Reactor就类似于NIO中的selector,因为服务器线程只有一个,所以是单Reactor,它可以同时监听多路请求,一次性将监听到的事件都取出来然后依次处理。但是如果在处理一个事件时发生阻塞,其他事件还是需要等待。

优点:服务器端用一个线程通过多路复用完成所有的io操作(连接,读,写等),模型简单

缺点:性能问题,无法发挥多核CPU的性能,服务器线程在处理一个客户端上的事件时,无法处理其他客户端的事件。也不太可靠,如果线程意外终止或者进入死循环,则整个系统通信模块不可以使用,造成节点故障。

使用NIO实现的聊天室就是单reactor单线程模型。

1.2.2 单Reactor多线程

服务器通过select监听客户端请求事件,将不同的请求分发给不同的线程处理。

优点:可以充分利用多核CPU的处理能力

缺点:多线程数据共享和访问比较复杂,reactor处理所有事件的监听和响应,是在单线程下运行的,在高并发场景下容易出现性能瓶颈。

1.2.3 主从Reactor多线程

可以有多个reactor子线程

优点:父线程与子线程的数据交互简单,Reactor父线程只需要接收新连接,子线程完成后序的业务处理。

缺点:编程复杂度高

2.Netty

netty主要基于主从Reactor(反应器)多线程模型,并做了一定改进

以下是netty中重要的组件

2.1 Channel

Netty不直接使用java nio中的Channel组件,而是对Channel组件进行了自己的封装。服务器编程应用中使用最多的是通信协议是TCP,对应的netty传输通道类型是NioSocketChannel类,netty服务器监听通道类型为NioServerSocketChannel。

在netty的NioSocketChannel内部封装了一个java nio的SelectableChannel成员,对NioSocketChannel通道上的所有io操作最终都会落地到java nio 中的SelectableChannel底层通道。

一个Channel绑定一个EventLoop,以后的所有关于这个channel的请求处理的事件都是由绑定的EventLoop接收。

2.2 EventLoop

EventLoop,即事件循环,是一个单线程执行器,其中维护了一个Selector,里面有run方法处理Channel上源源不断的io事件。

EventLoopGroup是一组EventLoop,即事件循环组,Channel会调用EventLoopGroup的register方法来绑定其中的一个EventLoop,以后这个channel上的io事件都由此EventLoop来处理。一个EventLoop可以管理多个Channel

1、构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//1.创建事件循环组,可以处理io事件、普通事件、定时事件
//每个事件循环是一个线程
//selector+线程池
EventLoopGroup group = new NioEventLoopGroup();

public NioEventLoopGroup() {
this(0);
}

//所以如果不指定事件循环对象的个数,默认创建的个数是NettyRuntime.availableProcessors() * 2个,即cpu的核心数×2
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

2、处理普通事件、定时事件

处理普通任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class TestEventLoop {
public static void main(String[] args) {

EventLoopGroup group = new NioEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
/**
* 输出结果:说明只有两个EventLoop
* io.netty.channel.nio.NioEventLoop@4facf68f
* io.netty.channel.nio.NioEventLoop@76508ed1
* io.netty.channel.nio.NioEventLoop@4facf68f
* io.netty.channel.nio.NioEventLoop@76508ed1
*/

//通过EventLoop执行普通任务
group.next().execute(()->{
log.debug("event");
});

group.next().execute(()->{
log.debug("event2");
});

group.next().execute(()->{
log.debug("event3");
});

log.debug("main");

/**
* 17:06:22.731 [nioEventLoopGroup-2-1] DEBUG vickkkyz.TestEventLoop - event
* 17:06:22.731 [nioEventLoopGroup-2-2] DEBUG vickkkyz.TestEventLoop - event2
* 17:06:22.732 [main] DEBUG vickkkyz.TestEventLoop - main
* 17:06:22.732 [nioEventLoopGroup-2-1] DEBUG vickkkyz.TestEventLoop - event3
*/

优雅地关闭
group.shutdownGracefully();
}
}

处理定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TestEventLoop {
public static void main(String[] args) {

EventLoopGroup group = new NioEventLoopGroup(2);

//通过EventLoop执行普通任务
group.next().scheduleAtFixedRate(()->{
log.debug("event");
},0,1, TimeUnit.SECONDS);

log.debug("main");

/**
* 17:09:50.779 [main] DEBUG vickkkyz.TestEventLoop - main
* 17:09:50.780 [nioEventLoopGroup-2-1] DEBUG vickkkyz.TestEventLoop - event
* 17:09:51.781 [nioEventLoopGroup-2-1] DEBUG vickkkyz.TestEventLoop - event
* 17:09:52.775 [nioEventLoopGroup-2-1] DEBUG vickkkyz.TestEventLoop - event
* 17:09:53.778 [nioEventLoopGroup-2-1] DEBUG vickkkyz.TestEventLoop - event
* 17:09:54.776 [nioEventLoopGroup-2-1] DEBUG vickkkyz.TestEventLoop - event
*/
}
}

3、处理IO事件

io事件就是客户端服务器事件。客户端发送数据,服务器接收数据。

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Client {
public static void main(String[] args) throws InterruptedException, IOException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);
// 此处打断点调试,调用 channel.writeAndFlush(...);
System.in.read();
}
}

服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Server {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(StandardCharsets.UTF_8));
super.channelRead(ctx, msg);
}
});
}
})
.bind(8080);
}

}

3、分工

创建两个EventLoopGroup,一个只负责accept事件(称为boss),另一个负责读写事件(称为worker)。

1
2
3
4
//启动器,负责装配netty组件,将它们组合起来,启动服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//创建 NioEventLoopGroup组件(),第一个是boss,第二个是worker
ServerBootstrap group1 = serverBootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup());

当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理,只需要将这个eventLoop的处理结果传递给这个非NioEventLoop处理即可(DefaultEventLoop)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Server {

public static void main(String[] args) {
EventLoopGroup group = new DefaultEventLoop();
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(StandardCharsets.UTF_8));
// 调用下一个handler
ctx.fireChannelRead(msg);
}
}).addLast(group,new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}

}
/**
* 18:49:28.514 [nioEventLoopGroup-3-2] DEBUG vickkkyz.Server - 1
* 18:49:28.514 [defaultEventLoop-1-1] DEBUG vickkkyz.Server - 1
*/

4、如何切换EventLoop

channelRead事件的传播流程, channelRead方法是在AbstractChannelHandlerContext类的invokeChannelRead方法中被调用。这里会判断当前handler和下一个handler是否是同一个,如果是,

1
2
3
4
5
6
7
8
9
10
11
12
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRegistered();
}
});
}
}

2.3 Future & Promise

2.4 Handler & Pipeline

2.5 ByteBuf


Java网络编程-Netty
https://vickkkyz.fun/2022/05/20/Netty/netty/
作者
Vickkkyz
发布于
2022年5月20日
许可协议