|
PipedReader与PipedInputStream极其相似,PipedWriter与PipedOutputStream也极其相似。如果已经看过Java8 I/O源码-PipedInputStream与PipedOutputStream,可以忽略以下内容。 PipedReader与PipedWriter分别为字符管道输入流和字符管道输出流。管道输入流通过连接到管道输出流实现了类似管道的功能,用于线程之间的通信。 通常,由某个线程向管道输出流中写入数据。根据管道的特性,这些数据会自动发送到与管道输出流对应的管道输入流中。这时其他线程就可以从管道输入流中读取数据,这样就实现了线程之间的通信。 PipedReader为字符管道输入流,用于读取对应的字符管道输出流写入其内置字符缓存数组buffer中的字符、借此来实现线程之间的通信。 PipedWriter为字符管道输出流、用于将当前线程的指定字符写入到与此线程对应的管道字符输入流中。 PipedReader下面来学习下PipedReader的源码。 public class PipedReader extends Reader { //管道输出流是否关闭 boolean closedByWriter = false; //管道输入流是否关闭 boolean closedByReader = false; //管道输入流是否被连接 boolean connected = false; //从管道中读取数据的线程 Thread readSide; //向管道中写入数据的线程 Thread writeSide; /** * 管道循环输入缓冲区的默认大小。 */ private static final int DEFAULT_PIPE_SIZE = 1024; /** * 放置数据的循环缓冲区。 */ char buffer[]; /** * 缓冲区的位置,当从连接的管道输出流中接收到下一个数据字符时,会将其存储到该位置。 */ int in = -1; /** * 缓冲区的位置,此管道输入流将从该位置读取下一个数据字节。 */ int out = 0; /** * 创建PipedReader,并指定其对应的PipedWriter。 */ public PipedReader(PipedWriter src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } /** * 创建一个PipedReader,使其连接到管道输出流src,并指定管道大小为pipeSize。 * @since 1.6 */ public PipedReader(PipedWriter src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } /** * 创建尚未连接的PipedReader。 */ public PipedReader() { initPipe(DEFAULT_PIPE_SIZE); } /** * 创建一个尚未连接的PipedReader,并指定管道大小为pipeSize。 * @since 1.6 */ public PipedReader(int pipeSize) { initPipe(pipeSize); } //创建PipedInputStream时指定其缓冲区大小 private void initPipe(int pipeSize) { //如果参数pipeSize小于等于0,抛出异常。 if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe size <= 0"); } buffer = new char[pipeSize]; } /** * 将PipedReader连接到指定的PipedWriter 。 * * 如果PipedReader已经被连接到了其他PipedWriter ,抛出IOException。 */ public void connect(PipedWriter src) throws IOException { src.connect(this); } /** * 接收一个字符,将其插入到缓冲区。如果没有可用的输入,方法会阻塞。 */ synchronized void receive(int c) throws IOException { //检查PipedReader的状态是否正常。 if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } ///获取将数据写入管道的线程 writeSide = Thread.currentThread(); //如果被写入管道的数据刚好被读完 while (in == out) { if ((readSide != null) && !readSide.isAlive()) { throw new IOException("Pipe broken"); } /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } //??? if (in < 0) { in = 0; out = 0; } //将数据字节写入到缓冲区中 buffer[in++] = (char) c; //如果in已经超出了缓冲区的范围,将in置为0,从头开始写 if (in >= buffer.length) { in = 0; } } /** * 接收字节数组中的部分数据,存到缓冲区中。 * 直到输入可用之前,方法会阻塞。 */ synchronized void receive(char c[], int off, int len) throws IOException { while (--len >= 0) { receive(c[off++]); } } /** * 管道输出流关闭时(PipedWriter.close()中会调用此方法),通知其已经关闭。 */ synchronized void receivedLast() { closedByWriter = true; notifyAll(); } /** * 从管道输入流中读取下个数据字节。 * 数据字节作为0~255之间的整数返回。 * 在输入数据可用、检测到流的末尾或者抛出异常前,方法一直阻塞。 * * @return 下一个数据字节;如果已到达流末尾,则返回-1。 * @exception IOException 如果管道未连接、损坏、关闭,或者发生 I/O 错误。 */ public synchronized int read() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; while (in < 0) { if (closedByWriter) { /* closed by writer, return EOF */ return -1; } if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++]; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; } /** * 将最多len个数据字节从此管道输入流读入char数组。 * * 如果已到达数据流的末尾,或者len超出管道缓冲区大小,则读取的字符数将少于len。 * * 如果len为0,则不读取任何字节并返回0; * 否则,在至少1个输入字符可用、检测到流末尾、抛出异常前,该方法将一直阻塞。 */ public synchronized int read(char cbuf[], int off, int len) throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } if ((off < 0) || (off > cbuf.length) || (len < 0) || ((off + len) > cbuf.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ int c = read(); if (c < 0) { return -1; } cbuf[off] = (char)c; int rlen = 1; while ((in >= 0) && (--len > 0)) { cbuf[off + rlen] = buffer[out++]; rlen++; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; } /** * 告知是否准备读取此流。如果循环缓冲区不为空,则传送字符流已做好被读取的准备。 */ public synchronized boolean ready() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } if (in < 0) { return false; } else { return true; } } /** * 关闭此传送流并释放与该流相关的所有系统资源。 */ public void close() throws IOException { in = -1; closedByReader = true; }}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
demo请参考Java8 I/O源码-PipedInputStream与PipedOutputStream中的demo。 总结- PipedReader和PipedInputStream的几乎一模一样。区别在于PipedReader操作的是字符,PipedInputStream操作的是字节;PipedReader有ready方法来判断是否可以从PipedReader中读数据,而PipedInputStream则根据available()方法进行判断。
PipedWriterpublic class PipedWriter extends Writer { /* 与PipedWriter相连接的管道输入流*/ private PipedReader sink; /* 标识PipedWriter是否关闭 */ private boolean closed = false; /** * 创建连接到指定输入流的管道输出流。 */ public PipedWriter(PipedReader snk) throws IOException { connect(snk); } /** * 创建没有连接到输入流的管道输出流。 * 在使用前,它必须连接到管道输入流。 */ public PipedWriter() { } /** * 将此管道输出流连接到指定管道输入流 */ public synchronized void connect(PipedReader snk) throws IOException { ////如果指定的输入流为null,抛出异常 if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) {//如果指定的输入流已经连接到输出流,抛出异常 throw new IOException("Already connected"); } else if (snk.closedByReader || closed) {//如果输入流或输出流关闭,抛出异常 throw new IOException("Pipe closed"); } //连接到输入流 sink = snk; //输入流的in指定为-1 snk.in = -1; //输入流的out指定为0 snk.out = 0; //输入流标记为已连接 snk.connected = true; } /** /** * 将指定的字符写入到此管道输出流。 */ public void write(int c) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } sink.receive(c); } /** * 将len个字符从初始偏移量为off的指定cbuf数组写入该管道输出流。 * 在将所有字符写入输出流之前,此方法一直处于阻塞状态。 */ public void write(char cbuf[], int off, int len) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } else if ((off | len | (off + len) | (cbuf.length - (off + len))) < 0) { throw new IndexOutOfBoundsException(); } sink.receive(cbuf, off, len); } /** * 刷新此输出流并强制写出所有缓冲的输出字节。 * 这将通知所有读取数据的线程,告知它们管道中的字符处于等待中。 */ public synchronized void flush() throws IOException { if (sink != null) { if (sink.closedByReader || closed) { throw new IOException("Pipe closed"); } synchronized (sink) { sink.notifyAll(); } } } /** * 关闭此管道输出流并释放与此流有关的所有系统资源。 */ public void close() throws IOException { closed = true; if (sink != null) { sink.receivedLast(); } }}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
demo请参考Java8 I/O源码-PipedInputStream与PipedOutputStream中的demo。 总结- PipedWriter和PipedOutputStream的几乎一模一样。区别在于PipedReader操作的是字符,PipedInputStream操作的是字节。
对PipedReader与PipedWriter的介绍就到这里。想了解Java8 I/O源码的更多内容,请参考 版权声明
|