A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

PipedInputStream与PipedOutputStream分别为管道输入流和管道输出流。管道输入流通过连接到管道输出流实现了类似管道的功能,用于线程之间的通信。

通常,由某个线程向管道输出流中写入数据。根据管道的特性,这些数据会自动发送到与管道输出流对应的管道输入流中。这时其他线程就可以从管道输入流中读取数据,这样就实现了线程之间的通信。

不建议对这两个流对象尝试使用单个线程,因为这样可能死锁线程。

PipedInputStreamoutline定义
public class PipedInputStream extends InputStream
字段
字段
说明

boolean closedByWriter = false;
管道输出流是否关闭

volatile boolean closedByReader = false;
管道输入流是否关闭

boolean connected = false;
管道输入流是否被连接

Thread readSide;
从管道中读取数据的线程

Thread writeSide;
向管道中写入数据的线程

private static final int DEFAULT_PIPE_SIZE = 1024;
管道循环输入缓冲区的默认大小。

protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
管道循环输入缓冲区的默认大小。

protected byte buffer[];
放置数据的循环缓冲区。

protected int in = -1;
缓冲区的位置,当从连接的管道输出流中接收到下一个数据字节时,会将其存储到该位置。

protected int out = 0;
缓冲区的位置,此管道输入流将从该位置读取下一个数据字节。
构造方法
构造方法
说明

public PipedInputStream(PipedOutputStream src) throws IOException {…}
创建PipedInputStream,使其连接到管道输出流src。

public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { …}
创建一个PipedInputStream,使其连接到管道输出流src,并对管道缓冲区使用指定的管道大小。

public PipedInputStream() {…}
创建尚未连接的PipedInputStream。

public PipedInputStream(int pipeSize) {…}
创建一个尚未连接的PipedInputStream,并对管道缓冲区使用指定的管道大小。
方法
修饰符
字段

private void initPipe(int pipeSize) {…}
创建PipedInputStream时指定缓冲区大小

public void connect(PipedOutputStream src) throws IOException {…}
将PipedInputStream连接到指定的PipedOutputStream

protected synchronized void receive(int b) throws IOException {…}
接收数据字节到缓冲区中

synchronized void receive(byte b[], int off, int len) throws IOException {…}
从此byte数组中从位置off开始接收最多len个数据字节。

private void checkStateForReceive() throws IOException {…}
检查PipedInputStream是否可以接收数据。

private void awaitSpace() throws IOException {…}
等待。

synchronized void receivedLast() {…}
当所有数据被接收完,关闭PipedOutputStream,唤醒所有等待的线程

public synchronized int read() throws IOException {…}
读取此管道输入流中的下一个数据字节。

public synchronized int read(byte b[], int off, int len) throws IOException {
将最多len个数据字节从此管道输入流读入byte 数组。

public synchronized int available() throws IOException {…}
返回可以不受阻塞地从此输入流中读取的字节数。

public void close() throws IOException {…}
关闭此管道输入流并释放与该流相关的所有系统资源。
构造方法PipedInputStream( PipedOutputStream src)/** * 创建PipedInputStream,并指定其对应的PipedOutputStream */public PipedInputStream(PipedOutputStream src) throws IOException {    this(src, DEFAULT_PIPE_SIZE);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
PipedInputStream( PipedOutputStream src, int pipeSize)/** * 创建PipedInputStream,并指定其对应的PipedOutputStream 和缓冲区大小。 */public PipedInputStream(PipedOutputStream src, int pipeSize)        throws IOException {     initPipe(pipeSize);     connect(src);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
PipedInputStream()/** * 创建PipedInputStream,并指定其缓冲区大小为默认大小。 */public PipedInputStream() {    initPipe(DEFAULT_PIPE_SIZE);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
PipedInputStream( int pipeSize)/** * 创建PipedInputStream,并指定其缓冲区大小。 */public PipedInputStream(int pipeSize) {    initPipe(pipeSize);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
方法initPipe( int pipeSize)//创建PipedInputStream时指定其缓冲区大小private void initPipe(int pipeSize) {     //如果参数pipeSize小于等于0,抛出异常。     if (pipeSize <= 0) {        throw new IllegalArgumentException("Pipe Size <= 0");     }     buffer = new byte[pipeSize];}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
connect( PipedOutputStream src)/** * 将PipedInputStream连接到指定的PipedOutputStream。 *  * 如果PipedInputStream已经被连接到了其他PipedOutputStream, * 抛出IOException。 */public void connect(PipedOutputStream src) throws IOException {    src.connect(this);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
receive( int b)/** * 接收一个数据字节,将其插入到缓冲区。如果没有可用的输入,方法会阻塞。 *  * @param b 接收的字节 * @exception IOException 如果管道损坏、未连接、关闭,或者发生I/O错误。 * @since     JDK1.1 */protected synchronized void receive(int b) throws IOException {    //检查PipedInputStream的状态是否正常。    checkStateForReceive();    //获取将数据写入管道的线程    writeSide = Thread.currentThread();    //如果被写入管道的数据刚好被读完    if (in == out)        //等待        awaitSpace();    //?    if (in < 0) {        in = 0;        out = 0;    }    //将数据字节写入到缓冲区中,位置为in,然后in+1    buffer[in++] = (byte)(b & 0xFF);    //如果in已经超出了缓冲区的范围,将in置为0,从头开始写    if (in >= buffer.length) {        in = 0;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
receive( byte b[], int off, int len)/** * 接收字节数组中的部分数据,存到缓冲区中。 * 直到输入可用之前,方法会阻塞。 *  * @param b 字节数组 * @param off 读取数据的起始偏移量(从off处开始读取) * @param len 最多可以接收的数据字节个数 * @exception IOException 如果管道损坏、未连接、关闭,或者发生I/O错误。 */synchronized void receive(byte b[], int off, int len) throws IOException {    //检查PipedInputStream状态,如果不正常,则抛出异常。    checkStateForReceive();    //获取将数据写入管道的线程    writeSide = Thread.currentThread();    //需要接收的数据量,初始值为len    int bytesToTransfer = len;    //如果需要接收的数据>0,即还有需要接收的数据    while (bytesToTransfer > 0) {        //如果写入管道的数据已经被读完,则等待        if (in == out)            awaitSpace();        //下次要传输的字节数,初始值为0        int nextTransferAmount = 0;        //如果缓冲区未满(满后,in会重置为0)        if (out < in) {            //计算下次要传输的字节数。值为缓冲区还剩余的空间(即in之前的位置都满了,只有in到末尾的位置可用)            nextTransferAmount = buffer.length - in;        } else if (in < out) {//?            if (in == -1) {//?                in = out = 0;                nextTransferAmount = buffer.length - in;            } else {//如果缓冲区已满(此时out之后和in之前的位置已满,只有in到out之间的位置可用)                nextTransferAmount = out - in;            }        }        //如果下次传输的字节数大于需要传输的字节数,这时只需要bytesToTransfer大小的空间就可以了        if (nextTransferAmount > bytesToTransfer)            nextTransferAmount = bytesToTransfer;        assert(nextTransferAmount > 0);        //将nextTransferAmount个字节从b中复制到缓冲区数组中        System.arraycopy(b, off, buffer, in, nextTransferAmount);        //重新计算需要传输的字节数,已经传输了nextTransferAmount字节,所以减去nextTransferAmount        bytesToTransfer -= nextTransferAmount;        //b中已经复制的部分以后不能再复制了,所以偏移量off后移nextTransferAmount个位置,防止重复复制        off += nextTransferAmount;        //将in后移nextTransferAmount,防止之前复制的数据被覆盖        in += nextTransferAmount;        //如果in已经超出缓冲区大小,将in置0,从头开始写        if (in >= buffer.length) {            in = 0;        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
checkStateForReceive()//检查PipedInputStream是否可以接收数据private void checkStateForReceive() throws IOException {    //如果没有连接输出流,抛出异常    if (!connected) {        throw new IOException("Pipe not connected");    } else if (closedByWriter || closedByReader) {//如果输入流或者输出流关闭,抛出异常        throw new IOException("Pipe closed");    } else if (readSide != null && !readSide.isAlive()) {//如果从管道中读数据的线程死亡,抛出异常        throw new IOException("Read end dead");    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
awaitSpace()//等待。如果写入管道的数据正好全部被读完,则执行awaitSpace()操作。//目的是唤醒线程,让管道中有被新写入的数据。private void awaitSpace() throws IOException {    //如果写入管道的数据正好全部被读完    while (in == out) {        checkStateForReceive();        /* full: kick any waiting readers */        notifyAll();        try {            wait(1000);        } catch (InterruptedException ex) {            throw new java.io.InterruptedIOException();        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
receivedLast()/** * 管道输出流关闭时(PipedOutputStream.close()中会调用此方法),通知其已经关闭。 */synchronized void receivedLast() {    closedByWriter = true;    notifyAll();}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
read()/** * 从管道输入流中读取下个数据字节。 * 数据字节作为0~255之间的整数返回。 * 在输入数据可用、检测到流的末尾或者抛出异常前,方法一直阻塞。 * * @return     下一个数据字节;如果已到达流末尾,则返回-1。 * @exception  IOException  如果管道未连接、损坏、关闭,或者发生 I/O 错误。 */public synchronized int read()  throws IOException {    //检查PipedInputStream状态,如果不可用,抛出异常    if (!connected) {        throw new IOException("Pipe not connected");    } else if (closedByReader) {        throw new IOException("Pipe closed");    } else if (writeSide != null && !writeSide.isAlive()               && !closedByWriter && (in < 0)) {        throw new IOException("Write end dead");    }    //获取从管道中读取数据的线程    readSide = Thread.currentThread();    //?    int trials = 2;    //?    while (in < 0) {        //如果管道输出流没有关闭        if (closedByWriter) {            /* closed by writer, return EOF */            //返回-1            return -1;        }        //如果写入数据的线程不为null且不活跃且trials<=0,说明管道损坏,抛出异常        if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {            throw new IOException("Pipe broken");        }        /* might be a writer waiting */        notifyAll();        try {            wait(1000);        } catch (InterruptedException ex) {            throw new java.io.InterruptedIOException();        }    }    //读取下个字节    int ret = buffer[out++] & 0xFF;    //如果out已经超出缓冲区范围,将out置为0,从头开始读    if (out >= buffer.length) {        out = 0;    }    //如果写入缓冲区中的数据已经读完,返回-1    if (in == out) {        /* now empty */        in = -1;    }    //返回读取到的字节    return ret;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
read( byte b[], int off, int len)/** * 将最多len个数据字节从此管道输入流读入byte数组。 *  * 如果已到达数据流的末尾,或者len超出管道缓冲区大小,则读取的字节数将少于len。 *  * 如果len为 0,则不读取任何字节并返回0; * 否则,在至少1个输入字节可用、检测到流末尾、抛出异常前,该方法将一直阻塞。 * * @param      b     读入数据的缓冲区 * @param      off   目标数组b中的起始偏移量 * @param      len   最多读取的字节数 * @return     读入缓冲区的总字节数;如果由于已到达流末尾而不再有数据,则返回 -1。 * @exception  NullPointerException 如果b为null。 * @exception  IndexOutOfBoundsException 如果off为负,len为负,或者len大于b.length - off * @exception  IOException 如果管道损坏、未连接、关闭,或者发生 I/O 错误。 */public synchronized int read(byte b[], int off, int len)  throws IOException {    //判断参数是否合法    if (b == null) {        throw new NullPointerException();    } else if (off < 0 || len < 0 || len > b.length - off) {        throw new IndexOutOfBoundsException();    } else if (len == 0) {        return 0;    }    //读取第一个字节    int c = read();    //如果到达缓冲区末尾,返回-1    if (c < 0) {        return -1;    }    //将读到的数据存入b中    b[off] = (byte) c;    //记录读取到的字节数    int rlen = 1;    //当输入流正常且要读取的字节数>1    while ((in >= 0) && (len > 1)) {        //记录可读的字节数        int available;        //如果缓冲区未满        if (in > out) {            //计算可读字节数。            available = Math.min((buffer.length - out), (in - out));        } else {//如果缓冲区已满,计算可读字节数            available = buffer.length - out;        }        // 如果可读字节数大于len-1,说明可以读取len个字节。否则说明没有len个字节可以读了。为什么是len-1呢?因为在计算可读字节数前,已经读取了一个字节了。        if (available > (len - 1)) {            available = len - 1;        }        //将available个字节从缓冲区中读取b中        System.arraycopy(buffer, out, b, off + rlen, available);        //将out后移available个位置        out += available;        //读取到的字节数+available        rlen += available;        //len-available        len -= available;        //如果out已经超出缓冲区范围,将out置为0,从头开始读        if (out >= buffer.length) {            out = 0;        }        //如果已经读完了,即读到了缓冲区末尾,返回-1        if (in == out) {            /* now empty */            in = -1;        }    }    //返回实际读取的字节数    return rlen;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
available()/** * 返回可以不受阻塞地从此输入流中读取的字节数。 * * @return 可以不受阻塞地从此输入流读取的字节数;如果已经调用close()方法关闭此输入流、管道未连接或已损坏,则返回0。 */public synchronized int available() throws IOException {    //如果已经调用close()方法关闭此输入流、管道未连接或已损坏,则返回0    if(in < 0)        return 0;    else if(in == out)//如果写入的数据被读完了,返回缓冲区的大小        return buffer.length;    else if (in > out)//如果写入的数据没被读完,返回写入的数据-已读的数据        return in - out;    else//如果in < out,这种情况为什么会出现,为什么是这个值?        return in + buffer.length - out;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
close()/** * 关闭此管道输入流,并释放与该流相关的所有系统资源。 */public void close()  throws IOException {    closedByReader = true;    //    synchronized (this) {        in = -1;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
PipedOutputStreamoutline定义
public class PipedOutputStream extends OutputStream
字段
字段
说明

private PipedInputStream sink;
与PipedOutputStream相连接的管道输入流
构造方法
字段
说明

public PipedOutputStream(PipedInputStream snk) throws IOException {…}

public PipedOutputStream() {}
方法字段
说明

public synchronized void connect(PipedInputStream snk) throws IOException {…}
将此管道输出流连接到管道输入流。

public void write(int b) throws IOException {…}
将指定数据字节写入管道输出流。

public void write(byte b[], int off, int len) throws IOException {…}

public synchronized void flush() throws IOException {…}
将len个字节从初始偏移量为off的指定byte数组写入管道输出流。

public void close() throws IOException {…}
关闭此管道输出流并释放与此流有关的所有系统资源。
构造方法PipedOutputStream( PipedInputStream snk)/** * 创建连接到指定输入流的管道输出流。 */public PipedOutputStream(PipedInputStream snk)  throws IOException {    connect(snk);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
PipedOutputStream()/** * 创建没有连接到输入流的管道输出流。 * 在使用前,它必须连接到管道输入流。 */public PipedOutputStream() {}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
connect( PipedInputStream snk)/** * 将此管道输出流连接到指定管道输入流 */public synchronized void connect(PipedInputStream snk) throws IOException {    //如果指定的输入流为null,抛出异常    if (snk == null) {        throw new NullPointerException();    } else if (sink != null || snk.connected) {//如果已经连接到输入流,抛出异常        throw new IOException("Already connected");    }    //连接到输入流    sink = snk;    //输入流的in指定为-1    snk.in = -1;    //输入流的out指定为0    snk.out = 0;    //输入流标记为已连接    snk.connected = true;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
write( int b)/** * 将指定的字节写入到此管道输出流。 * * @param      b   the <code>byte</code> to be written. * @exception IOException 如果管道处于毁坏或未连接状态,或者发生 I/O 错误。 */public void write(int b)  throws IOException {    if (sink == null) {        throw new IOException("Pipe not connected");    }    //将字节数组写入到缓冲区    sink.receive(b);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
write( byte b[], int off, int len)/** * 将len字节从初始偏移量为off的指定byte数组写入该管道输出流。 * 在将所有字节写入输出流之前,此方法一直处于阻塞状态。 * * @param      b     the data. * @param      off   the start offset in the data. * @param      len   the number of bytes to write. * @exception IOException 如果管道处于毁坏或未连接状态,或者发生 I/O 错误。 */public void write(byte b[], int off, int len) throws IOException {    //如果管道处于毁坏或未连接状态,或者发生 I/O 错误,抛出异常    if (sink == null) {        throw new IOException("Pipe not connected");    } else if (b == null) {        throw new NullPointerException();    } else if ((off < 0) || (off > b.length) || (len < 0) ||               ((off + len) > b.length) || ((off + len) < 0)) {        throw new IndexOutOfBoundsException();    } else if (len == 0) {        return;    }    //将数据写入到缓冲区    sink.receive(b, off, len);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
flush()/** * 刷新此输出流并强制写出所有缓冲的输出字节。 * 这将通知所有读取数据的线程,告知它们管道中的字符处于等待中。 */public synchronized void flush() throws IOException {    if (sink != null) {        synchronized (sink) {            sink.notifyAll();        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
close()/** * 关闭此管道输出流并释放与此流有关的所有系统资源。 */public void close()  throws IOException {    if (sink != null) {        //写线程关闭        sink.receivedLast();    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
demo

下面是使用PipedInputStream与PipedOutputStream来实现线程之间通信的一个简单实例。

一共有三个类:

  • Sender.java。发送者线程,信息的发送者。将信息写入管道输出流中
  • Receiver.java。接收者线程,信息的接收者。将信息从管道输入流中读取出来。
  • PipedStreamTest。测试类。演示线程之间的通信。

Sender.java

import java.io.IOException;import java.io.PipedOutputStream;/** * 发送者线程 */public class Sender extends Thread {    private PipedOutputStream out = new PipedOutputStream();    public PipedOutputStream getOutputStream() {        return out;    }    @Override    public void run() {        writeMessage();    }    // 向管道输出流中写入信息    private void writeMessage() {        String strInfo = "Hello World!";        try {            // 向管道输入流中写入数据            out.write(strInfo.getBytes());            // 释放资源            out.close();        } catch (IOException e) {            e.printStackTrace();        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

Receiver.java

import java.io.IOException;import java.io.PipedInputStream;/** * 接收者线程 */public class Receiver extends Thread {    private PipedInputStream in = new PipedInputStream();    public PipedInputStream getInputStream() {        return in;    }    @Override    public void run() {        readMessage();    }    // 从管道输入流中读取数据    public void readMessage() {        byte[] buf = new byte[1024];        try {            //从缓冲区中读取数据到buf中            int len = in.read(buf);            //打印读取到的内容            System.out.println(new String(buf, 0, len));            //释放资源            in.close();        } catch (IOException e) {            e.printStackTrace();        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

PipedStreamTest.java

import java.io.IOException;import java.io.PipedInputStream;import java.io.PipedOutputStream;/** * 测试类 */public class PipedStreamTest {    public static void main(String[] args) {        Sender sender = new Sender();        Receiver receiver = new Receiver();        PipedOutputStream out = sender.getOutputStream();        PipedInputStream in = receiver.getInputStream();        try {            // 连接输入流和输出流。下面两条语句的效果是一样。            // out.connect(in);            in.connect(out);            sender.start();            receiver.start();        } catch (IOException e) {            e.printStackTrace();        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

运行测试类,控制台会打印出Hello World!。这是信息发送者线程发送的信息,被接收者线程接收到后打印的。

总结
  • PipedInputStream与PipedOutputStream分别为管道输入流和管道输出流。管道输入流通过连接到管道输出流实现了类似管道的功能,用于线程之间的通信。
  • 通常,由某个线程向管道输出流中写入数据。根据管道的特性,这些数据会自动发送到与管道输出流对应的管道输入流中。这时其他线程就可以从管道输入流中读取数据,这样就实现了线程之间的通信。
  • 不建议对这两个流对象尝试使用单个线程,因为这样可能死锁线程。
  • PipedOutputStream是数据的发送者;PipedInputStream是数据的接收者。
  • PipedInputStream缓冲区大小默认为1024,写入数据时写入到这个缓冲区的,读取数据也是从这个缓冲区中读取的。
  • PipedInputStream通过read方法读取数据。PipedInputStream通过write方法写入数据,write方法其实是调用PipedInputStream中的receive方法来实现将数据写入缓冲区的的,因为缓冲区是在PipedInputStream中的。
  • PipedOutputStream和PipedInputStream之间通过connect()方法连接。
  • 使用后要关闭输入输出流。

对PipedInputStream与PipedOutputStream的介绍就到这里。想了解Java8 I/O源码的更多内容,请参考

版权声明


1 个回复

倒序浏览

很不错,受教了
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马