NIO

简介

NIO:可以叫做Non - Blocking IO,也可以成为New IO,非阻塞只是NIO的特点之一

优点

不用NIO情况下如何实现一个服务器

法一:多线程

  • 特点:一个Socket连接,使用一个线程管理
  • 有点:一对一服务,适合连接数少的场景
  • 缺点:
    • 内存占用高
    • 现成上下文切换成本高

法二:线程池

  • 特点:
  • 使用线程池思想,控制线程数量
  • 优点:避免了创建大量的线程导致内存占用过高的问题,适合短连接场景
  • 缺点:属于阻塞模式(即一个线程同时只能给一个 socket 连接提供服务),线程的利用率不高

法三:使用 NIO 的 Selector

  • 特点:用一个线程管理 Selector,Selector 去管理注册在此的通道(下面会详细介绍)
  • 优点:适合连接数多,但是流量低的场景(low traffic)

示意图

简单了解

JavaNIO由三个核心组件构成

  • Channel
  • Buffer
  • Selector

除了这三个还有别的如Pipe、FileLock等等

NIO与BIO对比

  1. BIO 是面向的;NIO 面向缓冲区
  2. BIO 只能向后读,不能向前读;NIO 可以向前也可以向后读
  3. BIO 是单向的,一端要么读要么写;NIO 中通道是双向的,一端可读可写
  4. BIO 是阻塞的;NIO 是非阻塞

大致过程

image-20240913112720081

Buffer

Buffer缓冲区:可以理解为一个数组,通道必须与Buffer一起使用

具体的实现类:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

API

核心API:

  • put(value):将 value 写入缓存(从当前的 position 开始写入,并增加 position 的值,即相对位置的写入
  • get(arr[]):读取 position 位置下的数据到一个数组中(如果不传参数,默认读取一个字符)
  • hasRemaining():判断 positionlimit 之间的距离,如果还有数据可以处理,那么就返回 true
  • clear() :清空缓冲区(清空的原理并不是真的清空了,只是将 position 的位置为 0,limit 设置为 capacity,即切换回写模式
  • flip():可以将缓冲区由写模式切换到读模式(将 limit 设为 position 的位置,再将 position 设置为 0)
  • compact():压缩,将未读取的数据(即 position 与 limit 之间的数据)向前移动,然后 position 指向最后一个未读取的数据之后,然后 limit 指向 capacity

其他API

  • capacity(); limit() ; position():返回当前 capacity/limit/position 的值
  • mark():标记当前 position 的位置,当调用 reset 方法时,会重置到 mark 标记过的位置(只能在 0-position 之前)
  • remaining():返回 positionlimit 之间的距离
  • reset():重置 positionmark 标记过的位置
  • rewind():将 position 设置为 0(即让 position 回到初始的位置),取消 mark 标记位

创建Buffer

  • allocate(long):
  • 创建一个指定大小的缓冲区;是一个静态方法;(注意:buffer 对象不能 new,只能通过 allocate 分配)
  • warp():把已存在的数组包装为一个 Buffer 对象(无论操作两者中的哪一个,另一个也会变化,因为就是同一个数组)

注意:

这两种创建方式,都是间接的创建了一个缓冲区(间接:是指这个缓冲区在 JVM 堆中)

如何将一个字符串转换为一个ByteBuffer数组

  1. 使用 ByteBufferwrap 方法
  2. 使用 Charset
  3. 创建 ByteBufferput 字符串的字节数组
1
2
3
4
5
6
7
8
String str = "hello";
//1、使用ByteBuffer的wrap
ByteBuffer bf1 = ByteBuffer.wrap(str.getBytes());
//2、使用Charset
ByteBuffer bf2 = StandardCharsets.UTF_8.encode(str);
//3、创建ByteBuffer再放入
ByteBuffer bf3 = ByteBuffer.allocate(16);
bf3.put(str.getBytes());

属性及变化

Buffer 有四个重要的属性

  • capacity:表示当前 Buffer 的容量大小;创建时指定,创建后不能修改;如果写满了 capacity,那么需要等清空后才能写入数据
  • position:表示当前的位置,初始时为 0,每读写一个数据,position 就 + 1,最大为 capacity-1
  • limit:指第一个不能被读或写的位置(即可以表示只能读取或写入多少个数据)
  • mark:设置一个标记位,当进行 reset 时,会将 position 的值变为 mark 原本标记的值

四个属性有这样的大小关系:0 <= mark <= position <= limit <= capacity

下面有个图,可以清晰的表示整个过程

buffer存储过程

结合代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
while (true) {
int len = fc.read(bf);
if (len == -1) {
break;
}
bf.flip();
//1、转换为读模式!将limit设为position,再将position设置为0
while (bf.hasRemaining()){
byte b = bf.get();
//2、每读取一个字节,就将position+1
System.out.print((char) b);
}
bf.clear();
//3、转换为写模式!将position的位置为0,limit设置为capacity
}

直接缓冲区

首先要明确一个概念:只有 byteBuffer 才有资格参与 IO 操作,因为数据全为 01,只有 byteBuffer 存储的才是 01 二进制,所以只有它才能 IO 操作

为什么要用直接缓冲区?

使用非直接缓冲区可能会导致性能损耗(多一次拷贝过程)

假设给一个通道传入了一个非直接缓冲区,那么通道会先创建一个临时的直接缓冲区,将非直接缓冲区的数据复制到临时的直接缓冲区,使用这个临时的直接缓冲区去执行 IO 操作(多一次拷贝,增大了开销)

直接缓存区存在的问题:

  • 直接缓存区绕过了 JVM 的堆栈,不受 JVM 控制,所以有可能我们创建的直接缓存区代价会更高
  • 直接缓存区分配会较慢,因为 Java 需要调用 OS 的函数进行分配,所以会较慢

如何创建一块直接缓冲区?

调用 ByteBuffer.allocateDirect(1024) 即可

分散读与集中写

这主要是两种思想

分散读就是把原本的数据一次读到多个 buffer 中,集中写就是将多个 buffer 的内容一次性写入

这两种思想分别有两个接口代表:ScatteringByteChannel(有 read 方法)、GatheringByteChannel(有 write 方法)

都可以传入一个 ByteBuffer 的数组

1
2
3
4
5
6
7
8
9
10
11
ByteBuffer bf1 = ByteBuffer.allocate(5);
ByteBuffer bf2 = ByteBuffer.allocate(5);
ByteBuffer bf3 = ByteBuffer.allocate(5);

//分散读
try (FileChannel fc = new RandomAccessFile("D:\\temp.txt", "r").getChannel()) {
fc.read(new ByteBuffer[]{bf1, bf2, bf3});
// 这个read方法传入的是一个ByteBuffer数组
// 可以将数据分别读入三个缓冲区中
} catch (IOException e) {
}

集中写同理

TCP粘包问题

什么是粘包问题?

  1. 以接收端来看:因为 TCP 是面向流的协议,所以不会保持输入数据的边界,导致接收端很可能一下子收到多个应用层报文,需要应用开发者自己分开,有些人觉得这样不合理,就起名为粘包
  2. 以发送端来看:用户数据被 TCP 发送时,会根据 Nagle 算法,将小的数据封装在一个报文段发送出去,导致不同的报文段黏在了一起

如何解决

  1. 发送方关闭Nagle方法

    • 设置TCP_NODELAY关闭Nagle算法
    • 最好不使用
  2. 交给应用层:

    • 固定消息长度:一条消息就发送一个固定大小的数据,然后填充空白

      • 缺点:浪费带宽
    • 格式化数据:每条数据有固定的格式(开始符,结束符),这种方法简单易行

      • 缺点:但是这种方法需要保证传输的字符中没有该开始符与结束符,而且因为需要挨个遍历,传输速度不是很快
    • TLV 格式:即 Type 类型、Length 长度、Value 数据,在类型和长度已知的情况下就可以方便的知道消息大小,分配合适的 buffer

      • 缺点:buffer 需要提前分配,如果分配过大,影响 server 的吞吐量
      • HTTP1.1 是 TLV 格式(先传输类型)
      • HTTP2.0 是 LTV 格式(先传输长度)

    此处举一个例子:(这里属于格式化数据的一种方式,使用到 \n 作为分隔符)

    网络上有多条数据发送给服务端。数据之间使用 \n 进行分隔,但由于某种原因这些数据在接收时,被进行了重新组合。

    例如原始数据有 3 条为

    1
    2
    3
    Hello,world\n
    I"m zhangsan\n
    How are you?\n

    变成了下面的两个 byteBuffer(黏包,半包)

    1
    2
    Hello,world\nI" m zhangsan\nHo
    w are you?\n

    如何解决?(但这种方法需要对字节挨个遍历,所以效率不是很高

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public static void main(String[] args) {
    ByteBuffer source = ByteBuffer.allocate(32);
    // 模拟粘包
    source.put("Hello,world\nI' m zhangsan\nHo".getBytes());
    spilt(source);
    source.put("w are you?\n".getBytes());
    spilt(source);

    }

    // 处理粘包
    private static void spilt(ByteBuffer source) {
    source.flip();
    for (int i = 0; i < source.limit(); i++) {
    if(source.get(i) == '\n'){// 带索引的get方法不会改变position的值
    int len = i + 1 - source.position();
    // 计算要分配的缓冲长度
    ByteBuffer target = ByteBuffer.allocate(len);
    for (int j = 0; j < len; j++) {
    target.put(source.get());
    }
    }
    }
    source.compact();
    // 注意这里不能用clear
    }

Channel

Channel:通道,类似于IO流,但是与流不同的是Channel是双向的

IO流:如 InputStreamOutputStream 要么输入(读),要么输出(写)

但是 Channel 是双向的,实现类如下

  • FileChannel:从文件中读写数据
  • DatagramChannel:通过 UDP 读写网络数据
  • SocketChannel:通过 TCP 读写网络数据
  • ServerSocketChannel:可以监听新进来的 TCP 连接,就像 Web 服务器一样,对每一个新连接都会建立一个 SocketChannel

特点:

  • 通道的操作是双向的(可以只读、可以只写、还可以读写)
  • 通道可以操作的数据种类很多(可以是文件 IO、网络 Socket 都可以操作)
  • 通道的操作是异步
  • 不能直接访问通道,需要与 Buffer 合作(通道读必须从一个 Buffer 读、通道写必须写到一个 Buffer 内)
  • 通道不能复用!关闭了就没了

具体在 Java 中,就是一个接口,有两个方法

1
2
3
4
5
6
public interface Channel extends Closeable {
//检测通道是否正常打开
public boolean isOpen();
//关闭这个通道:(如果通道关闭了,对这个通道的操作都会抛出异常)
public void close() throws IOException;
}

FileChannel

用来对文件进行操作的通道

获取FileChannel

获取 FileChannel 的方式:(不能 new 来获取,有三种获取方式

  • 获取 FileChannel 可以使用 RandomAccessFile 这个类
  • 也可以使用 InputStream(这样创建的 FileChannel 只能读)
  • 也可以使用 OutputStream(这样创建的 FileChannel 只能写)

读到Buffer中

FileChannel 读取数据到Buffer中

  • 通道的 read() 方法,必须读入到一个缓存中,而且会返回读取到的字节数(或者说是,缓冲区中的字节数有多少)
  • 需要在缓冲的 buffer.hasRemaining() 的循环中使用
  • 如果读完,会返回 - 1

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 此文件有21字节,设置模式为读写均可
RandomAccessFile rw = new RandomAccessFile("D:\\temp.txt", "rw");
// 使用RandomAccessFile类的getChannel方法获得此文件的通道
FileChannel channel = rw.getChannel();

// 关于buffer的内容先不做介绍,此处为分配一个大小为10字节的缓存
ByteBuffer buffer = ByteBuffer.allocate(10);

// 通道的read方法会返回读取到的字节数,因为我的文件21字节,所以需要读三次
// 第一次:10字节、第二次10字节、第三次1字节
int read = channel.read(buffer);
// 这一步是不会阻塞的!会继续向下执行
while (read != -1){
System.out.println("读取了:"+ read);
buffer.flip();
while (buffer.hasRemaining()){
System.out.println((char)buffer.get());
}
buffer.clear();
read = channel.read(buffer);
}
rw.close();
channel.close();

读文件大致Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
try (RandomAccessFile rw = new RandomAccessFile("D:\\temp.txt", "rw")){
// JDK7 新特性
//在 try 的后边可以增加一个 (),在括号中可以定义流对象,
//那么这个流对象的作用域就在 try 有效,try 中的代码执行完毕,会把流对象自动释放,不用写 finally
FileChannel fc = rw.getChannel();
ByteBuffer bf = ByteBuffer.allocate(10);
while (true) {
int len = fc.read(bf);
if (len == -1) {
break;
}
bf.flip();
while (bf.hasRemaining()){
byte b = bf.get();
System.out.print((char) b);
}
bf.clear();
}
} catch (IOException e) {
e.printStackTrace();
}

写到通道中

  • write方法,需要传入缓冲
  • 需要在缓冲的buffer.hasRemaining()的循环中使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
RandomAccessFile rw = new RandomAccessFile("D:\\temp.txt", "rw");
FileChannel channel = rw.getChannel();

ByteBuffer buffer = ByteBuffer.allocate(15);
buffer.clear();
byte[] bytes = "I love you too".getBytes();
System.out.println(bytes.length);
buffer.put(bytes);

buffer.flip();
while (buffer.hasRemaining()){
int write = channel.write(buffer);
// 返回写入的字节,与读取一样,并不能确定一次会写入多少字节
// 所以需要在循环内执行write方法,并且要用hasRemaining方法判断是否有剩余
System.out.println(write);
}

// 关闭
channel.close();

其他方法

  • positon():返回从文件开头位置到当前位置的字节数
  • position(long):设置当前位置(注意:如果指定的位置超过了文件当前的位置,然后进行了写入,会导致文件中间部分没有被写(文件空洞)
  • size():返回文件大小
  • truncate(long):截取前指定大小的数据,后面的数据将会被删除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
RandomAccessFile rw = new RandomAccessFile("D:\\temp.txt", "rw");
FileChannel channel = rw.getChannel();
long pos = channel.position();
// 初始位置为 0
long size = channel.size();
System.out.println(size);
channel.position(pos+size);
// 指定从什么位置开始,这里指定了当前位置+文件大小,就是文件的末尾开始写
ByteBuffer buffer = ByteBuffer.allocate(30);
buffer.put(" but she don't fond of me".getBytes());
buffer.flip();
while (buffer.hasRemaining()){
channel.write(buffer);
}
rw.close();
channel.close();

force与RandomAccessFile

  • force(boolean) 方法:
    • 此方法是为了保证对文件的所有更新操作都写回磁盘
    • 参数为 false 表示只写回文件内容
    • 参数为 true 表示既要写回文件内容,又要写回元数据信息(即文件的权限信息等)
  • RandomAccessFile(fileName, mode):传入一个文件的路径以及一个模式,此处的模式有四种
    • r:只读,如果调用 write 会抛出异常
    • rw:可读可写;如果文件不存在,将会被创建
    • rws:可读可写,并且所有的更新操作每次都会被同步的写入磁盘,会写入文件内容以及元数据(即带有 force(true) 的 rw 模式)
    • rwd:可读可写,并且所有的更新操作每次都会被同步的写入磁盘,会写入文件内容(即带有 force(false) 的 rw 模式)

通道间通信

通道之间可以传输数据,这两个方法仅仅是方向不同而已

(通道之间传输数据没有用到缓存,不知道底层实现是否用到了缓存)

  • transferFrom(ReadableByteChannel src, long position, long count):三个参数,第一个传入另外一个通道,第二个为起始位置,第三个传入想要传输的字节数
1
2
3
4
5
6
7
8
9
10
RandomAccessFile rw1 = new RandomAccessFile("d:\\temp.txt", "rw");
FileChannel channelFrom = rw1.getChannel();
RandomAccessFile rw2 = new RandomAccessFile("d:\\temp2.txt", "rw");
FileChannel channelTo = rw2.getChannel();
// 这样就将temp的文件写入到了temp2
channelTo.transferFrom(channelFrom, 0, channelFrom.size());
// channelFrom.transferTo(0, channelFrom.size(), channelTo);
// 也可以这么实现,两个方法只不过是方向反了一下
channelFrom.close();
channelTo.close();

注意:transferTo 方法是有上限的(最大为 2G,超过这个范围,那么只会传输 2g 的内容,剩下的数据就不会再传输了)

可以这样实现传输大于 2g 的内容:

1
2
3
4
5
6
7
8
9
10
11
try (
FileChannel fc1 = new RandomAccessFile("D:\\temp.txt", "r").getChannel();
FileChannel fc2 = new RandomAccessFile("D:\\temp1.txt", "rw").getChannel()
) {
// 重点在这里:
long size = fc1.size();
for (long left = size; left > 0 ;) {
left -= fc1.transferTo(size-left, left, fc2);
}
} catch (IOException e) {
}

Socket通道

此处的 Socket 通道泛指了三个实现了 AbstractSelectableChannel 的类:SocketChannelDatagramChannelServerSocketChannel

为什么引入Socket通道?

传统的 Socket 会为每个 Socket 连接创建一个线程,但是 Socket 通道仅仅只开辟一个或几个线程就可以提供成百上千的服务,大大提高了性能!

好处:

1、节省了线程切换的上下文开销

2、便于管理

Socket 通道的特点

  • ServerSocketChannel只负责: (不负责读写)

    1. 监听传入的连接
    2. 创建 SocketChannel 对象
  • DatagramChannelSocketChannel负责真正的读写操作

    • Socket 通道可以被重复使用
  • Socket 通道可以设置为非阻塞模式

ServerSocketChannel

ServerSocketChannel:可以理解为一个实现了非阻塞模式的 ServerSocket

特点:

  • socket():可以获得 ServerSocket 对象,然后调用其 bind() 方法去绑定一个端口(注意:ServerSocketChannel 本身没有 bind 方法!!(JDK1.7 有了 bind 方法,默认绑定本地地址))
  • ServerSocketChannelaccept() 方法,会返回一个 SocketChannel 对象;但是使用 ServerSocketaccept() 方法就还是阻塞的
    • 如果返回的对象为 null,说明当前没有连接
  • 因为继承了 AbstractSelectableChannel 类,所以可以设置为非阻塞模式

ServerSocketChannel 对象的创建:

不能 new,需要调用 ServerSocketChannel 的静态方法 open()

核心API

  • accepct():方法是阻塞方法(如果不设置为非阻塞)
  • bind():绑定一个端口
  • configureBlocking(boolean):默认为 true(阻塞模式)

使用单线程 + Channel 实现服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 0. byteBuffer
ByteBuffer bf = ByteBuffer.allocate(16);
// 1. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);// 设置为非阻塞模式
// 2. 绑定端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
ArrayList<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. SSC建立与客户端的连接,sc与客户端通信
SocketChannel sc = ssc.accept();// 设置为非阻塞,不会再阻塞运行
if (sc != null) {
log.debug("connected... {}", sc);
sc.configureBlocking(false);// sc也设置为非阻塞
channels.add(sc);
}
// 5. 遍历集合处理请求
for (SocketChannel channel : channels) {
int read = channel.read(bf);// 设置为非阻塞
if (read > 0) {
bf.flip();
readBuffer(bf);// 此方法简单的输出了一下buffer(模拟实际操作)
bf.clear();
log.debug("after read...");
}
}
}

缺点:我们使用了死循环来一次次判断连接情况(这样也会浪费 CPU 的资源)

因此有了 Selector 来解决这个问题(使用 Seletor 后,代码的逻辑会发生变化)

ScoketChannel

SocketChannel:用来连接到TCP套接字的通道

特点:

  • 实现了可选择通道,可以被多路复用
  • 基于 TCP 传输协议
  • 支持两种模式:阻塞与非阻塞(同样也是通过 configureBlocking 调节)

创建SocketChannel

  • ServerSocket 对象的 accept 方法会返回 SocketChannel 对象
  • 直接使用 SocketChannel.open() 也可以创建 SocketChannel 对象

核心API

  • configureBlocking(boolean):默认为 true(阻塞模式)
  • read(ByteBuffer):读
  • write(ByteBuffer):写

Selector

提供了一种选择执行已经就绪的任务的能力

允许单线程处理多个通道,大大提高了效率

原理

  1. 每个Channel都在Selector上注册
  2. 注册后会返回一个选择键,选择键代表了当前通道是否已经就绪的信息
  3. 每执行一次select方法,都会更新所有的选择键,然后选择一个已经就绪的通道

选择示意图

注意:

  • 这些 Channel 必须是 SelectableChannel 的子类(比如:FileChannel 就不是其子类,也就不能被选择)

有关的类

  • Selector选择器类负责管理在此注册的通道的集合信息以及他们的就绪状态
  • SelectableChannel可选择通道,是一个抽象类,继承这个类的类为可以进行选择的类(FileChannel 就没有继承这个类)
  • SelectionKey:选择键类,封装了通道与选择器之间的注册关系,含有两个比特集(一个代表注册关系所关心的通道操作,一个代表通道已经就绪的操作)

建立Selector系统

创建 Selector:不能 new,需要调用 Selector.open() 方法

2、设置通道为非阻塞(只有非阻塞的通道才能注册到选择器)

3、通道注册:调用 register(selector, OP) 进行注册,两个参数,第一个为选择器,第二个参数为想让选择器关心的操作,会返回一个 SelectionKey 对象

此处可以设置的关心的操作 OP 有:

1
2
3
4
SelectionKey.OP_READ = 1<<0; // 是否有可读的通道就绪
SelectionKey.OP_WRITE = 1<<2; // 是否有可写的通道就绪
SelectionKey.OP_CONNECT = 1<<2; // 是否有新的通道连接到服务器
SelectionKey.OP_ACCEPT = 1<<3; // 是否有新的连接(只有ServerSocketChannel有这个操作)

如果我们想要关心多个操作,可以通过 | 位或运算符将 OP 连接起来

1
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE)

4、select () 轮询:查看是否有就绪的通道,会返回当前就绪通道的数量

Selector三个集合

  • keys():此集合保存已注册的键(此集合不能修改!)
  • selectedKeys():此集合保存关心事件发生的键,此集合我们只能移除,不能添加(添加是自动进行的)
    • 注意这个集合的特点:会自动添加集合,但是不会自动删除集合
    • 因此我们在处理完成后要主动 remove,要不然会报异常
  • 已取消键的集合,使用 cancel() 方法后的键都会放在这里

Selector 的核心就是 select 方法,它的执行过程为:

  1. 检查已取消集合:如果集合非空,就将集合内所有的键从另外两个集合中移除,然后注销其相关的通道
  2. 检查 selectedKeys 集合:确定每个通道所关心的操作是否已经就绪
  3. 返回值:返回上一次调用 select 后进入就绪状态的通道的数量

SelectionKey

选择键: 表示通道与 Selector 之间的注册关系,注册一个通道就会返回一个 SelectionKey

相关 API:

  • channel:返回对应的通道
  • selector:返回通道注册的选择器
  • cancel:取消注册关系
  • isValid():判断注册关系是否有效
  • interestOps():以整数的形式,返回所关心操作的 bit 掩码,可以用此来判断选择器是否关心通道的某个操作
1
2
Boolean isAccept = interestOps & SelectionKey.OP_ACCEPT == SelectionKey.OP_ACCEPT;
// 判断一下与的结果是否与Accept相等
  • isAccept() 等操作:上面的这种方式判断太麻烦了,API 直接就有相关判断方法

关于Selector

select 方法什么时候会不阻塞,向下执行?

总共用如下几种情况:

  1. 发生事件时
    • 客户端发起连接请求,触发 accept 事件
    • 客户端发送数据、正常关闭、异常关闭都会触发 read 事件
    • 如果要发送的数据大于缓冲区,会触发多次 read 事件
    • channel 可写,触发 write 事件
    • 在 Linux 下 nio bug 发生时
  2. 调用 selector.wakeup()
  3. 调用 selector.close()
  4. selector 所在线程 interrupt

事件可以不处理吗?

不可以,事件要么就 remove,要么就 cancel,不可以不进行处理

如果不进行处理,select 每次都会返回这个事件,白白浪费 CPU 的资源

用完 key 为什么要 remove?

elector 维护的集合 SelectedKeys 添加是自动的,删除需要我们手动进行,一旦有我们关心的事件发生,那么 Selector 就会将此 key 添加到这个集合内

如果我们用完 key,没有 remove,那么此集合内就还会存在这个 key,在使用这个 key 进行操作的时候,就可能会出现异常(比如空指针异常)

注意:remove 只是将这个集合内的该键删除了,如果对应的事件还会继续发生,那么这里光删除集合内的 key 是没有用的,应该 cancel 掉对应的事件

处理read事件要注意的事情

【一】处理好客户端的正常与异常断开

正常断开依靠 read 方法返回值

异常断开依靠 catch 抓住异常后,将此 key 取消掉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try{
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
int read = channel.read(buffer);//如果客户端正常断开,这里会返回-1
// 【处理客户端正常断开】
if(read == -1){
key.cancel();
}else {
...
}
}catch (IOException e){
e.printStackTrace();
// 【处理客户端异常断开】
key.cancel();
}

【二】处理好消息边界

例如我们开辟一块 buffer 为 4 字节的缓冲区,客户端传输两个汉字 “中国”,默认字符集 UTF-8 对于 1 个汉字占用 3 个字节,就要处理好消息边界的问题!

(如何解决消息边界问题下一节详细介绍)

处理write事件要注意的事情

我们发送的数据可能不是一次性发送的!

1
2
3
4
5
6
7
8
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
//【1】 write不能保证一次性就将所有的数据写入,有一个返回值,表示实际写入的字节数
while (buffer.hasRemaining()){
// 【2】返回值代表一次性可以发送的位数
// 这里一次性可以发送多少,涉及到了OS对于TCP的发送缓存与接收缓存的实现
int write = sc.write(buffer);
System.out.println(write);
}

这样处理虽然可以保证数据全部发完,但是不符合 NIO 非阻塞的思想

好的处理方式应该是,如果我们一次性发不完,可以先去处理其他事情(避免一直发送导致发送缓冲区满,导致轮询)

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
while (true){
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
// 向客户端发送消息
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 30000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());

sc.write(buffer);
//【1】 一次发送完最好
if (buffer.hasRemaining()){
//【2】 如果发送不完,就关注写事件
// 注意:要拿到原本关注的事件再|上此事件(+ |都可以)
scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE);
//【3】把没有写完的数据写回,利用附件的形式
scKey.attach(buffer);
}
}else if(key.isWritable()) {
// 【4】从附件取出,继续写
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
sc.write(buffer);
// 【5】如果此次还没有写完,由于我们已经关注了可写事件,所以下一次会继续写
// 【6】清理操作
if(!buffer.hasRemaining()){
// 清掉附件
key.attach(null);
// 去掉关注的事件
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}

处理消息边界

比如粘包问题

异常情况

消息边界:发送一个消息可能有三种异常情况

  1. buffer的大小不足以放下一个完整的消息。
    • 这种情况下,只能将 buffer 扩容
  2. buffer 空间足够,但是只放了 1.5 个消息,剩下的半个消息我们需要进行拼接(在 Buffer 一节,我们介绍了一个利用 \n 来截断消息的步骤)

如果 buffer 空间不足(即一条消息的大小就超过了 buffer),nio 是如何进行处理的呢?

例如:buffer 空间为 8 字节,传输一条 10 字节的消息

服务器收到 read 事件,将数据读到 8 字节的缓存中,然后打印输出

然后,服务器会再次收到 read 事件,将剩下 2 个字节读入,打印输出

(即:服务器会变为多次 read 事件读取消息,因此对于这种情况我们需要扩容)

如何扩容buffer

buffer 不能是局部变量

最开始我们的代码是这样的,但是如果我们想要给一个 buffer 扩容,buffer 就不能是一个局部变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//收到read事件
try {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
// 注意这里的buffer是局部变量
int read = channel.read(buffer);
if (read == -1) {
key.cancel();
} else {
//
spilt(buffer);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
}

那么,不能是局部变量,我提到最外面可以不可以呢?

不可以,我们要保证一个线程的 buffer 是私有的,如果我们提到最外面,那么多个线程操控一个 buffer,这就乱套了(线程不安全)

因此我们要借助附件 attachment

注意:附件可以保证每一个线程间是私有的

regiser() 传入的第三个参数就是附件,如果要获取附件,可以调用 key 的 attachment() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if (key.isAcceptable()) {
//6.1、accept事件
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
// 【1】将buffer以附件的形式存放到key中
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("sc: {}", sc);
} else if (key.isReadable()) {
//6.2、read事件
try {
SocketChannel channel = (SocketChannel) key.channel();
// 【2】从附件中获取buffer
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
int read = channel.read(byteBuffer);
if (read == -1) {
key.cancel();
} else {
spilt(byteBuffer);
byteBuffer.flip();
readBuffer(byteBuffer);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
}
}

什么时候进行扩容呢?

当然是 buffer 空间不够的时候,之前我们自己实现的 split() 方法最后是使用的 compact() 方法去完成拼接

如果一个消息大于 buffer,那么他的 \n 的索引依然会大于 limit

比如:

1
2
3
4
buffer大小8字节
传输一个数据:0123456789\n(11字节)
----------------->
执行spilt()方法,最后buffer内的数据为01234567,此时position为8,limit也为8

因此,扩容的时机就是 position==limit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SocketChannel channel = (SocketChannel) key.channel();
// 从附件中获取buffer
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
int read = channel.read(byteBuffer);
if (read == -1) {
key.cancel();
} else {
System.out.println(byteBuffer);
// 【1】判断是否需要扩容
if(byteBuffer.position() == byteBuffer.limit()){
//【2】 分配一个double容量的buffer
ByteBuffer newByteBuffer = ByteBuffer.allocate(byteBuffer.capacity() * 2);
byteBuffer.flip();
newByteBuffer.put(byteBuffer);//【3】 copy
key.attach(newByteBuffer);// 【4】替换原有的byteBuffer
}
}

这样两次 read 事件,就能把消息拼接到一起了!

1
2
3
4
5
6
7
8
9
10
11
12
buffer大小8字节
传输一个数据:0123456789\n(11字节)
---------解释---------
【第一次read事件】:
执行spilt()方法
buffer1内的数据:01234567
此时position=8, limit = 8
因此创建buffer2,容量为8*2字节,并放入附件中
【第二次read事件】:
buffer2数据为:01234567
spilt()将89\n拼接到buffer2
此时buffer2数据为:0123456789\n

此处我们的扩容简单的进行了翻倍扩容,这是比较简单的实现,在 Netty 中,就设计的比较优秀了,此处我们只是抛砖引玉

buffer 大小如何抉择有两种思路:

ByteBuffer 需要设计为可变的 buffer:

  • 思路一:首先分配一个较小的 buffer,如果不够用,再分配一个更大的 buffer,并将原本的 buffer 数据 copy 到新 buffer 内
    • 优点:消息连续好处理
    • 缺点:数据 copy 消耗性能
  • 思路二:用多个数组组成 buffer,一个数组不够,就将多出来的内容写入新的数组
    • 优点:避免了 copy 消耗性能

NIO服务器Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@Slf4j
public class Server {

public static void main(String[] args) throws IOException {
//1、创建Selector对象
Selector selector = Selector.open();
ByteBuffer bf = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
//2、注册ssc,返回SelectionKey对应关系
SelectionKey sscKey = ssc.register(selector, 0, null);
//3、SelectionKey设置关心的操作
sscKey.interestOps(SelectionKey.OP_ACCEPT);// 这个键只关心其ACCEPT事件

while (true) {
//4、调用select方法,如果没有事件发生,是阻塞的!如果有事件才会恢复执行
selector.select();
//5、处理事件(因为我们涉及到删除,所以要用迭代器遍历)
// 注意:事件必须要处理,要么remove要么cancel,要不然select会一直有这个事件,浪费CPU资源
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
log.debug("key: {}", key);
// 7、要记得删除此事件!可以一开始就删,也可以最后再删除
// 这里一开始就删除了,特别强调!!一定要删除
iterator.remove();
// 6、区分时间类型进行处理
if (key.isAcceptable()) {
//6.1、accept事件
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
// 将buffer以附件的形式存放到key中
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("sc: {}", sc);
} else if (key.isReadable()) {
//6.2、read事件
try {
SocketChannel channel = (SocketChannel) key.channel();
// 从附件中获取buffer
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
int read = channel.read(byteBuffer);
if (read == -1) {
key.cancel();
} else {
System.out.println(byteBuffer);
// 判断是否需要扩容
if(byteBuffer.position() == byteBuffer.limit()){
// 分配一个double容量的buffer
ByteBuffer newByteBuffer = ByteBuffer.allocate(byteBuffer.capacity() * 2);
byteBuffer.flip();
newByteBuffer.put(byteBuffer);// copy
key.attach(newByteBuffer);// 替换原有的byteBuffer
}
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
}
} else if (key.isWritable()) {
//6.3、write事件
}

}
}
}

private static void readBuffer(ByteBuffer bf) {
for (int i = bf.position(); i < bf.limit(); i++) {
System.out.print((char) bf.get());
}
System.out.println();
}

private static void spilt(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
if (source.get(i) == '\n') {// 带索引的get方法不会改变position的值
int len = i + 1 - source.position();
// 计算要分配的缓冲长度
ByteBuffer target = ByteBuffer.allocate(len);
for (int j = 0; j < len; j++) {
target.put(source.get());
}
}
}
source.compact();
// 注意这里不能用clear
}
}

多线程优化

如何优化

前面我们都是使用单线程,即将所有的事件全部放在一个线程内进行操作,不能发挥多线程的优势

所以我们可以进行多线程优化:(这也是 Netty 的实现原则)

优化的手段

可以将 ACCEPT 请求与读写请求分开,如图:

多线程优化

有两类线程:

  • boss 线程:专门负责处理 ACCEPT 事件
  • worker 线程:处理 READWRITE 事件

boss 线程只有一个,而 worker 线程可以根据服务器的 CPU 核心数来确定,并且可以加入负载均衡策略,让每一个 worker 线程一起出力。

优化手段总结:

1、按事件分类,分给不同的线程执行

2、worker 间实现负载均衡

实现逻辑

boss 线程一开始就有,worker 线程是慢慢创建的,具体的逻辑如下

实现逻辑

值得注意的是:要让 select 与注册全让 worker 线程来执行,通过消息队列 +wakeup() 方法 来进行传输消息(即我标为黄色的部分)

这样可以避免出现执行顺序不正确带来的顺序问题。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
// boss负责管理连接事件
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8888));

// 1、创建固定数量的worker并初始化
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger index = new AtomicInteger();
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// ACCEPT事件
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
log.debug("connected...{}", sc.getRemoteAddress());
sc.configureBlocking(false);
//2、关联selector
// 轮流使用worker,达到负载均衡
workers[index.getAndIncrement() % workers.length].initial(sc);
}
}
}
}

/**
* Worker类负责读写操作
*/
static class Worker implements Runnable {
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false;//保证只创建一个
// 作为消息队列,让boss线程给worker线程传消息
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) {
this.name = name;
}

// 初始化线程和selector
public void initial(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this, name);
selector = Selector.open();
start = true;
thread.start();
}
queue.add(() -> {
try {
sc.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup();
// 唤醒selector,让其可以将READ事件注册
}

@Override
public void run() {
while (true) {
try {
selector.select(); // 阻塞
Runnable task = queue.poll();
if (task != null) {
task.run();
// 真正是在这里执行了注册方法 sc.register(selector, SelectionKey.OP_READ, null);
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
int read = channel.read(buffer);
if(read == -1){
key.cancel();
}
buffer.flip();
log.debug("{} executing...", name);
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
buffer.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

零拷贝(未完)