Java NIO之初始

在读完第一遍《Java NIO》之后,对于NIO的相关API有了一定的了解,可是一直不管理同步非阻塞的原因。本打算再读一遍加深理解,可是对其原理一直想不通,所性就先深入学习了一下NIO的同步非阻塞的原理。在不断的深入研究过程中,从API到JVM,再到Linux,一点一点追溯下去,着实很有乐趣。

这里主要说的网络IO,NIO和普通IO的优势主要体现在服务端进行高并发通信上面,在点对点交换数据的时候不见得比传统IO快。

关于文件IO对于阻塞的要求不是很高,所以在是否阻塞方面不用太关心。不过NIO在COPY文件的时候相对传统IO来书实现了内存映射。

阻塞IO

对于Java NIO最主要的一点就是要理解非阻塞的原理。首先我们来看一下普通IO对阻塞问题。
BuffSocketServer类中一共有三个启动服务的方法,分别是server1、server2、server3。

  1. server1方法中,监听了8888端口,每当获取到新的socket连接,就会从该socket中读取InputStream流,并且不断的读取流。这里有几个点说明下:
  • serverSocket.accept()方法是个阻塞方法,当没有连接时会一直阻塞的这儿,当新的连接创建时,返回该连接,程序往下执行。
  • 当获取到一个连接,即socket时,使用输入流相关的API从该socket中读取内容。
  • in.readLine(),这个使用的是readLine方法,这个方法只有碰到换行符或者回车符才返回,否则也阻塞在这儿。

当我们启动server1方法后,使用客户端发送数据时,可以正常接收消息。如果再次运行客户端,服务端却并没有打印第二个客户端发送的数据。这是因为在server1中,serverSocket.accept()只执行了一次,当第二个客户端创建连接时,accept方法却没有机会得到执行,所以也就没法获取到第二个客户端的连接,也就无从接收消息了。所以从始至终,readLine读取的都是从第一个客户端的socket连接中获取的数据。

  1. server2是对server1的“改良”版本,我们把serverSocket.accept()方法放在while(true)中,这样就可以不断的获取新的连接了。但是有个问题就是每当获取一个新的连接,只能从该连接中获取一次数据,即readLine只能执行一次。执行完就进行下一次循环,重新获取连接,可能重新阻塞。

  2. 那有没有什么办法同时处理多个客户端连接呢,这时理所应当的想到线程,也就是server3的实现方式。

那有没有不使用线程的方式来实现多个客户端连接处理呢?说实话,真没有,因为JDK提供的io相关的API接口,限制了住了没法不用线程。除非JDK提供新的API,这也就是接下来要说的NIO接口了。说了这么多,其实就是要把普通IO的局限性描述清楚,只为更好的理解NIO的特性,以及NIO解决了什么问题。

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.buff.bio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @Author: Buff
* @Description:
* @Date: Created in 2019-11-21 23:08
*/
public class BuffSocketServer {
public static void main(String[] args) {
//server1();
//server2();
server3();
}

private static void server1() {
try {
ServerSocket serverSocket = new ServerSocket(8888);
Socket socket = serverSocket.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while (true) {
String str = in.readLine();
System.out.println(socket + " 收到的请求内容为:" + str);
}
} catch (IOException e) {
e.printStackTrace();
}
}


private static void server2() {
try {
ServerSocket serverSocket = new ServerSocket(8888);
while (true) {
Socket socket = serverSocket.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String str = in.readLine();
System.out.println(socket + " 收到的请求内容为:" + str);
}
} catch (IOException e) {
e.printStackTrace();
}
}

private static void server3() {
try {
ServerSocket serverSocket = new ServerSocket(8888);
while (true) {
Socket socket = serverSocket.accept();
new Thread(() -> {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while(true) {
String str = in.readLine();
System.out.println(socket + " 收到的请求内容为:" + str);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();

}
} catch (IOException e) {
e.printStackTrace();
}
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.buff.bio;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;

/**
* @Author: Buff
* @Description:
* @Date: Created in 2019-11-21 23:08
*/
public class BuffSocketClient {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 8888);
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
while (true) {
out.println(Math.random());
Thread.sleep(3000);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}

非阻塞IO

这理不会对NIO的相关API作过多的介绍,主要是探究一下NIO的实现原理。下边就是NIO的实现方式,将多个Socket连接以SocketChannel的形式注册到Selector中,每个Socket的数据都是由一个线程进行处理。当然也可以使用多个线程。关键在于一个线程就可以处理。如果是普通IO则无法实现这种需求。

Channel本身持有socket,也可以理解为Channel封装的socket,但是比socket提供更多的操作。其实现是通过底层API支持的。大致流程就是将Channel关心的事件注册到Selector上,然后不断轮询Selector上已经发生的事件,如果事件就绪,则进行相应的数据操作,而数据是通过Channel对应的缓存Buffer进行读写的。

下边是使用NIO的API实现的服务端与客户端,服务器在使用单线程的情况下,可以处理不同的客户端请求,并没有因为多个客户端没有处理完而发生阻塞。
Java NIO服务端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package com.buff.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
* @Author: Buff
* @Description:
* @Date: Created in 2019-11-22 20:43
*/
public class BuffNIOServer {
private int port;
/*标识数字*/
private int flag = 0;
/*缓冲区大小*/
private int BLOCK = 4096;
/*接受数据缓冲区*/
private ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
/*发送数据缓冲区*/
private ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
private Selector selector;

public BuffNIOServer(int port) throws IOException {
this.port = port;
}

public void start() throws IOException {
// 打开服务器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 服务器配置为非阻塞
serverSocketChannel.configureBlocking(false);
// 检索与此通道关联的服务器套接字
ServerSocket serverSocket = serverSocketChannel.socket();
// 进行服务的绑定
serverSocket.bind(new InetSocketAddress(port));
// 通过open()方法找到Selector
selector = Selector.open();
// 注册到selector,等待连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 选择一组键,并且相应的通道已经打开
selector.select();
// 返回此选择器的已选择键集。
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
handleKey(selectionKey);
}
}
}

// 处理请求
private void handleKey(SelectionKey selectionKey) throws IOException {
// 接受请求
ServerSocketChannel server = null;
SocketChannel client = null;
String receiveText;
String sendText;
int count = 0;
// 测试此键的通道是否已准备好接受新的套接字连接。
if (selectionKey.isAcceptable()) {
// 返回为之创建此键的通道。
server = (ServerSocketChannel) selectionKey.channel();
// 接受到此通道套接字的连接。
// 此方法返回的套接字通道(如果有)将处于阻塞模式。
client = server.accept();
// 配置为非阻塞
client.configureBlocking(false);
// 注册到selector,等待连接
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
// 返回为之创建此键的通道。
client = (SocketChannel) selectionKey.channel();
//将缓冲区清空以备下次读取
receivebuffer.clear();
//读取服务器发送来的数据到缓冲区中
count = client.read(receivebuffer);
if (count > 0) {
receiveText = new String(receivebuffer.array(), 0, count);
System.out.println(client.getRemoteAddress()+"------服务器端接受客户端数据--:" + receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
} else if (selectionKey.isWritable()) {
//将缓冲区清空以备下次写入
sendbuffer.clear();
// 返回为之创建此键的通道。
client = (SocketChannel) selectionKey.channel();
sendText = "message from server--" + flag++;
//向缓冲区中输入数据
sendbuffer.put(sendText.getBytes());
//将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
sendbuffer.flip();
//输出到通道
client.write(sendbuffer);
System.out.println("服务器端向客户端发送数据--:" + sendText);
client.register(selector, SelectionKey.OP_READ);
}
}

/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
BuffNIOServer server = new BuffNIOServer(8899);
server.start();
}
}

Java NIO客户端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package com.buff.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
* @Author: Buff
* @Description:
* @Date: Created in 2019-11-22 20:49
*/
public class BuffNIOClient {
private String hostname;
private int port;

/*标识数字*/
private static int flag = 0;
/*缓冲区大小*/
private static int BLOCK = 4096;
/*接受数据缓冲区*/
private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
/*发送数据缓冲区*/
private static ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);

public BuffNIOClient(String hostname, int port) {
this.hostname = hostname;
this.port = port;
}

public void send() throws IOException {
// 打开socket通道
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞方式
socketChannel.configureBlocking(false);
// 打开选择器
Selector selector = Selector.open();
// 注册连接服务端socket动作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 连接
socketChannel.connect(new InetSocketAddress(
hostname, port));
// 分配缓冲区大小内存
Set<SelectionKey> selectionKeys;
Iterator<SelectionKey> iterator;
SelectionKey selectionKey;
SocketChannel client;
String receiveText;
String sendText;
int count = 0;

while (true) {
//选择一组键,其相应的通道已为 I/O 操作准备就绪。
//此方法执行处于阻塞模式的选择操作。
selector.select();
//返回此选择器的已选择键集。
selectionKeys = selector.selectedKeys();
//System.out.println(selectionKeys.size());
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
System.out.println("client connect");
client = (SocketChannel) selectionKey.channel();
// 判断此通道上是否正在进行连接操作。
// 完成套接字通道的连接过程。
if (client.isConnectionPending()) {
client.finishConnect();
System.out.println("完成连接!");
sendbuffer.clear();
sendbuffer.put("Hello,Server".getBytes());
sendbuffer.flip();
client.write(sendbuffer);
}
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
client = (SocketChannel) selectionKey.channel();
//将缓冲区清空以备下次读取
receivebuffer.clear();
//读取服务器发送来的数据到缓冲区中
count = client.read(receivebuffer);
if (count > 0) {
receiveText = new String(receivebuffer.array(), 0, count);
System.out.println("客户端接受服务器端数据--:" + receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}

} else if (selectionKey.isWritable()) {
sendbuffer.clear();
client = (SocketChannel) selectionKey.channel();
sendText = "message from client--" + (flag++);
sendbuffer.put(sendText.getBytes());
//将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
sendbuffer.flip();
client.write(sendbuffer);
try {
Thread.sleep(3 * 1000);
} catch (InterruptedException e) {
}
System.out.println("客户端向服务器端发送数据--:" + sendText);
client.register(selector, SelectionKey.OP_READ);
}
}
selectionKeys.clear();
}
}

public static void main(String[] args) throws IOException {
BuffNIOClient client = new BuffNIOClient("localhost", 8899);
client.send();
}
}

关键问题

Selector是怎么知道每次是哪个客户端呢

在看服务端代码时,一直不明白Selector是怎么知道每次是哪个客户端呢?这个其实是API维护着一个集合,保存着Channel信息,Channel中一定持有Socket,而每个Socket和文件描述符(FD)一一对应。我们知道在Linux中,一切皆文件。

Seclect内部存在三个集合来管理SelectionKey。监听事件通道集合(publicKeys),通道事件就绪集合(publicSelectedKeys),取消监听通道(cancelKeys)

  1. 通过SelectableChannel.register方法可以将Channel通道封装成SelectionKey对象添加到Seclect内部publicKeys集合中。
  2. 通过Seclect.keys方法获取集合publicKeys集合,但无法手动修改。
  3. 如果某个通道的事件到达,会将通道对应SelectionKey添加到publicSelectedKeys集合中。用户线程观测SelectionKey集合中SelectionKey对已到达的事件作处理。
  4. 通过selectedKeys方法获取publicSelectedKeys集合,每次通道处理需要手动删除。避免重复处理。
  5. 如果通道事件到达,用户未处理就将SelectionKey从publicSelectedKeys集合中删除。那么下一次调用select方法时会重新将从publicSelectedKeys集合添加到publicSelectedKeys集合中。
  6. 如果通道事件到达,用户处理后会在下一次调用select方法时将通道对应的SelectionKey从publicKeys集合删除

事件是怎么触发的

在代码中可以看到在while(true)中,一直不断运行着selector.select()方法,不断的去获取事件。那这个事件是怎么生产的呢,怎么通过select()方法返回的呢?通过源码可以看到调用栈select()->select(long timeout)->lockAndDoSelect(long timeout)->doSelect(long timeout)->poll()。这个过程中有着复杂的并发处理,这里只看下poll()方法。poll调用的是Native方法,其参数描述如下,可以看到参数是文件描述符相关,文件描述符代表着一个Channel或者说是Socket。返回参数readFds和exceptFds就可以告诉我们哪些Channel有读事件,哪些有写事件。至此我们也就明白了事件的生产来源,他是由本地方法或者说操作系统告诉我们的。

参数 含意
pollAddress FD数组内存起始地址
numfds 待监听的FD数量
readFds 用于接收发生可读事件的FD
exceptFds 用于接收发生可写事件的FD
timeout 超时等待时间

关于文件描述符和poll

关于文件描述符和poll,能力有限就不作介绍了,其中的要了解的东西太多了,越深入越远,太费时间精力了。不过相关的知识多了解一下,对自己理解有非常大的帮助的,就像你看一到一辆汽车,大概知道原理,你能明白为啥能跑起来。但如果问下你鬼是个什么东西,你就会非常懵逼,就是因为我们一都不了解,所以才被问的很茫然。下边提供一些对自己有帮助的博文供大家参考,有JavaNIO相关的源码解读,也有文件描述符、poll/epoll、管道等概念的相关文章。

这里对原理有个基本认识,后续首先学习下NIO的API知识,然后再对源码学习,其实最想了解的是IO底层原理,包括对文件句柄,文件描述符,select/poll/epoll原理,管道,文件系统,这些概念的系统性认识。

[浅谈文件描述符及文件系统] https://blog.csdn.net/lvyibin890/article/details/78814612

[linux中文件描述符fd和文件指针flip的理解]https://www.cnblogs.com/Jezze/archive/2011/12/23/2299861.html

[Linux文件描述符获取方法及详细介绍,这里让你快速学习 ]https://blog.csdn.net/qq_40663274/article/details/83904069

[文件系统编程之文件描述符]https://blog.csdn.net/qq_40663274/article/details/83904069

[poll()函数学习笔记]https://blog.csdn.net/zed_killer/article/details/82771338

[使用eventfd创建一个用于事件通知的文件描述符]https://www.jianshu.com/p/57cc1d7d354f

[“基础 – 事件触发机制”]https://www.cnblogs.com/qq120848369/p/3656644.html

[select、poll和epoll的总结对比]https://blog.csdn.net/qq_35976351/article/details/85228002

[【Java.NIO】Selector,及SelectionKey]https://blog.csdn.net/robinjwong/article/details/41792623

[选择器02 Selector原理和使用]https://www.jianshu.com/p/83aed59b4f92

[Java NIO wakeup实现原理]https://my.oschina.net/7001/blog/1509533

[Selector源码深入分析之Window实现(上篇)]https://my.oschina.net/7001/blog/1590939

[Selector源码深入分析之Window实现(下篇)]https://my.oschina.net/7001/blog/1591042