Java中多线程异步读写文件

当涉及到 Java 中的文件处理时,管理大文件而不引起性能问题可能具有挑战性。通过使用两个线程,我们可以在不阻塞主线程的情况下高效地读与写文件。

通过多线程同时执行多个文件操作,利用多核处理器以及将 I/O 操作与计算重叠。这种并发性可以更好地利用系统资源并减少总体执行时间。

1、读取文件
在单独的一个线程中读取文件,我们可以创建一个新线程并传递一个读取文件的Runnable对象。FileReader类用于读取文件。此外,为了增强文件读取过程,我们使用 BufferedReader来高效地逐行读取文件:

Thread thread = new Thread(new Runnable() {
    @Override
    public void run() {
        try (BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath))) {
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                System.out.println(line);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
});
thread.start();

2、写入文件
创建另一个新线程并使用FileWriter类将数据写入文件:

Thread thread = new Thread(new Runnable() {
    @Override
    public void run() {
        try (FileWriter fileWriter = new FileWriter(filePath)) {
            fileWriter.write("Hello, world!");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
});
thread.start();

这种方法允许读取和写入同时运行,这意味着它们可以在单独的线程中同时发生。当一项操作不依赖于另一项操作的完成时,这尤其有用。

处理并发
多个线程对文件的并发访问需要特别注意,以避免数据损坏和意外行为。在前面的代码中,两个线程是同时启动的。这意味着两者可以同时执行,并且不能保证它们的操作交错的顺序。如果读取器线程在写入操作仍在进行时尝试访问文件,则可能最终会读取不完整或部分写入的数据。 这可能会导致处理过程中出现误导性信息或错误,从而可能影响依赖准确数据的下游操作。

此外,如果两个写入线程同时尝试将数据写入文件,它们的写入可能会交错并覆盖彼此的部分数据。如果没有正确的同步处理,这可能会导致信息损坏或不一致。

生产者-消费者模型
为了解决这个问题,一种常见的方法是使用生产者-消费者模型。一个或多个生产者线程读取文件并将其添加到队列中,一个或多个消费者线程处理队列中的文件。这种方法使我们能够根据需要添加更多生产者或消费者来轻松扩展我们的应用程序。


使用BlockingQueue进行并发文件处理
带有队列的生产者-消费者模型协调操作,确保读写顺序一致。为了实现这个模型,我们可以使用线程安全的队列数据结构,例如BlockingQueue。生产者可以使用Offer()方法将文件添加到队列中,消费者可以使用poll()方法检索文件。

每个BlockingQueue实例都有一个内部锁,用于管理对其内部数据结构(链表、数组等)的访问。当线程尝试执行像Offer()或poll()这样的操作时,它首先获取此锁。这可确保一次只有一个线程可以访问队列,从而防止同时修改和数据损坏。

通过使用BlockingQueue,我们将生产者和消费者解耦,允许他们按照自己的节奏工作,而无需直接相互等待。这可以提高整体性能。

1. 创建文件生产者
我们首先创建FileProducer类,代表负责从输入文件读取行并将其添加到共享队列的生产者线程。此类利用BlockingQueue在生产者线程和消费者线程之间进行协调。它接受BlockingQueue作为行的同步存储,确保消费者线程可以访问它们。

以下是FileProducer类的示例:

class FileProducer implements Runnable {
    private final BlockingQueue<String> queue;
    private final String inputFileName;
    public FileProducer(BlockingQueue<String> queue, String inputFileName) {
        this.queue = queue;
        this.inputFileName = inputFileName;
    }
    // ...
}

接下来,在run()方法中,我们使用BufferedReader打开文件以进行高效的行读取。我们还包括对文件操作期间可能发生的潜在IOException的错误处理。

@Override
public void run() {
    try (BufferedReader reader = new BufferedReader(new FileReader(inputFileName))) {
        String line;
        // ...
    } catch (IOException e) {
        e.printStackTrace();
    }
}

打开文件后,代码进入循环,从文件中读取行并同时使用Offer()方法将它们添加到队列中:

while ((line = reader.readLine()) != null) {
    queue.offer(line);
}

2. 创建文件消费者
接下来,我们介绍FileConsumer类,它代表消费者线程,其任务是从队列中检索行并将其写入输出文件。此类接受BlockingQueue作为从生产者线程接收行的输入:

class FileConsumer implements Runnable {
    private final BlockingQueue<String> queue;
    private final String outputFileName;
    public FileConsumer(BlockingQueue queue, String outputFileName) {
        this.queue = queue;
        this.outputFileName = outputFileName;
    }
    
    // ...
}

接下来,在run()方法中我们使用BufferedWriter来促进高效写入输出文件:

@Override
public void run() {
    try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFileName))) {
        String line;
        // ...
    } catch (IOException e) {
        e.printStackTrace();
    }
}

打开输出文件后,代码进入连续循环,使用poll()方法从队列中检索行。如果某行可用,则将该行写入文件。当poll()返回null时,循环终止,这表明生产者已完成写入行并且没有更多行需要处理:

while ((line = queue.poll()) != null) {
    writer.write(line);
    writer.newLine();
}

3. 线程编排器
最后,我们将所有内容包装在主程序中。首先,我们创建一个LinkedBlockingQueue实例作为生产者线程和消费者线程之间线路的中介。该队列建立了用于通信和协调的同步通道。

BlockingQueue<String> queue = new LinkedBlockingQueue<>();

接下来,我们创建两个线程:一个FileProducer线程,负责从输入文件中读取行并将它们添加到队列中。我们还创建一个FileConsumer线程,其任务是从队列中检索行并熟练地处理它们的处理并将其输出到指定的输出文件:

String fileName = "input.txt";
String outputFileName = "output.txt"
Thread producerThread = new Thread(new FileProducer(queue, fileName));
Thread consumerThread = new Thread(new FileConsumer(queue, outputFileName);

随后,我们使用 start() 方法启动它们的执行。我们利用join()方法来确保两个线程在程序退出之前正常完成其工作:

producerThread.start();
consumerThread.start();
try {
    producerThread.join();
    consumerThread1.join();
} catch (InterruptedException e) {
    e.printStackTrace();
}

现在,让我们创建一个输入文件,然后运行该程序:

Hello,
Baeldung!
Nice to meet you!


运行程序后,我们可以检查输出文件。我们应该看到输出文件包含与输入文件相同的行:

Hello,
Baeldung!
Nice to meet you!

在提供的示例中,生产者在循环中将行添加到队列中,而消费者在循环中从队列中检索行。这意味着队列中可以同时存在多行,并且即使生产者仍在添加更多行,消费者也可以处理队列中的行。