写在前面

在3月20日的博客里,我通过代码来理解了一下NIO和BIO的区别,可以看出一部分代码是比较麻烦和复杂的,在项目开发中一般遇到这种普适性的问题,都会有前人造出轮子来让我们使用,而对于NIO的网络编程,最佳的轮子就是Netty。

Netty的语言实现是Java,所以去看源码什么的也会比较容易一点。

Reactor设计模式

Netty的核心在于异步、事件驱动。

在了解Netty之前必须先了解Reactor设计模式。

Reactor的翻译是反应堆,其实本质上就是事件驱动模式。

在Reactor模式中,有一些比较重要的概念:

  1. 事件,Event,对应于Netty中的channelActive、channelRead、channelInactive等等
  2. 事件处理器,EventHandler,对应于Netty中的ChannelInboundHandler,对应的事件就要有对应的事件处理逻辑
  3. Reactor,事件分发器,负责响应并分发I/O事件。事件发生了,需要将事件交给对应的事件处理器去处理,在Netty中,对应的工作由EventLoop来执行。
  4. 事件多路处理器,Selector,在上一篇博客里我们重点讨论过这个部分,在Netty中,Selector的工作由EventLoop来执行。

Netty实现了Reactor的三种模型,分别为:

  1. 单Reactor单线程模型,对应Netty中的单线程模型
  2. 单Reactor多线程模型,对应Netty中的多线程模型
  3. 主从Reactor多线程模型,对应Netty中的主从多线程模型

单线程模型

我们在上一篇博客中实现的NIO模式,其实就是单Reactor单线程模式。

只启用一个Selector,这个Selector又要监听Accept事件,获得Channel之后将对应的Channel也注册到同一个Selector上去,这个Selector需要负责监听这个Channel的读写事件以及之前的ServerChannel的注册事件。

单Reactor

这里我用了掘金的图,我觉得画的比较符合我的认知。

那么在我们上篇博客里,服务器的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {
public static void main(String[] args) throws Exception {
// NIO基于Channel控制,所以有Selector管理所有的Channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 设置监听端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 设置Selector管理所有Channel
Selector selector = Selector.open();
// 注册并设置连接时处理
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务启动成功,监听端口为:" + 8080);
// NIO使用轮询,当有请求连接时,则启动一个线程
int keySelect = 0;
while (serverSocketChannel.isOpen()) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
if (next.isAcceptable()) { // 如果是连接的
SocketChannel accept = serverSocketChannel.accept();
if (accept != null) {
//把新的会话测channel注册到selector里去,让Selector来管理它
accept.configureBlocking(false);
//并且把它的感兴趣状态变为可读状态
accept.register(selector,SelectionKey.OP_READ);
}
iterator.remove();
}
//一旦可读了,就代表客户端发来了消息,那我们就去处理这个消息
if(next.isReadable()){
handleRead(next);
}
}
}
serverSocketChannel.close();
}

//其实处理这个消息我们依然有一处是阻塞的,就是我们返回给客户端的时候,要求客户端是写可用的。
//但是写可用的触发是很多次的,一般都是写可用的,所以我们就没有做过多的干预
private static void handleRead(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(50);
buffer.clear();
int read = channel.read(buffer);
String msg = new String(buffer.array(), 0, read).trim();
System.out.println("服务端收到消息:"+msg);
String outMsg = "【Echo】" + msg; // 生成回应信息
//模拟消息处理时长
Thread.sleep(5000);
buffer.clear();
buffer.put(outMsg.getBytes()); //回传信息放入缓冲区
buffer.flip();
channel.write(buffer);// 回传信息
}
}

那么如果这段代码用Netty实现该如何实现呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class NettyServer {
public static void main(String[] args) throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup singleReactor = new NioEventLoopGroup(1);//限制为单线程
bootstrap.group(singleReactor)
.channel(NioServerSocketChannel.class)
.handler(new BossLogHandler())
.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline channelPipeline = channel.pipeline();
// 添加 HTTP 编解码器
channelPipeline.addLast(new HttpServerCodec());
// 聚合 HTTP 消息
channelPipeline.addLast(new HttpObjectAggregator(65536));
// 处理 WebSocket 升级请求
channelPipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//超时处理器
channelPipeline.addLast(new IdleStateHandler(10,10,1000, TimeUnit.SECONDS));
// 自定义处理器
channelPipeline.addLast(new MyChannelHandler());
}
});
ChannelFuture future = bootstrap.bind("127.0.0.1", 8080).sync();
System.out.println("服务器启动");
// 等待连接关闭
future.channel().closeFuture().sync();
}
}

这个服务端实现起来是比较简单的,因为只有一个EventLoop,但其实多线程模式和主从模式其实也只需要稍作修改即可。

单线程模式的优势为:

  1. 简化并发模型
  2. 减少上下文切换的开销
  3. 资源消耗低

存在的问题:

阻塞模式、CPU利用率低、不适合高并发场景

多线程模型

和单线程模型不同的点在于多了一个handler线程池。

Reactor设计模式的单Reactor多线程模式和Netty的多线程模型其实并不完全对应。

Reactor的单Reactor多线程模式只有一个线程的问题,但是仍然只有一个Reactor在同时监听ACCEPT事件和READ事件。

但是Netty的多线程模式则并不是,所有线程都负责处理连接和数据。

1
EventLoopGroup singleReactor = new NioEventLoopGroup();

就把这个1去掉就行了。

多线程模型的优点包括:

  1. 提高吞吐率
  2. 更好利用多核

存在的问题:

  1. 没有区分监听新连接和处理已有连接的读写操作,这种设计可能导致某些线程在处理复杂或耗时的数据处理任务时无法及时响应新的连接请求,从而影响新连接的接受速度。
  2. 难以实现有效的负载均衡,所有线程都在执行相似的任务,可能会出现部分线程过载而其他线程空闲的情况,难以实现真正的负载均衡。
  3. 资源竞争加剧,所有线程都可以访问I/O、数据库等共享资源

为了解决这种问题Netty设计出了主从模型。

主从模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class NettyServer {
public static void main(String[] args) throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup(1);//限制为单线程
EventLoopGroup workers = new NioEventLoopGroup(1);//限制为单线程
bootstrap.group(boss,workers)
.channel(NioServerSocketChannel.class)
.handler(new BossLogHandler())
.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline channelPipeline = channel.pipeline();
// 添加 HTTP 编解码器
channelPipeline.addLast(new HttpServerCodec());
// 聚合 HTTP 消息
channelPipeline.addLast(new HttpObjectAggregator(65536));
// 处理 WebSocket 升级请求
channelPipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//超时处理器
channelPipeline.addLast(new IdleStateHandler(10,10,1000, TimeUnit.SECONDS));
// 自定义处理器
channelPipeline.addLast(new MyChannelHandler());
}
});
ChannelFuture future = bootstrap.bind("127.0.0.1", 8080).sync();
System.out.println("服务器启动");
// 等待连接关闭
future.channel().closeFuture().sync();
}
}

通过将线程之间的任务分开,分为负责监听并建立新连接的boss线程池和负责处理已经连接的连接的事件的workers线程池,能够有效实现不同分工的负载均衡。

EventLoop

EvenLoop其实是Netty的核心所在,我们常常定义的EventLoopGroup是用来管理EventLoop实例线程池的。常见的实现包括NioEventLoopGroup和EpollEventLoopGroup。

NioEventLoopGroup

  • 基于Java NIO: NioEventLoopGroup 使用的是 Java 原生的非阻塞 I/O(NIO)API。这意味着它可以跨平台使用,无论是在 Windows、Linux 还是 macOS 上都能正常工作。
  • 跨平台兼容性: 由于依赖于标准的 Java NIO 库,因此具有很好的跨平台兼容性。
  • 性能特点: 对于大多数应用场景来说,NioEventLoopGroup 提供了足够的性能。然而,在高并发场景下,尤其是在 Linux 系统上,它可能不如 EpollEventLoopGroup 高效。

EpollEventLoopGroup

  • 基于Linux的epoll: EpollEventLoopGroup是专门为 Linux 系统设计的,它利用了 Linux 特有的 epoll I/O 事件通知机制。相比于 Java NIO 的 select/poll 方法,epoll 在处理大量文件描述符时更加高效,特别是在有大量连接但活跃连接相对较少的情况下。
  • 性能优化: 在高负载和高并发场景下,EpollEventLoopGroup 能够提供比 NioEventLoopGroup 更好的性能。这是因为 epoll 能够更有效地管理大量的文件描述符,并且在监听大量连接的同时保持较低的CPU使用率。
  • 限制: 只能在 Linux 系统上运行,因为它依赖于 Linux 内核特有的功能。

就像我想在mac上运行就会报错,只在Linux上支持

1
2
3
4
5
6
7
8
9
10
11
12
Exception in thread "main" java.lang.UnsatisfiedLinkError: failed to load the required native library
at io.netty.channel.epoll.Epoll.ensureAvailability(Epoll.java:81)
at io.netty.channel.epoll.EpollEventLoopGroup.<clinit>(EpollEventLoopGroup.java:41)
at NettyServer.main(NettyServer.java:16)
Caused by: java.lang.ExceptionInInitializerError
at io.netty.channel.epoll.Epoll.<clinit>(Epoll.java:40)
... 2 more
Caused by: java.lang.IllegalStateException: Only supported on Linux
at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:317)
at io.netty.channel.epoll.Native.<clinit>(Native.java:85)
... 3 more

Channel

Channel其实就是类似于Socket的网络连接,可以是客户端连接或服务器监听端口。Channel负责读写网络数据,并注册到EventLoop中等待事件处理。

而Channel的I/O事件则会交给它的ChannelPipeline来处理,用户可以在ChannelPipeline添加一系列ChannelHandler,包括解码器、聚合器、WebSocketServerProtocolHandler这种http升级ws的Handler以及用户自定义的ChannelHandler(通过继承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter或者实现ChannelInboundHandler)来定义

Netty事件驱动模型的工作原理

Netty的事件驱动模型通过EventLoopGroup、EventLoop、Channel、ChannelPipeline和ChannelHandler之间的协同工作来实现。其工作流程如下:

  1. 初始化:服务器启动时,创建一个或多个EventLoopGroup,分别用于接收连接和处理I/O操作。
  2. 注册Channel:为每个客户端连接创建一个Channel,并将其注册到一个EventLoop中。每个Channel会绑定一个ChannelPipeline。
  3. 事件循环:EventLoop在其绑定的线程中不断循环,监听I/O事件。当有事件发生时,EventLoop会将事件分发到ChannelPipeline。
  4. 事件处理:ChannelPipeline根据事件类型,将事件传递给相应的ChannelHandler进行处理。ChannelHandler可以处理入站、出站事件,或者进行数据编码、解码等操作。