林珍治 發表於 2026-2-19 22:07:00

9、PipedInputStream和PipedOutputStream的源码分析和使用方法详细分析

<p>  在多线程编程中,线程间的数据交换是一个常见需求。Java IO包中的PipedInputStream和PipedOutputStream提供了一种高效的线程间通信机制,允许一批(多个)线程向PipedOutputStream写入数据,另一批(多个)线程从PipedInputStream读取数据。<br>
  但是,同一批(多个)线程相互之间会存在竞争,比如,同一批向PipedOutputStream写入数据的线程会存在竞争,同一批从PipedInputStream读取数据的线程也会存在竞争。因此PipedInputStream和PipedOutputStream中的线程安全需要通过synchronized关键字和wait()/notifyAll()机制实现。不建议在一个线程中同时使用PipedInputStream和PipedOutputStream,因为这样可能会导致这个线程陷入死锁状态。<br>
  PipedInputStream和PipedOutputStream之间的通信本质上是一个生产者-消费者模型,其中PipedOutputStream作为生产者,PipedInputStream作为消费者。两者通过一个循环缓冲区(byte[]数组)进行数据交换,PipedOutputStream将数据缓存在PipedInputStream的数组当中,等待PipedInputStream的读取。<br>
  PipedInputStream和PipedOutputStream的UML关系图,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219215754596-156220550.png"></p>
<h4 id="一pipedoutputstream生产者源码向pipedinputstream消费者中的缓冲区byte数组写入字节数据的输出stream生产者">一、PipedOutputStream(生产者)源码——向PipedInputStream(消费者)中的缓冲区(byte[]数组)写入字节数据的输出Stream(生产者)</h4>
<pre><code>package java.io;

import java.io.*;

public
class PipedOutputStream extends OutputStream {
    //与这个PipedOutputStream(生产者)相关联的 PipedInputStream (消费者)
    private PipedInputStream sink;
   
    //构造函数
    public PipedOutputStream(PipedInputStream snk)throws IOException {
      connect(snk);//调用connect()函数,来改变PipedInputStream (消费者)中一些变量的值
    }
   
    //构造函数
    public PipedOutputStream() {
    }
   
    //线程同步函数:用来改变将要关联的PipedInputStream (消费者)中一些变量的值
    public synchronized void connect(PipedInputStream snk) throws IOException {
      if (snk == null) {
            throw new NullPointerException();//如果将要关联的PipedInputStream (消费者)为null,抛出NullPointerException
      } else if (sink != null || snk.connected) {
            //如果与这个PipedOutputStream(生产者)相关联的 PipedInputStream (消费者)!=null或者将要关联的PipedInputStream (消费者)的boolean connected变量为true,则抛出IOException
            throw new IOException("Already connected");
      }
      sink = snk;//将这个PipedOutputStream(生产者)与这个PipedInputStream (消费者)相关联
      snk.in = -1;//改变PipedInputStream (消费者)中的变量int in=-1
      snk.out = 0;//改变PipedInputStream (消费者)中的变量int out=0
      snk.connected = true;//改变PipedInputStream (消费者)中的变量boolean connected=true
    }
   
    //向与这个PipedOutputStream(生产者)相关联的 PipedInputStream (消费者)的缓冲区(byte[]数组)写入1个字节
    public void write(int b)throws IOException {
      if (sink == null) {
             //如果与这个PipedOutputStream(生产者)相关联的 PipedInputStream (消费者)== null,抛出IOException
            throw new IOException("Pipe not connected");
      }
      sink.receive(b);//最终调用的是这个相关联的 PipedInputStream (消费者)的receive(int b)函数
    }
   
    //向与这个PipedOutputStream(生产者)相关联的 PipedInputStream (消费者)的缓冲区(byte[]数组)写入byte[]数组b的[off,off+len)(左闭右开,不包括off+len)索引位置的字节
    public void write(byte b[], int off, int len) throws IOException {
      if (sink == null) {
            //如果与这个PipedOutputStream(生产者)相关联的 PipedInputStream (消费者)== null,抛出IOException
            throw new IOException("Pipe not connected");
      } else if (b == null) {
            throw new NullPointerException();//如果byte[]数组b==null,抛出一个NullPointerException
      } else if ((off &lt; 0) || (off &gt; b.length) || (len &lt; 0) ||
                   ((off + len) &gt; b.length) || ((off + len) &lt; 0)) {//byte[]数组b的[off,off+len)(左闭右开)索引位置是否有越界的检查
            throw new IndexOutOfBoundsException();//越界的话,抛出一个IndexOutOfBoundsException
      } else if (len == 0) {
            return;//如果len==0,结束本次函数调用
      }
      sink.receive(b, off, len);//最终调用的是这个相关联的 PipedInputStream (消费者)的receive(byte b[], int off, int len)函数
    }
   
    //线程同步函数:使用notifyAll()函数唤醒所有与这个PipedOutputStream(生产者)相关联的 PipedInputStream (消费者)线程(这个消费者可以绑定1~多个线程)
    public synchronized void flush() throws IOException {
      if (sink != null) {
            synchronized (sink) {
                sink.notifyAll();
            }
      }
    }
    //关闭这个PipedOutputStream(生产者),这个PipedOutputStream(生产者)不能再向与它相关联的PipedInputStream(消费者)中的缓冲区(byte[]数组)写入字节数据
    public void close()throws IOException {
      if (sink != null) {
            sink.receivedLast();
      }
    }
}
</code></pre>
<h4 id="二pipedinputstream消费者源码从自己的缓冲区byte数组读取字节数据的输入stream消费者">二、PipedInputStream(消费者)源码——从自己的缓冲区(byte[]数组)读取字节数据的输入Stream(消费者)</h4>
<pre><code>package java.io;

public class PipedInputStream extends InputStream {
    //标记符:true表示与这个 PipedInputStream (消费者)相关联的PipedOutputStream(生产者)已经关闭,反之,反之
    boolean closedByWriter = false;
    //标记符:true表示当前这个 PipedInputStream (消费者)已经关闭了,反之,反之
    volatile boolean closedByReader = false;
    //标记符:true表示与这个 PipedInputStream (消费者)相关联的PipedOutputStream(生产者)已经持有了这个PipedInputStream (消费者)对象(或者叫已经连接上了),反之,反之
    boolean connected = false;

    Thread readSide;//当前消费的线程
    Thread writeSide;//当前生产者的线程
   
    //默认的PipedInputStream (消费者)的缓冲区(byte[]数组)的长度
    private static final int DEFAULT_PIPE_SIZE = 1024;

    //PipedInputStream (消费者)的缓冲区(byte[]数组)
    protected byte buffer[];
    //缓冲区(byte[]数组)的写指针
    protected int in = -1;
    //缓冲区(byte[]数组)的读指针
    protected int out = 0;
    //构造函数
    public PipedInputStream(PipedOutputStream src) throws IOException {
      this(src, DEFAULT_PIPE_SIZE);//缓冲区(byte[]数组)的长度使用默认值1024
    }

    //构造函数
    public PipedInputStream(PipedOutputStream src, int pipeSize)
            throws IOException {
         initPipe(pipeSize);//缓冲区(byte[]数组)的长度使用指定的长度
         //最终还是调用PipedOutputStream(生产者)的connect()函数,并把自身对象this传递进去,然后在PipedOutputStream(生产者)的connect()函数中,改变自己的3个变量int in=-1、int out=0、boolean connected=true
         connect(src);
    }
   
    //构造函数,缓冲区(byte[]数组)的长度使用默认值1024
    public PipedInputStream() {
      initPipe(DEFAULT_PIPE_SIZE);
    }

    //构造函数,缓冲区(byte[]数组)的长度使用指定的长度
    public PipedInputStream(int pipeSize) {
      initPipe(pipeSize);
    }
   
    //初始化缓冲区(byte[]数组)
    private void initPipe(int pipeSize) {
         if (pipeSize &lt;= 0) {
            throw new IllegalArgumentException("Pipe Size &lt;= 0");
         }
         buffer = new byte;
    }

    public void connect(PipedOutputStream src) throws IOException {
      src.connect(this); //最终还是调用PipedOutputStream(生产者)的connect()函数,并把自身对象this传递进去,然后在PipedOutputStream(生产者)的connect()函数中,改变自己的3个变量int in=-1、int out=0、boolean connected=true
    }
   
    //线程同步函数:该函数只被PipedOutputStream(生产者)的write(int b)函数调用
    protected synchronized void receive(int b) throws IOException {
      checkStateForReceive();//检查PipedInputStream (消费者)的状态
      writeSide = Thread.currentThread();//当前执行该函数的线程,就是生产者线程
      if (in == out)
            //如果缓冲区(byte[]数组)的读指针==缓冲区(byte[]数组)的写指针,唤醒所有消费者线程,自己这个生产者线程调用wait(1000)函数
            awaitSpace();
      if (in &lt; 0) {//缓冲区(byte[]数组)的写指针&lt;0时,设置缓冲区(byte[]数组)的写指针=0,缓冲区(byte[]数组)的读指针=0
            in = 0;
            out = 0;
      }
      buffer = (byte)(b &amp; 0xFF);//向缓冲区的写指针位置写入1个字节
      if (in &gt;= buffer.length) {
            in = 0;//如果缓冲区满了,设置缓冲区的写指针=0
      }
    }

    //线程同步函数:该函数只被PipedOutputStream(生产者)的write(byte b[], int off, int len)函数调用
    synchronized void receive(byte b[], int off, int len)throws IOException {
      checkStateForReceive();//检查PipedInputStream (消费者)的状态
      writeSide = Thread.currentThread();//当前执行该函数的线程,就是生产者线程
      int bytesToTransfer = len;//生产者线程要写入到缓冲区(byte[]数组)中的字节总量
      while (bytesToTransfer &gt; 0) {
            if (in == out)
                //如果缓冲区(byte[]数组)的读指针==缓冲区(byte[]数组)的写指针,唤醒所有消费者线程,自己这个生产者线程调用wait(1000)函数
                awaitSpace();
            int nextTransferAmount = 0;//本次生产者线程要写入到缓冲区(byte[]数组)中的字节数量
            if (out &lt; in) {
                //如果缓冲区的读指针&lt;缓冲区的写指针,本次要写入到缓冲区(byte[]数组)中的字节数量=缓冲区的长度-缓冲区的写指针
                nextTransferAmount = buffer.length - in;
            } else if (in &lt; out) {
                if (in == -1) {
                  in = out = 0;
                  //如果缓冲区的读指针(out)&gt; 缓冲区的写指针(in)并且缓冲区的写指针(in)=-1,先设置缓冲区的读(out)、写(in)指针=0,本次要写入到缓冲区(byte[]数组)中的字节数量=缓冲区的长度
                  nextTransferAmount = buffer.length - in;
                } else {
                  //如果缓冲区的读指针(out)&gt; 缓冲区的写指针(in)并且缓冲区的写指针(in)=-1,本次要写入到缓冲区(byte[]数组)中的字节数量=读指针(out)-写指针(in)
                  nextTransferAmount = out - in;
                }
            }
            //本次生产者线程要写入到缓冲区(byte[]数组)中的字节数量最多为len,下次为len-本次写入到缓冲区(byte[]数组)中的字节数量,也就是每次写入的基于len个字节循环递减上一次写入的
            if (nextTransferAmount &gt; bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount &gt; 0);
            System.arraycopy(b, off, buffer, in, nextTransferAmount);//向缓冲区(byte[]数组)的数组b中[off,off+nextTransferAmount)索引位置的字节,都是左闭右开。
            bytesToTransfer -= nextTransferAmount;//每一次都基于len个字节循环递减本次写入到缓冲区(byte[]数组)中的字节数量nextTransferAmount
            off += nextTransferAmount;//将下次要从byte[]数组b中取字节的起始索引的位置(偏移量)+本次写入到缓冲区(byte[]数组)中的字节数量nextTransferAmount
            in += nextTransferAmount;//将缓冲区的写指针(in)+本次写入到缓冲区(byte[]数组)中的字节数量nextTransferAmount
            if (in &gt;= buffer.length) {
                in = 0;//如果缓冲区的写指针(in)&gt; 缓冲区(byte[]数组)的长度,设置缓冲区的写指针(in)=0
            }
      }
    }

    //检查PipedInputStream (消费者)的状态
    private void checkStateForReceive() throws IOException {
      if (!connected) {
            throw new IOException("Pipe not connected");
      } else if (closedByWriter || closedByReader) {
            throw new IOException("Pipe closed");
      } else if (readSide != null &amp;&amp; !readSide.isAlive()) {
            throw new IOException("Read end dead");
      }
    }
   
    //如果缓冲区(byte[]数组)的读指针==缓冲区(byte[]数组)的写指针,唤醒所有消费者线程,自己这个生产者线程调用wait(1000)函数
    private void awaitSpace() throws IOException {
      while (in == out) {
            checkStateForReceive();

            /* full: kick any waiting readers */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
      }
    }
    //关闭与这个 PipedInputStream (消费者)相关联的PipedOutputStream(生产者)
    synchronized void receivedLast() {
      closedByWriter = true;
      notifyAll();//唤醒所有消费者线程
    }
    //线程同步函数:消费者线程每次从缓冲区(byte[]数组)中读取1个字节
    public synchronized int read()throws IOException {
      if (!connected) {//检查标记符connected,如果为false,抛出IOException
            throw new IOException("Pipe not connected");
      } else if (closedByReader) {//检查标记符closedByReader,如果为true,抛出IOException
            throw new IOException("Pipe closed");
      } else if (writeSide != null &amp;&amp; !writeSide.isAlive()
                   &amp;&amp; !closedByWriter &amp;&amp; (in &lt; 0)) {
         //检查当前这个PipedInputStream (消费者)对象中引用的生产者线程和生产者线程的状态,如果和标记符closedByWriter还有缓冲区(byte[]数组)的写指针(in)不能对应的话,抛出一个IOException
            throw new IOException("Write end dead");
      }

      readSide = Thread.currentThread();//当前执行该函数的线程,就是消费者线程
      int trials = 2;//这是一个多次检测的策略变量,防止生产者线程没有关闭了与这个 PipedInputStream (消费者)相关联的PipedOutputStream(生产者)时便抛出IOException
      //in=-1的情况有种:
      //①、生产者线程还没有向缓冲区(byte[]数组)中写任何字节
      //②、消费者线程从缓冲区(byte[]数组)中读完字节(byte)数据以后读指针(out)=写指针(in),那么,当前消费者线程会设置写指针(in)=-1
      //③、消费者线程执行PipedInputStream 的close()函数后,关闭了这个 PipedInputStream (消费者)
      while (in &lt; 0) {
            if (closedByWriter) {
                /* closed by writer, return EOF */
                return -1;
            }
            if ((writeSide != null) &amp;&amp; (!writeSide.isAlive()) &amp;&amp; (--trials &lt; 0)) {
                //多个消费者线程从缓冲区(byte[]数组)中读的时候,并且前一个消费者线程已经把缓冲区(byte[]数组)中写入的字节读完了,并且前一个线程设置了写指针(in)=-1,生产者线程也关闭了与这个 PipedInputStream (消费者)相关联的PipedOutputStream(生产者)时,抛出一个IOException
                throw new IOException("Pipe broken");
            }
            /* might be a writer waiting */
            notifyAll();//此处的目的是为了唤醒所有生产者线程
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
      }
      int ret = buffer &amp; 0xFF;//获取缓冲区(byte[]数组)中读指针(out)索引位置的字节,并且将读指针(out)+1
      if (out &gt;= buffer.length) {
            out = 0;//如果读指针(out)&gt;=缓冲区(byte[]数组)的长度,设置读指针(out)=0
      }
      if (in == out) {
            /* now empty */
            in = -1;//如果消费者线程从缓冲区(byte[]数组)中读完字节(byte)数据以后读指针(out)=写指针(in),那么,当前消费者线程会设置写指针(in)=-1
      }

      return ret;
    }

    //线程同步函数:如果缓冲区(byte[]数组)中有足够多的字节的话(数量&gt;len),消费者线程每次从缓冲区(byte[]数组)中读取len个字节放到byte[]数组b的[off, off+len)索引位置(左闭右开,不包括off+len)
    //如果缓冲区(byte[]数组)中字节的数量&lt;len个(比如有in(写指针)-out(读指针)个),消费者线程每次从缓冲区(byte[]数组)中读取(in-out)个字节放到byte[]数组b的[off, off+in-out)索引位置(左闭右开,不包括off+in-out)
    public synchronized int read(byte b[], int off, int len)throws IOException {
      if (b == null) {
            throw new NullPointerException();
      } else if (off &lt; 0 || len &lt; 0 || len &gt; b.length - off) {//byte[]数组b的[off,off+len)(左闭右开)索引位置是否有越界的检查
            throw new IndexOutOfBoundsException();//越界的话,抛出一个IndexOutOfBoundsException
      } else if (len == 0) {
            return 0;//如果len==0,返回0
      }

      /* possibly wait on the first character */
      int c = read();//先调用read()函数试探性从缓冲区(byte[]数组)中读1个字节
      if (c &lt; 0) {
            return -1;//如果试探性的从缓冲区(byte[]数组)中都读不到1个字节,返回-1
      }
      b = (byte) c;//把试探性从缓冲区(byte[]数组)中读到的第1个字节放到byte[]数组b的off索引位置
      int rlen = 1;//累计从缓冲区(byte[]数组)中读到的所有字节数量
      while ((in &gt;= 0) &amp;&amp; (len &gt; 1)) {

            int available;//本次执行System.arraycopy()函数可以从缓冲区(byte[]数组)中读到byte[]数组b中的字节数量

            if (in &gt; out) {
                available = Math.min((buffer.length - out), (in - out));
            } else {
                available = buffer.length - out;
            }

            // A byte is read beforehand outside the loop
            if (available &gt; (len - 1)) {//减掉试探性从缓冲区(byte[]数组)中读到的第1个字节
                available = len - 1;
            }
            System.arraycopy(buffer, out, b, off + rlen, available);
            out += available;//读指针(out)+System.arraycopy()函数从缓冲区(byte[]数组)中读到byte[]数组b中的字节数量
            rlen += available;//累计从缓冲区(byte[]数组)中读到的所有字节数量 + System.arraycopy()函数从缓冲区(byte[]数组)中读到byte[]数组b中的字节数量
            len -= available;//len - System.arraycopy()函数从缓冲区(byte[]数组)中读到byte[]数组b中的字节数量

            if (out &gt;= buffer.length) {
                out = 0;//如果读指针(out)&gt;=缓冲区(byte[]数组)的长度,设置读指针(out)=0
            }
            if (in == out) {
                /* now empty */
                in = -1;//如果消费者线程从缓冲区(byte[]数组)中读完字节(byte)数据以后读指针(out)=写指针(in),那么,当前消费者线程会设置写指针(in)=-1
            }
      }
      return rlen;//返回累计从缓冲区(byte[]数组)中读到的所有字节数量
    }
   
    //线程同步函数:返回缓冲区(byte[]数组)中可以被消费者线程读取的字节数量
    public synchronized int available() throws IOException {
      if(in &lt; 0)
            return 0;
      else if(in == out)
            return buffer.length;
      else if (in &gt; out)
            return in - out;
      else
            return in + buffer.length - out;
    }
   
    //关闭这个 PipedInputStream (消费者),其实就是设置标记符closedByReader=true, 设置写指针(in)=-1
    public void close()throws IOException {
      closedByReader = true;
      synchronized (this) {
            in = -1;
      }
    }
}
</code></pre>
<h4 id="三1个线程向pipedoutputstream生产者写字节数据1个线程从pipedinputstream消费者读取字节数据的过程">三、1个线程向PipedOutputStream(生产者)写字节数据,1个线程从PipedInputStream(消费者)读取字节数据的过程</h4>
<h5 id="31非循环直接写和非循环直接读">3.1、非循环直接写和非循环直接读</h5>
<pre><code>package com.chelong.StreamAndReader;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class PipedTest {
   public static void main(String[] args) throws IOException {
      final PipedOutputStream output = new PipedOutputStream();
      final PipedInputStream input = new PipedInputStream(output);
      Thread thread1 = new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               output.write("Hello world, pipe!".getBytes());//write()函数是阻塞的
            } catch (IOException e) {
            }
         }
      });

      Thread thread2 = new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               int data = -1;
               while ((data = input.read()) != -1) {//read()函数是阻塞的
                  System.out.print((char) data);
               }
            } catch (IOException e) {
            }
         }
      });

      thread1.start();
      thread2.start();
   }
}
</code></pre>
<p>程序运行结果,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212445827-714738061.png"></p>
<p>  main线程构造PipedOutputStream(生产者)和PipedInputStream(消费者)的过程如下:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212510743-1554062823.png"></p>
<p>  向PipedOutputStream(生产者)写字节数据的生产者线程的执行过程如下:</p>
<p><img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212544914-451659960.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212555287-1090346414.png"></p>
<p>  从PipedInputStream(消费者)读取字节数据的消费者线程的执行过程如下:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212607497-1898462647.png"></p>
<h6 id="311非循环直接写和非循环直接读时1个生产者线程和1个消费者线程处理数据的过程">3.1.1、非循环直接写和非循环直接读时1个生产者线程和1个消费者线程处理数据的过程</h6>
<p>  Java 语言定义了 6 种线程状态, 在任意一个时间点, 一个线程只能有且只有其中的一种状态, 这 6 种状态分别如下:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212638111-71943307.png"></p>
<p>这 6 种线程状态的简单介绍,如下所示<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212705389-2145559667.png"></p>
<p>  JVM运行时内存结构主要包含了五个部分:程序计数器 (PC寄存器)、 JVM栈、Native方法栈、堆、 方法区。如下图所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212726158-604379458.png"></p>
<p>图中红色部分是线程私有区域,进入这个区域的数据不会出现线程竞争的关系。而绿色区域中的数据则被所有线程共享,其中Java堆中存放的是大量对象,方法区中存放class信息、常量、静态变量等数据。<br>
  每个线程的线程栈中会存放函数(方法)的描述符,成员(本地)变量等,函数(方法)在线程栈中会通过压栈和弹栈来执行,除了8种(byte、short、int、long、float、double、boolean、char)基本的数据类型存储在线程栈中以外,其余的引用数据类型(对象)都存储在堆中,然后通过引用将堆中的对象和线程栈中的变量关联起来(也可以叫线程栈中的引用指向堆中的对象)。<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212846610-2137279183.png"></p>
<p>那么,当使用者执行3.1中的代码时,1个生产者线程和1个消费者线程处理数据的过程如下:<br>
①、main线程初始化一个缓冲区(byte[]数组),长度为1024(默认值),然后生产者线程通过不断的压栈来完成函数之间的调用,最终执行PipedInputStream.class::receive(byte b[], int off, int len)函数来对缓冲区(byte[]数组)进行填充,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212906698-848464368.png"></p>
<p>②、当生产者线程填充完缓冲区之后,写指针变量int in=17,读指针变量int out=0,Thread writeSide = 当前这个生产者线程(Thread)对象,生产者线程会把自己线程栈中修改的变量最终刷新到堆中PipedInputStream对象中,以确保其它消费者线程的线程栈从堆中读取这3个变量时,这3个变量已经为修改后的值,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212918513-1084660758.png"></p>
<p>③、消费者线程读缓冲区(byte[]数组)的过程中会不断地执行out++(读指针)以读取缓冲区(byte[]数组)中的可用字节并返回,直到out(读指针)==in(写指针),修改in(写指针)=-1,并且每次同步执行PipedInputStream.class::read()函数时,都会更新Thread readSide = 当前这个消费者线程(Thread)对象,消费者线程也会把自己线程栈中修改的变量最终刷新到堆中PipedInputStream对象中,以确保其它消费者线程的线程栈从堆中读取这3个变量时,这3个变量已经为修改后的值,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212929105-1277795762.png"></p>
<p>④、更新in(写指针)=-1后,消费者线程再次同步执行PipedInputStream.class::read()函数时,如果PipedInputStream::boolean closedByWriter变量为true,则会返回-1</p>
<h5 id="32加锁循环写和非加锁循环读到byte数组b中再处理">3.2、加锁循环写和非加锁循环读到byte[]数组b中再处理</h5>
<pre><code>package com.chelong.pipe;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class PipeForTransferInThread {
   public static void main(String[] args) throws IOException, InterruptedException {
      final PipedOutputStream output = new PipedOutputStream();
      final PipedInputStream input = new PipedInputStream(output);
      //生产者线程
      Thread producer = new Thread(new Runnable() {
         @Override
         public void run() {
            for (int i = 0; i &lt; 3; i++) {
               synchronized (input) {
                  try {
//                  input.wait();
                     output.write("Hello world, pipe!".getBytes());
                     input.wait();//释放锁并无限等待,直到消费者线程consumer 执行notifyAll()函数来唤醒当前阻塞
                  } catch (Exception e) {
                     e.printStackTrace();
                  }
               }
            }
         }
      },"生产者线程");
      
      //消费者线程
      Thread consumer = new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               byte[] b = new byte;//1KB
               int readBytes = -1;
               long lastTime = System.currentTimeMillis();
               while ((readBytes = input.read(b, 0, b.length)) != -1) {
                  long curTime = System.currentTimeMillis();
                  System.out.print(Thread.currentThread().getName()+"本次读取花费时间:" + (curTime - lastTime) + "ms,读到的数据是:");
                  lastTime = curTime;
                  for (int i = 0; i &lt; readBytes; i++) {
                     System.out.print((char) b);//模拟处理字节数据
                  }
                  System.out.println();
               }
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
      },"消费者线程");
      producer.start();//生产者线程启动
      consumer.start();//消费者线程启动
   }
}
</code></pre>
<p>程序运行结果,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219212957175-20935426.png"></p>
<p>  main线程构造PipedOutputStream(生产者)和PipedInputStream(消费者)的过程可以参考3.1;<br>
  向PipedOutputStream(生产者)写字节数据的生产者线程的执行过程可以参考3.1;<br>
  从PipedInputStream(消费者)读取字节数据的消费者线程的执行过程如下:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213038694-175651231.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213049017-456869541.png"></p>
<h6 id="321加锁循环写和非加锁循环读到byte数组b中再处理时1个生产者线程和1个消费者线程处理数据的过程">3.2.1、加锁循环写和非加锁循环读到byte[]数组b中再处理时1个生产者线程和1个消费者线程处理数据的过程</h6>
<p>  标题3.2中的代码的整个执行过程如下:<br>
①、main线程初始化一个缓冲区(byte[]数组),长度为1024(默认值),如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213118666-370990115.png"></p>
<p>②、然后生产者线程通过不断的压栈来完成函数之间的调用,最终执行PipedInputStream.class::receive(byte b[], int off, int len)函数来对缓冲区(byte[]数组)进行填充,并且先在自己的线程栈中更新in(写指针)=17,out(读指针)=0,writeSide=当前这个生产者线程(Thread)对象 如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213206234-855863118.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213228794-600203709.png"></p>
<p>当生产者线程对缓冲区(byte[]数组)填充完成之后,再执行标题3.2中的代码</p>
<pre><code>input.wait();
</code></pre>
<p>这行代码会释放锁并让生产者线程进入无限等待,直到消费者线程consumer执行notifyAll()函数来唤醒当前这个生产者线程。在这之前,生产者线程会将自己线程栈中的in(写指针)=17,out(读指针)=0,writeSide=当前这个生产者线程,这3个变量更新到主内存(也就是堆)中的PipedInputStream对象中。<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213330975-716788447.png"></p>
<p>③、消费者线程读缓冲区(byte[]数组)的过程也是通过不断的压栈来完成函数之间的调用,最终执行PipedInputStream::read()函数(试探性的读取1个字节)和PipedInputStream::read(byte b[], int off, int len)函数(读取剩余其它的字节)将步骤②中生产者线程写入到缓冲区(byte[]数组)中的17个字节读取出来<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213355658-1832149818.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213408524-331963725.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213419622-364653237.png"></p>
<blockquote>
<p>附言:最终消费者线程也会将自己线程栈中的in(写指针)= -1,out(读指针)= 17,writeSide=当前这个消费者线程,这3个变量更新到主内存(也就是堆)中的PipedInputStream对象中。</p>
</blockquote>
<p>因此,本次消费者线程从缓冲区(byte[]数组)中读数据的过程中没有执行read()函数中的wait(1000)这一行代码,如下:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214247483-2022355589.png"></p>
<p>所以,本次消费者线程从缓冲区(byte[]数组)中读取数据到消费者线程中自己创建的byte[]数组中时,只花费了0ms:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214258804-1015500725.png"></p>
<p>接下来,当消费者线程将步骤②中生产者线程写入到缓冲区(byte[]数组)中的17个字节读取出来以后(通过System.arraycopy()函数复制到了消费者线程中自己创建的byte[]数组中),消费者线程会遍历从缓冲区读到的这个byte[]数组,来处理这些数据,如下所示(标题3.2中的代码片段):</p>
<pre><code>                   //标题3.2中的代码片段
                   for (int i = 0; i &lt; readBytes; i++) {
                     System.out.print((char) b);//模拟处理字节数据
                  }
</code></pre>
<p>然后,当消费者线程再次执行</p>
<pre><code>//标题3.2中的代码片段
input.read(b, 0, b.length)
</code></pre>
<p>从缓冲区(byte[]数组)中读数据到自己创建的byte[]数组中时,由于此时in(写指针)=-1,并且当下图中的其它5个条件都不成立时,唤醒执行了</p>
<pre><code>input.wait()
</code></pre>
<p>的生产者线程,然后当前这个正在从缓冲区(byte数组)中读数据的消费者线程执行wait 1000ms ,如下:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214349541-777000844.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214404547-1644993049.png"></p>
<p>④、当生产者线程被消费者线程执行的</p>
<pre><code>notifyAll();
</code></pre>
<p>唤醒之后,会再次通过不断的压栈来完成函数之间的调用,再次执行PipedInputStream.class::receive(byte b[], int off, int len)函数来对缓冲区(byte[]数组)进行填充,并且先在自己的线程栈中先更新in(写指针)=17,out(读指针)=0,writeSide=当前这个生产者线程(Thread)对象 如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213206234-855863118.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214506624-944592756.png"></p>
<p>当生产者线程对缓冲区(byte[]数组)填充完成之后,再执行标题3.2中的代码</p>
<pre><code>input.wait();
</code></pre>
<p>这行代码会释放锁并让生产者线程进入无限等待,直到消费者线程consumer执行notifyAll()函数来唤醒当前这个生产者线程。在这之前,生产者线程会将自己线程栈中的in(写指针)=17,out(读指针)=0,writeSide=当前这个生产者线程,这3个变量更新到主内存(也就是堆)中的PipedInputStream对象中。如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214533848-1664801751.png"></p>
<p>⑤、消费者线程在第③步执行了</p>
<pre><code>wait(1000);
</code></pre>
<p>在等待了1000ms之后,消费者线程会自动唤醒继续执行,此时自己线程栈中的in(写指针)= -1,out(读指针)= 17已经被第④步中的生产者线程修改为in(写指针)=17,out(读指针)=0(生产者线程不会直接修改消费者线程栈中的变量,生产者线程会先将自己线程栈中in(写指针),out(读指针)变量的值修改到主内存中,然后消费者线程会自己将主内存中的这2个变量值刷新到消费者自己的线程栈中),如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214556548-1757854590.png"></p>
<p>然后执行PipedInputStream::read()函数(试探性的读取1个字节)和PipedInputStream::read(byte b[], int off, int len)函数(读取剩余其它的字节)将步骤④中生产者线程写入到缓冲区(byte[]数组)中的17个字节读取出来<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213355658-1832149818.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213408524-331963725.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214634067-1884703899.png"></p>
<blockquote>
<p>附言:最终消费者线程也会将自己线程栈中的in(写指针)= -1,out(读指针)= 17,writeSide=当前这个消费者线程,这3个变量更新到主内存(也就是堆)中的PipedInputStream对象中。</p>
</blockquote>
<p>由于,本次消费者线程从缓冲区(byte[]数组)中读数据的过程是从步骤③中自动唤醒继续执行的,所以,本次消费者线程从缓冲区(byte[]数组)中读取数据到消费者线程中自己创建的byte[]数组中时,花费了1015ms:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214751875-1048696201.png"></p>
<p>接下来,当消费者线程将步骤④中生产者线程写入到缓冲区(byte[]数组)中的17个字节读取出来以后(通过System.arraycopy()函数复制到了消费者线程中自己创建的byte[]数组中),消费者线程会遍历从缓冲区读到的这个byte[]数组,来处理这些数据,如下所示(标题3.2中的代码片段):</p>
<pre><code>                   //标题3.2中的代码片段
                   for (int i = 0; i &lt; readBytes; i++) {
                     System.out.print((char) b);//模拟处理字节数据
                  }
</code></pre>
<p>然后,当消费者线程再次执行</p>
<pre><code>//标题3.2中的代码片段
input.read(b, 0, b.length)
</code></pre>
<p>从缓冲区(byte[]数组)中读数据到自己创建的byte[]数组中时,由于此时in(写指针)=-1,并且当下图中的其它5个条件都不成立时,唤醒执行了</p>
<pre><code>input.wait()
</code></pre>
<p>的生产者线程,然后当前这个正在从缓冲区(byte[]数组)中读数据的消费者线程执行wait 1000ms ,如下:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214349541-777000844.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214404547-1644993049.png"></p>
<p>⑥、当生产者线程被消费者线程执行的</p>
<pre><code>notifyAll();
</code></pre>
<p>唤醒之后,会再次通过不断的压栈来完成函数之间的调用,再次执行PipedInputStream.class::receive(byte b[], int off, int len)函数来对缓冲区(byte[]数组)进行填充,并且先在自己的线程栈中先更新in(写指针)=17,out(读指针)=0,writeSide=当前这个生产者线程(Thread)对象 如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213206234-855863118.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214506624-944592756.png"></p>
<p>当生产者线程对缓冲区(byte[]数组)填充完成之后,再执行标题3.2中的代码</p>
<pre><code>input.wait();
</code></pre>
<p>这行代码会释放锁并让生产者线程进入无限等待,直到消费者线程consumer执行notifyAll()函数来唤醒当前这个生产者线程。在这之前,生产者线程会将自己线程栈中的in(写指针)=17,out(读指针)=0,writeSide=当前这个生产者线程,这3个变量更新到主内存(也就是堆)中的PipedInputStream对象中。如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214533848-1664801751.png"></p>
<p>⑦、消费者线程在第⑤步执行了</p>
<pre><code>wait(1000);
</code></pre>
<p>在等待了1000ms之后,消费者线程会自动唤醒继续执行,此时自己线程栈中的in(写指针)= -1,out(读指针)= 17已经被第⑥步中的生产者线程修改为in(写指针)=17,out(读指针)=0(生产者线程不会直接修改消费者线程栈中的变量,生产者线程会先将自己线程栈中in(写指针),out(读指针)变量的值修改到主内存中,然后消费者线程会自己将主内存中的这2个变量值刷新到消费者自己的线程栈中),然后执行PipedInputStream::read()函数(试探性的读取1个字节)和PipedInputStream::read(byte b[], int off, int len)函数(读取剩余其它的字节)将步骤⑥中生产者线程写入到缓冲区(byte[]数组)中的17个字节读取出来<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213355658-1832149818.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219213408524-331963725.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214634067-1884703899.png"></p>
<blockquote>
<p>附言:最终消费者线程也会将自己线程栈中的in(写指针)= -1,out(读指针)= 17,writeSide=当前这个消费者线程,这3个变量更新到主内存(也就是堆)中的PipedInputStream对象中。</p>
</blockquote>
<p>由于,本次消费者线程从缓冲区(byte[]数组)中读数据的过程是从步骤⑤中自动唤醒继续执行的,所以,本次消费者线程从缓冲区(byte[]数组)中读取数据到消费者线程中自己创建的byte[]数组中时,花费了1017ms:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219215231062-1134536045.png"></p>
<p>接下来,当消费者线程将步骤⑥中生产者线程写入到缓冲区(byte[]数组)中的17个字节读取出来以后(通过System.arraycopy()函数复制到了消费者线程中自己创建的byte[]数组中),消费者线程会遍历从缓冲区读到的这个byte[]数组,来处理这些数据,如下所示(标题3.2中的代码片段):</p>
<pre><code>                   //标题3.2中的代码片段
                   for (int i = 0; i &lt; readBytes; i++) {
                     System.out.print((char) b);//模拟处理字节数据
                  }
</code></pre>
<p>然后,当消费者线程再次执行</p>
<pre><code>//标题3.2中的代码片段
input.read(b, 0, b.length)
</code></pre>
<p>从缓冲区(byte[]数组)中读数据到自己创建的byte[]数组中时,由于此时in(写指针)=-1,并且当下图中的其它5个条件都不成立时,唤醒执行了</p>
<pre><code>input.wait()
</code></pre>
<p>的生产者线程,然后当前这个正在从缓冲区(byte[]数组)中读数据的消费者线程执行wait 1000ms ,如下:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214349541-777000844.png"><br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219214404547-1644993049.png"></p>
<p>⑧、当生产者线程被消费者线程执行的</p>
<pre><code>notifyAll();
</code></pre>
<p>唤醒之后,会跳出for循环,结束生产者线程的生命周期,之后,该线程对象会被操作系统回收。<br>
⑨、消费者线程在第⑦步执行了</p>
<pre><code>wait(1000);
</code></pre>
<p>在等待了1000ms之后,消费者线程会自动唤醒继续执行,此时自己线程栈中的in(写指针)= -1,out(读指针)= 17,并且从</p>
<pre><code>wait(1000);
</code></pre>
<p>的代码之后,继续执行,执行过程如下(从下图的紫色流程继续执行):<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219215457614-1446643501.png"></p>
<p>在执行了2个循环后,直到int trials = 0时,执行到判断(writeSide != null) &amp;&amp; (!writeSide.isAlive()) &amp;&amp; (--trials &lt; 0)这个条件时就会为true(下图的红色流程)<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219215508348-1020804433.png"></p>
<p>然后,抛出了一个IOException("Pipe broken"),因此,可以得出int trials变量的含义:这个变量是一个多次检测的策略变量,当生产者线程没有关闭了与这个 PipedInputStream (消费者)相关联的PipedOutputStream(生产者)时,并且writeSide变量指向的当前生产者线程已经被操作系统回收时(此时当前生产者线程对象的isAlive()函数会返回false),消费者线程会抛出1个IOException("Pipe broken"),并结束while循环,进而结束消费者线程的生命周期。之后,该线程对象也会被操作系统回收。如下图所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219215557745-511787897.png"></p>
<h6 id="322怎样防止321中第步的生产者线程抛出ioexceptionpipe-broken">3.2.2、怎样防止3.2.1中第⑨步的生产者线程抛出IOException("Pipe broken")</h6>
<p>  回顾3.2.1中第⑨步中的消费者线程抛出IOException("Pipe broken")的产生过程:当执行到判断(writeSide != null) &amp;&amp; (!writeSide.isAlive()) &amp;&amp; (--trials &lt; 0)这个条件时就会为true(下图的红色流程)<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219215508348-1020804433.png"></p>
<p>那么,使用者就可以将上图中红色流程的前一步变成true即可,如下代码所示(只修改了生产者线程中的代码,消费者线程中的代码没有变化):</p>
<pre><code>package com.chelong.pipe;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
   public static void main(String[] args) throws IOException, InterruptedException {
      final PipedOutputStream output = new PipedOutputStream();
      final PipedInputStream input = new PipedInputStream(output);
      //生产者线程
      Thread producer = new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               for (int i = 0; i &lt; 3; i++) {
                  synchronized (input) {
//                  input.wait();
                     output.write("Hello world, pipe!".getBytes());
                     input.wait();//释放锁并无限等待,直到消费者线程thread2执行notifyAll()函数来唤醒当前阻塞
                  }
               }
            } catch (Exception e) {
               e.printStackTrace();
            } finally {
               try {
                  if (output != null) output.close();//调用close()函数关闭生产者对象
               } catch (IOException e) {
                  e.printStackTrace();
               }
            }
         }
      }, "生产者线程");

      //消费者线程
      Thread consumer = new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               byte[] b = new byte;//1KB
               int readBytes = -1;
               long lastTime = System.currentTimeMillis();
               while ((readBytes = input.read(b, 0, b.length)) != -1) {
                  long curTime = System.currentTimeMillis();
                  System.out.print(Thread.currentThread().getName() + "本次读取花费时间:" + (curTime - lastTime) + "ms,读到的数据是:");
                  lastTime = curTime;
                  for (int i = 0; i &lt; readBytes; i++) {
                     System.out.print((char) b);//模拟处理字节数据
                  }
                  System.out.println();
               }
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
      }, "消费者线程");
      producer.start();//生产者线程启动
      consumer.start();//消费者线程启动
   }
}
</code></pre>
<p>程序运行结果,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202602/2485827-20260219215642548-1984110125.png"></p>
<p>  通过PipedOutputStream.class::close()的源码可以看到这样修改后消费者线程不再抛出IOException("Pipe broken")原因:<br>
PipedOutputStream.class(生产者类)的源码</p>
<pre><code>package java.io;

import java.io.*;

public
class PipedOutputStream extends OutputStream {
    ...省略部分代码...
    //关闭这个PipedOutputStream(生产者),这个PipedOutputStream(生产者)不能再向与它相关联的PipedInputStream(消费者)中的缓冲区(byte[]数组)写入字节数据
    public void close()throws IOException {
      if (sink != null) {
            sink.receivedLast();//调用PipedInputStream.class::receivedLast()函数
      }
    }
}
</code></pre>
<p>PipedInputStream .class(消费者类)的源码</p>
<pre><code>package java.io;

public class PipedInputStream extends InputStream {
    //标记符:true表示与这个 PipedInputStream (消费者)相关联的PipedOutputStream(生产者)已经关闭,反之,反之
    boolean closedByWriter = false;
    ...省略部分代码...
    //关闭与这个 PipedInputStream (消费者)相关联的PipedOutputStream(生产者)
    synchronized void receivedLast() {
      closedByWriter = true;//关闭后消费者再从缓冲区(byte[])数组中读取字节数据时,会返回-1,不会抛出IOException了
      notifyAll();//唤醒所有消费者线程
    }
    ...省略部分代码...
</code></pre>
<h4 id="四多个线程向pipedoutputstream生产者写字节数据多个线程从pipedinputstream消费者读取字节数据的过程">四、多个线程向PipedOutputStream(生产者)写字节数据,多个线程从PipedInputStream(消费者)读取字节数据的过程</h4>
<p>  略(待补充)</p><br><br>
来源:https://www.cnblogs.com/Carey-ccl/p/19625392
頁: [1]
查看完整版本: 9、PipedInputStream和PipedOutputStream的源码分析和使用方法详细分析