聽風思雨 發表於 2025-4-28 20:24:00

并发编程--上篇

<h1 id="java并发探索--上篇">Java并发探索--上篇</h1>
<h2 id="1基本概念">1.基本概念</h2>
<ul>
<li><strong>线程与进程</strong>:线程是程序执行的最小单位,而进程是系统进行资源分配和调度的基本单位。例如,一个 Java 程序可以包含多个线程,它们共享进程的资源。</li>
<li><strong>并发与并行</strong>:并发是指多个任务在同一时间段内执行,而并行是指多个任务在同一时刻执行。在多核 CPU 系统中,可以实现真正的并行。</li>
<li><strong>同步与异步</strong>:同步是指程序按照顺序依次执行,而异步是指程序在执行某个任务时,不需要等待该任务完成,可以继续执行其他任务。</li>
</ul>
<p>“Java并发探索--下篇” --- 在下面找</p>
<p>【博客园】</p>
<p>https://www.cnblogs.com/jackjavacpp</p>
<p>【CSDN】</p>
<p>https://blog.csdn.net/okok__TXF</p>
<h2 id="2探索线程的创建">2.探索线程的创建</h2>
<h3 id="线程的状态">①线程的状态</h3>
<p>从<code>Thread</code>源码里面看出</p>
<pre><code class="language-java">public enum State {
        // 尚未启动的线程的线程状态。
    NEW,
        // 就绪
    RUNNABLE,
        // 等待监视器锁的线程的线程状态
    BLOCKED,
        /*
        等待线程的线程状态,线程由于调用以下方法之一而处于等待状态:
        Object.wait() 没有超时
        Thread.join() 没有超时
        LockSupport.park()
        */
    WAITING,
        /*
        指定等待时间的等待线程的线程状态
        线程处于定时等待状态,因为调用了以下方法之一,并指定等待时间:
        Thread.sleep
    Object.wait with timeout
    Thread.join with timeout
    LockSupport.parkNanos
    LockSupport.parkUntil
        */
    TIMED_WAITING,
        //终止线程的线程状态。线程已完成执行。
    TERMINATED;
}
</code></pre>
<p>下面看一张图,很清楚的解释了各状态之间的关系:【节选自https://blog.csdn.net/agonie201218/article/details/128712507】</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202155489-1446648903.png" style="zoom: 67%">
<p>在Java中,一个Thread有大致六个状态。</p>
<p>线程创建之后(new Thread)它将处于 <strong>NEW(新建)</strong> 状态,调用 <code>start()</code> 方法后开始运行,线程这时候处于 <strong>RUNNABLE(就绪)</strong> 状态。可运行状态的线程获得了 CPU 时间片后就处于 <strong>RUNNING(运行)</strong> 状态。</p>
<p>明白了线程的运行状态,接下来让我们来看一下在爪哇里面如何创建并且启动线程。</p>
<h3 id="线程创建">②线程创建</h3>
<h4 id="1两种基本方式">1)两种基本方式</h4>
<ul>
<li>继承Thread类,重写run方法</li>
</ul>
<pre><code class="language-java">public class MyThread1 extends Thread {
    @Override
    public void run() {
      System.out.println(Thread.currentThread().getName() + ": hello world");
    }
}
public class JUCMain {
    public static void main(String[] args) {
      new MyThread1().start();
    }
}
</code></pre>
<ul>
<li>实现Runnable接口,传入Thread</li>
</ul>
<pre><code class="language-java">public class Runnable1 implements Runnable{
    @Override
    public void run() {
      System.out.println("hello world, Runnable");
    }
}
public class JUCMain {
    public static void main(String[] args) {
      new Thread(new Runnable1()).start();
    }
}
</code></pre>
<p>网上还传有其他创建线程的方式,比如: Callable接口,重写call,结合FutureTask;线程池;lambda表达式等等。。。诚然,这也确实是创建线程启动的方式不错。但是本文毕竟是探索性质的文章,我们要探索其本质。</p>
<p>首先从<code>start()</code>方法看起(这个方式<strong>属于Thread类</strong>的)。调用<code>start()</code>后,JVM会创建一个新线程并执行该线程的<code>run()</code>方法。<strong>注意</strong>:直接调用<code>run()</code>不会启动新线程,而是在当前线程中执行。</p>
<pre><code class="language-java">// 启动线程并触发 JVM 创建原生线程
// synchronized后面解释【见 探索“锁”】
public synchronized void start() {
    // 零状态值对应于状态 “NEW”
    // 线程想要start,必须是为0的状态
    if (threadStatus != 0)
      throw new IllegalThreadStateException();
    /*
    group 是线程所属的线程组。这行代码将当前线程实例添加到线程组中,
    同时线程组的未启动线程计数会减1。
    */
    group.add(this);
    boolean started = false;
    try {
      start0(); //关键!调用本地方法(native)
      started = true;
    } finally {
      try {
            if (!started) { //启动失败时回滚
                //如果 started 为 false,说明线程启动失败,
                //调用 group.threadStartFailed(this) 方法通知线程组该线程启动失败。
                group.threadStartFailed(this);
            }
      } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
      }
    }
}
//========== native
private native void start0();
</code></pre>
<p>那么执行的是run()方法,run方法里面是啥呢</p>
<pre><code class="language-java">private Runnable target; // target是Runnable类型

@Override
public void run() {
    if (target != null) {
      target.run();
    }
}
</code></pre>
<p>如果继承Thread类后,重写run()方法,那么run方法就会覆盖上面的方法。</p>
<p>如果是实现的Runnable接口,<code>new Thread(new Runnable1())</code>的时候,就会把target赋值,然后调用run()方法的时候,就执行的是target的run方法了。</p>
<h4 id="2-其他创建方式">2) 其他创建方式</h4>
<h5 id="lambda">.lambda</h5>
<ul>
<li><strong>lambda表达式创建</strong>:这个仅仅是写法不同而已。因为Runnable是个函数式接口</li>
</ul>
<pre><code class="language-java">@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
</code></pre>
<h5 id="callable">.callable</h5>
<ul>
<li><strong>Callable创建的方式</strong></li>
</ul>
<pre><code class="language-java">public class MyCall implements Callable&lt;String&gt; {
    @Override
    public String call() throws Exception {
      Thread.sleep(2000);
      return "Hello Callable";
    }
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask&lt;String&gt; task = new FutureTask&lt;&gt;(new MyCall());
    new Thread(task).start();
    System.out.println(task.get());
}
</code></pre>
<p><code>new Thread(Runnable runnable)</code>要求传的类型是Runnable,但是现在传的是FutureTask。所以先来看一看FutureTask和Runnable之间有什么联系.</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202156126-1269832798.png" style="zoom: 80%">
<p>从上面可以看到,FutureTask实现了RunnableFuture接口,然后RunnableFuture接口继承了Future和Runnable两个接口。</p>
<p><code>Future&lt;V&gt;</code></p>
<p><code>Future</code> 接口是 Java 并发编程中的一个重要接口,位于 <code>java.util.concurrent</code> 包下,它代表了一个异步计算的结果。异步计算意味着在调用方法后,程序不会立即等待结果返回,而是可以继续执行其他任务,当结果准备好时,再通过 <code>Future</code> 对象获取。</p>
<pre><code class="language-java">// 这里使用了泛型 &lt;V&gt;,表示该 Future 对象所代表的异步计算结果的类型。
public interface Future&lt;V&gt; {

    //尝试取消异步任务的执行。
    /*
    如果任务已经完成、已经被取消或者由于其他原因无法取消,则返回 false;
    如果任务成功取消,则返回 true。
    */
    boolean cancel(boolean mayInterruptIfRunning);

    //如果任务在完成之前被取消,则返回 true;否则返回 false。
    boolean isCancelled();

    //如果任务已经完成,则返回 true;否则返回 false。
    boolean isDone();

    //获取异步任务的计算结果。如果任务还未完成,调用该方法的线程会被阻塞,直到任务完成。
    V get() throws InterruptedException, ExecutionException;

    //获取异步任务的计算结果,并且可以指定一个超时时间。
    //如果在指定的时间内任务还未完成,调用该方法的线程会被阻塞,直到任务完成或者超时。
    V get(long timeout, TimeUnit unit)
      throws InterruptedException, ExecutionException, TimeoutException;
}
</code></pre>
<p><code>RunnableFuture</code></p>
<pre><code class="language-java">public interface RunnableFuture&lt;V&gt; extends Runnable, Future&lt;V&gt; {
    // 很简单嘛,这个是来自Runnable的
    void run();
}
</code></pre>
<p>这个接口就相当于组合了Runnable和Future,能够获取到返回值了。</p>
<p><code>FutureTask&lt;V&gt;</code> 既然要把它当做参数传进Thread的构造函数,那么想必它肯定是实现了run方法的。</p>
<pre><code class="language-java">public class FutureTask&lt;V&gt; implements RunnableFuture&lt;V&gt; {
    // 基本属性
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL= 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED= 6;
    /** The underlying callable; nulled out after running */
    private Callable&lt;V&gt; callable;
    /** 结果 */
    private Object outcome;
    /** The thread running the callable; CAS ed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
   
    // 看它的构造函数1
    public FutureTask(Callable&lt;V&gt; callable) {
      if (callable == null)
            throw new NullPointerException();
      this.callable = callable; // 赋值callable========
      this.state = NEW; // ensure visibility of callable
    }
    // 构造函数2 ==== 本质还是把Runnable加了一层,给封装成Callable了
    public FutureTask(Runnable runnable, V result) {
      this.callable = Executors.callable(runnable, result);
      this.state = NEW;       // ensure visibility of callable
    }
    /*
    Executors::callable(xx, xx)方法==========
    public static &lt;T&gt; Callable&lt;T&gt; callable(Runnable task, T result) {
      if (task == null)
            throw new NullPointerException();
      return new RunnableAdapter&lt;T&gt;(task, result);
    }
    static final class RunnableAdapter&lt;T&gt; implements Callable&lt;T&gt; {
      final Runnable task;
      final T result;
      RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
      }
      public T call() {
            task.run(); // 调用Runnable的run()
            return result;
      }
    }
    */
   
    // run()方法 ---------------
    // new Thread(new FutureTask&lt;&gt;(new MyCall()))
    public void run() {
      if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                       null, Thread.currentThread()))
            return;
      try {
            Callable&lt;V&gt; c = callable;
            if (c != null &amp;&amp; state == NEW) {
                V result;
                boolean ran;
                try {
                  //====调用callable.call()
                  result = c.call();
                  ran = true;
                } catch (Throwable ex) {
                   .........
                }
                // 如果运行OK了,设置结果!
                if (ran) set(result);
            }
      } finally {
            .............
      }
    }
   
    // 设置结果outcome
    protected void set(V v) {
      // https://www.cnblogs.com/jackjavacpp/p/18787832
      // 使用CAS --- 【见上一篇文章 java map &amp; CAS &amp; AQS】
      if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v; // 这里
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
      }
    }
   
    // 比较核心的get方法================start
    public V get() throws InterruptedException, ExecutionException {
      int s = state;
      if (s &lt;= COMPLETING) // 如果状态不是完成
            s = awaitDone(false, 0L); // 等待完成
      return report(s); // 返回结果
    }
    private int awaitDone(boolean timed, long nanos)
      throws InterruptedException {
                // 1.计算超时截止时间
      final long deadline = timed ? System.nanoTime() + nanos : 0L;
      WaitNode q = null;
      boolean queued = false;
      for (;;) { // 2.自旋循环等待任务完成
            // 2.1如果该线程中断了
            if (Thread.interrupted()) {
                removeWaiter(q);// 从等待队列中移除当前节点
                throw new InterruptedException();
            }
            // 2.2检查状态
            int s = state;
            // 任务已终态(NORMAL, EXCEPTIONAL, CANCELLED)
            if (s &gt; COMPLETING) {
                if (q != null)
                  q.thread = null;
                return s;// 返回最终状态
            }
            // 2.3若任务状态等于 COMPLETING,表明任务正在完成,
            // 此时调用 Thread.yield() 方法让当前线程让出 CPU 时间片,等待任务完成。
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued) //将节点加入等待队列
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                   q.next = waiters, q);
            else if (timed) { // 2.4如果是有时限的get()
                nanos = deadline - System.nanoTime();
                if (nanos &lt;= 0L) {
                  removeWaiter(q);
                  return state; // 返回状态
                }
                LockSupport.parkNanos(this, nanos);
            }
            else //若没有设置超时时间,就调用 LockSupport.park 方法让当前线程无限期阻塞,直到被唤醒。
                LockSupport.park(this);
      }
    }
    private V report(int s) throws ExecutionException {
      Object x = outcome;
      if (s == NORMAL)
            return (V)x; // 返回outcome
       ......
    }
    //==================================end
}
</code></pre>
<p><code>awaitDone</code> 方法的核心功能是让<strong>当前线程等待异步任务完成,它会持续检查任务的状态,根据不同的状态采取相应的处理措施</strong>,同时支持设置超时时间。在等待过程中,若线程被中断,会抛出 <code>InterruptedException</code> 异常。</p>
<p>通过上面的分析,Callable这种方式实际上本质还是Runnable嚯。使用<strong>FutureTask将Future和Runnable</strong>结合起来,功能更加丰富。</p>
<h5 id="线程池threadpoolexecutor">.线程池ThreadPoolExecutor</h5>
<ul>
<li><strong>线程池创建线程</strong></li>
</ul>
<p>如下使用方式。</p>
<pre><code class="language-java">public class PoolMain {
    public static void main(String[] args) {
      // 创建一个线程池
      ExecutorService pool = Executors.newFixedThreadPool(1);
      long start = System.currentTimeMillis();
      // execute=============
      pool.execute(() -&gt; {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("execute pool创建启动线程!");
      });
      // submit==============
      Future&lt;Integer&gt; future = pool.submit(() -&gt; {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("submit pool创建启动线程!");
            return 100;
      });
      try {
            System.out.println(future.get());
      } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
      }
      System.out.println("main线程执行时间:" + (System.currentTimeMillis() - start));
      pool.shutdown();
    }
}
</code></pre>
<p>从上面的例子可以看出,大致有<code>ExecutorService</code>,<code>Executors</code>,newFixedThreadPool()方法本质是 new ThreadPoolExecutor(),故还有一个<code>ThreadPoolExecutor</code>类。</p>
<p>接下来梳理一下这些类背后的关系。【通过idea得到下面的关系图】此外,<code>Executors</code>只是一个工具类。</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202156530-45308.png" style="zoom: 67%">
<p><code>Executor</code>是顶级接口</p>
<pre><code class="language-java">public interface Executor {
        // 只定义了一个方法
    void execute(Runnable command);
}
</code></pre>
<p><code>ExecutorService</code>:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法</p>
<pre><code class="language-java">public interface ExecutorService extends Executor {
    void shutdown();
    List&lt;Runnable&gt; shutdownNow();
    &lt;T&gt; Future&lt;T&gt; submit(Callable&lt;T&gt; task);
    &lt;T&gt; Future&lt;T&gt; submit(Runnable task, T result);
    Future&lt;?&gt; submit(Runnable task);
    //....
    &lt;T&gt; List&lt;Future&lt;T&gt;&gt; invokeAll(Collection&lt;? extends Callable&lt;T&gt;&gt; tasks)
      throws InterruptedException;
    &lt;T&gt; T invokeAny(Collection&lt;? extends Callable&lt;T&gt;&gt; tasks)
      throws InterruptedException, ExecutionException;
}
</code></pre>
<p><code>AbstractExecutorService</code>ExecutorService执行方法的默认实现,发现下面的<strong>submit()</strong>底层实际执行的是execute(ftask)方法<strong>【Executor接口的execute()方法,在这个抽象类里面没有具体实现,到具体子类ThreadPoolExecutor在可以看到】</strong>。</p>
<pre><code class="language-java">public abstract class AbstractExecutorService implements ExecutorService {
    // 这里重点只看一下submit方法的默认实现
    // 优点1: 可以有Future返回值
        public Future&lt;?&gt; submit(Runnable task) {
      if (task == null) throw new NullPointerException();
      RunnableFuture&lt;Void&gt; ftask = newTaskFor(task, null);
      execute(ftask);
      return ftask;
    }
    public &lt;T&gt; Future&lt;T&gt; submit(Runnable task, T result) {
      if (task == null) throw new NullPointerException();
      RunnableFuture&lt;T&gt; ftask = newTaskFor(task, result);
      execute(ftask);
      return ftask;
    }
    // 优点2: 支持Callable参数
    public &lt;T&gt; Future&lt;T&gt; submit(Callable&lt;T&gt; task) {
      if (task == null) throw new NullPointerException();
      RunnableFuture&lt;T&gt; ftask = newTaskFor(task);
      execute(ftask);
      return ftask;
    }
}
</code></pre>
<p><code>ThreadPoolExecutor</code>:线程池,可以通过调用Executors静态工厂方法来创建线程池并返回一个ExecutorService对象</p>
<pre><code class="language-java">public class ThreadPoolExecutor extends AbstractExecutorService {
   /**
   * 七大参数!!!!======
   */
    public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                              int maximumPoolSize,//线程池的最大线程数
                              long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                              TimeUnit unit,//时间单位
                              BlockingQueue&lt;Runnable&gt; workQueue,//任务队列,用来储存等待执行任务的队列
                              ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                              RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,定制策略来处理任务
                               ) {
      if (corePoolSize &lt; 0 || maximumPoolSize &lt;= 0 ||
            maximumPoolSize &lt; corePoolSize || keepAliveTime &lt; 0)
            throw new IllegalArgumentException();
      if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
      this.corePoolSize = corePoolSize;
      this.maximumPoolSize = maximumPoolSize;
      this.workQueue = workQueue;
      this.keepAliveTime = unit.toNanos(keepAliveTime);
      this.threadFactory = threadFactory;
      this.handler = handler;
    }
}
</code></pre>
<p>回到这一小节最开始的时候,例子中的线程池有两种提交并运行线程的方式<code>execute</code>和<code>submit</code>两个方法。现在来看一下<code>ThreadPoolExecutor</code>中的<code>execute()</code>方法是什么样子的。submit()我们已经知道了是在AbstractExecutorService中有默认实现的。</p>
<pre><code class="language-java">// ThreadPoolExecutor::execute(Runnable command)
public void execute(Runnable command) {
    if (command == null)
      throw new NullPointerException();
    int c = ctl.get();
    // 1.若当前工作线程数小于核心线程数(corePoolSize),尝试创建新的核心线程
    // 这里是用的位运算的,我没有深究
    if (workerCountOf(c) &lt; corePoolSize) {
      if (addWorker(command, true)) //
            return;
      // 创建新线程失败,重新获取ctl
      c = ctl.get();
    }
    // 2.任务入队
    // 线程池处于运行状态(isRunning(c))
    // 且任务成功加入阻塞队列(workQueue.offer(command))
    if (isRunning(c) &amp;&amp; workQueue.offer(command)) {
      int recheck = ctl.get();
      // 2.2 双重检查
      /*
      2.2.1再次检查线程池状态(可能在此期间线程池被关闭)。
      2.2.2若线程池已关闭,尝试从队列中移除任务,移除成功则拒绝任务(reject(command))。
      2.2.3若线程池仍运行但无活跃线程(workerCountOf(recheck) == 0),
      添加一个非核心线程(addWorker(null, false)),该线程会从队列中拉取任务执行。
      */
      if (! isRunning(recheck) &amp;&amp; remove(command))
            reject(command);
      else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.若任务无法入队(队列已满),尝试创建非核心线程(addWorker(command, false))
    else if (!addWorker(command, false))
      //若创建失败(线程数已达maximumPoolSize或线程池已关闭),
      //执行拒绝策略(reject(command))
      reject(command);

}
/*
execute(command)

├─ 检查command非空

├─ 当前线程数 &lt; corePoolSize?
│   ├─ 是 → 创建核心线程 → 成功则返回
│   └─ 否 → 进入下一步

├─ 线程池是否RUNNING且任务入队成功?
│   ├─ 是 → 双重检查状态
│   │   ├─ 线程池已关闭 → 移除任务并拒绝
│   │   └─ 线程池仍运行且无活跃线程 → 创建非核心线程
│   │
│   └─ 否 → 尝试创建非核心线程
│       ├─ 成功 → 处理任务
│       └─ 失败 → 拒绝任务
*/

为什么需要二次检查线程池状态?

- 任务入队后,其他线程可能关闭了线程池(如调用shutdown())。
- 处理:
- 若线程池已关闭,需移除已入队任务并拒绝。
- 若线程池仍运行但无活跃线程(如核心线程数为0且任务在队列中),需创建新线程处理队列任务。
</code></pre>
<p><strong>场景1:核心线程数未满</strong></p>
<ul>
<li>线程池处于 <code>RUNNING</code>,当前线程数 2(<code>corePoolSize=5</code>)。</li>
<li><code>addWorker(task, true)</code> 成功创建核心线程并执行任务。</li>
</ul>
<p><strong>场景2:队列已满,创建临时线程</strong></p>
<ul>
<li>核心线程满载,队列已满,线程数未达 <code>maximumPoolSize</code>。</li>
<li><code>addWorker(task, false)</code> 创建临时线程处理任务。</li>
</ul>
<p><strong>场景3:SHUTDOWN 状态处理剩余任务</strong></p>
<ul>
<li>线程池调用 <code>shutdown()</code>,状态变为 <code>SHUTDOWN</code>。</li>
<li>已有任务在队列中,<code>addWorker(null, true)</code> 创建线程处理队列任务。</li>
</ul>
<p><code>ThreadPoolExecutor</code>设计思想:</p>
<ul>
<li><strong>核心线程优先</strong>:优先使用核心线程处理任务。</li>
<li><strong>队列缓冲</strong>:核心线程满载后,任务入队等待。</li>
<li><strong>非核心线程应急</strong>:队列满后,创建临时线程处理任务(不超过<code>maximumPoolSize</code>)</li>
</ul>
<p><mark><strong>addWorker 创建工作线程</strong></mark></p>
<p><code>addWorker</code> 方法通过精细的状态检查和并发控制,确保线程池在动态扩缩容时保持线程安全。【方便理解,可以把这个方法看作是创建线程】其核心在于:</p>
<ul>
<li><strong>双重循环</strong>:外层处理状态变化,内层处理线程数修改。</li>
<li><strong>锁与原子操作结合</strong>:保证 <code>workers</code> 集合和 <code>workerCount</code> 的一致性。</li>
<li><strong>异常回滚机制</strong>:确保资源不会泄漏(如线程数虚增或 Worker 未清理)。</li>
</ul>
<p>firstTask为我们最开始写的Runnable。【记一个代号叫做 “<strong>我的任务</strong>” 】</p>
<blockquote>
<pre><code class="language-java">// ======== firstTask
pool.execute(() -&gt; {
   try {
       Thread.sleep(1000);
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   }
   System.out.println("execute pool创建启动线程!");
});
</code></pre>
</blockquote>
<pre><code class="language-java">// addWorker(runnable, core)
//标记是否以核心线程数(corePoolSize)为上限创建线程。
//若为 false,则使用最大线程数(maximumPoolSize)
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
      ........
      int rs = runStateOf(c);// 获取线程池状态
      // 检查是否允许添加Worker
      if (rs &gt;= SHUTDOWN &amp;&amp;
            !(rs == SHUTDOWN &amp;&amp;
               firstTask == null &amp;&amp;
               !workQueue.isEmpty()))
            return false;

      for (;;) {
            .........
            //CAS 增加线程数
            if (compareAndIncrementWorkerCount(c))
                break retry;// 成功增加,跳出循环
            c = ctl.get();// Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
      }
    }
    ...........
    try {
      // 把“我的任务”传进去了
      w = new Worker(firstTask); // 创建的一个Worker
      final Thread t = w.thread;
      if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加锁保护 workers 集合
            mainLock.lock();
            try {
                // 再次检查状态(防止在加锁前状态变化)
                int rs = runStateOf(ctl.get());
                if (rs &lt; SHUTDOWN ||
                  (rs == SHUTDOWN &amp;&amp; firstTask == null)) {
                  if (t.isAlive())
                     //======抛出异常....
                  workers.add(w); // 将 Worker 加入集合
                  int s = workers.size();
                  if (s &gt; largestPoolSize)
                        largestPoolSize = s;
                  workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 【Worker类里面的thread】
                // 启动线程=========重点【见下面的分析】
                t.start();
                workerStarted = true;
            }
      }
    } finally {
      if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
</code></pre>
<p>Worker是ThreadPoolExecutor的内部类,可以看出是一个Runnable,那么肯定重写了run()方法</p>
<pre><code class="language-java">private final class Worker
      extends AbstractQueuedSynchronizer implements Runnable
{
    final Thread thread;
    Runnable firstTask; //"我的任务"到这里来了
    // 构造函数
    Worker(Runnable firstTask) {
      setState(-1); // inhibit interrupts until runWorker
      this.firstTask = firstTask;
      // 利用线程工厂创建了一个线程
      /*
      如果final Thread t = w.thread;
      t.start();启动的话,执行的是这个内部类的run()
      */
      this.thread = getThreadFactory().newThread(this);
    }
    // run就这一行
    public void run() {runWorker(this);}
    //到这里了
    final void runWorker(Worker w) {
      ..
      //"我的任务"
      Runnable task = w.firstTask;
      ...
      try {
          //getTask()从等待队列里面取出Runnable
          while (task != null || (task = getTask()) != null) {
            ....
            task.run();//==========
          }
      }......
      ..
      //// 无任务时回收线程
         processWorkerExit(w, completedAbruptly);
      
    }
}
</code></pre>
<p><code>ThreadPoolExecutor的静态内部类</code> :: jdk自带的<strong>四种拒绝策略</strong>。</p>
<pre><code class="language-java">public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
// 1.直接抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
    .........
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
// 2.直接丢弃
public static class DiscardPolicy implements RejectedExecutionHandler {
    .....
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
// 3.丢弃等待队列的第一个
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    ........
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                //如果线程池未关闭,就弹出队列头部的元素,然后尝试执行
      if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
      }
    }
}
// 4.调用者运行,直接执行run()方法里面的逻辑。
// 只要线程池没有关闭,就由提交任务的当前线程处理
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    ......
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
            r.run();
      }
    }
}
</code></pre>
<p><strong>总结</strong>一下,在了解到了ThreadPoolExecutor的一些类间关系、以及一些基本流程、属性之后。接下来我们来梳理一遍,线程池的运行方式。</p>
<ol>
<li>
<p><strong>创建线程池(七大参数、四大拒绝策略)</strong></p>
</li>
<li>
<p><strong>任务提交</strong></p>
</li>
</ol>
<blockquote>
<pre><code class="language-java">//2.1
executor.execute(() -&gt; {
    // 任务逻辑
});
</code></pre>
<p><strong>2.2任务处理决策链:::</strong></p>
<p>2.2.1<strong>尝试创建核心线程</strong>:当前工作线程数 &lt; <code>corePoolSize</code>,直接创建新核心线程执行任务</p>
<pre><code class="language-java">if (workerCount &lt; corePoolSize) {
    addWorker(command, true); // true表示检查corePoolSize
    return;
}
</code></pre>
<p>2.2.2<strong>任务入队</strong>: 若核心线程已满,任务尝试加入工作队列。</p>
<pre><code class="language-java">if (workQueue.offer(command)) {
    // 双重检查线程池状态
    if (线程池已关闭) 移除任务并拒绝;
    else if (当前无活跃线程) 创建非核心线程处理队列任务;
}
</code></pre>
<p>2.2.3<strong>尝试创建非核心线程</strong>: 若队列已满且线程数 &lt; <code>maximumPoolSize</code>,创建非核心线程。</p>
<pre><code class="language-java">else if (!addWorker(command, false)) { // false表示检查maximumPoolSize
    reject(command); // 触发拒绝策略
}
</code></pre>
<p>2.2.4<strong>拒绝任务</strong></p>
</blockquote>
<ol start="3">
<li><strong>工作线程执行任务</strong></li>
</ol>
<blockquote>
<p>3.1<strong>Worker初始化</strong>:每个<code>Worker</code>绑定一个线程和初始任务(<code>firstTask</code>)。</p>
<pre><code class="language-java">Worker w = new Worker(firstTask);
final Thread t = w.thread;
t.start(); // 启动线程执行runWorker()
</code></pre>
<p>3.2<strong>任务执行循环(<code>runWorker</code>方法)</strong>:</p>
<pre><code class="language-java">while (task != null || (task = getTask()) != null) {
    try {
      task.run(); // 执行任务
    } finally {
      task = null; // 清理任务引用
    }
}
</code></pre>
<p>3.3<strong>从队列获取任务(<code>getTask</code>方法)</strong>:</p>
<ul>
<li><strong>阻塞模式</strong>:若为核心线程或允许核心线程超时,调用<code>workQueue.take()</code>永久阻塞。</li>
<li><strong>超时模式</strong>:若非核心线程,调用<code>workQueue.poll(keepAliveTime)</code>超时等待。</li>
</ul>
<pre><code class="language-java">Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
</code></pre>
</blockquote>
<ol start="4">
<li><strong>线程回收与资源释放</strong></li>
</ol>
<blockquote>
<pre><code class="language-java">private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // 异常终止时补偿workerCount
      decrementWorkerCount();
    mainLock.lock();
    try {
      workers.remove(w); // 从集合中移除Worker
      if (线程池仍在运行 &amp;&amp; 队列非空)
            addWorker(null, false); // 补充线程处理队列任务
    } finally {
      mainLock.unlock();
    }
}
</code></pre>
</blockquote>
<p>线程池本质也是Runnable!</p>
<p>一张好图:【来自:【线程池工作原理】https://blog.csdn.net/fighting_yu/article/details/89473175】</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202156926-2070083376.png" style="zoom: 60%">
<h2 id="3探索锁">3.探索”锁“</h2>
<p>上面探索了线程以及线程池的创建,发现其源码中存在这种代码;</p>
<pre><code class="language-java">//1.Thread的start方法
public synchronized void start()

//2.线程池部分源码addWorker()方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();

//3.
LockSupport.park(this);
</code></pre>
<p>这些是什么呢?这就是锁。。</p>
<p>确保线程安全最常见的做法是利用锁机制(<code>Lock</code>、<code>sychronized</code>)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。</p>
<h3 id="-synchronized">① synchronized</h3>
<p><code>synchronized</code> 是 Java 中最基本的同步机制,它可以修饰方法或代码块,确保同一时刻只有一个线程可以执行被修饰的代码。</p>
<pre><code class="language-java">public class SynchronizedTest {
    public static void main(String[] args) {
      SynchronizedTest lock1 = new SynchronizedTest();
      new Thread(lock1::test).start();
      new Thread(lock1::test2).start();
      new Thread(lock1::testStatic).start();
      new Thread(lock1::testFs).start();
    }
    public void testStatic() {
      // 锁的是Class对象
      synchronized (SynchronizedTest.class){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("testStatic()");
      }
    }

    // 锁的是一个实例对象
    public void test(){
      synchronized (this){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("test()");
      }
    }

    public synchronized void test2(){
      try {
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            throw new RuntimeException(e);
      }
      System.out.println("test2()");
    }

    public void testFs(){
      try {
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            throw new RuntimeException(e);
      }
      System.out.println("testFs()");
    }
}
</code></pre>
<p>上面只有test() 和 test2() 是互斥的。也就是1秒过后,testStatic()、testFs()、和 【test()、test1() 其中之一】一起输出打印。</p>
<p><strong>修饰代码块、修饰方法:锁的是该对象;</strong></p>
<p><strong>修饰静态成员:锁的是该类的Class对象;</strong>这种方式可以确保对静态变量的访问是线程安全的</p>
<p><strong>还可以锁任意对象。</strong></p>
<p>其实主要弄清楚各自锁的是什么对象就行了,看是否需要的是一个锁,就可以判断是否互斥了;</p>
<pre><code class="language-java">//锁的是Class对象
public static synchronized void testStatic1() {
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    System.out.println("testStaticMethod()");
}
public void testStatic() {
    synchronized (SynchronizedTest.class){
      try {
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            throw new RuntimeException(e);
      }
      System.out.println("testStatic()");
    }
}
public synchronized void test2(){
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    System.out.println("test2()");
}
</code></pre>
<p>如上述代码示例,testStatic1和testStatic需要持有的对象是同一个,故这二者会产生互斥,test2需要持有的是该类的一个实例对象,故不会与这二者产生互斥。</p>
<p><strong>需要注意的是: synchronized 并不属于方法定义的一部分,故synchronized 关键字不能被继承</strong>。如果在父类中的某个方 法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这 个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上 synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方 法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此, 子类的方法也就相当于同步了。<br>
来看看如下示例:</p>
<pre><code class="language-java">public class Father {
    public synchronized void method1(){
      try {
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            throw new RuntimeException(e);
      }
      System.out.println("Father method1");
    }
}
//
public class Son extends Father{
    public void syncSon() {
      super.method1(); // 调用的父类的同步方法
    }
    public void syncSon1() {
      super.method1();
    }
    public void method1() { // 重写了,但是synchronized并不会继承过来
      try {
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            throw new RuntimeException(e);
      }
      System.out.println("Son method1");
    }

    public void sonHaha() { method1(); }
    public void sonHehe() { method1(); }

    public static void main(String[] args) {
      Son son = new Son();
      // new Thread(son::syncSon).start();
      // new Thread(son::syncSon1).start(); // 会互斥
      new Thread(son::sonHaha).start();
      new Thread(son::sonHehe).start(); // 不会
    }
}
</code></pre>
<p>synchronized【隐式锁】的底层原理涉及到 Java 对象头(Object Header)和 Monitor(监视器)两个关键概念。</p>
<p>每个 Java 对象在内存中分为三部分:</p>
<ol>
<li><strong>对象头(Header)</strong>
<ul>
<li><strong>Mark Word</strong>(标记字段):存储哈希码、GC 分代年龄、锁状态等。</li>
<li><strong>Klass Pointer</strong>(类型指针):指向类的元数据。</li>
</ul>
</li>
<li><strong>实例数据(Instance Data)</strong></li>
<li><strong>对齐填充(Padding)</strong></li>
</ol>
<p><strong>Java 对象头</strong>:在 Java 虚拟机中,每个对象都有一个对象头,用于存储对象的元数据信息,包括对象的哈希码、GC 相关信息、锁状态等。对象头通常包含一个<strong>标记字段(Mark Word),用于标识对象的锁状态</strong>。</p>
<p><strong>Monitor(监视器)</strong>:Monitor 是一种同步机制,负责管理对象的锁。每个<strong>对象都与一个 Monitor</strong> 相关联。当一个线程尝试进入一个被synchronized修饰的代码块或方法时,它会尝试获取对象的 Monitor。如果 Monitor 处于无锁状态,则当前线程会尝试将其锁定;如果 Monitor 已经被其他线程锁定,则当前线程会进入阻塞状态,直到持有锁的线程释放锁。</p>
<pre><code class="language-c++">// C++ 实现(HotSpot 源码)
class ObjectMonitor {
    void*   _header;       // Mark Word
    void*   _owner;      // 持有锁的线程
    volatile intptr_t_count;   // 重入次数
    ObjectWaiter* _WaitSet;      // 等待队列(调用 wait() 的线程)
    ObjectWaiter* _EntryList;      // 阻塞队列(竞争锁失败的线程)
    // ...
};
</code></pre>
<p>查看本小节开头示例的test()方法的字节码:</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202157418-696507534.png" style="zoom: 50%">
<p><code>synchronized</code> 同步语句块的实现使用的是 <code>monitorenter</code> 和 <code>monitorexit</code> 指令,当执行 <code>monitorenter</code> 指令时,线程试图获取锁也就是获取 <strong>对象监视器 <code>monitor</code></strong> 的持有权。第一个monitorexit正常退出同步块, 第二个是异常退出同步块。</p>
<h4 id="synchronized优化"><strong>synchronized优化:</strong></h4>
<p>锁升级(JDK 6+)</p>
<p><strong>3.0 无锁</strong></p>
<ul>
<li>无锁:<strong>当第一个线程第一次访问一个对象的同步块时</strong>,JVM会在对象头中设置该线程的ID,并将对象头的状态位设置为“偏向锁”。这个过程称为“偏向”,表示对象当前偏向于第一个访问它的线程。</li>
</ul>
<p><strong>3.1 偏向锁(Biased Locking)</strong></p>
<ul>
<li><strong>原理</strong>:第一个获取锁的线程将线程 ID 写入 Mark Word,后续无需 CAS。这样如果该线程再来的时候,由于是已经设置了锁偏向该线程,故只需比对一下对象头里面的Mark Word就行了。</li>
<li><strong>触发条件</strong>:JVM 启用了偏向锁(默认开启),且对象未处于锁定状态。</li>
<li><strong>撤销</strong>:当其他线程尝试竞争时,撤销偏向锁并升级为轻量级锁。</li>
</ul>
<p><strong>3.2 轻量级锁(Lightweight Locking)</strong></p>
<ul>
<li><strong>加锁流程</strong>:
<ol>
<li>在当前线程栈帧中创建 <strong>Lock Record</strong>。</li>
<li>通过 CAS 将 Mark Word 复制到 Lock Record,并尝试将 Mark Word 指向 Lock Record。</li>
</ol>
</li>
</ul>
<p>轻量级锁在以下场景会升级为重量级锁:</p>
<ol>
<li><strong>自旋失败</strong>:竞争线程自旋一定次数后仍未获取锁。</li>
<li><strong>竞争加剧</strong>:等待锁的线程数超过 <strong>JVM 自适应的阈值</strong>。</li>
</ol>
<p><strong>3.3 重量级锁(Heavyweight Locking)</strong></p>
<ul>
<li><strong>实现</strong>:通过操作系统提供的互斥量(Mutex)和条件变量实现,线程竞争失败后进入阻塞状态。</li>
<li><strong>性能问题</strong>:涉及用户态到内核态的切换,开销较大。</li>
</ul>
<p><strong>synchronized优化:</strong></p>
<p>锁会升级,从低到高升级,反着降级不可以:<strong>无锁状态-&gt;偏向锁状态 -&gt; 轻量级锁状态 -&gt; 重量级锁状态</strong></p>
<table>
<thead>
<tr>
<th style="text-align: left"><strong>锁类型</strong></th>
<th style="text-align: left"><strong>实现机制</strong></th>
<th style="text-align: left"><strong>适用场景</strong></th>
<th style="text-align: left"><strong>性能开销</strong></th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: left"><strong>偏向锁</strong></td>
<td style="text-align: left">通过 Mark Word 记录线程 ID</td>
<td style="text-align: left">单线程重复访问同步块</td>
<td style="text-align: left">最低</td>
</tr>
<tr>
<td style="text-align: left"><strong>轻量级锁</strong></td>
<td style="text-align: left">CAS + 自旋(适应性自旋)</td>
<td style="text-align: left">低竞争、短时间同步</td>
<td style="text-align: left">中等</td>
</tr>
<tr>
<td style="text-align: left"><strong>重量级锁</strong></td>
<td style="text-align: left">操作系统互斥量(Mutex) + Monitor</td>
<td style="text-align: left">高竞争、长时间同步</td>
<td style="text-align: left">最高</td>
</tr>
</tbody>
</table>
<p>【节选自   锁升级】 :https://blog.csdn.net/weixin_45433817/article/details/132216383</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202157805-1500515569.png" style="zoom: 67%">
<p><strong>问:</strong>synchronized是公平锁吗?</p>
<p>首先要知道公平锁和非公平锁的概念:</p>
<ul>
<li><strong>公平锁</strong>:公平锁指的是多个线程按照申请锁的顺序来获取锁,先到先得。当一个线程请求锁时,如果该锁当前处于可用状态,并且在该线程之前已经有其他线程在等待该锁,那么这个线程会被放入等待队列的尾部,等待前面的线程依次获取并释放锁后,它才能获取锁。</li>
<li><strong>非公平锁</strong>:非公平锁则不保证线程获取锁的顺序与申请锁的顺序一致。当一个线程请求锁时,即使有其他线程在等待该锁,它也会先尝试直接获取锁,如果获取成功就可以直接执行,而不用排队等待。</li>
</ul>
<p>那么,<code>synchronized</code> 基于对象头的 Mark Word 和监视器(Monitor)实现。当一个线程进入同步块时,它会尝试获取对象的监视器。如果监视器处于空闲状态,该线程会直接获取监视器,而不会考虑是否有其他线程已经在等待这个监视器。例如,当一个线程释放了 <code>synchronized</code> 修饰的同步块的锁后,新到来的线程有很大机会直接获取到锁,而不是等待那些在等待队列中的线程,这就体现了其非公平性。</p>
<pre><code class="language-java">class SynchronizedNonFairExample {
    private static final Object lock = new Object();
    private static int counter = 0;
    public static void main(String[] args) {
      for (int i = 0; i &lt; 5; i++) {
            new Thread(() -&gt; {
                while (true) {
                  synchronized (lock) {
                        counter++;
                        System.out.println(Thread.currentThread().getName() + " 获取到锁,计数: " + counter);
                        try {
                            // 模拟执行任务
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                  }
                }
            }, "Thread-" + i).start();
      }
    }
}
</code></pre>
<p>在上述代码中,多个线程竞争 <code>lock</code> 对象的锁。运行代码时,你会发现线程获取锁的顺序并不是按照线程启动的顺序,这就说明了 <code>synchronized</code> 是非公平锁。</p>
<h3 id="-lock">② Lock</h3>
<p>上一节的<code>synchronized</code>是jdk内置的关键字,属于隐式锁、也叫内置锁。现在这一节来探索一下显式锁,其<strong>提供更细粒度的控制</strong>(如可中断、超时、公平性),核心实现为 <code>ReentrantLock</code>。</p>
<pre><code class="language-java">public interface Lock {
    //获取锁。如果锁不可用,则当前线程将出于线程调度目的而被禁用,并在获取锁之前处于休眠状态。
    void lock();
    void lockInterruptibly() throws InterruptedException;
    //如果锁可用,则获取锁,并立即返回值为 true。如果锁不可用,则此方法将立即返回值 false。
        boolean tryLock();
    /*
    如果在给定的等待时间内有空闲,并且当前线程尚未中断,则获取该锁。
    如果锁可用,则此方法立即返回值 true。如果锁不可用,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,
    直到发生以下三种情况之一:锁由当前线程获取;或者其他线程中断当前线程,支持中断锁获取;或指定的等待时间已用
    */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    //释放锁
        void unlock();
    //返回绑定到此 Lock 实例的新 Condition 实例。在等待条件之前,锁必须由当前线程持有
        Condition newCondition();
}
</code></pre>
<p><img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202154852-778971061.png" alt="" loading="lazy"></p>
<p>从上图中,我们可以得知juc包下的几个主要的实现类,绿色圈圈连接的是里面的内部类。</p>
<h4 id="1-reentrantlock">1) ReentrantLock</h4>
<p>接下来从我们最熟知的<code>ReentrantLock</code>开始看起吧,他的简单使用:</p>
<pre><code class="language-java">public class LockTest {
    Lock lock = new ReentrantLock();
    int count = 0;
    public static void main(String[] args) throws InterruptedException {
      LockTest test = new LockTest();
      for (int i = 1; i &lt;= 2; i++) {
            new Thread(test::add).start();
      }
      Thread.sleep(2000);
      /*
      如果不加锁的话,结果就不一定是两万了
      */
      System.out.println(test.count);
    }

    public void add() {
      // 标准写法 try加锁 finally释放锁
      try {
            lock.lock();
            for (int i = 0; i &lt; 10000; i++) {
                count++;
            }
      } finally {
            lock.unlock();
      }
    }
}
</code></pre>
<p>我们分析一下,首先是调用了其无参构造函数创建了一个对象,里面是new了一个看名字是非公平同步标记的对象,那他肯定会有公平的同步标记。</p>
<pre><code class="language-java">// 下面都是在ReentrantLock.java里面
private final Sync sync;
// 无参构造
public ReentrantLock() {
    sync = new NonfairSync();
}
// 有参构造
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
// 然后调用lock.lock()实际是调用的sync.lock();
public void lock() {
    sync.lock();
}
// 然后调用lock.unlock()实际是调用的sync.release(1);;
public void unlock() {
    sync.release(1);
}
// 是ReentrantLock的静态内部类
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
      if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
      else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
    }
}
// 是ReentrantLock的静态内部类
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
      acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
            if (!hasQueuedPredecessors() &amp;&amp;
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
      }
      else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc &lt; 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
      }
      return false;
    }
}
// Sync继承了AbstractQueuedSynchronizer:【AQS】
abstract static class Sync extends AbstractQueuedSynchronizer {
    .....
}
</code></pre>
<p>从上面额关系我们可以看出一切源头就是<code>AbstractQueuedSynchronizer</code>,也就是我们熟悉的AQS。在这篇文章的“补充知识点”环节中,对AQS做了一个简单的介绍及分析。【AQS】:https://blog.csdn.net/okok__TXF/article/details/146455487</p>
<p>【博客园】:https://blog.csdn.net/okok__TXF/article/details/146455487</p>
<p>它是是 Java 并发包 <code>java.util.concurrent.locks</code> 下的一个核心类,是构建锁和其他同步工具(如 <code>ReentrantLock</code>、<code>Semaphore</code>、<code>CountDownLatch</code> 等)的基础框架。</p>
<p>里面定义了两种资源共享模式:</p>
<ul>
<li><strong>独占模式(Exclusive)</strong>:同一时刻只有一个线程能获取资源,如 <code>ReentrantLock</code>。</li>
</ul>
<p>在独占模式的时候,<code>tryAcquire(int)</code>:尝试获取资源,成功返回 <code>true</code>,失败返回 <code>false</code>;<code>tryRelease(int)</code>:尝试释放资源,成功返回 <code>true</code>,失败返回 <code>false</code>。</p>
<ul>
<li><strong>共享模式(Share)</strong>:多个线程可同时获取资源,如 <code>Semaphore</code>(信号量)、<code>CountDownLatch</code>(倒计时 latch)。</li>
</ul>
<p>在共享模式的时候,<code>tryAcquireShared(int)</code>:尝试获取共享资源,负数表示失败;0 表示成功但无剩余资源;正数表示成功且有剩余资源。<code>tryReleaseShared(int)</code>:尝试释放共享资源,释放后若允许唤醒后续等待节点,返回 <code>true</code>,否则 <code>false</code>。</p>
<h5 id="--非公平锁">- 非公平锁</h5>
<p>回到ReentrantLock中,我们以<strong>lock()方法</strong>举例子:【lock是无参构造的】<mark>非公平锁</mark></p>
<pre><code class="language-java">//ReentrantLock.java
public void lock() {
    sync.lock(); // ===========1
}
//内部的抽象类Sync
abstract void lock();
final boolean nonfairTryAcquire(int acquires) { // ===========7
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
      if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
      }
    }
    else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc &lt; 0) // overflow
            throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
    }
    return false;
}

//具体实现类NonfairSync
final void lock() { // ===========2
    if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
    else
      acquire(1); // ===========3
}
protected final boolean tryAcquire(int acquires) { // ===========6
    return nonfairTryAcquire(acquires); // 这个是抽象类Sync的
}

//AbstractQueuedSynchronizer.java --- acquire(1)
public final void acquire(int arg) { // ===========4
    // 这里的tryAcquire是NonfairSync.java里面的
    if (!tryAcquire(arg) &amp;&amp; // ===========5
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
      boolean interrupted = false;
      for (;;) {
            final Node p = node.predecessor();
            if (p == head &amp;&amp; tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &amp;&amp;
                parkAndCheckInterrupt())
                interrupted = true;
      }
    } finally {
      if (failed)
            cancelAcquire(node);
    }
}
</code></pre>
<p>已经按顺序将1234567标注在了上面,模板方法的设计模式有时候真的会让人晕头转向。。</p>
<p>一句简简单单的lock.lock()方法做了什么呢?</p>
<p><strong>首先,</strong>会直接尝试CAS获取锁【<code>compareAndSetState(0, 1)</code> 会通过 CAS 操作尝试将 AQS 中的 <code>state</code> 状态从 0 改为 1】,如果成功的话成功则设置当前线程为锁持有者,否则进入AQS的获取流程;【在这里,当线程调用 <code>lock()</code> 时,会先通过 <code>CAS</code> 操作尝试将 <code>AQS</code> 的 <code>state</code> 从 <code>0</code> 改为 <code>1</code>。<strong>此时不会检查等待队列中是否有其他线程在排队</strong>,只要 <code>CAS</code> 成功,就直接获取锁,体现了 “插队” 的特性。】</p>
<p><strong>其次,</strong>进入aqs的<code>acquire流程</code> ,</p>
<pre><code class="language-java">1.tryAcquire(arg)   2.addWaiter(Node.EXCLUSIVE)   3.acquireQueued(xxx)
</code></pre>
<p>第一个方法tryAcquire再次尝试获取锁(非公平锁的 <code>tryAcquire</code> 即 <code>nonfairTryAcquire(acquires)</code>),在Sync :: nonfairTryAcquire(int acquires)方法里面,得到aqs里面的state,如果是0,再次尝试 CAS 抢占(体现非公平性,不检查队列);如果不是0,说明被抢占了,判断持有锁的线程是不是当前线程,如果是(体现可重入性),更新state,如果不是返回false。</p>
<p><strong>然后,</strong> 若<code>tryAcquire</code>失败【返回false】,调用<code>addWaiter(Node.EXCLUSIVE)</code> 会将当前线程包装成一个独占模式的 <code>Node</code> 节点加入 AQS 队列。接着 <code>acquireQueued</code> 会使线程在队列中自旋等待,不断尝试获取锁或被唤醒后尝试获取,直到成功。</p>
<p>接下来分析一下<code>lock.unlock()</code>方法,这个就在代码里面注释解释了</p>
<pre><code class="language-java">//ReentrantLock.java
public void unlock() {
    //调用其内部同步器 sync 的 release 方法:
    sync.release(1); // ===========1
}
// 内部抽象类Sync
protected final boolean tryRelease(int releases) { // ===========3
    int c = getState() - releases; // 减少同步状态值(释放一次锁,`state` 减 1)
    // 检查当前线程是否为锁的持有者,不是则抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {// 当 `state` 减为 0 时,完全释放锁
      free = true;
      setExclusiveOwnerThread(null);// 清除独占锁的线程引用
    }
    setState(c);// 更新 `state` 值
    return free;// 返回是否完全释放锁
}

//AbstractQueuedSynchronizer.java --- acquire(1)
public final boolean release(int arg) {
    //tryRelease 尝试释放锁,由子类实现具体逻辑
    if (tryRelease(arg)) { // ===========2
      Node h = head;// 获取等待队列头节点
      if (h != null &amp;&amp; h.waitStatus != 0)
            unparkSuccessor(h);// 唤醒后继节点 ===========4
      return true;
    }
    return false;
}
//唤醒后继节点
private void unparkSuccessor(Node node) {===========5
    int ws = node.waitStatus;
    if (ws &lt; 0) // 将头节点的 `waitStatus` 设为 0(取消之前的状态)
      compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next; // 找到头节点的后继节点
    if (s == null || s.waitStatus &gt; 0) { // 若后继节点为空或已取消,从队尾向前找第一个非取消节点
      s = null;
      for (Node t = tail; t != null &amp;&amp; t != node; t = t.prev)
            if (t.waitStatus &lt;= 0)
                s = t;
    }
    if (s != null) // 唤醒找到的节点对应的线程
      LockSupport.unpark(s.thread);
}
</code></pre>
<p>在内部抽象类Sync中的tryRelease中:</p>
<ol>
<li><strong>减少 <code>state</code> 值</strong>:<code>ReentrantLock</code> 支持重入,<code>state</code> 记录锁的重入次数。每次调用 <code>unlock()</code>,<code>state</code> 减 1。</li>
<li><strong>检查线程所有权</strong>:确保只有锁的持有者才能释放锁,否则抛出 <code>IllegalMonitorStateException</code>。</li>
<li><strong>完全释放锁</strong>:当 <code>state</code> 减为 0 时,将 <code>setExclusiveOwnerThread(null)</code>,表示锁已完全释放,返回 <code>true</code>。</li>
</ol>
<p><code>ReentrantLock</code>支持重入:【其实从名字就可以看出来了 -- Reentrant(再进去的、就是可重入嘛)】</p>
<pre><code class="language-java">ReentrantLock lock = new ReentrantLock();
lock.lock();// state=1,线程持有锁
lock.lock();// state=2(重入)
lock.unlock(); // state=1(未完全释放)
lock.unlock(); // state=0,释放锁并唤醒等待线程
</code></pre>
<h5 id="--公平锁">- 公平锁</h5>
<p>那么ReentrantLock的公平锁是什么样子的呢?其实大致步骤都差不多,主要是在<code>FairSync.java</code></p>
<pre><code class="language-java">protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
      //公平锁会先检查等待队列是否有前驱节点,若有则不能抢锁
      if (!hasQueuedPredecessors() &amp;&amp;
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
      }
    }
    else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc &lt; 0)
            throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
    }
    return false;
}
public final boolean hasQueuedPredecessors() {
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &amp;&amp;
      ((s = h.next) == null || s.thread != Thread.currentThread());
}
</code></pre>
<p>公平锁在 <code>state == 0</code> 时,会先通过 <code>hasQueuedPredecessors</code> 检查等待队列。若有其他线程在排队,则当前线程不能抢占,必须入队等待,保证 “先来先得”,体现了公平性。而非公平锁跳过这一步,直接抢锁,这就是非公平性的核心体现。</p>
<h4 id="2-读写锁">2) 读写锁</h4>
<pre><code class="language-java">//ReadWriteLock 维护一对关联的锁,一个用于只读作,一个用于写入。只要没有写入器,多个读取器线程就可以同时持有读取锁。
//写锁是独占的。读写锁允许在访问共享数据时实现比互斥锁允许的更高级别的并发。它利用了这样一个事实,
//即虽然一次只有一个线程(写入线程)可以修改共享数据,但在许多情况下,任意数量的线程都可以同时读取数据(因此是读取线程)。
//从理论上讲,与使用互斥锁相比,使用读写锁允许的并发性增加将导致性能改进。
public interface ReadWriteLock {
    //返回用于读取的锁。
    Lock readLock();
        //返回用于写入的锁。
    Lock writeLock();
}
</code></pre>
<p><strong>读写锁是否会比使用互斥锁提高性能</strong>取决于读取数据的频率与修改数据的频率、读取和写入作的持续时间以及数据的争用 - 即尝试同时读取或写入数据的线程数。例如,最初填充数据,此后不经常修改的集合;经常搜索(例如某种目录)是使用读写锁的理想候选者。但是,如果更新变得频繁,则数据将花费大部分时间进行独占锁定,并发性几乎没有增加。只有分析和测量才能确定使用读写锁是否适合您的应用程序。</p>
<p>读写锁允许多个线程同时读(没有写入时,多个线程允许同时读(提高性能)),但只要有一个线程在写,其他线程就必须等待(只允许一个线程写入(其他线程既不能写入也不能读取))。也就是读读不冲突、读写就要冲突了。</p>
<h5 id="--readwritelock">- ReadWriteLock</h5>
<p>下面给出一个简单示例:ReadWriteLock</p>
<pre><code class="language-java">public class ReadWriteLockTest2 {
    private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static final Lock readLock = readWriteLock.readLock();
    private static final Lock writeLock = readWriteLock.writeLock();
    private static int[] a = new int;
    public static void main(String[] args) throws InterruptedException {
      // 1个线程写
      new Thread(ReadWriteLockTest2::write).start();
      for (int i = 0; i &lt; 9; i++)// 10个线程读
            new Thread(()-&gt; System.out.println(get())).start();
      Thread.sleep(2000);
    }
    public static Object get() {
       readLock.lock();
      try {
            Thread.sleep(100);
            return a;
      } catch (InterruptedException e) {
            throw new RuntimeException(e);
      } finally {
            readLock.unlock();
      }
    }
    public static void write() {
      writeLock.lock();
      try {
            a++;
            System.out.println("写进行~~~");
            Thread.sleep(1000);
            System.out.println("写ok~~~");
      } catch (InterruptedException e) {
            throw new RuntimeException(e);
      } finally {
            writeLock.unlock();
      }
    }
}
</code></pre>
<p>写操作在执行的时候,读线程是会阻塞的。但是10个读线程之间并没有阻塞</p>
<h5 id="--stampedlock">- StampedLock</h5>
<p><code>StampedLock</code>对比 <code>ReentrantReadWriteLock</code> 有所增强,在原先读写锁的基础之上新增了乐观读的模式。该模式并不会加锁,所以不会阻塞线程,会有更高的吞吐量和更高的性能。(乐观锁和悲观锁)</p>
<p>StampedLock具有三种控制读/写访问的模式:</p>
<p>1、写入(Writing):方法writeLock可能阻塞等待独占访问,并返回一个戳,该戳可在方法unlockWrite中使用以释放锁。还提供了tryWriteLock的非定时和定时版本。当锁保持在写模式时,不能获得读锁,并且所有乐观读验证都将失败。</p>
<p>2、读取(Reading):方法readLock可能会阻塞等待非独占访问,并返回一个戳,该戳可在方法unlockRead中使用以释放锁。还提供了tryReadLock的非定时和定时版本。</p>
<p>3、乐观读取(Optimistic Reading):tryOptimisticRead方法返回一个非0的stamp,只有当前同步状态没有被写模式所占有是才能获取到。他是在获取stamp值后对数据进行读取操作,最后验证该stamp值是否发生变化,如果发生变化则读取无效,代表有数据写入。这种方式能够降低竞争和提高吞吐量。</p>
<p>简单示例:</p>
<pre><code class="language-java">public class StampedLockTest {
    private static final StampedLock stampedLock = new StampedLock();
    private static double x = 1.0;
    private static double y = 1.0;
    public static void main(String[] args) {
      // 1. 一个线程写
      new Thread(() -&gt; addXY(1, 1)).start();
      // 2. 10个线程读
      for (int i = 0; i &lt; 10; i++) {
            new Thread(() -&gt; System.out.println(getSArea())).start();
      }
    }
    private static double getSArea() {
      // 乐观读
      long stamp = stampedLock.tryOptimisticRead();
      double s1 = x * y;
      // 验证一下
      if (!stampedLock.validate(stamp)) { // 验证失败
            stamp = stampedLock.readLock(); // 升级为读锁
            try {
                s1 = x * y;
            } finally {
                stampedLock.unlockRead(stamp);
            }
      }
      return s1;
    }
    private static void addXY(double a, double b) {
      long stamp = stampedLock.writeLock();
      try {
            System.out.println("写进行~~");
            x += a;
            y += b;
            Thread.sleep(1000);
            System.out.println("写ok~~");
      } catch (InterruptedException e) {
            throw new RuntimeException(e);
      } finally {
            stampedLock.unlockWrite(stamp);
      }
    }
}

</code></pre>
<p><strong>写操作:</strong><code>writeLock()</code> 返回一个 <code>stamp</code>(时间戳),表示获取写锁成功。写锁是独占的,获取时会阻塞所有读锁和其他写锁(除了乐观读)。通过 <code>unlockWrite(stamp)</code> 释放写锁,<code>stamp</code> 必须与获取时的一致,否则抛出异常。</p>
<p><strong>乐观读:</strong> <strong><code>tryOptimisticRead()</code></strong>:获取一个 <strong>乐观读时间戳</strong>,不实际加锁,直接读取数据(成本极低),然后<strong><code>validate(stamp)</code></strong>:检查该时间戳对应的读操作期间是否有写操作发生。若 <code>stamp</code> 有效(无写操作),则数据一致;否则需要升级为读锁。锁升级:若验证失败,说明数据可能被修改,通过 <code>readLock()</code> 获取读锁(阻塞直到写锁释放),确保后续读取的数据是最新的。</p>
<h5 id="--可重入性探讨">- 可重入性探讨</h5>
<p>JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做<strong>可重入锁</strong>。</p>
<p>本小节看一下上面两种读写锁的可重入性,首先是ReadWriteLock,从他的实现类来看就是可重入的了【ReentrantReadWriteLock】</p>
<p>在<code>ReentrantReadWriteLock</code>中也有<code>Sync</code>的抽象内部类,当调用写锁的lock时,实际是会经过里面重写的<code>tryAcquire</code>,从下面的代码可以知道<strong>同一线程可多次获取写锁</strong>:当线程获取写锁后,再次调用 <code>writeLock()</code> 会直接成功(无需等待),因为内部维护了一个 <strong>重入计数器</strong>(类似 <code>ReentrantLock</code>)。每次获取写锁时计数器加 1,释放时减 1,计数器为 0 时才真正释放锁。</p>
<pre><code class="language-java">protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
      if (w == 0 || current != getExclusiveOwnerThread())
            return false;
      if (w + exclusiveCount(acquires) &gt; MAX_COUNT)
         ....
      // Reentrant acquire
      setState(c + acquires);
      return true;
    }
    ....
    return true;
}
</code></pre>
<p><code>ReentrantReadWriteLock</code> 基于 <strong>AQS(AbstractQueuedSynchronizer)</strong> 实现,通过 <code>state</code> 变量的高 16 位和低 16 位分别记录 <strong>读锁的共享次数</strong> 和 <strong>写锁的重入次数</strong>:</p>
<ul>
<li><strong>写锁(独占模式)</strong>:使用低 16 位记录当前线程的重入次数(和 <code>ReentrantLock</code> 类似)。</li>
<li><strong>读锁(共享模式)</strong>:使用高 16 位记录所有线程的读锁获取次数,但会通过线程本地变量(<code>ThreadLocal</code>)记录当前线程的读锁重入次数,避免不同线程的计数干扰。</li>
</ul>
<p>这种设计使得同一线程多次获取写锁或在读锁 / 写锁之间按规则重入时,不会出现死锁,符合可重入锁的定义。</p>
<pre><code class="language-java">public static void write() {
    writeLock.lock();
    try {
      writeLock.lock();
      a++;
      System.out.println("写进行~~~");
      Thread.sleep(1000);
      System.out.println("写ok~~~");
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    } finally {
      writeLock.unlock(); // 可重入
      writeLock.unlock();
    }
}
</code></pre>
<p><strong>StampedLock是不可重入的</strong>,为什么呢?</p>
<ol>
<li>
<p>没有锁计数机制:<code>StampedLock</code> 并没有像 <code>ReentrantLock</code> 那样维护一个锁的重入计数。在 <code>ReentrantLock</code> 中,<code>state</code> 变量用于记录锁的重入次数,每次获取锁时 <code>state</code> 加 1,释放锁时 <code>state</code> 减 1。而 <code>StampedLock</code> 中的 <code>state</code> 变量主要用于表示锁的状态和版本信息,并非用于记录重入次数。</p>
</li>
<li>
<p>如果一个线程已经持有了 <code>StampedLock</code> 的写锁或读锁,再次尝试获取相同类型的锁时,会出现以下情况:</p>
<ul>
<li><strong>写锁情况</strong>:如果线程已经持有写锁,再次调用 <code>writeLock()</code> 方法,由于写锁是独占的,该线程会被阻塞,因为它会等待自己释放写锁后才能再次获取,这显然会导致死锁。</li>
<li><strong>读锁情况</strong>:如果线程已经持有读锁,再次调用 <code>readLock()</code> 方法,虽然读锁是共享的,但 <code>StampedLock</code> 并不会像可重入锁那样允许线程多次获取而不产生问题。而且如果在持有读锁的情况下尝试获取写锁,会导致死锁,因为写锁需要独占资源,而当前线程已经持有了读锁。</li>
</ul>
</li>
</ol>
<pre><code class="language-java">private static void addXY(double a, double b) {
    long stamp = stampedLock.writeLock();
    try {
      long lock = stampedLock.writeLock(); // 不可重入
      System.out.println("写进行~~");
      x += a;
      y += b;
      Thread.sleep(1000);
      System.out.println("写ok~~");
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    } finally {
      stampedLock.unlockWrite(stamp);
    }
}
</code></pre>
<h3 id="-锁案例">③ 锁案例</h3>
<p>上面只掌握了一丢丢的理论,没有实践怎么行呢?</p>
<h4 id="1-交替打印">1) 交替打印</h4>
<p>第一个,我们来实现一下三个线程交替打印A B C试试,第一个线程打印A,第二个B,第三个C</p>
<pre><code class="language-java">// synchronized实现
public class PrintABCSynchronized {
    private int now = 1;
    public static void main(String[] args) {
      PrintABCSynchronized obj = new PrintABCSynchronized();
      new Thread(obj::printA).start();
      new Thread(obj::printB).start();
      new Thread(obj::printC).start();
    }
    public void printA() {
      for (int i = 0; i &lt; 10; i++) {
            synchronized (this) {
                while ( now != 1 ) { // 为什么用while,不用if?留给读者思考
                  try {this.wait();}
                  catch (InterruptedException e) {
                        e.printStackTrace();
                  }
                }
                System.out.println("A"); now = 2;
                this.notifyAll();
            }
      }
    }
    public void printB() {
      for (int i = 0; i &lt; 10; i++) {
            synchronized (this) {
                while ( now != 2 ) {
                  try {this.wait();}
                  catch (InterruptedException e) {
                        e.printStackTrace();
                  }
                }
                System.out.println("B"); now = 3;
                this.notifyAll();
            }
      }
    }
    public void printC() {
      for (int i = 0; i &lt; 10; i++) {
            synchronized (this) {
                while ( now != 3 ) {
                  try {this.wait();}
                  catch (InterruptedException e) {
                        e.printStackTrace();
                  }
                }
                System.out.println("C"); now = 1;
                this.notifyAll();
            }
      }
    }
}
</code></pre>
<p>wait - notify 【这个只能用在synchronized同步代码块中,是属于Object的方法】上面的缺陷很严重,那就是一下子就唤醒了所有挂起的线程,其实有的线程根本就不用唤醒,有没有一种办法,就是我想唤醒谁就唤醒谁呢?</p>
<pre><code class="language-java">public class PrintABCLock {
    private Lock lock = new ReentrantLock();
    private Condition a = lock.newCondition();
    private Condition b = lock.newCondition();
    private Condition c = lock.newCondition();
    private int flag = 1;
    public static void main(String[] args) {
      PrintABCLock obj = new PrintABCLock();
      new Thread(obj::printA).start();
      new Thread(obj::printB).start();
      new Thread(obj::printC).start();
    }
    public void printA(){
      lock.lock();
      try {
            for (int i = 0; i &lt; 10; i++) {
                while ( flag != 1 ) a.await();
                System.out.println('A'); flag = 2;
                b.signal();
            }
      } catch ( InterruptedException e) {
            e.printStackTrace();
      }
      finally {
            lock.unlock();
      }
    }
    public void printB() {
      lock.lock();
      try {
            for (int i = 0; i &lt; 10; i++) {
                while ( flag != 2 ) b.await();
                System.out.println('B'); flag = 3;
                c.signal();
            }
      } catch ( InterruptedException e) {
            e.printStackTrace();
      }
      finally {
            lock.unlock();
      }
    }
    public void printC() {
      lock.lock();
      try {
            for (int i = 0; i &lt; 10; i++) {
                while ( flag != 3 ) c.await();
                System.out.println('C'); flag = 1;
                a.signal();
            }
      } catch ( InterruptedException e) {
            e.printStackTrace();
      }
      finally {
            lock.unlock();
      }
    }
}
</code></pre>
<h4 id="2-阻塞队列">2) 阻塞队列</h4>
<p>下面模仿jdk中ArrayBlockingQueue的源码,给了一个简洁的阻塞队列</p>
<pre><code class="language-java">class TBlockedQueue&lt;T&gt; {
    private final Lock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    private final int capacity;
    private final LinkedList&lt;T&gt; list;
    public TBlockedQueue(int capacity) {
      if (capacity &lt;= 0)
            throw new IllegalArgumentException("Capacity 不能小于1");
      this.capacity = capacity;
      list = new LinkedList&lt;&gt;();
      lock = new ReentrantLock();
      notEmpty = lock.newCondition();
      notFull = lock.newCondition();
    }
    public void add( T t ) {
      list.addLast(t);
    }

    // 1. 入队 -- 当队列已满时,向队列中添加元素的操作会被阻塞,直到队列有空间可用。
    public void put( T t ) {
      if (t == null) throw new NullPointerException();
      lock.lock();
      try {
            while (list.size() == capacity) notFull.await();
            list.addLast(t);
            notEmpty.signal();
      } catch (InterruptedException e) {
            throw new RuntimeException(e);
      } finally {
            lock.unlock();
      }
    }
    // 2. 出队 -- 当队列为空时,从队列中获取元素的操作会被阻塞,直到队列中有新元素加入
    public T take() {
      lock.lock();
      try {
            while (list.isEmpty()) notEmpty.await();
            T t = list.removeFirst();
            notFull.signal();
            return t;
      } catch (InterruptedException e) {
            throw new RuntimeException(e);
      } finally {
            lock.unlock();
      }
    }
}
</code></pre>
<p>这里自定义的 <code>TBlockedQueue</code> 是一个典型的 <strong>有界阻塞队列</strong>,其核心思路是通过 <strong>锁(Lock)和条件变量(Condition)</strong> 实现线程间的同步与协调,确保在多线程环境下对队列的操作是安全的。通过 <code>lock.lock()</code> 和 <code>lock.unlock()</code> 包裹对共享资源 <code>list</code> 的操作,确保同一时刻只有一个线程修改队列。同时,使用 <code>while</code> 循环检查条件(如 <code>list.size() == capacity</code>),防止 <strong>虚假唤醒</strong>导致条件不满足时错误地继续执行。</p>
<ul>
<li><code>notEmpty</code>:当队列为空时,<code>take</code> 操作会等待此条件;当有元素入队时,通过 <code>signal()</code> 唤醒等待的消费者线程。</li>
<li><code>notFull</code>:当队列已满时,<code>put</code> 操作会等待此条件;当有元素出队时,通过 <code>signal()</code> 唤醒等待的生产者线程。</li>
</ul>
<h4 id="3-aqs自定义锁">3) AQS自定义锁</h4>
<p>前面分析可以知道ReentrantLock是以AQS为基础框架来实现的,那么,此节我们自定义来实现一个锁。</p>
<p>见 “Java并发探索--下篇”</p>
<h2 id="4探索并发工具">4.探索并发工具</h2>
<h2 id="5虚拟线程">5.虚拟线程</h2>
<p>见 “Java并发探索--下篇” --- 在下面找</p>
<p>【博客园】</p>
<p>https://www.cnblogs.com/jackjavacpp</p>
<p>【CSDN】</p>
<p>https://blog.csdn.net/okok__TXF</p>
<h2 id="end参考">end.参考</h2>
<ol>
<li>https://blog.csdn.net/agonie201218/article/details/128712507</li>
<li>https://blog.csdn.net/xu_yong_lin/article/details/117521773</li>
<li>https://www.cnblogs.com/java-bible/p/13930006.html</li>
<li>https://blog.csdn.net/fighting_yu/article/details/89473175</li>
<li>https://tech.meituan.com/2018/11/15/java-lock.html</li>
<li>https://blog.csdn.net/weixin_44772566/article/details/137398521</li>
<li>https://blog.csdn.net/m0_73978383/article/details/146442443 【synchronized详解】</li>
<li>https://liaoxuefeng.com/books/java/threading 【廖雪峰的官方网站--- 神中神】</li>
</ol><br><br>
来源:https://www.cnblogs.com/jackjavacpp/p/18852416
頁: [1]
查看完整版本: 并发编程--上篇