Java中的管道输入输出流

在 Java 中,PipedInputStream和PipedOutputStream是 java.io 包的一部分,通常用于线程间通信。它为一个线程提供了一种通过管道将数据发送到另一个线程的方法。在这篇博文中,我们将探讨 PipedInputStream和PipedOutputStream 的功能,并提供 10 个不同的代码示例来说明其用法。

PipedInputStream 基础知识
“PipedInputStream”应连接到“PipedOutputStream”以在两个线程之间建立通信链接。写入“PipedOutputStream”的数据可以从连接的“PipedInputStream”中读取。以下是一些需要记住的关键点:

  • - 线程通信:管道流通常用于 Java 应用程序中两个线程之间的通信。
  • - 阻塞:从“PipedInputStream”读取将阻塞,直到数据可用,而写入“PipedOutputStream”将阻塞,直到有数据空间。

现在,让我们深入研究 10 个不同的代码示例来演示“PipedInputStream/PipedOutputStream”的各种用例。

使用connect
Java 会在第一次尝试读取或写入数据时自动调用,以确保输入流和输出流连接。然而,这样的自动连接发生在需要读取或写入数据之前,可能导致潜在的竞争条件,因此在多线程环境中,显式调用 connect 是更安全的做法。

示例 1:基本设置

import java.io.*;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class BasicUsageExample {
    public static void main(String[] args) throws IOException {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();

        // Connect the input and output streams
          
        pis.connect(pos);

       
// Write data to the output stream
        pos.write(
"Hello, PipedInputStream!".getBytes());

       
// Read data from the input stream
        int data;
        while ((data = pis.read()) != -1) {
            System.out.print((char) data);
        }

       
// Close streams
        pos.close();
        pis.close();
    }
}

该基本设置展示了如何创建一个 `PipedInputStream` 并将其连接到一个 `PipedOutputStream` 。

示例 2:写入管道输出流

import java.io.*;

public class Example2 {
    public static void main(String[] args) throws IOException {
        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream(in);

        String message = "Hello, PipedInputStream!";
        out.write(message.getBytes());
          
       
// Read from 'in' to retrieve the message
    }
}

在这里,我们向 `PipedOutputStream` 写入一条信息。

例 3:从管道输入流读取写入的数据

import java.io.*;
//先写入再读
public class Example3 {
    public static void main(String[] args) throws IOException {
        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream(in);

        String message =
"Hello, PipedInputStream!";
        out.write(message.getBytes());

        byte[] buffer = new byte[1024];
        int bytesRead = in.read(buffer);
        String receivedMessage = new String(buffer, 0, bytesRead);
        System.out.println(receivedMessage);
    }
}
//在多线程环境中,显式调用 connect 是更安全的做法。
public class BufferedReaderExample {
    public static void main(String[] args) throws IOException {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();
       
//在多线程环境中,显式调用 connect 是更安全的做法。
        pis.connect(pos);

        pos.write(
"Buffered Read Example".getBytes());

        byte[] buffer = new byte[1024];
        int bytesRead;

       
// Read data into a buffer
        while ((bytesRead = pis.read(buffer)) != -1) {
            System.out.print(new String(buffer, 0, bytesRead));
        }

        pos.close();
        pis.close();
    }
}

本例演示如何从 `PipedInputStream` 读取数据。

示例 4:线程通信

import java.io.*;

public class Example4 {
    public static void main(String[] args) throws IOException {
        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream(in);

        // Create and start a separate thread for writing
        new Thread(() -> {
            try {
                String message =
"Threaded Communication!";
                out.write(message.getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

       
// Read from 'in' in the main thread
        byte[] buffer = new byte[1024];
        int bytesRead = in.read(buffer);
        String receivedMessage = new String(buffer, 0, bytesRead);
        System.out.println(receivedMessage);
    }
}

这里,两个线程演示了使用 `PipedInputStream` 和 `PipedOutputStream` 进行通信。

线程安全的例子:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class MultiThreadedExample {
    public static void main(String[] args) {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();

        try {
            pis.connect(pos);

            // Start a writer thread
            Thread writerThread = new Thread(() -> {
                try {
                    pos.write(
"Multi-threaded Example".getBytes());
                    pos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

           
// Start a reader thread
            Thread readerThread = new Thread(() -> {
                try {
                    int data;
                    while ((data = pis.read()) != -1) {
                        System.out.print((char) data);
                    }
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

           
// Start threads
            writerThread.start();
            readerThread.start();

           
// Wait for threads to finish
            writerThread.join();
            readerThread.join();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}


例 5:处理 IOException

import java.io.*;

public class Example5 {
    public static void main(String[] args) {
        try (PipedInputStream in = new PipedInputStream();
             PipedOutputStream out = new PipedOutputStream(in)) {

            // Use in and out streams
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

本例演示了使用 try-with-resources 进行适当的资源管理。

模拟线程通信中断:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class InterruptHandlingExample {
    public static void main(String[] args) {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();

        try {
            pis.connect(pos);

            // Start a reader thread
            Thread readerThread = new Thread(() -> {
                try {
                    int data;
                    while ((data = pis.read()) != -1) {
                        System.out.print((char) data);
                       
// Simulate interrupt
                        if (Thread.interrupted()) {
                            System.out.println(
"Reader Thread Interrupted");
                            break;
                        }
                    }
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

           
// Start threads
            readerThread.start();

           
// Simulate interrupt after a delay
            try {
                Thread.sleep(1000);
                readerThread.interrupt();
                readerThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            pos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

解释:演示如何在 "PipedInputStream "阅读器线程中优雅地处理中断。

例 6:自定义缓冲区大小

import java.io.*;

public class Example6 {
    public static void main(String[] args) throws IOException {
        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream(in);

        byte[] customBuffer = new byte[2048];
        int bytesRead = in.read(customBuffer);
        // Process data from the custom buffer
    }
}

您可以根据自己的要求定制缓冲区的大小。

例 7:关闭数据流

import java.io.*;

public class Example7 {
    public static void main(String[] args) throws IOException {
        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream(in);

        // Use in and out streams

       
// Close the streams when done
        in.close();
        out.close();
    }
}

切记在不再需要时关闭数据流。

例 8:处理中断

import java.io.*;

public class Example8 {
    public static void main(String[] args) {
        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream();

        // Interrupt handling in case of thread interruption
        try {
            out.connect(in);
        } catch (IOException e) {
            Thread.currentThread().interrupt();
        }
    }
}

连接数据流时优雅地处理中断。

例 9:非阻塞性读取

import java.io.*;

public class Example9 {
    public static void main(String[] args) throws IOException {
        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream(in);

        // Non-blocking read
        if (in.available() > 0) {
            byte[] buffer = new byte[in.available()];
            int bytesRead = in.read(buffer);
           
// Process data from the buffer
        }
    }
}

本例演示了使用 `available()` 方法进行非阻塞读取。

例 10:阻塞操作超时

import java.io.*;

public class Example10 {
    public static void main(String[] args) throws IOException {
        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream(in);

        // Timeout for blocking read
        long timeout = 5000;
// 5 seconds
        long startTime = System.currentTimeMillis();
        while (in.available() == 0) {
            if (System.currentTimeMillis() - startTime > timeout) {
               
// Handle timeout
                break;
            }
        }

        if (in.available() > 0) {
            byte[] buffer = new byte[in.available()];
            int bytesRead = in.read(buffer);
           
// Process data from the buffer
        }
    }
}

您可以为阻塞操作实施超时机制。

线程安全超时:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class TimeoutExample {
    public static void main(String[] args) {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();

        try {
            pis.connect(pos);

            // Start a reader thread
            Thread readerThread = new Thread(() -> {
                try {
                    int data;
                    long startTime = System.currentTimeMillis();

                   
// Set a timeout for blocking read
                    while ((data = pis.read()) != -1) {
                        System.out.print((char) data);

                       
// Break if timeout exceeds 2 seconds
                        if (System.currentTimeMillis() - startTime > 2000) {
                            System.out.println(
"\nTimeout Reached");
                            break;
                        }
                    }
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

           
// Start threads
            readerThread.start();

           
// Write data after a delay
            try {
                Thread.sleep(1000);
                pos.write(
"Timeout Example".getBytes());
                pos.close();
                readerThread.join();
            } catch (InterruptedException | IOException e) {
                e.printStackTrace();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

11. 处理 EOF:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class EOFHandlingExample {
    public static void main(String[] args) {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();

        try {
            pis.connect(pos);

            // Start a writer thread
            Thread writerThread = new Thread(() -> {
                try {
                    pos.write(
"EOF Handling Example".getBytes());
                    pos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

           
// Start a reader thread
            Thread readerThread = new Thread(() -> {
                try {
                    int data;
                    while (true) {
                        data = pis.read();
                        if (data == -1) {
                            System.out.println(
"\nEnd of File (EOF) Reached");
                            break;
                        }
                        System.out.print((char) data);
                    }
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

           
// Start threads
            writerThread.start();
            readerThread.start();

           
// Wait for threads to finish
            writerThread.join();
            readerThread.join();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

解释:演示从 "PipedInputStream "读取数据时如何处理文件结束(EOF)条件。

12.大数据传输:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Arrays;

public class LargeDataTransferExample {
    public static void main(String[] args) {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();

        try {
            pis.connect(pos);

            // Start a writer thread
            Thread writerThread = new Thread(() -> {
                try {
                    byte[] largeData = new byte[1024 * 1024];
// 1 MB
                    Arrays.fill(largeData, (byte) 'A');
                    pos.write(largeData);
                    pos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

           
// Start a reader thread
            Thread readerThread = new Thread(() -> {
                try {
                    byte[] buffer = new byte[1024];
                    int bytesRead;

                    while ((bytesRead = pis.read(buffer)) != -1) {
                       
// Process the data
                        System.out.print(new String(buffer, 0, bytesRead));
                    }
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

           
// Start threads
            writerThread.start();
            readerThread.start();

           
// Wait for threads to finish
            writerThread.join();
            readerThread.join();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

12.重置流:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class ResetStreamExample {
    public static void main(String[] args) {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();

        try {
            pis.connect(pos);

            // Start a writer thread
            Thread writerThread = new Thread(() -> {
                try {
                    pos.write(
"Reset Stream Example".getBytes());
                    pos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

           
// Start a reader thread
            Thread readerThread = new Thread(() -> {
                try {
                    int data;
                    while ((data = pis.read()) != -1) {
                        System.out.print((char) data);
                    }

                   
// Reset the stream to read again
                    pis.close();
                    pis.connect(new PipedOutputStream());
                    writerThread.join();
                    System.out.println(
"\nStream Reset\n");

                   
// Start a new reader thread
                    Thread newReaderThread = new Thread(() -> {
                        try {
                            int newData;
                            while ((newData = pis.read()) != -1) {
                                System.out.print((char) newData);
                            }
                            pis.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    });

                    newReaderThread.start();
                    newReaderThread.join();
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
            });

           
// Start threads
            writerThread.start();
            readerThread.start();

           
// Wait for threads to finish
            writerThread.join();
            readerThread.join();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

13.使用 BufferedReader 进行缓冲阅读:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class BufferedReaderExample {
    public static void main(String[] args) {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();

        try {
            pis.connect(pos);

            // Start a writer thread
            Thread writerThread = new Thread(() -> {
                try {
                    pos.write(
"Buffered Reader Example".getBytes());
                    pos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

           
// Start a reader thread with BufferedReader
            Thread readerThread = new Thread(() -> {
                try (BufferedReader reader = new BufferedReader(new InputStreamReader(pis))) {
                    String line;
                    while ((line = reader.readLine()) != null) {
                        System.out.println(line);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

           
// Start threads
            writerThread.start();
            readerThread.start();

           
// Wait for threads to finish
            writerThread.join();
            readerThread.join();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}