Java IO机制

Published: 19 Sep 2014 Category: 技术

Java IO有各种各样的类,纷繁复制,但是如果将他们的设计原理弄明白,就不会感觉非常混乱了。不过还是感觉很难用。。。。

Java的IO机制可以大概分为两个部分,一部分是基于流的IO, 一部分是NIO。

基于流的 IO

基于流的IO又分为两个部分,一部分是基于字节的IO,另一部分是基于字符的IO。基于字符的IO是在基于字节的IO基础上,又进行了解码,将 字节转化为字符。

无论这个流是什么类,总要解决下面几个问题。

  • 数据从哪里读取,写到哪里
  • 类有什么功能

字节流

数据来源及去向有很多种,例如:

  • 字节数组:ByteArrayInputStream/ByteArrayOutputStream
  • 字符串:StringBufferInputStream
  • 文件:FileInputStream/FileOutputStream
  • 管道:PipedInputStream/PipedOutputStream
  • Socket: socket.getInputStream()/socket.getOutputStream()

像文件读取这样的操作,最终需要调用操作系统的 API 实现,所以,你如果在JDK的源代码中跟踪下去,会发现,最终读写操作 调用的是 native 函数。

IO类通过继承FilterInputStream,使用装饰器模式,为流添加功能:

  • 按基本数据类型读取:DataInputStream
  • 使用缓冲区读取:BufferedInputStream
  • 跟踪行号:LineNumberInputStream
  • 弹出一个字节缓冲区:PushbackInputStream
  • 压缩:ZipInputStream/ZipOutputStream/GZIPInputStream/GZIPOutputStream

IO类通过 InputStreamReader/InputStreamWriter 将字节流转化成字符流,实际上就是添加了解码的功能。

字符流

数据来源与去向

  • 字符数组:CharArrayReader/CharArrayWriter
  • 字符串:StringReader/StringWriter
  • 文件:FileReader/FileWriter
  • 管道:PipedReader/PipedWriter

字符流新添加功能

  • 缓冲:BufferedReader/BufferedWriter
  • 行号:LineNumberReader
  • 打印:PrintWriter
  • 一字符缓冲:PushbackReader

新 IO

从 JDK1.4 开始,引入了 java.nio 包,在新IO中,引入了几个新的概念。

  • 通道(Channel)
  • 缓冲区(Buffer)
  • 选择器(Selector)

新 IO 中,通过 File, Socket 可以得到 Channel,然后读数据时,将数据从 Channel 读取到 Buffer 中,写数据时,将数据从 Buffer 中 写入到 Channel。我们通过操作 Buffer 来读取,写入数据。

例如,将一个文件中写入一个字节的数据:

    public static void testNIO() throws IOException {
        File file = new File("testNIO.txt");
        FileOutputStream outputStream = new FileOutputStream(file);
        FileChannel channel = outputStream.getChannel();
        ByteBuffer src = ByteBuffer.allocate(2);
        src.put((byte) 65);
        src.flip();
        channel.write(src);
        channel.close();
    }

注意如何获取文件通道,如何操作 Buffer,如何通过 Buffer 和 Channel 向文件中写入数据。

Buffer 结构与原理

另外,要明白 nio 的原理,掌握 Buffer 的结构是很有必要的,Buffer 有三个关键的数据

    private int position = 0;
    private int limit;
    private int capacity;

capacity 是 Buffer 的大小,limit 是当前position可以到达的最大位置,position 是当前要操作的数据的位置。

初始状态

Buffer 的初始状态:

+---------------------------------------+
|   |   |   |   |   |   |   |   |   |   |
+---------------------------------------+
                                    capacity
position                              limit   

写入数据后

向 Buffer 中写入数据后,position 的位置会移动:

    public ByteBuffer put(byte x) {
        hb[ix(nextPutIndex())] = x;
        return this;
    }
    final int nextPutIndex() {                          // package-private
        if (position >= limit)
            throw new BufferOverflowException();
        return position++;
    }

写入数据后,buffer状态 ``` +---------------------------------------+ | * | * | * | * | * | | | | | | +---------------------------------------+

                                 capacity
              position             limit

#### flip 后

这时候,我们需要调用  `flip` 函数,将 limit 设置为 position 当前位置设置, 同时,将 position 清0。

```java
    public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }

调用 flip 后 Buffer 状态

+---------------------------------------+
| * | * | * | * | * |   |   |   |   |   |
+---------------------------------------+

                   limit                capacity
position                   

读取数据后

这样,从 Buffer 中读取数据时,就可以从 position 开始,一直读取到 limit。

读取数据时, position 的位置也要变化。

    public byte get() {
        return hb[ix(nextGetIndex())];
    }
    final int nextGetIndex() {                          // package-private
        if (position >= limit)
            throw new BufferUnderflowException();
        return position++;
    }

读取数据后,Buffer 状态

+---------------------------------------+
| * | * | * | * | * |   |   |   |   |   |
+---------------------------------------+

                   limit                capacity
                   position                   

clear 后

此后,如果想再向 Buffer 中写入数据,可以调用 clear 函数,将 position 清 0,limit 设置为 capacity。使得 Buffer 回到初态。

+---------------------------------------+
| * | * | * | * | * |   |   |   |   |   |
+---------------------------------------+
                                    capacity
position                              limit
    public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
    }

总之,需要时刻明白,读取与写入都是从当前位置 position 开始,一直移动到 limit。同时,需要明白自己调用函数之前和之后, position 与 limit 的位置是如何变化的。

另外,这里的读取与写入一定要明白是指什么,我们对 Buffer 进行 put 时,是对它写,进行 get 时,是对它读。而 channel.write(buf) 时,是对 buf 读,再写入 channel。 channel.read(buf) 时,是从 channel 读,再写入 buf。需要弄清楚读写的主体是什么。

Selector 与 Channel

在传统IO中,由于读写操作是阻塞的,当无法读取或者写入数据时,程序就会阻塞住。为了同时为多个客户端服务,服务器端需要多个 线程,一方面可以使多个客户端得到响应,另一方面,也使得一个线程被阻塞时,整个程序不会停下来。

而在新IO中,通过通道,可以实现非阻塞IO,同时,可以通过一个Selector来管理多个Channel。这样,使用一个线程就可以管理多个 客户端连接。

先创建一个 Selector,

 Selector selector = Selector.open();

然后,将每个Channel设置成非阻塞的。然后调用Channel的register函数,将对应的selector及一个事件代码传入,意思是当这个事件发生 时,我们可以通过 selector 来获取这个 channel。

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
        InetSocketAddress address = new InetSocketAddress(
                InetAddress.getLocalHost(), 10000);
        serverSocketChannel.socket().bind(address);
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

假如已经将多个channel 注册到 selector,那么当其中有几个 channel 已经发生了我们所关心的事件时,如何获取这些 channel呢?

可以通过 select 函数获取现在已经触发事件的 channel 个数。

selector.select()

然后,通过 selectedKeys 函数获取所有发生的事件

Set<SelectionKey> selectedKeys = selector.selectedKeys();

并通过迭代器,访问每个事件,检测他们的事件类型,并作出相应的处理。

Iterator<SelectionKey> it = selectedKeys.iterator(); // 依次进行处理
while (it.hasNext()) {
    SelectionKey selectionKey = it.next();
    if (selectionKey.isAcceptable()) { // 如果是Accept事件
    // 获取注册的ServerSocketChannel
        serverSocketChannel = ((ServerSocketChannel) selectionKey
        .channel());
        SocketChannel socketChannel = serverSocketChannel.accept(); // 建立连接
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ); // 注册该连接的Read事件

    } else if (selectionKey.isReadable()) { // 如果是Read事件
    // 获取注册的SocketChannel
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while (socketChannel.read(buffer) > 0) { // 读取接收到的数据
            buffer.flip();
            byte[] dst = new byte[buffer.limit()];
            buffer.get(dst);
        }
    }
    it.remove(); // 需要将处理过的事件移除
}

注意,当 ACCEPT 事件发生时,我们获取了新的 channel ,并将 selector 及 OP_READ 事件注册过去,这样,当这个 channel 的 READ 事件发生时,我们就可以通过 selectorKeys 来获取这个事件。

上面的代码来自参考资料1)

通过上面的例子,可以看出,我们使用了一个线程, 一个selector,多个channel,就管理了多个来自客户端的 socket 连接。

除了上面的 Selector, Channel, Buffer 外,新 IO 还提供了内存映射文件,文件锁功能。

内存映射文件

操作系统可以利用虚拟内存实现将文件或者文件的一部分映射到内存中,然后这个文件就可以当作是内存数组一样随机访问。通过 内存映射文件,可以大大提升文件访问的性能,同时可以支持对大文件的访问。我们可以通过 Channel 和 Buffer 来访问内存映射文件的内容。

FileChannel channel = FileChannel.open(Paths.get("test.txt"));
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0,
                channel.size());
for (int i = 0; i < channel.size(); i++) {
    buffer.get(i);
}

文件锁

可以通过给文件加锁,来控制对文件访问的同步。

FileChannel channel = FileChannel.open(Paths.get("test.txt"));
FileLock lock = channel.lock();

// Operation on channel

lock.release();

参考资料

Java NIO (三) 非阻塞模式与Selector