| int num = channel.read(buf); |
上述方法会返回从 Channel 中读入到 Buffer 的数据大小。
提取 Buffer 中的值前面介绍了写操作,每写入一个值,position 的值都需要加 1,所以 position 最后会指向最后一次写入的位置的后面一个,如果 Buffer 写满了,那么 position 等于 capacity(position 从 0 开始)。
如果要读 Buffer 中的值,需要切换模式,从写入模式切换到读出模式。注意,通常在说 NIO 的读操作的时候,我们说的是从 Channel 中读数据到 Buffer 中,对应的是对 Buffer 的写入操作,初学者需要理清楚这个。
调用Buffer的flip()方法,可以从写模式切换到读模式,其实就是重新设置了一下position和limit的值。
| | public final Buffer flip() {
limit = position; // 将 limit 设置为实际写入的数据数量 position = 0; // 重置 position 为 0 mark = -1; // mark 之后再说 return this; } |
对应写操作的一系列put方法,读操作提供了一系列的get()方法:
| // 根据 position 来获取数据
public abstract byte get(); // 获取指定位置的数据 public abstract byte get(int index); // 将 Buffer 中的数据写入到数组中 public ByteBuffer get(byte[] dst) |
附一个经常使用的方法:
| new String(buffer.array()).trim(); |
除了将数据从Buffer读取出来使用,更常见的操作是将写入的数据输出到Channel中,如通过FileChannel将数据写入到文件中,通过SocketChannel将数据写入到网络发送到远程机器等。对应的,这种操作,我们称之为写操作。
| int num = channel.write(buf); |
mark()、reset()除了position、limit、capacity这三个基本属性外,还有一个常用的属性就是mark。
mark用于临时保存position的值,每次调用mark()方法都会将mark设置为当前的position,便于后学需要的时候使用。
| public final Buffer mark() {
mark = position; return this; } |
那到底什么时候用呢?考虑以下场景,我们在 position 为 5 的时候,先 mark() 一下,然后继续往下读,读到第 10 的时候,我想重新回到 position 为 5 的地方重新来一遍,那只要调一下 reset() 方法,position 就回到 5 了。
| public final Buffer reset() {
int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; } |
rewind()、clear()、compact()rewind():会重置position为0,通常用于从头读写Buffer。
| public final Buffer rewind() {
position = 0; mark = -1; return this; } |
clear():相当于重新实例化。
通常,我们会先填充Buffer,然后从Buffer读取数据,之后再重新往里填充新的数据,我们一般在填充之前先调用clear().
| public final Buffer clear() {
position = 0; limit = capacity; mark = -1; return this; } |
compact():和clear()一样的是都是在准备往Buffer中填充新数据之前调用。
clear()会重置几个属性,但是并不会将Buffer中的数据清空,只不过后面写的时候会覆盖之前的数据。
而compact()方法调用之后,会先处理还没有读取的数据,也就是position到limit直接的数据,先将这些数据都移动到左边,然后在这个基础之上再开始写入。此时,limit还是等于capacity,position指向原来数据的右边。
Channel所有的 NIO 操作始于通道,通道是数据来源或数据写入的目的地,主要地,我们将关心 java.nio 包中实现的以下几个 Channel:
FileChannel:文件通道,用于文件的读和写。
DatagramChannel:用于UDP连接的接收和发送
SocketChannel:TCP客户端
ServerSocketChannel:TCP服务端,监听某个端口进来的请求。
Channel 经常翻译为通道,类似 IO 中的流,用于读取和写入。它与前面介绍的 Buffer 打交道,读操作的时候将 Channel 中的数据填充到 Buffer 中,而写操作时将 Buffer 中的数据写入到 Channel 中。
FileChannel
初始化:
| FileInputStream inputStream = new FileInputStream(new File("/data.txt"));
FileChannel fileChannel = inputStream.getChannel(); |
当然了,也可以从RandomAccessFile类中的getChannel来得到FileChannel。
读取文件内容:
| ByteBuffer buffer = ByteBuffer.allocate(1024);
int num = fileChannel.read(buffer); |
写入文件内容:
| ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("随机写入一些内容到 Buffer 中".getBytes()); // Buffer 切换为读模式 buffer.flip(); while(buffer.hasRemaining()) { // 将 Buffer 中的内容写入文件 fileChannel.write(buffer); } |
SocketChannel打开一个TCP链接:
| SocketChannel socketChannel = SocketChannel<br> .open(newInetSocketAddress("127.0.0.1" , 80)); |
当然了,上面的这行代码等价于下面的两行:
| // 打开一个通道
SocketChannel socketChannel = SocketChannel.open(); // 发起连接 socketChannel.connect(new InetSocketAddress("127.0.0.1", 80)); |
SocketChannel 的读写和 FileChannel 没什么区别,就是操作缓冲区。
| // 读取数据
socketChannel.read(buffer); // 写入数据到网络连接中 while(buffer.hasRemaining()) { socketChannel.write(buffer); } |
ServerSocketChannelServerSocketChannel 用于监听机器端口,管理从这个端口进来的 TCP 连接。
| // 实例化
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 监听 8080 端口 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); while (true) { // 一旦有一个 TCP 连接进来,就对应创建一个 SocketChannel 进行处理 SocketChannel socketChannel = serverSocketChannel.accept(); } |
这里我们看到了SocketChannel的第二个实例化方式。
到这里,我们应该能理解SocketChannel了,它不仅仅是TCP客户端,它代表的是一个网络通道,可读可写。
ServerSocketChannel不和Buffer打交道了,因为它并不实际处理数据,一旦接到请求,就会实例化一个SocketChannel,之后再这个简介通道上传递的数据它就不管了,它会继续监听端口等待下一个连接。
DatagramChannelUDP 和 TCP 不一样,DatagramChannel 一个类处理了服务端和客户端。
UDP 是面向无连接的,不需要和对方握手,不需要通知对方,就可以直接将数据包投出去,至于能不能送达,它是不知道的.
监听端口:
| DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(9090)); ByteBuffer buf = ByteBuffer.allocate(48); channel.receive(buf); |
发送数据:
| String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48); buf.put(newData.getBytes()); buf.flip(); int bytesSent = channel.send(buf, newInetSocketAddress("jenkov.com" , 80)); |
SelectorSelector建立在非阻塞的基础之上,大家经常听到的多路复用在java世界中指的就是它,用于实现一个线程管理多个Channel。
开启Selector:
| Selector selector = Selector.open(); |
将 Channel 注册到 Selector 上。前面我们说了,Selector 建立在非阻塞模式之上,所以注册到 Selector 的 Channel 必须要支持非阻塞模式, FileChannel 不支持非阻塞,我们这里讨论最常见的 SocketChannel 和 ServerSocketChannel。
| // 将通道设置为非阻塞模式,因为默认都是阻塞模式的
channel.configureBlocking(false); // 注册 SelectionKey key = channel.register(selector, SelectionKey.OP_READ); |
register 方法的第二个 int 型参数(使用二进制的标记位)用于表明需要监听哪些感兴趣的事件,共以下四种事件:
SelectionKey.OP_READ:对应 00000001,通道中有数据可以进行读取
SelectionKey.OP_WRITE: 对应 00000100,可以往通道中写入数据
SelectionKey.OP_CONNECT: 对应 00001000,成功建立 TCP 连接
SelectionKey.OP_ACCEPT: 对应 00010000,接受 TCP 连接
我们可以同时监听一个 Channel 中的发生的多个事件,比如我们要监听 ACCEPT 和 READ 事件,那么指定参数为二进制的 000 10001 即十进制数值 17 即可。
注册方法返回值是 SelectionKey 实例,它包含了 Channel 和 Selector 信息,也包括了一个叫做 Interest Set 的信息,即我们设置的我们感兴趣的正在监听的事件集合。
调用 select() 方法获取通道信息。用于判断是否有我们感兴趣的事件已经发生了。
示例:
| Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
// 判断是否有事件准备好 int readyChannels = selector.select(); if(readyChannels == 0) continue ; // 遍历 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); } } |
对于Selector,需要熟悉以下几个方法:
select()
调用此方法,会将上次 select 之后的 准备好的 channel 对应的 SelectionKey 复制到 selected set 中。如果没有任何通道准备好,这个方法会阻塞,直到至少有一个通道准备好。
selectNow()
功能和 select 一样,区别在于如果没有准备好的通道,那么此方法会立即返回 0。
select(long timeout)
看了前面两个,这个应该很好理解了,如果没有通道准备好,此方法会等待一会
wakeup()
这个方法是用来唤醒等待在 select() 和 select(timeout) 上的线程的。如果 wakeup() 先被调用,此时没有线程在 select 上阻塞,那么之后的一个 select() 或 select(timeout) 会立即返回,而不会阻塞,当然,它只会作用一次。
调
用 Buffer 的 flip() 方法,可以从写入模式切换到读取模式。其实这个方法也就是设置了一下 position 和 limit 值罢了