NIO
简介
NIO:可以叫做Non - Blocking IO,也可以成为New IO,非阻塞只是NIO的特点之一
优点
不用NIO情况下如何实现一个服务器
法一:多线程
- 特点:一个Socket连接,使用一个线程管理
- 有点:一对一服务,适合连接数少的场景
- 缺点:
- 内存占用高
- 现成上下文切换成本高
法二:线程池
- 特点:
- 使用线程池思想,控制线程数量
- 优点:避免了创建大量的线程导致内存占用过高的问题,适合短连接场景
- 缺点:属于阻塞模式(即一个线程同时只能给一个 socket 连接提供服务),线程的利用率不高
法三:使用 NIO 的 Selector
- 特点:用一个线程管理 Selector,Selector 去管理注册在此的通道(下面会详细介绍)
- 优点:适合连接数多,但是流量低的场景(low traffic)

简单了解
JavaNIO由三个核心组件构成
- Channel
- Buffer
- Selector
除了这三个还有别的如Pipe、FileLock等等
NIO与BIO对比
- BIO 是面向流的;NIO 面向缓冲区
- BIO 只能向后读,不能向前读;NIO 可以向前也可以向后读
- BIO 是单向的,一端要么读要么写;NIO 中通道是双向的,一端可读可写
- BIO 是阻塞的;NIO 是非阻塞的
大致过程

Buffer
Buffer缓冲区:可以理解为一个数组,通道必须与Buffer一起使用
具体的实现类:
- ByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
API
核心API:
put(value):将value写入缓存(从当前的position开始写入,并增加position的值,即相对位置的写入)get(arr[]):读取position位置下的数据到一个数组中(如果不传参数,默认读取一个字符)hasRemaining():判断position与limit之间的距离,如果还有数据可以处理,那么就返回 trueclear():清空缓冲区(清空的原理并不是真的清空了,只是将 position 的位置为 0,limit 设置为 capacity,即切换回写模式)flip():可以将缓冲区由写模式切换到读模式(将 limit 设为 position 的位置,再将 position 设置为 0)compact():压缩,将未读取的数据(即 position 与 limit 之间的数据)向前移动,然后 position 指向最后一个未读取的数据之后,然后 limit 指向 capacity
其他API:
capacity(); limit() ; position():返回当前capacity/limit/position的值mark():标记当前position的位置,当调用reset方法时,会重置到 mark 标记过的位置(只能在 0-position 之前)remaining():返回position与limit之间的距离reset():重置position到mark标记过的位置rewind():将position设置为 0(即让 position 回到初始的位置),取消 mark 标记位
创建Buffer
- allocate(long):
- 创建一个指定大小的缓冲区;是一个静态方法;(注意:
buffer对象不能new,只能通过allocate分配) warp():把已存在的数组包装为一个 Buffer 对象(无论操作两者中的哪一个,另一个也会变化,因为就是同一个数组)
注意:
这两种创建方式,都是间接的创建了一个缓冲区(间接:是指这个缓冲区在 JVM 堆中)
如何将一个字符串转换为一个ByteBuffer数组
- 使用
ByteBuffer的wrap方法 - 使用
Charset - 创建
ByteBuffer再put字符串的字节数组
1 | String str = "hello"; |
属性及变化
Buffer 有四个重要的属性:
capacity:表示当前 Buffer 的容量大小;创建时指定,创建后不能修改;如果写满了 capacity,那么需要等清空后才能写入数据position:表示当前的位置,初始时为 0,每读写一个数据,position就 + 1,最大为capacity-1limit:指第一个不能被读或写的位置(即可以表示只能读取或写入多少个数据)mark:设置一个标记位,当进行reset时,会将position的值变为mark原本标记的值
四个属性有这样的大小关系:0 <= mark <= position <= limit <= capacity
下面有个图,可以清晰的表示整个过程

结合代码
1 | while (true) { |
直接缓冲区
首先要明确一个概念:只有 byteBuffer 才有资格参与 IO 操作,因为数据全为 01,只有 byteBuffer 存储的才是 01 二进制,所以只有它才能 IO 操作
为什么要用直接缓冲区?
使用非直接缓冲区可能会导致性能损耗(多一次拷贝过程)
假设给一个通道传入了一个非直接缓冲区,那么通道会先创建一个临时的直接缓冲区,将非直接缓冲区的数据复制到临时的直接缓冲区,使用这个临时的直接缓冲区去执行 IO 操作(多一次拷贝,增大了开销)
直接缓存区存在的问题:
- 直接缓存区绕过了 JVM 的堆栈,不受 JVM 控制,所以有可能我们创建的直接缓存区代价会更高
- 直接缓存区分配会较慢,因为 Java 需要调用 OS 的函数进行分配,所以会较慢
如何创建一块直接缓冲区?
调用 ByteBuffer.allocateDirect(1024) 即可
分散读与集中写
这主要是两种思想
分散读就是把原本的数据一次读到多个 buffer 中,集中写就是将多个 buffer 的内容一次性写入
这两种思想分别有两个接口代表:ScatteringByteChannel(有 read 方法)、GatheringByteChannel(有 write 方法)
都可以传入一个 ByteBuffer 的数组
1 | ByteBuffer bf1 = ByteBuffer.allocate(5); |
集中写同理
TCP粘包问题
什么是粘包问题?
- 以接收端来看:因为 TCP 是面向流的协议,所以不会保持输入数据的边界,导致接收端很可能一下子收到多个应用层报文,需要应用开发者自己分开,有些人觉得这样不合理,就起名为粘包
- 以发送端来看:用户数据被 TCP 发送时,会根据 Nagle 算法,将小的数据封装在一个报文段发送出去,导致不同的报文段黏在了一起
如何解决
发送方关闭Nagle方法
- 设置
TCP_NODELAY关闭Nagle算法 - 最好不使用
- 设置
交给应用层:
固定消息长度:一条消息就发送一个固定大小的数据,然后填充空白
- 缺点:浪费带宽
格式化数据:每条数据有固定的格式(开始符,结束符),这种方法简单易行
- 缺点:但是这种方法需要保证传输的字符中没有该开始符与结束符,而且因为需要挨个遍历,传输速度不是很快
TLV 格式:即 Type 类型、Length 长度、Value 数据,在类型和长度已知的情况下就可以方便的知道消息大小,分配合适的 buffer
- 缺点:buffer 需要提前分配,如果分配过大,影响 server 的吞吐量
- HTTP1.1 是 TLV 格式(先传输类型)
- HTTP2.0 是 LTV 格式(先传输长度)
此处举一个例子:(这里属于格式化数据的一种方式,使用到
\n作为分隔符)网络上有多条数据发送给服务端。数据之间使用
\n进行分隔,但由于某种原因这些数据在接收时,被进行了重新组合。例如原始数据有 3 条为
1
2
3Hello,world\n
I"m zhangsan\n
How are you?\n变成了下面的两个
byteBuffer(黏包,半包)1
2Hello,world\nI" m zhangsan\nHo
w are you?\n如何解决?(但这种方法需要对字节挨个遍历,所以效率不是很高)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
// 模拟粘包
source.put("Hello,world\nI' m zhangsan\nHo".getBytes());
spilt(source);
source.put("w are you?\n".getBytes());
spilt(source);
}
// 处理粘包
private static void spilt(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
if(source.get(i) == '\n'){// 带索引的get方法不会改变position的值
int len = i + 1 - source.position();
// 计算要分配的缓冲长度
ByteBuffer target = ByteBuffer.allocate(len);
for (int j = 0; j < len; j++) {
target.put(source.get());
}
}
}
source.compact();
// 注意这里不能用clear
}
Channel
Channel:通道,类似于IO流,但是与流不同的是Channel是双向的
IO流:如 InputStream、OutputStream 要么输入(读),要么输出(写)
但是 Channel 是双向的,实现类如下
FileChannel:从文件中读写数据DatagramChannel:通过 UDP 读写网络数据SocketChannel:通过 TCP 读写网络数据ServerSocketChannel:可以监听新进来的 TCP 连接,就像 Web 服务器一样,对每一个新连接都会建立一个SocketChannel
特点:
- 通道的操作是双向的(可以只读、可以只写、还可以读写)
- 通道可以操作的数据种类很多(可以是文件 IO、网络 Socket 都可以操作)
- 通道的操作是异步的
- 不能直接访问通道,需要与 Buffer 合作(通道读必须从一个 Buffer 读、通道写必须写到一个 Buffer 内)
- 通道不能复用!关闭了就没了
具体在 Java 中,就是一个接口,有两个方法
1 | public interface Channel extends Closeable { |
FileChannel
用来对文件进行操作的通道
获取FileChannel
获取 FileChannel 的方式:(不能 new 来获取,有三种获取方式)
- 获取
FileChannel可以使用RandomAccessFile这个类 - 也可以使用
InputStream(这样创建的 FileChannel 只能读) - 也可以使用
OutputStream(这样创建的 FileChannel 只能写)
读到Buffer中
FileChannel 读取数据到Buffer中
- 通道的
read()方法,必须读入到一个缓存中,而且会返回读取到的字节数(或者说是,缓冲区中的字节数有多少) - 需要在缓冲的
buffer.hasRemaining()的循环中使用 - 如果读完,会返回 - 1
Demo
1 | // 此文件有21字节,设置模式为读写均可 |
读文件大致Demo
1 | try (RandomAccessFile rw = new RandomAccessFile("D:\\temp.txt", "rw")){ |
写到通道中
- write方法,需要传入缓冲
- 需要在缓冲的
buffer.hasRemaining()的循环中使用
1 | RandomAccessFile rw = new RandomAccessFile("D:\\temp.txt", "rw"); |
其他方法
positon():返回从文件开头位置到当前位置的字节数position(long):设置当前位置(注意:如果指定的位置超过了文件当前的位置,然后进行了写入,会导致文件中间部分没有被写(文件空洞))size():返回文件大小truncate(long):截取前指定大小的数据,后面的数据将会被删除
1 | RandomAccessFile rw = new RandomAccessFile("D:\\temp.txt", "rw"); |
force与RandomAccessFile
force(boolean)方法:- 此方法是为了保证对文件的所有更新操作都写回磁盘
- 参数为
false表示只写回文件内容 - 参数为
true表示既要写回文件内容,又要写回元数据信息(即文件的权限信息等)
RandomAccessFile(fileName, mode):传入一个文件的路径以及一个模式,此处的模式有四种r:只读,如果调用 write 会抛出异常rw:可读可写;如果文件不存在,将会被创建rws:可读可写,并且所有的更新操作每次都会被同步的写入磁盘,会写入文件内容以及元数据(即带有force(true)的 rw 模式)rwd:可读可写,并且所有的更新操作每次都会被同步的写入磁盘,会写入文件内容(即带有force(false)的 rw 模式)
通道间通信
通道之间可以传输数据,这两个方法仅仅是方向不同而已
(通道之间传输数据没有用到缓存,不知道底层实现是否用到了缓存)
transferFrom(ReadableByteChannel src, long position, long count):三个参数,第一个传入另外一个通道,第二个为起始位置,第三个传入想要传输的字节数
1 | RandomAccessFile rw1 = new RandomAccessFile("d:\\temp.txt", "rw"); |
注意:transferTo 方法是有上限的(最大为 2G,超过这个范围,那么只会传输 2g 的内容,剩下的数据就不会再传输了)
可以这样实现传输大于 2g 的内容:
1 | try ( |
Socket通道
此处的 Socket 通道泛指了三个实现了 AbstractSelectableChannel 的类:SocketChannel、DatagramChannel、ServerSocketChannel
为什么引入Socket通道?
传统的 Socket 会为每个 Socket 连接创建一个线程,但是 Socket 通道仅仅只开辟一个或几个线程就可以提供成百上千的服务,大大提高了性能!
好处:
1、节省了线程切换的上下文开销
2、便于管理
Socket 通道的特点
ServerSocketChannel只负责: (不负责读写)- 监听传入的连接
- 创建
SocketChannel对象
DatagramChannel与SocketChannel负责真正的读写操作- Socket 通道可以被重复使用
Socket 通道可以设置为非阻塞模式
ServerSocketChannel
ServerSocketChannel:可以理解为一个实现了非阻塞模式的 ServerSocket
特点:
socket():可以获得ServerSocket对象,然后调用其bind()方法去绑定一个端口(注意:ServerSocketChannel本身没有bind方法!!(JDK1.7 有了 bind 方法,默认绑定本地地址))ServerSocketChannel有accept()方法,会返回一个SocketChannel对象;但是使用ServerSocket的accept()方法就还是阻塞的- 如果返回的对象为
null,说明当前没有连接
- 如果返回的对象为
- 因为继承了
AbstractSelectableChannel类,所以可以设置为非阻塞模式
ServerSocketChannel 对象的创建:
不能 new,需要调用 ServerSocketChannel 的静态方法 open()
核心API
accepct():方法是阻塞方法(如果不设置为非阻塞)bind():绑定一个端口configureBlocking(boolean):默认为 true(阻塞模式)
使用单线程 + Channel 实现服务器:
1 | // 0. byteBuffer |
缺点:我们使用了死循环来一次次判断连接情况(这样也会浪费 CPU 的资源)
因此有了 Selector 来解决这个问题(使用 Seletor 后,代码的逻辑会发生变化)
ScoketChannel
SocketChannel:用来连接到TCP套接字的通道
特点:
- 实现了可选择通道,可以被多路复用
- 基于 TCP 传输协议
- 支持两种模式:阻塞与非阻塞(同样也是通过
configureBlocking调节)
创建SocketChannel
ServerSocket对象的accept方法会返回SocketChannel对象- 直接使用
SocketChannel.open()也可以创建 SocketChannel 对象
核心API
configureBlocking(boolean):默认为 true(阻塞模式)read(ByteBuffer):读write(ByteBuffer):写
Selector
提供了一种选择执行已经就绪的任务的能力
允许单线程处理多个通道,大大提高了效率
原理
- 每个Channel都在Selector上注册
- 注册后会返回一个选择键,选择键代表了当前通道是否已经就绪的信息
- 每执行一次select方法,都会更新所有的选择键,然后选择一个已经就绪的通道

注意:
- 这些 Channel 必须是
SelectableChannel的子类(比如:FileChannel就不是其子类,也就不能被选择)
有关的类
Selector:选择器类负责管理在此注册的通道的集合信息以及他们的就绪状态SelectableChannel:可选择通道,是一个抽象类,继承这个类的类为可以进行选择的类(FileChannel 就没有继承这个类)SelectionKey:选择键类,封装了通道与选择器之间的注册关系,含有两个比特集(一个代表注册关系所关心的通道操作,一个代表通道已经就绪的操作)
建立Selector系统
创建 Selector:不能 new,需要调用 Selector.open() 方法
2、设置通道为非阻塞(只有非阻塞的通道才能注册到选择器)
3、通道注册:调用 register(selector, OP) 进行注册,两个参数,第一个为选择器,第二个参数为想让选择器关心的操作,会返回一个 SelectionKey 对象
此处可以设置的关心的操作 OP 有:
1 | SelectionKey.OP_READ = 1<<0; // 是否有可读的通道就绪 |
如果我们想要关心多个操作,可以通过 | 位或运算符将 OP 连接起来
1 | channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE) |
4、select () 轮询:查看是否有就绪的通道,会返回当前就绪通道的数量
Selector三个集合
keys():此集合保存已注册的键(此集合不能修改!)selectedKeys():此集合保存关心事件发生的键,此集合我们只能移除,不能添加(添加是自动进行的)- 注意这个集合的特点:会自动添加集合,但是不会自动删除集合
- 因此我们在处理完成后要主动
remove掉,要不然会报异常
- 已取消键的集合,使用
cancel()方法后的键都会放在这里
Selector 的核心就是
select方法,它的执行过程为:
- 检查已取消集合:如果集合非空,就将集合内所有的键从另外两个集合中移除,然后注销其相关的通道
- 检查 selectedKeys 集合:确定每个通道所关心的操作是否已经就绪
- 返回值:返回上一次调用 select 后进入就绪状态的通道的数量
SelectionKey
选择键: 表示通道与 Selector 之间的注册关系,注册一个通道就会返回一个 SelectionKey
相关 API:
channel:返回对应的通道selector:返回通道注册的选择器cancel:取消注册关系isValid():判断注册关系是否有效interestOps():以整数的形式,返回所关心操作的 bit 掩码,可以用此来判断选择器是否关心通道的某个操作
1 | Boolean isAccept = interestOps & SelectionKey.OP_ACCEPT == SelectionKey.OP_ACCEPT; |
isAccept()等操作:上面的这种方式判断太麻烦了,API 直接就有相关判断方法
关于Selector
select 方法什么时候会不阻塞,向下执行?
总共用如下几种情况:
- 发生事件时
- 客户端发起连接请求,触发 accept 事件
- 客户端发送数据、正常关闭、异常关闭都会触发 read 事件
- 如果要发送的数据大于缓冲区,会触发多次 read 事件
- channel 可写,触发 write 事件
- 在 Linux 下 nio bug 发生时
- 调用
selector.wakeup() - 调用
selector.close() - selector 所在线程
interrupt
事件可以不处理吗?
不可以,事件要么就 remove,要么就 cancel,不可以不进行处理
如果不进行处理,select 每次都会返回这个事件,白白浪费 CPU 的资源
用完 key 为什么要 remove?
elector 维护的集合 SelectedKeys 添加是自动的,删除需要我们手动进行,一旦有我们关心的事件发生,那么 Selector 就会将此 key 添加到这个集合内
如果我们用完 key,没有 remove,那么此集合内就还会存在这个 key,在使用这个 key 进行操作的时候,就可能会出现异常(比如空指针异常)
注意:remove 只是将这个集合内的该键删除了,如果对应的事件还会继续发生,那么这里光删除集合内的 key 是没有用的,应该 cancel 掉对应的事件
处理read事件要注意的事情
【一】处理好客户端的正常与异常断开
正常断开依靠 read 方法返回值
异常断开依靠 catch 抓住异常后,将此 key 取消掉
1 | try{ |
【二】处理好消息边界
例如我们开辟一块 buffer 为 4 字节的缓冲区,客户端传输两个汉字 “中国”,默认字符集 UTF-8 对于 1 个汉字占用 3 个字节,就要处理好消息边界的问题!
(如何解决消息边界问题下一节详细介绍)
处理write事件要注意的事情
我们发送的数据可能不是一次性发送的!
1 | ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString()); |
这样处理虽然可以保证数据全部发完,但是不符合 NIO 非阻塞的思想
好的处理方式应该是,如果我们一次性发不完,可以先去处理其他事情(避免一直发送导致发送缓冲区满,导致轮询)
完整代码如下:
1 | while (true){ |
处理消息边界
比如粘包问题
异常情况
消息边界:发送一个消息可能有三种异常情况
- buffer的大小不足以放下一个完整的消息。
- 这种情况下,只能将 buffer 扩容
buffer空间足够,但是只放了 1.5 个消息,剩下的半个消息我们需要进行拼接(在 Buffer 一节,我们介绍了一个利用\n来截断消息的步骤)
如果 buffer 空间不足(即一条消息的大小就超过了 buffer),nio 是如何进行处理的呢?
例如:buffer 空间为 8 字节,传输一条 10 字节的消息
服务器收到 read 事件,将数据读到 8 字节的缓存中,然后打印输出
然后,服务器会再次收到 read 事件,将剩下 2 个字节读入,打印输出
(即:服务器会变为多次 read 事件读取消息,因此对于这种情况我们需要扩容)
如何扩容buffer
buffer 不能是局部变量
最开始我们的代码是这样的,但是如果我们想要给一个 buffer 扩容,buffer 就不能是一个局部变量
1 | //收到read事件 |
那么,不能是局部变量,我提到最外面可以不可以呢?
不可以,我们要保证一个线程的 buffer 是私有的,如果我们提到最外面,那么多个线程操控一个 buffer,这就乱套了(线程不安全)
因此我们要借助附件 attachment
注意:附件可以保证每一个线程间是私有的
regiser() 传入的第三个参数就是附件,如果要获取附件,可以调用 key 的 attachment() 方法
1 | if (key.isAcceptable()) { |
什么时候进行扩容呢?
当然是 buffer 空间不够的时候,之前我们自己实现的 split() 方法最后是使用的 compact() 方法去完成拼接
如果一个消息大于 buffer,那么他的 \n 的索引依然会大于 limit
比如:
1 | buffer大小8字节 |
因此,扩容的时机就是 position==limit
1 | SocketChannel channel = (SocketChannel) key.channel(); |
这样两次 read 事件,就能把消息拼接到一起了!
1 | buffer大小8字节 |
此处我们的扩容简单的进行了翻倍扩容,这是比较简单的实现,在 Netty 中,就设计的比较优秀了,此处我们只是抛砖引玉
buffer 大小如何抉择有两种思路:
ByteBuffer 需要设计为可变的 buffer:
- 思路一:首先分配一个较小的 buffer,如果不够用,再分配一个更大的 buffer,并将原本的 buffer 数据 copy 到新 buffer 内
- 优点:消息连续好处理
- 缺点:数据 copy 消耗性能
- 思路二:用多个数组组成 buffer,一个数组不够,就将多出来的内容写入新的数组
- 优点:避免了 copy 消耗性能
NIO服务器Demo
1 |
|
多线程优化
如何优化
前面我们都是使用单线程,即将所有的事件全部放在一个线程内进行操作,不能发挥多线程的优势
所以我们可以进行多线程优化:(这也是 Netty 的实现原则)
优化的手段
可以将 ACCEPT 请求与读写请求分开,如图:

有两类线程:
- boss 线程:专门负责处理
ACCEPT事件 - worker 线程:处理
READ与WRITE事件
boss 线程只有一个,而 worker 线程可以根据服务器的 CPU 核心数来确定,并且可以加入负载均衡策略,让每一个 worker 线程一起出力。
优化手段总结:
1、按事件分类,分给不同的线程执行
2、worker 间实现负载均衡
实现逻辑
boss 线程一开始就有,worker 线程是慢慢创建的,具体的逻辑如下

值得注意的是:要让 select 与注册全让 worker 线程来执行,通过消息队列 +wakeup() 方法 来进行传输消息(即我标为黄色的部分)
这样可以避免出现执行顺序不正确带来的顺序问题。
代码实现
1 |
|