在 Java 中,PipedInputStream和PipedOutputStream是 java.io 包的一部分,通常用于线程间通信。它为一个线程提供了一种通过管道将数据发送到另一个线程的方法。在这篇博文中,我们将探讨 PipedInputStream和PipedOutputStream 的功能,并提供 10 个不同的代码示例来说明其用法。
PipedInputStream 基础知识
“PipedInputStream”应连接到“PipedOutputStream”以在两个线程之间建立通信链接。写入“PipedOutputStream”的数据可以从连接的“PipedInputStream”中读取。以下是一些需要记住的关键点:
- - 线程通信:管道流通常用于 Java 应用程序中两个线程之间的通信。
- - 阻塞:从“PipedInputStream”读取将阻塞,直到数据可用,而写入“PipedOutputStream”将阻塞,直到有数据空间。
现在,让我们深入研究 10 个不同的代码示例来演示“PipedInputStream/PipedOutputStream”的各种用例。
使用connect
Java 会在第一次尝试读取或写入数据时自动调用,以确保输入流和输出流连接。然而,这样的自动连接发生在需要读取或写入数据之前,可能导致潜在的竞争条件,因此在多线程环境中,显式调用 connect 是更安全的做法。
示例 1:基本设置
import java.io.*;
import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream;
public class BasicUsageExample { public static void main(String[] args) throws IOException { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream();
// Connect the input and output streams pis.connect(pos);
// Write data to the output stream pos.write("Hello, PipedInputStream!".getBytes());
// Read data from the input stream int data; while ((data = pis.read()) != -1) { System.out.print((char) data); }
// Close streams pos.close(); pis.close(); } }
|
该基本设置展示了如何创建一个 `PipedInputStream` 并将其连接到一个 `PipedOutputStream` 。
示例 2:写入管道输出流
import java.io.*;
public class Example2 { public static void main(String[] args) throws IOException { PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in);
String message = "Hello, PipedInputStream!"; out.write(message.getBytes()); // Read from 'in' to retrieve the message } }
|
在这里,我们向 `PipedOutputStream` 写入一条信息。
例 3:从管道输入流读取写入的数据
import java.io.*; //先写入再读 public class Example3 { public static void main(String[] args) throws IOException { PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in);
String message = "Hello, PipedInputStream!"; out.write(message.getBytes());
byte[] buffer = new byte[1024]; int bytesRead = in.read(buffer); String receivedMessage = new String(buffer, 0, bytesRead); System.out.println(receivedMessage); } } //在多线程环境中,显式调用 connect 是更安全的做法。 public class BufferedReaderExample { public static void main(String[] args) throws IOException { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(); //在多线程环境中,显式调用 connect 是更安全的做法。 pis.connect(pos);
pos.write("Buffered Read Example".getBytes());
byte[] buffer = new byte[1024]; int bytesRead;
// Read data into a buffer while ((bytesRead = pis.read(buffer)) != -1) { System.out.print(new String(buffer, 0, bytesRead)); }
pos.close(); pis.close(); } }
|
本例演示如何从 `PipedInputStream` 读取数据。
示例 4:线程通信
import java.io.*;
public class Example4 { public static void main(String[] args) throws IOException { PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in);
// Create and start a separate thread for writing new Thread(() -> { try { String message = "Threaded Communication!"; out.write(message.getBytes()); } catch (IOException e) { e.printStackTrace(); } }).start();
// Read from 'in' in the main thread byte[] buffer = new byte[1024]; int bytesRead = in.read(buffer); String receivedMessage = new String(buffer, 0, bytesRead); System.out.println(receivedMessage); } }
|
这里,两个线程演示了使用 `PipedInputStream` 和 `PipedOutputStream` 进行通信。
线程安全的例子:
import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream;
public class MultiThreadedExample { public static void main(String[] args) { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream();
try { pis.connect(pos);
// Start a writer thread Thread writerThread = new Thread(() -> { try { pos.write("Multi-threaded Example".getBytes()); pos.close(); } catch (IOException e) { e.printStackTrace(); } });
// Start a reader thread Thread readerThread = new Thread(() -> { try { int data; while ((data = pis.read()) != -1) { System.out.print((char) data); } pis.close(); } catch (IOException e) { e.printStackTrace(); } });
// Start threads writerThread.start(); readerThread.start();
// Wait for threads to finish writerThread.join(); readerThread.join(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
|
例 5:处理 IOException
import java.io.*;
public class Example5 { public static void main(String[] args) { try (PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in)) {
// Use in and out streams } catch (IOException e) { e.printStackTrace(); } } }
|
本例演示了使用 try-with-resources 进行适当的资源管理。
模拟线程通信中断:
import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream;
public class InterruptHandlingExample { public static void main(String[] args) { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream();
try { pis.connect(pos);
// Start a reader thread Thread readerThread = new Thread(() -> { try { int data; while ((data = pis.read()) != -1) { System.out.print((char) data); // Simulate interrupt if (Thread.interrupted()) { System.out.println("Reader Thread Interrupted"); break; } } pis.close(); } catch (IOException e) { e.printStackTrace(); } });
// Start threads readerThread.start();
// Simulate interrupt after a delay try { Thread.sleep(1000); readerThread.interrupt(); readerThread.join(); } catch (InterruptedException e) { e.printStackTrace(); }
pos.close(); } catch (IOException e) { e.printStackTrace(); } } }
|
解释:演示如何在 "PipedInputStream "阅读器线程中优雅地处理中断。例 6:自定义缓冲区大小
import java.io.*;
public class Example6 { public static void main(String[] args) throws IOException { PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in);
byte[] customBuffer = new byte[2048]; int bytesRead = in.read(customBuffer); // Process data from the custom buffer } }
|
您可以根据自己的要求定制缓冲区的大小。
例 7:关闭数据流
import java.io.*;
public class Example7 { public static void main(String[] args) throws IOException { PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in);
// Use in and out streams
// Close the streams when done in.close(); out.close(); } }
|
切记在不再需要时关闭数据流。
例 8:处理中断
import java.io.*;
public class Example8 { public static void main(String[] args) { PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream();
// Interrupt handling in case of thread interruption try { out.connect(in); } catch (IOException e) { Thread.currentThread().interrupt(); } } }
|
连接数据流时优雅地处理中断。
例 9:非阻塞性读取
import java.io.*;
public class Example9 { public static void main(String[] args) throws IOException { PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in);
// Non-blocking read if (in.available() > 0) { byte[] buffer = new byte[in.available()]; int bytesRead = in.read(buffer); // Process data from the buffer } } }
|
本例演示了使用 `available()` 方法进行非阻塞读取。
例 10:阻塞操作超时
import java.io.*;
public class Example10 { public static void main(String[] args) throws IOException { PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in);
// Timeout for blocking read long timeout = 5000; // 5 seconds long startTime = System.currentTimeMillis(); while (in.available() == 0) { if (System.currentTimeMillis() - startTime > timeout) { // Handle timeout break; } }
if (in.available() > 0) { byte[] buffer = new byte[in.available()]; int bytesRead = in.read(buffer); // Process data from the buffer } } }
|
您可以为阻塞操作实施超时机制。
线程安全超时:
import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream;
public class TimeoutExample { public static void main(String[] args) { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream();
try { pis.connect(pos);
// Start a reader thread Thread readerThread = new Thread(() -> { try { int data; long startTime = System.currentTimeMillis();
// Set a timeout for blocking read while ((data = pis.read()) != -1) { System.out.print((char) data);
// Break if timeout exceeds 2 seconds if (System.currentTimeMillis() - startTime > 2000) { System.out.println("\nTimeout Reached"); break; } } pis.close(); } catch (IOException e) { e.printStackTrace(); } });
// Start threads readerThread.start();
// Write data after a delay try { Thread.sleep(1000); pos.write("Timeout Example".getBytes()); pos.close(); readerThread.join(); } catch (InterruptedException | IOException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } } }
|
11. 处理 EOF:
import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream;
public class EOFHandlingExample { public static void main(String[] args) { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream();
try { pis.connect(pos);
// Start a writer thread Thread writerThread = new Thread(() -> { try { pos.write("EOF Handling Example".getBytes()); pos.close(); } catch (IOException e) { e.printStackTrace(); } });
// Start a reader thread Thread readerThread = new Thread(() -> { try { int data; while (true) { data = pis.read(); if (data == -1) { System.out.println("\nEnd of File (EOF) Reached"); break; } System.out.print((char) data); } pis.close(); } catch (IOException e) { e.printStackTrace(); } });
// Start threads writerThread.start(); readerThread.start();
// Wait for threads to finish writerThread.join(); readerThread.join(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
|
解释:演示从 "PipedInputStream "读取数据时如何处理文件结束(EOF)条件。12.大数据传输:
import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.util.Arrays;
public class LargeDataTransferExample { public static void main(String[] args) { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream();
try { pis.connect(pos);
// Start a writer thread Thread writerThread = new Thread(() -> { try { byte[] largeData = new byte[1024 * 1024]; // 1 MB Arrays.fill(largeData, (byte) 'A'); pos.write(largeData); pos.close(); } catch (IOException e) { e.printStackTrace(); } });
// Start a reader thread Thread readerThread = new Thread(() -> { try { byte[] buffer = new byte[1024]; int bytesRead;
while ((bytesRead = pis.read(buffer)) != -1) { // Process the data System.out.print(new String(buffer, 0, bytesRead)); } pis.close(); } catch (IOException e) { e.printStackTrace(); } });
// Start threads writerThread.start(); readerThread.start();
// Wait for threads to finish writerThread.join(); readerThread.join(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
|
12.重置流:
import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream;
public class ResetStreamExample { public static void main(String[] args) { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream();
try { pis.connect(pos);
// Start a writer thread Thread writerThread = new Thread(() -> { try { pos.write("Reset Stream Example".getBytes()); pos.close(); } catch (IOException e) { e.printStackTrace(); } });
// Start a reader thread Thread readerThread = new Thread(() -> { try { int data; while ((data = pis.read()) != -1) { System.out.print((char) data); }
// Reset the stream to read again pis.close(); pis.connect(new PipedOutputStream()); writerThread.join(); System.out.println("\nStream Reset\n");
// Start a new reader thread Thread newReaderThread = new Thread(() -> { try { int newData; while ((newData = pis.read()) != -1) { System.out.print((char) newData); } pis.close(); } catch (IOException e) { e.printStackTrace(); } });
newReaderThread.start(); newReaderThread.join(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } });
// Start threads writerThread.start(); readerThread.start();
// Wait for threads to finish writerThread.join(); readerThread.join(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
|
13.使用 BufferedReader 进行缓冲阅读:
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PipedInputStream; import java.io.PipedOutputStream;
public class BufferedReaderExample { public static void main(String[] args) { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream();
try { pis.connect(pos);
// Start a writer thread Thread writerThread = new Thread(() -> { try { pos.write("Buffered Reader Example".getBytes()); pos.close(); } catch (IOException e) { e.printStackTrace(); } });
// Start a reader thread with BufferedReader Thread readerThread = new Thread(() -> { try (BufferedReader reader = new BufferedReader(new InputStreamReader(pis))) { String line; while ((line = reader.readLine()) != null) { System.out.println(line); } } catch (IOException e) { e.printStackTrace(); } });
// Start threads writerThread.start(); readerThread.start();
// Wait for threads to finish writerThread.join(); readerThread.join(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
|