Java中I/O流:阻塞和非阻塞范例

I/O 流是输入输出操作的核心。这些是数据在源和目的地之间流动的路径。

  • 输入流:程序或应用程序使用这些流从文件、连接、键盘等源读取数据。
  • 输出流:程序或应用程序使用这些流将数据写入目标。

阻塞和非阻塞 I/O
基本 I/O 操作本质上通常是阻塞的,即它们会阻塞线程执行,直到有一些数据可供读取。

例如,在正常的 HTTP 请求中:

  • 客户端向绑定到某个端口(HTTP 的端口 80)的应用程序发出请求,
  • 它们首先在它们之间建立套接字连接。
  • 连接建立后,服务器等待客户端发出请求,
  • 然后通过同一个套接字发送响应。

在普通的 Socket 连接中,我们希望客户端和服务器之间能够持续通信,而不需要一次又一次地经历昂贵的 HTTP 请求过程,因此我们保持套接字连接打开以进行通信。服务器等待客户端发送一些内容来响应它。

这意味着线程将被阻塞,直到客户端说些什么。这个 SocketServer 可以用 Java 来创建,如下所示:

ServerSocket serverSocket = new ServerSocket(port)
while (true) {
  Socket socket = serverSocket.accept();
  System.out.println("New client connected: " + socket);

 
//ClientHandler.run 是一个函数,用于无限监听 
 
// 并处理客户端消息,直至连接关闭。
  ClientHandler clientHandler = new ClientHandler(socket);
  clientHandler.run();
}

解释这种情况的一个例子是:假设两个朋友正在玩一个游戏,他们有一个管道可以互相通信。一位朋友(客户端)说了些什么,另一位朋友(服务器)记下消息并响应“ack”。

这对于只有两个朋友想玩游戏的情况来说是很好的,即只有一个朋友在说话,另一个朋友在听或等待他连续说话。

当多个朋友想要聊天和玩游戏时,即多个朋友带着各自的管道到达并希望在另一端写下他们的消息时,就会出现问题。但由于只有一个人正在收听(即friend-2),因此他无法等待每个管道的一端来收听消息。对他来说,一个显而易见的解决方案是雇用多人(每个管道一个人)并将他们分配给管道

这个概念就是多线程。在此,您为每个新连接生成一个新线程。

ExecutorService executorService = Executors.newSingleThreadExecutor();
ServerSocket serverSocket = new ServerSocket(port)
while (true) {
  Socket socket = serverSocket.accept();
  System.out.println("New client connected: " + socket);

 
// ClientHandler.run 是一个无限监听 和处理客户端消息的函数,直到连接关闭。
  ClientHandler clientHandler = new ClientHandler(socket);
  executorService.submit(clientHandler::run);
}

不过,这种方法也有很大的局限性。作为服务器方的朋友 2 不可能为每个玩游戏的客户朋友继续雇人。如果朋友们继续来玩,那么到了某个时候,朋友 2 就会没钱了,也就是说,服务器不可能产生无限多的线程。这个问题需要一个更好的解决方案。

问题在于每个连接都会阻塞一个线程。这就是为什么我们需要为每个新连接生成一个新线程。为了解决这个问题,非阻塞 I/O 出现了。这意味着一个连接不会阻塞一个线程,而且只要连接想发送消息,就会有某种机制去监听它。

在 Java 中,一种流行的实现方式是使用通道和选择器
在我们当前的例子中,我们的想法是安装一个记录器,分别记录来自每个管道的消息。现在,朋友 2  只需遍历所有消息,逐一回应并处理它们(写在他的笔记本上)。这样就不需要雇人了,朋友 2 就可以独自监听和处理来自他的每个朋友的消息。

Java NIO 库为输入流提供了通道类。在我们当前的用例中,即 Socket 连接,它有 ServerSocketChannel 类。通道只是数据传输的途径(即管道)。ServerSocketChannel 类允许我们将连接定义为非阻塞连接,并将其注册到选择器(记录器)。

  1. 客户端
    • 客户端将消息数据写入其出站缓冲区。
    • 然后,客户端尝试通过通道将数据通过网络发送到服务器。
  • 网络传输
    • 数据从客户端的出站缓冲区中取出并通过网络发送到服务器。
  • 服务器端
    • 传输的数据到达服务器的网络接口,并被放入与服务器连接相关的网络输入缓冲区中。
    • 服务器的操作系统管理此输入缓冲区并使数据可供服务器的应用程序读取。
  • 服务器应用程序:
    • 当服务器的应用程序准备好从通道读取时(通过调用channel.read()),它从网络输入缓冲区读取数据。
    • 如果数据正在输入缓冲区中等待,服务器会将其读入其应用程序级缓冲区进行处理。

    定义 ServerSocketChannel 并将其注册到选择器,如下所示:

    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    // 配置无阻塞性
    serverSocketChannel.configureBlocking(false);
    InetSocketAddress inetSocketAddress = new InetSocketAddress(8000);
    serverSocketChannel.bind(inetSocketAddress);
    System.out.println(
    "Socket Server started on port 8000");

    // 现在,您可以用所需的兴趣键在选择器上注册频道(将进一步解释)。
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);


    Interest Key(兴趣键)是您对通道感兴趣的操作的键值。在当前上下文中,当我们创建一个新的套接字服务器时,我们对新连接感兴趣。该操作的兴趣键是 SelectionKey.OP_ACCEPT。

    创建 ServerSocketChanel、将其绑定到端口 8000 并用选择器注册后,我们就可以开始处理连接了。

    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    // configure non blocking nature
    serverSocketChannel.configureBlocking(false);
    InetSocketAddress inetSocketAddress = new InetSocketAddress(8000);
    serverSocketChannel.bind(inetSocketAddress);
    System.out.println(
    "Socket Server started on port 8000");

    // 现在,您可以使用所需的兴趣键(将进一步解释)通过选择器注册通道。
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while(true) {
     
    //这是我们的选择器中已准备好进行 
     
    //处理的键,即已准备好接受 (OP_ACCEPT) 的新连接、已准备好读取 (OP_READ) 的连接消息等。

     int readyCount = selector.select();
     if(readyCount==0){
     
    // skip when there is no key to be processed
      continue;
     }
     
    // get the ready keys set
     Set<SelectionKey> readyKeysSet = selector.selectedKeys();
     
    // iterate over the readyKeySet
     Iterator iterator = readyKeysSet.iterator();
     
    // 检查是否有任何连接请求,服务器是否准备好接受连接,即 OP_ACCEPT 已注册。
     while(iterator.hasNext()) {
      SelectionKey key = (SelectionKey) iterator.next();
      iterator.remove();
      if (key.isAcceptable()) {
       System.out.println(
    "Accepting connection");
       
    // 获取客户端连接的通道
       ServerSocketChannel server = (ServerSocketChannel) key.channel();
       
    // accept the connection
       SocketChannel client = server.accept();
       
    // 为该通道配置非阻塞行为
       client.configureBlocking(false);
       
    // 用相同的选择器注册客户端通道。我们感兴趣的操作是只读,即我们只想监听客户端消息。选择键为 OP_READ。
       SelectionKey clientKey = client.register(selector, SelectionKey.OP_READ); 
      }

     
    //检查是否有任何客户端要发送消息 
     
    // (只有当有任何消息且 
     
    // OP_READ 已注册时才为真)。
      if(key.isReadable()) {
       SocketChannel client = (SocketChannel) key.channel();
       int BUFFER_SIZE = 1024;
       
    // create a buffer
       ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
       try {
       
    // read data and store it into created buffer
        int bytesRead = client.read(buffer);
        if (bytesRead == -1) {
         System.out.println(
    "Connection close");
         
    // Connection closed by client
         key.cancel();
         client.close();
         continue;
        }
       
    //将缓冲区从写入模式翻转为读取模式
        buffer.flip();
       
    // 定义一个字节数组,其大小为缓冲区的字节数。
        byte[] receivedBytes = new byte[buffer.remaining()];
        buffer.get(receivedBytes);
       
    // print the length of byte array
        System.out.println(receivedBytes.length);
       
    // 现在,在收到消息后,我们要立即将回执写入客户端。因此,我们现在要注册 OP_WRITE,这样服务器就可以向出站缓冲区写入信息了。
        key.interestOpsOr(SelectionKey.OP_WRITE);
       } catch (SocketException e) {
         e.printStackTrace();
         key.cancel();
         client.close();
         continue;
        } catch (Exception e) {
            e.printStackTrace();
        }
      }
      if(key.isWritable()) {
       SocketChannel client = (SocketChannel) key.channel();
       
    // 写入与该客户端连接相关的出站缓冲区
       client.write(
    "ack");
       
    // 立即删除 OP_WRITE 息键,这样服务器就不会准备好写入。
       key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
      }
     }
    }

    这样,我们就能在一个线程中处理多个客户端连接及其读写。由于我们希望扩展并增加处理时间,因此应将处理读取信息等阻塞任务(处理过程包括向数据库保存内容等)卸载到单独的线程中。这样,一旦这些任务结束,线程就会被杀死。为了进一步优化,我们可以为任务的上下文创建线程池。

    ExecutorService executorService = Executors.newFixedThreadPool(n);

    这样就创建了一个最多有 n 个可用线程的线程池。如果没有可用线程,任务将在队列中等待,直到有线程可用。

    为了进一步优化,我们还可以创建一个客户端池,为每个池分配一个单独的选择器,并在单独的线程中运行对每个池的处理。这样,我们就可以增加处理量,从而延长客户端的响应时间。

    该方法的全部 Java 代码如下:

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    public class MultiThreadedServer {

        private static final int PORT = 8080;
        private static final int MAX_CLIENTS_PER_POOL = 20;
        private static final int MIN_NUM_POOLS = 5; // Number of separate pools

        public static void main(String[] args) throws IOException {
            ExecutorService poolExecutor = Executors.newFixedThreadPool(MIN_NUM_POOLS);

           
    // Create and open a server socket channel
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
            serverSocketChannel.configureBlocking(false);

           
    // Create a single selector for the server socket channel
            Selector serverSelector = Selector.open();
            serverSocketChannel.register(serverSelector, SelectionKey.OP_ACCEPT);

           
    // Array to hold selectors for client channels
            Selector[] clientSelectors = new Selector[MIN_NUM_POOLS];
            for (int i = 0; i < MIN_NUM_POOLS; i++) {
                clientSelectors[i] = Selector.open();
                ClientPoolHandler clientPoolHandler = new ClientPoolHandler(clientSelectors[i]);
                poolExecutor.submit(clientPoolHandler::run);
            }

           
    // Accept and handle client connections in separate threads for each pool
            while (true) {
                serverSelector.select();
                Set<SelectionKey> selectedKeys = serverSelector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    keyIterator.remove();

                    if (!key.isValid()) {
                        continue;
                    }

                    if (key.isAcceptable()) {
                       
    // Accept the connection
                        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                        SocketChannel clientChannel = serverChannel.accept();
                        boolean clientAdded = false;

                       
    // Check if any selector has capacity, otherwise create a new one
                        for (Selector selector : clientSelectors) {
                            int numClients = selector.keys().size() - 1;
    // Subtract 1 for the server channel
                            if (numClients < MAX_CLIENTS_PER_POOL) {
                                clientChannel.configureBlocking(false);
                                clientChannel.register(selector, SelectionKey.OP_READ);
                                clientAdded = true;
                                break;
                            }
                        }

                       
    // If no selector has capacity, create a new one and spawn a new thread
                        if (!clientAdded) {
                            Selector newSelector = Selector.open();
                            clientChannel.configureBlocking(false);
                            clientChannel.register(newSelector, SelectionKey.OP_READ);
                            ClientPoolHandler clientPoolHandler = new ClientPoolHandler(newSelector); 
                            poolExecutor.submit(clientPoolHandler::run);
                        }
                    }
                }
            }
        }

        private static class ClientPoolHandler{
            private Selector selector;

            public ClientPoolHandler(Selector selector) {
                this.selector = selector;
            }

            public void run() {
                try {
                    while (true) {
                        selector.select();
                       
    // Handle selected keys
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    在这里,我们维持最低数量的池运行,以保证应用程序在需要时随时可用。

    结论
    基本 I/O 操作通常是阻塞式的,会导致线程等待数据可用。然而,随着可扩展性需求的出现,特别是在有多个并发连接的情况下,由于资源限制,阻塞 I/O 变得不切实际。

    在 Java NIO(非阻塞 I/O)等技术的推动下,向非阻塞 I/O 的转变提供了更具扩展性的解决方案。通过将连接与线程解耦,并采用通道和选择器等机制,非阻塞 I/O 允许服务器在不耗尽系统资源的情况下处理大量连接。

    从本质上讲,非阻塞 I/O 使服务器能够异步管理多个连接,从而提高性能和可扩展性。通过采用这种方法,我们可以构建稳健高效的系统,既能处理高负载,又能保持响应速度。