PipedReader与PipedInputStream极其相似,PipedWriter与PipedOutputStream也极其相似。如果已经看过Java8 I/O源码-PipedInputStream与PipedOutputStream,可以忽略以下内容。
PipedReader与PipedWriter分别为字符管道输入流和字符管道输出流。管道输入流通过连接到管道输出流实现了类似管道的功能,用于线程之间的通信。
通常,由某个线程向管道输出流中写入数据。根据管道的特性,这些数据会自动发送到与管道输出流对应的管道输入流中。这时其他线程就可以从管道输入流中读取数据,这样就实现了线程之间的通信。
PipedReader为字符管道输入流,用于读取对应的字符管道输出流写入其内置字符缓存数组buffer中的字符、借此来实现线程之间的通信。
PipedWriter为字符管道输出流、用于将当前线程的指定字符写入到与此线程对应的管道字符输入流中。
PipedReader下面来学习下PipedReader的源码。
public class PipedReader extends Reader { //管道输出流是否关闭 boolean closedByWriter = false; //管道输入流是否关闭 boolean closedByReader = false; //管道输入流是否被连接 boolean connected = false; //从管道中读取数据的线程 Thread readSide; //向管道中写入数据的线程 Thread writeSide; /** * 管道循环输入缓冲区的默认大小。 */ private static final int DEFAULT_PIPE_SIZE = 1024; /** * 放置数据的循环缓冲区。 */ char buffer[]; /** * 缓冲区的位置,当从连接的管道输出流中接收到下一个数据字符时,会将其存储到该位置。 */ int in = -1; /** * 缓冲区的位置,此管道输入流将从该位置读取下一个数据字节。 */ int out = 0; /** * 创建PipedReader,并指定其对应的PipedWriter。 */ public PipedReader(PipedWriter src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } /** * 创建一个PipedReader,使其连接到管道输出流src,并指定管道大小为pipeSize。 * @since 1.6 */ public PipedReader(PipedWriter src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } /** * 创建尚未连接的PipedReader。 */ public PipedReader() { initPipe(DEFAULT_PIPE_SIZE); } /** * 创建一个尚未连接的PipedReader,并指定管道大小为pipeSize。 * @since 1.6 */ public PipedReader(int pipeSize) { initPipe(pipeSize); } //创建PipedInputStream时指定其缓冲区大小 private void initPipe(int pipeSize) { //如果参数pipeSize小于等于0,抛出异常。 if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe size <= 0"); } buffer = new char[pipeSize]; } /** * 将PipedReader连接到指定的PipedWriter 。 * * 如果PipedReader已经被连接到了其他PipedWriter ,抛出IOException。 */ public void connect(PipedWriter src) throws IOException { src.connect(this); } /** * 接收一个字符,将其插入到缓冲区。如果没有可用的输入,方法会阻塞。 */ synchronized void receive(int c) throws IOException { //检查PipedReader的状态是否正常。 if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } ///获取将数据写入管道的线程 writeSide = Thread.currentThread(); //如果被写入管道的数据刚好被读完 while (in == out) { if ((readSide != null) && !readSide.isAlive()) { throw new IOException("Pipe broken"); } /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } //??? if (in < 0) { in = 0; out = 0; } //将数据字节写入到缓冲区中 buffer[in++] = (char) c; //如果in已经超出了缓冲区的范围,将in置为0,从头开始写 if (in >= buffer.length) { in = 0; } } /** * 接收字节数组中的部分数据,存到缓冲区中。 * 直到输入可用之前,方法会阻塞。 */ synchronized void receive(char c[], int off, int len) throws IOException { while (--len >= 0) { receive(c[off++]); } } /** * 管道输出流关闭时(PipedWriter.close()中会调用此方法),通知其已经关闭。 */ synchronized void receivedLast() { closedByWriter = true; notifyAll(); } /** * 从管道输入流中读取下个数据字节。 * 数据字节作为0~255之间的整数返回。 * 在输入数据可用、检测到流的末尾或者抛出异常前,方法一直阻塞。 * * @return 下一个数据字节;如果已到达流末尾,则返回-1。 * @exception IOException 如果管道未连接、损坏、关闭,或者发生 I/O 错误。 */ public synchronized int read() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; while (in < 0) { if (closedByWriter) { /* closed by writer, return EOF */ return -1; } if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++]; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; } /** * 将最多len个数据字节从此管道输入流读入char数组。 * * 如果已到达数据流的末尾,或者len超出管道缓冲区大小,则读取的字符数将少于len。 * * 如果len为0,则不读取任何字节并返回0; * 否则,在至少1个输入字符可用、检测到流末尾、抛出异常前,该方法将一直阻塞。 */ public synchronized int read(char cbuf[], int off, int len) throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } if ((off < 0) || (off > cbuf.length) || (len < 0) || ((off + len) > cbuf.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ int c = read(); if (c < 0) { return -1; } cbuf[off] = (char)c; int rlen = 1; while ((in >= 0) && (--len > 0)) { cbuf[off + rlen] = buffer[out++]; rlen++; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; } /** * 告知是否准备读取此流。如果循环缓冲区不为空,则传送字符流已做好被读取的准备。 */ public synchronized boolean ready() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } if (in < 0) { return false; } else { return true; } } /** * 关闭此传送流并释放与该流相关的所有系统资源。 */ public void close() throws IOException { in = -1; closedByReader = true; }}请参考Java8 I/O源码-PipedInputStream与PipedOutputStream中的demo。
总结请参考Java8 I/O源码-PipedInputStream与PipedOutputStream中的demo。
总结对PipedReader与PipedWriter的介绍就到这里。想了解Java8 I/O源码的更多内容,请参考
| 欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) | 黑马程序员IT技术论坛 X3.2 |