并发编程--上篇
<h1 id="java并发探索--上篇">Java并发探索--上篇</h1><h2 id="1基本概念">1.基本概念</h2>
<ul>
<li><strong>线程与进程</strong>:线程是程序执行的最小单位,而进程是系统进行资源分配和调度的基本单位。例如,一个 Java 程序可以包含多个线程,它们共享进程的资源。</li>
<li><strong>并发与并行</strong>:并发是指多个任务在同一时间段内执行,而并行是指多个任务在同一时刻执行。在多核 CPU 系统中,可以实现真正的并行。</li>
<li><strong>同步与异步</strong>:同步是指程序按照顺序依次执行,而异步是指程序在执行某个任务时,不需要等待该任务完成,可以继续执行其他任务。</li>
</ul>
<p>“Java并发探索--下篇” --- 在下面找</p>
<p>【博客园】</p>
<p>https://www.cnblogs.com/jackjavacpp</p>
<p>【CSDN】</p>
<p>https://blog.csdn.net/okok__TXF</p>
<h2 id="2探索线程的创建">2.探索线程的创建</h2>
<h3 id="线程的状态">①线程的状态</h3>
<p>从<code>Thread</code>源码里面看出</p>
<pre><code class="language-java">public enum State {
// 尚未启动的线程的线程状态。
NEW,
// 就绪
RUNNABLE,
// 等待监视器锁的线程的线程状态
BLOCKED,
/*
等待线程的线程状态,线程由于调用以下方法之一而处于等待状态:
Object.wait() 没有超时
Thread.join() 没有超时
LockSupport.park()
*/
WAITING,
/*
指定等待时间的等待线程的线程状态
线程处于定时等待状态,因为调用了以下方法之一,并指定等待时间:
Thread.sleep
Object.wait with timeout
Thread.join with timeout
LockSupport.parkNanos
LockSupport.parkUntil
*/
TIMED_WAITING,
//终止线程的线程状态。线程已完成执行。
TERMINATED;
}
</code></pre>
<p>下面看一张图,很清楚的解释了各状态之间的关系:【节选自https://blog.csdn.net/agonie201218/article/details/128712507】</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202155489-1446648903.png" style="zoom: 67%">
<p>在Java中,一个Thread有大致六个状态。</p>
<p>线程创建之后(new Thread)它将处于 <strong>NEW(新建)</strong> 状态,调用 <code>start()</code> 方法后开始运行,线程这时候处于 <strong>RUNNABLE(就绪)</strong> 状态。可运行状态的线程获得了 CPU 时间片后就处于 <strong>RUNNING(运行)</strong> 状态。</p>
<p>明白了线程的运行状态,接下来让我们来看一下在爪哇里面如何创建并且启动线程。</p>
<h3 id="线程创建">②线程创建</h3>
<h4 id="1两种基本方式">1)两种基本方式</h4>
<ul>
<li>继承Thread类,重写run方法</li>
</ul>
<pre><code class="language-java">public class MyThread1 extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": hello world");
}
}
public class JUCMain {
public static void main(String[] args) {
new MyThread1().start();
}
}
</code></pre>
<ul>
<li>实现Runnable接口,传入Thread</li>
</ul>
<pre><code class="language-java">public class Runnable1 implements Runnable{
@Override
public void run() {
System.out.println("hello world, Runnable");
}
}
public class JUCMain {
public static void main(String[] args) {
new Thread(new Runnable1()).start();
}
}
</code></pre>
<p>网上还传有其他创建线程的方式,比如: Callable接口,重写call,结合FutureTask;线程池;lambda表达式等等。。。诚然,这也确实是创建线程启动的方式不错。但是本文毕竟是探索性质的文章,我们要探索其本质。</p>
<p>首先从<code>start()</code>方法看起(这个方式<strong>属于Thread类</strong>的)。调用<code>start()</code>后,JVM会创建一个新线程并执行该线程的<code>run()</code>方法。<strong>注意</strong>:直接调用<code>run()</code>不会启动新线程,而是在当前线程中执行。</p>
<pre><code class="language-java">// 启动线程并触发 JVM 创建原生线程
// synchronized后面解释【见 探索“锁”】
public synchronized void start() {
// 零状态值对应于状态 “NEW”
// 线程想要start,必须是为0的状态
if (threadStatus != 0)
throw new IllegalThreadStateException();
/*
group 是线程所属的线程组。这行代码将当前线程实例添加到线程组中,
同时线程组的未启动线程计数会减1。
*/
group.add(this);
boolean started = false;
try {
start0(); //关键!调用本地方法(native)
started = true;
} finally {
try {
if (!started) { //启动失败时回滚
//如果 started 为 false,说明线程启动失败,
//调用 group.threadStartFailed(this) 方法通知线程组该线程启动失败。
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//========== native
private native void start0();
</code></pre>
<p>那么执行的是run()方法,run方法里面是啥呢</p>
<pre><code class="language-java">private Runnable target; // target是Runnable类型
@Override
public void run() {
if (target != null) {
target.run();
}
}
</code></pre>
<p>如果继承Thread类后,重写run()方法,那么run方法就会覆盖上面的方法。</p>
<p>如果是实现的Runnable接口,<code>new Thread(new Runnable1())</code>的时候,就会把target赋值,然后调用run()方法的时候,就执行的是target的run方法了。</p>
<h4 id="2-其他创建方式">2) 其他创建方式</h4>
<h5 id="lambda">.lambda</h5>
<ul>
<li><strong>lambda表达式创建</strong>:这个仅仅是写法不同而已。因为Runnable是个函数式接口</li>
</ul>
<pre><code class="language-java">@FunctionalInterface
public interface Runnable {
public abstract void run();
}
</code></pre>
<h5 id="callable">.callable</h5>
<ul>
<li><strong>Callable创建的方式</strong></li>
</ul>
<pre><code class="language-java">public class MyCall implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "Hello Callable";
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(new MyCall());
new Thread(task).start();
System.out.println(task.get());
}
</code></pre>
<p><code>new Thread(Runnable runnable)</code>要求传的类型是Runnable,但是现在传的是FutureTask。所以先来看一看FutureTask和Runnable之间有什么联系.</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202156126-1269832798.png" style="zoom: 80%">
<p>从上面可以看到,FutureTask实现了RunnableFuture接口,然后RunnableFuture接口继承了Future和Runnable两个接口。</p>
<p><code>Future<V></code></p>
<p><code>Future</code> 接口是 Java 并发编程中的一个重要接口,位于 <code>java.util.concurrent</code> 包下,它代表了一个异步计算的结果。异步计算意味着在调用方法后,程序不会立即等待结果返回,而是可以继续执行其他任务,当结果准备好时,再通过 <code>Future</code> 对象获取。</p>
<pre><code class="language-java">// 这里使用了泛型 <V>,表示该 Future 对象所代表的异步计算结果的类型。
public interface Future<V> {
//尝试取消异步任务的执行。
/*
如果任务已经完成、已经被取消或者由于其他原因无法取消,则返回 false;
如果任务成功取消,则返回 true。
*/
boolean cancel(boolean mayInterruptIfRunning);
//如果任务在完成之前被取消,则返回 true;否则返回 false。
boolean isCancelled();
//如果任务已经完成,则返回 true;否则返回 false。
boolean isDone();
//获取异步任务的计算结果。如果任务还未完成,调用该方法的线程会被阻塞,直到任务完成。
V get() throws InterruptedException, ExecutionException;
//获取异步任务的计算结果,并且可以指定一个超时时间。
//如果在指定的时间内任务还未完成,调用该方法的线程会被阻塞,直到任务完成或者超时。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
</code></pre>
<p><code>RunnableFuture</code></p>
<pre><code class="language-java">public interface RunnableFuture<V> extends Runnable, Future<V> {
// 很简单嘛,这个是来自Runnable的
void run();
}
</code></pre>
<p>这个接口就相当于组合了Runnable和Future,能够获取到返回值了。</p>
<p><code>FutureTask<V></code> 既然要把它当做参数传进Thread的构造函数,那么想必它肯定是实现了run方法的。</p>
<pre><code class="language-java">public class FutureTask<V> implements RunnableFuture<V> {
// 基本属性
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL= 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED= 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** 结果 */
private Object outcome;
/** The thread running the callable; CAS ed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
// 看它的构造函数1
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable; // 赋值callable========
this.state = NEW; // ensure visibility of callable
}
// 构造函数2 ==== 本质还是把Runnable加了一层,给封装成Callable了
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
/*
Executors::callable(xx, xx)方法==========
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run(); // 调用Runnable的run()
return result;
}
}
*/
// run()方法 ---------------
// new Thread(new FutureTask<>(new MyCall()))
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//====调用callable.call()
result = c.call();
ran = true;
} catch (Throwable ex) {
.........
}
// 如果运行OK了,设置结果!
if (ran) set(result);
}
} finally {
.............
}
}
// 设置结果outcome
protected void set(V v) {
// https://www.cnblogs.com/jackjavacpp/p/18787832
// 使用CAS --- 【见上一篇文章 java map & CAS & AQS】
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 这里
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
// 比较核心的get方法================start
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // 如果状态不是完成
s = awaitDone(false, 0L); // 等待完成
return report(s); // 返回结果
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 1.计算超时截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) { // 2.自旋循环等待任务完成
// 2.1如果该线程中断了
if (Thread.interrupted()) {
removeWaiter(q);// 从等待队列中移除当前节点
throw new InterruptedException();
}
// 2.2检查状态
int s = state;
// 任务已终态(NORMAL, EXCEPTIONAL, CANCELLED)
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;// 返回最终状态
}
// 2.3若任务状态等于 COMPLETING,表明任务正在完成,
// 此时调用 Thread.yield() 方法让当前线程让出 CPU 时间片,等待任务完成。
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued) //将节点加入等待队列
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { // 2.4如果是有时限的get()
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state; // 返回状态
}
LockSupport.parkNanos(this, nanos);
}
else //若没有设置超时时间,就调用 LockSupport.park 方法让当前线程无限期阻塞,直到被唤醒。
LockSupport.park(this);
}
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x; // 返回outcome
......
}
//==================================end
}
</code></pre>
<p><code>awaitDone</code> 方法的核心功能是让<strong>当前线程等待异步任务完成,它会持续检查任务的状态,根据不同的状态采取相应的处理措施</strong>,同时支持设置超时时间。在等待过程中,若线程被中断,会抛出 <code>InterruptedException</code> 异常。</p>
<p>通过上面的分析,Callable这种方式实际上本质还是Runnable嚯。使用<strong>FutureTask将Future和Runnable</strong>结合起来,功能更加丰富。</p>
<h5 id="线程池threadpoolexecutor">.线程池ThreadPoolExecutor</h5>
<ul>
<li><strong>线程池创建线程</strong></li>
</ul>
<p>如下使用方式。</p>
<pre><code class="language-java">public class PoolMain {
public static void main(String[] args) {
// 创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(1);
long start = System.currentTimeMillis();
// execute=============
pool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("execute pool创建启动线程!");
});
// submit==============
Future<Integer> future = pool.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("submit pool创建启动线程!");
return 100;
});
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("main线程执行时间:" + (System.currentTimeMillis() - start));
pool.shutdown();
}
}
</code></pre>
<p>从上面的例子可以看出,大致有<code>ExecutorService</code>,<code>Executors</code>,newFixedThreadPool()方法本质是 new ThreadPoolExecutor(),故还有一个<code>ThreadPoolExecutor</code>类。</p>
<p>接下来梳理一下这些类背后的关系。【通过idea得到下面的关系图】此外,<code>Executors</code>只是一个工具类。</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202156530-45308.png" style="zoom: 67%">
<p><code>Executor</code>是顶级接口</p>
<pre><code class="language-java">public interface Executor {
// 只定义了一个方法
void execute(Runnable command);
}
</code></pre>
<p><code>ExecutorService</code>:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法</p>
<pre><code class="language-java">public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
//....
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
}
</code></pre>
<p><code>AbstractExecutorService</code>ExecutorService执行方法的默认实现,发现下面的<strong>submit()</strong>底层实际执行的是execute(ftask)方法<strong>【Executor接口的execute()方法,在这个抽象类里面没有具体实现,到具体子类ThreadPoolExecutor在可以看到】</strong>。</p>
<pre><code class="language-java">public abstract class AbstractExecutorService implements ExecutorService {
// 这里重点只看一下submit方法的默认实现
// 优点1: 可以有Future返回值
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 优点2: 支持Callable参数
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
</code></pre>
<p><code>ThreadPoolExecutor</code>:线程池,可以通过调用Executors静态工厂方法来创建线程池并返回一个ExecutorService对象</p>
<pre><code class="language-java">public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 七大参数!!!!======
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,定制策略来处理任务
) {
if (corePoolSize < 0 || maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
</code></pre>
<p>回到这一小节最开始的时候,例子中的线程池有两种提交并运行线程的方式<code>execute</code>和<code>submit</code>两个方法。现在来看一下<code>ThreadPoolExecutor</code>中的<code>execute()</code>方法是什么样子的。submit()我们已经知道了是在AbstractExecutorService中有默认实现的。</p>
<pre><code class="language-java">// ThreadPoolExecutor::execute(Runnable command)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.若当前工作线程数小于核心线程数(corePoolSize),尝试创建新的核心线程
// 这里是用的位运算的,我没有深究
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //
return;
// 创建新线程失败,重新获取ctl
c = ctl.get();
}
// 2.任务入队
// 线程池处于运行状态(isRunning(c))
// 且任务成功加入阻塞队列(workQueue.offer(command))
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.2 双重检查
/*
2.2.1再次检查线程池状态(可能在此期间线程池被关闭)。
2.2.2若线程池已关闭,尝试从队列中移除任务,移除成功则拒绝任务(reject(command))。
2.2.3若线程池仍运行但无活跃线程(workerCountOf(recheck) == 0),
添加一个非核心线程(addWorker(null, false)),该线程会从队列中拉取任务执行。
*/
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.若任务无法入队(队列已满),尝试创建非核心线程(addWorker(command, false))
else if (!addWorker(command, false))
//若创建失败(线程数已达maximumPoolSize或线程池已关闭),
//执行拒绝策略(reject(command))
reject(command);
}
/*
execute(command)
│
├─ 检查command非空
│
├─ 当前线程数 < corePoolSize?
│ ├─ 是 → 创建核心线程 → 成功则返回
│ └─ 否 → 进入下一步
│
├─ 线程池是否RUNNING且任务入队成功?
│ ├─ 是 → 双重检查状态
│ │ ├─ 线程池已关闭 → 移除任务并拒绝
│ │ └─ 线程池仍运行且无活跃线程 → 创建非核心线程
│ │
│ └─ 否 → 尝试创建非核心线程
│ ├─ 成功 → 处理任务
│ └─ 失败 → 拒绝任务
*/
为什么需要二次检查线程池状态?
- 任务入队后,其他线程可能关闭了线程池(如调用shutdown())。
- 处理:
- 若线程池已关闭,需移除已入队任务并拒绝。
- 若线程池仍运行但无活跃线程(如核心线程数为0且任务在队列中),需创建新线程处理队列任务。
</code></pre>
<p><strong>场景1:核心线程数未满</strong></p>
<ul>
<li>线程池处于 <code>RUNNING</code>,当前线程数 2(<code>corePoolSize=5</code>)。</li>
<li><code>addWorker(task, true)</code> 成功创建核心线程并执行任务。</li>
</ul>
<p><strong>场景2:队列已满,创建临时线程</strong></p>
<ul>
<li>核心线程满载,队列已满,线程数未达 <code>maximumPoolSize</code>。</li>
<li><code>addWorker(task, false)</code> 创建临时线程处理任务。</li>
</ul>
<p><strong>场景3:SHUTDOWN 状态处理剩余任务</strong></p>
<ul>
<li>线程池调用 <code>shutdown()</code>,状态变为 <code>SHUTDOWN</code>。</li>
<li>已有任务在队列中,<code>addWorker(null, true)</code> 创建线程处理队列任务。</li>
</ul>
<p><code>ThreadPoolExecutor</code>设计思想:</p>
<ul>
<li><strong>核心线程优先</strong>:优先使用核心线程处理任务。</li>
<li><strong>队列缓冲</strong>:核心线程满载后,任务入队等待。</li>
<li><strong>非核心线程应急</strong>:队列满后,创建临时线程处理任务(不超过<code>maximumPoolSize</code>)</li>
</ul>
<p><mark><strong>addWorker 创建工作线程</strong></mark></p>
<p><code>addWorker</code> 方法通过精细的状态检查和并发控制,确保线程池在动态扩缩容时保持线程安全。【方便理解,可以把这个方法看作是创建线程】其核心在于:</p>
<ul>
<li><strong>双重循环</strong>:外层处理状态变化,内层处理线程数修改。</li>
<li><strong>锁与原子操作结合</strong>:保证 <code>workers</code> 集合和 <code>workerCount</code> 的一致性。</li>
<li><strong>异常回滚机制</strong>:确保资源不会泄漏(如线程数虚增或 Worker 未清理)。</li>
</ul>
<p>firstTask为我们最开始写的Runnable。【记一个代号叫做 “<strong>我的任务</strong>” 】</p>
<blockquote>
<pre><code class="language-java">// ======== firstTask
pool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("execute pool创建启动线程!");
});
</code></pre>
</blockquote>
<pre><code class="language-java">// addWorker(runnable, core)
//标记是否以核心线程数(corePoolSize)为上限创建线程。
//若为 false,则使用最大线程数(maximumPoolSize)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
........
int rs = runStateOf(c);// 获取线程池状态
// 检查是否允许添加Worker
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (;;) {
.........
//CAS 增加线程数
if (compareAndIncrementWorkerCount(c))
break retry;// 成功增加,跳出循环
c = ctl.get();// Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
...........
try {
// 把“我的任务”传进去了
w = new Worker(firstTask); // 创建的一个Worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁保护 workers 集合
mainLock.lock();
try {
// 再次检查状态(防止在加锁前状态变化)
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
//======抛出异常....
workers.add(w); // 将 Worker 加入集合
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 【Worker类里面的thread】
// 启动线程=========重点【见下面的分析】
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
</code></pre>
<p>Worker是ThreadPoolExecutor的内部类,可以看出是一个Runnable,那么肯定重写了run()方法</p>
<pre><code class="language-java">private final class Worker
extends AbstractQueuedSynchronizer implements Runnable
{
final Thread thread;
Runnable firstTask; //"我的任务"到这里来了
// 构造函数
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 利用线程工厂创建了一个线程
/*
如果final Thread t = w.thread;
t.start();启动的话,执行的是这个内部类的run()
*/
this.thread = getThreadFactory().newThread(this);
}
// run就这一行
public void run() {runWorker(this);}
//到这里了
final void runWorker(Worker w) {
..
//"我的任务"
Runnable task = w.firstTask;
...
try {
//getTask()从等待队列里面取出Runnable
while (task != null || (task = getTask()) != null) {
....
task.run();//==========
}
}......
..
//// 无任务时回收线程
processWorkerExit(w, completedAbruptly);
}
}
</code></pre>
<p><code>ThreadPoolExecutor的静态内部类</code> :: jdk自带的<strong>四种拒绝策略</strong>。</p>
<pre><code class="language-java">public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
// 1.直接抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
.........
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
// 2.直接丢弃
public static class DiscardPolicy implements RejectedExecutionHandler {
.....
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
// 3.丢弃等待队列的第一个
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
........
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//如果线程池未关闭,就弹出队列头部的元素,然后尝试执行
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
// 4.调用者运行,直接执行run()方法里面的逻辑。
// 只要线程池没有关闭,就由提交任务的当前线程处理
public static class CallerRunsPolicy implements RejectedExecutionHandler {
......
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
</code></pre>
<p><strong>总结</strong>一下,在了解到了ThreadPoolExecutor的一些类间关系、以及一些基本流程、属性之后。接下来我们来梳理一遍,线程池的运行方式。</p>
<ol>
<li>
<p><strong>创建线程池(七大参数、四大拒绝策略)</strong></p>
</li>
<li>
<p><strong>任务提交</strong></p>
</li>
</ol>
<blockquote>
<pre><code class="language-java">//2.1
executor.execute(() -> {
// 任务逻辑
});
</code></pre>
<p><strong>2.2任务处理决策链:::</strong></p>
<p>2.2.1<strong>尝试创建核心线程</strong>:当前工作线程数 < <code>corePoolSize</code>,直接创建新核心线程执行任务</p>
<pre><code class="language-java">if (workerCount < corePoolSize) {
addWorker(command, true); // true表示检查corePoolSize
return;
}
</code></pre>
<p>2.2.2<strong>任务入队</strong>: 若核心线程已满,任务尝试加入工作队列。</p>
<pre><code class="language-java">if (workQueue.offer(command)) {
// 双重检查线程池状态
if (线程池已关闭) 移除任务并拒绝;
else if (当前无活跃线程) 创建非核心线程处理队列任务;
}
</code></pre>
<p>2.2.3<strong>尝试创建非核心线程</strong>: 若队列已满且线程数 < <code>maximumPoolSize</code>,创建非核心线程。</p>
<pre><code class="language-java">else if (!addWorker(command, false)) { // false表示检查maximumPoolSize
reject(command); // 触发拒绝策略
}
</code></pre>
<p>2.2.4<strong>拒绝任务</strong></p>
</blockquote>
<ol start="3">
<li><strong>工作线程执行任务</strong></li>
</ol>
<blockquote>
<p>3.1<strong>Worker初始化</strong>:每个<code>Worker</code>绑定一个线程和初始任务(<code>firstTask</code>)。</p>
<pre><code class="language-java">Worker w = new Worker(firstTask);
final Thread t = w.thread;
t.start(); // 启动线程执行runWorker()
</code></pre>
<p>3.2<strong>任务执行循环(<code>runWorker</code>方法)</strong>:</p>
<pre><code class="language-java">while (task != null || (task = getTask()) != null) {
try {
task.run(); // 执行任务
} finally {
task = null; // 清理任务引用
}
}
</code></pre>
<p>3.3<strong>从队列获取任务(<code>getTask</code>方法)</strong>:</p>
<ul>
<li><strong>阻塞模式</strong>:若为核心线程或允许核心线程超时,调用<code>workQueue.take()</code>永久阻塞。</li>
<li><strong>超时模式</strong>:若非核心线程,调用<code>workQueue.poll(keepAliveTime)</code>超时等待。</li>
</ul>
<pre><code class="language-java">Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
</code></pre>
</blockquote>
<ol start="4">
<li><strong>线程回收与资源释放</strong></li>
</ol>
<blockquote>
<pre><code class="language-java">private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 异常终止时补偿workerCount
decrementWorkerCount();
mainLock.lock();
try {
workers.remove(w); // 从集合中移除Worker
if (线程池仍在运行 && 队列非空)
addWorker(null, false); // 补充线程处理队列任务
} finally {
mainLock.unlock();
}
}
</code></pre>
</blockquote>
<p>线程池本质也是Runnable!</p>
<p>一张好图:【来自:【线程池工作原理】https://blog.csdn.net/fighting_yu/article/details/89473175】</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202156926-2070083376.png" style="zoom: 60%">
<h2 id="3探索锁">3.探索”锁“</h2>
<p>上面探索了线程以及线程池的创建,发现其源码中存在这种代码;</p>
<pre><code class="language-java">//1.Thread的start方法
public synchronized void start()
//2.线程池部分源码addWorker()方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
//3.
LockSupport.park(this);
</code></pre>
<p>这些是什么呢?这就是锁。。</p>
<p>确保线程安全最常见的做法是利用锁机制(<code>Lock</code>、<code>sychronized</code>)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。</p>
<h3 id="-synchronized">① synchronized</h3>
<p><code>synchronized</code> 是 Java 中最基本的同步机制,它可以修饰方法或代码块,确保同一时刻只有一个线程可以执行被修饰的代码。</p>
<pre><code class="language-java">public class SynchronizedTest {
public static void main(String[] args) {
SynchronizedTest lock1 = new SynchronizedTest();
new Thread(lock1::test).start();
new Thread(lock1::test2).start();
new Thread(lock1::testStatic).start();
new Thread(lock1::testFs).start();
}
public void testStatic() {
// 锁的是Class对象
synchronized (SynchronizedTest.class){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testStatic()");
}
}
// 锁的是一个实例对象
public void test(){
synchronized (this){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("test()");
}
}
public synchronized void test2(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("test2()");
}
public void testFs(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testFs()");
}
}
</code></pre>
<p>上面只有test() 和 test2() 是互斥的。也就是1秒过后,testStatic()、testFs()、和 【test()、test1() 其中之一】一起输出打印。</p>
<p><strong>修饰代码块、修饰方法:锁的是该对象;</strong></p>
<p><strong>修饰静态成员:锁的是该类的Class对象;</strong>这种方式可以确保对静态变量的访问是线程安全的</p>
<p><strong>还可以锁任意对象。</strong></p>
<p>其实主要弄清楚各自锁的是什么对象就行了,看是否需要的是一个锁,就可以判断是否互斥了;</p>
<pre><code class="language-java">//锁的是Class对象
public static synchronized void testStatic1() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testStaticMethod()");
}
public void testStatic() {
synchronized (SynchronizedTest.class){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testStatic()");
}
}
public synchronized void test2(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("test2()");
}
</code></pre>
<p>如上述代码示例,testStatic1和testStatic需要持有的对象是同一个,故这二者会产生互斥,test2需要持有的是该类的一个实例对象,故不会与这二者产生互斥。</p>
<p><strong>需要注意的是: synchronized 并不属于方法定义的一部分,故synchronized 关键字不能被继承</strong>。如果在父类中的某个方 法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这 个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上 synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方 法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此, 子类的方法也就相当于同步了。<br>
来看看如下示例:</p>
<pre><code class="language-java">public class Father {
public synchronized void method1(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Father method1");
}
}
//
public class Son extends Father{
public void syncSon() {
super.method1(); // 调用的父类的同步方法
}
public void syncSon1() {
super.method1();
}
public void method1() { // 重写了,但是synchronized并不会继承过来
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Son method1");
}
public void sonHaha() { method1(); }
public void sonHehe() { method1(); }
public static void main(String[] args) {
Son son = new Son();
// new Thread(son::syncSon).start();
// new Thread(son::syncSon1).start(); // 会互斥
new Thread(son::sonHaha).start();
new Thread(son::sonHehe).start(); // 不会
}
}
</code></pre>
<p>synchronized【隐式锁】的底层原理涉及到 Java 对象头(Object Header)和 Monitor(监视器)两个关键概念。</p>
<p>每个 Java 对象在内存中分为三部分:</p>
<ol>
<li><strong>对象头(Header)</strong>
<ul>
<li><strong>Mark Word</strong>(标记字段):存储哈希码、GC 分代年龄、锁状态等。</li>
<li><strong>Klass Pointer</strong>(类型指针):指向类的元数据。</li>
</ul>
</li>
<li><strong>实例数据(Instance Data)</strong></li>
<li><strong>对齐填充(Padding)</strong></li>
</ol>
<p><strong>Java 对象头</strong>:在 Java 虚拟机中,每个对象都有一个对象头,用于存储对象的元数据信息,包括对象的哈希码、GC 相关信息、锁状态等。对象头通常包含一个<strong>标记字段(Mark Word),用于标识对象的锁状态</strong>。</p>
<p><strong>Monitor(监视器)</strong>:Monitor 是一种同步机制,负责管理对象的锁。每个<strong>对象都与一个 Monitor</strong> 相关联。当一个线程尝试进入一个被synchronized修饰的代码块或方法时,它会尝试获取对象的 Monitor。如果 Monitor 处于无锁状态,则当前线程会尝试将其锁定;如果 Monitor 已经被其他线程锁定,则当前线程会进入阻塞状态,直到持有锁的线程释放锁。</p>
<pre><code class="language-c++">// C++ 实现(HotSpot 源码)
class ObjectMonitor {
void* _header; // Mark Word
void* _owner; // 持有锁的线程
volatile intptr_t_count; // 重入次数
ObjectWaiter* _WaitSet; // 等待队列(调用 wait() 的线程)
ObjectWaiter* _EntryList; // 阻塞队列(竞争锁失败的线程)
// ...
};
</code></pre>
<p>查看本小节开头示例的test()方法的字节码:</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202157418-696507534.png" style="zoom: 50%">
<p><code>synchronized</code> 同步语句块的实现使用的是 <code>monitorenter</code> 和 <code>monitorexit</code> 指令,当执行 <code>monitorenter</code> 指令时,线程试图获取锁也就是获取 <strong>对象监视器 <code>monitor</code></strong> 的持有权。第一个monitorexit正常退出同步块, 第二个是异常退出同步块。</p>
<h4 id="synchronized优化"><strong>synchronized优化:</strong></h4>
<p>锁升级(JDK 6+)</p>
<p><strong>3.0 无锁</strong></p>
<ul>
<li>无锁:<strong>当第一个线程第一次访问一个对象的同步块时</strong>,JVM会在对象头中设置该线程的ID,并将对象头的状态位设置为“偏向锁”。这个过程称为“偏向”,表示对象当前偏向于第一个访问它的线程。</li>
</ul>
<p><strong>3.1 偏向锁(Biased Locking)</strong></p>
<ul>
<li><strong>原理</strong>:第一个获取锁的线程将线程 ID 写入 Mark Word,后续无需 CAS。这样如果该线程再来的时候,由于是已经设置了锁偏向该线程,故只需比对一下对象头里面的Mark Word就行了。</li>
<li><strong>触发条件</strong>:JVM 启用了偏向锁(默认开启),且对象未处于锁定状态。</li>
<li><strong>撤销</strong>:当其他线程尝试竞争时,撤销偏向锁并升级为轻量级锁。</li>
</ul>
<p><strong>3.2 轻量级锁(Lightweight Locking)</strong></p>
<ul>
<li><strong>加锁流程</strong>:
<ol>
<li>在当前线程栈帧中创建 <strong>Lock Record</strong>。</li>
<li>通过 CAS 将 Mark Word 复制到 Lock Record,并尝试将 Mark Word 指向 Lock Record。</li>
</ol>
</li>
</ul>
<p>轻量级锁在以下场景会升级为重量级锁:</p>
<ol>
<li><strong>自旋失败</strong>:竞争线程自旋一定次数后仍未获取锁。</li>
<li><strong>竞争加剧</strong>:等待锁的线程数超过 <strong>JVM 自适应的阈值</strong>。</li>
</ol>
<p><strong>3.3 重量级锁(Heavyweight Locking)</strong></p>
<ul>
<li><strong>实现</strong>:通过操作系统提供的互斥量(Mutex)和条件变量实现,线程竞争失败后进入阻塞状态。</li>
<li><strong>性能问题</strong>:涉及用户态到内核态的切换,开销较大。</li>
</ul>
<p><strong>synchronized优化:</strong></p>
<p>锁会升级,从低到高升级,反着降级不可以:<strong>无锁状态->偏向锁状态 -> 轻量级锁状态 -> 重量级锁状态</strong></p>
<table>
<thead>
<tr>
<th style="text-align: left"><strong>锁类型</strong></th>
<th style="text-align: left"><strong>实现机制</strong></th>
<th style="text-align: left"><strong>适用场景</strong></th>
<th style="text-align: left"><strong>性能开销</strong></th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: left"><strong>偏向锁</strong></td>
<td style="text-align: left">通过 Mark Word 记录线程 ID</td>
<td style="text-align: left">单线程重复访问同步块</td>
<td style="text-align: left">最低</td>
</tr>
<tr>
<td style="text-align: left"><strong>轻量级锁</strong></td>
<td style="text-align: left">CAS + 自旋(适应性自旋)</td>
<td style="text-align: left">低竞争、短时间同步</td>
<td style="text-align: left">中等</td>
</tr>
<tr>
<td style="text-align: left"><strong>重量级锁</strong></td>
<td style="text-align: left">操作系统互斥量(Mutex) + Monitor</td>
<td style="text-align: left">高竞争、长时间同步</td>
<td style="text-align: left">最高</td>
</tr>
</tbody>
</table>
<p>【节选自 锁升级】 :https://blog.csdn.net/weixin_45433817/article/details/132216383</p>
<img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202157805-1500515569.png" style="zoom: 67%">
<p><strong>问:</strong>synchronized是公平锁吗?</p>
<p>首先要知道公平锁和非公平锁的概念:</p>
<ul>
<li><strong>公平锁</strong>:公平锁指的是多个线程按照申请锁的顺序来获取锁,先到先得。当一个线程请求锁时,如果该锁当前处于可用状态,并且在该线程之前已经有其他线程在等待该锁,那么这个线程会被放入等待队列的尾部,等待前面的线程依次获取并释放锁后,它才能获取锁。</li>
<li><strong>非公平锁</strong>:非公平锁则不保证线程获取锁的顺序与申请锁的顺序一致。当一个线程请求锁时,即使有其他线程在等待该锁,它也会先尝试直接获取锁,如果获取成功就可以直接执行,而不用排队等待。</li>
</ul>
<p>那么,<code>synchronized</code> 基于对象头的 Mark Word 和监视器(Monitor)实现。当一个线程进入同步块时,它会尝试获取对象的监视器。如果监视器处于空闲状态,该线程会直接获取监视器,而不会考虑是否有其他线程已经在等待这个监视器。例如,当一个线程释放了 <code>synchronized</code> 修饰的同步块的锁后,新到来的线程有很大机会直接获取到锁,而不是等待那些在等待队列中的线程,这就体现了其非公平性。</p>
<pre><code class="language-java">class SynchronizedNonFairExample {
private static final Object lock = new Object();
private static int counter = 0;
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (true) {
synchronized (lock) {
counter++;
System.out.println(Thread.currentThread().getName() + " 获取到锁,计数: " + counter);
try {
// 模拟执行任务
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "Thread-" + i).start();
}
}
}
</code></pre>
<p>在上述代码中,多个线程竞争 <code>lock</code> 对象的锁。运行代码时,你会发现线程获取锁的顺序并不是按照线程启动的顺序,这就说明了 <code>synchronized</code> 是非公平锁。</p>
<h3 id="-lock">② Lock</h3>
<p>上一节的<code>synchronized</code>是jdk内置的关键字,属于隐式锁、也叫内置锁。现在这一节来探索一下显式锁,其<strong>提供更细粒度的控制</strong>(如可中断、超时、公平性),核心实现为 <code>ReentrantLock</code>。</p>
<pre><code class="language-java">public interface Lock {
//获取锁。如果锁不可用,则当前线程将出于线程调度目的而被禁用,并在获取锁之前处于休眠状态。
void lock();
void lockInterruptibly() throws InterruptedException;
//如果锁可用,则获取锁,并立即返回值为 true。如果锁不可用,则此方法将立即返回值 false。
boolean tryLock();
/*
如果在给定的等待时间内有空闲,并且当前线程尚未中断,则获取该锁。
如果锁可用,则此方法立即返回值 true。如果锁不可用,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,
直到发生以下三种情况之一:锁由当前线程获取;或者其他线程中断当前线程,支持中断锁获取;或指定的等待时间已用
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
//返回绑定到此 Lock 实例的新 Condition 实例。在等待条件之前,锁必须由当前线程持有
Condition newCondition();
}
</code></pre>
<p><img src="https://img2023.cnblogs.com/blog/2358057/202504/2358057-20250428202154852-778971061.png" alt="" loading="lazy"></p>
<p>从上图中,我们可以得知juc包下的几个主要的实现类,绿色圈圈连接的是里面的内部类。</p>
<h4 id="1-reentrantlock">1) ReentrantLock</h4>
<p>接下来从我们最熟知的<code>ReentrantLock</code>开始看起吧,他的简单使用:</p>
<pre><code class="language-java">public class LockTest {
Lock lock = new ReentrantLock();
int count = 0;
public static void main(String[] args) throws InterruptedException {
LockTest test = new LockTest();
for (int i = 1; i <= 2; i++) {
new Thread(test::add).start();
}
Thread.sleep(2000);
/*
如果不加锁的话,结果就不一定是两万了
*/
System.out.println(test.count);
}
public void add() {
// 标准写法 try加锁 finally释放锁
try {
lock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} finally {
lock.unlock();
}
}
}
</code></pre>
<p>我们分析一下,首先是调用了其无参构造函数创建了一个对象,里面是new了一个看名字是非公平同步标记的对象,那他肯定会有公平的同步标记。</p>
<pre><code class="language-java">// 下面都是在ReentrantLock.java里面
private final Sync sync;
// 无参构造
public ReentrantLock() {
sync = new NonfairSync();
}
// 有参构造
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 然后调用lock.lock()实际是调用的sync.lock();
public void lock() {
sync.lock();
}
// 然后调用lock.unlock()实际是调用的sync.release(1);;
public void unlock() {
sync.release(1);
}
// 是ReentrantLock的静态内部类
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 是ReentrantLock的静态内部类
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// Sync继承了AbstractQueuedSynchronizer:【AQS】
abstract static class Sync extends AbstractQueuedSynchronizer {
.....
}
</code></pre>
<p>从上面额关系我们可以看出一切源头就是<code>AbstractQueuedSynchronizer</code>,也就是我们熟悉的AQS。在这篇文章的“补充知识点”环节中,对AQS做了一个简单的介绍及分析。【AQS】:https://blog.csdn.net/okok__TXF/article/details/146455487</p>
<p>【博客园】:https://blog.csdn.net/okok__TXF/article/details/146455487</p>
<p>它是是 Java 并发包 <code>java.util.concurrent.locks</code> 下的一个核心类,是构建锁和其他同步工具(如 <code>ReentrantLock</code>、<code>Semaphore</code>、<code>CountDownLatch</code> 等)的基础框架。</p>
<p>里面定义了两种资源共享模式:</p>
<ul>
<li><strong>独占模式(Exclusive)</strong>:同一时刻只有一个线程能获取资源,如 <code>ReentrantLock</code>。</li>
</ul>
<p>在独占模式的时候,<code>tryAcquire(int)</code>:尝试获取资源,成功返回 <code>true</code>,失败返回 <code>false</code>;<code>tryRelease(int)</code>:尝试释放资源,成功返回 <code>true</code>,失败返回 <code>false</code>。</p>
<ul>
<li><strong>共享模式(Share)</strong>:多个线程可同时获取资源,如 <code>Semaphore</code>(信号量)、<code>CountDownLatch</code>(倒计时 latch)。</li>
</ul>
<p>在共享模式的时候,<code>tryAcquireShared(int)</code>:尝试获取共享资源,负数表示失败;0 表示成功但无剩余资源;正数表示成功且有剩余资源。<code>tryReleaseShared(int)</code>:尝试释放共享资源,释放后若允许唤醒后续等待节点,返回 <code>true</code>,否则 <code>false</code>。</p>
<h5 id="--非公平锁">- 非公平锁</h5>
<p>回到ReentrantLock中,我们以<strong>lock()方法</strong>举例子:【lock是无参构造的】<mark>非公平锁</mark></p>
<pre><code class="language-java">//ReentrantLock.java
public void lock() {
sync.lock(); // ===========1
}
//内部的抽象类Sync
abstract void lock();
final boolean nonfairTryAcquire(int acquires) { // ===========7
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//具体实现类NonfairSync
final void lock() { // ===========2
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // ===========3
}
protected final boolean tryAcquire(int acquires) { // ===========6
return nonfairTryAcquire(acquires); // 这个是抽象类Sync的
}
//AbstractQueuedSynchronizer.java --- acquire(1)
public final void acquire(int arg) { // ===========4
// 这里的tryAcquire是NonfairSync.java里面的
if (!tryAcquire(arg) && // ===========5
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
</code></pre>
<p>已经按顺序将1234567标注在了上面,模板方法的设计模式有时候真的会让人晕头转向。。</p>
<p>一句简简单单的lock.lock()方法做了什么呢?</p>
<p><strong>首先,</strong>会直接尝试CAS获取锁【<code>compareAndSetState(0, 1)</code> 会通过 CAS 操作尝试将 AQS 中的 <code>state</code> 状态从 0 改为 1】,如果成功的话成功则设置当前线程为锁持有者,否则进入AQS的获取流程;【在这里,当线程调用 <code>lock()</code> 时,会先通过 <code>CAS</code> 操作尝试将 <code>AQS</code> 的 <code>state</code> 从 <code>0</code> 改为 <code>1</code>。<strong>此时不会检查等待队列中是否有其他线程在排队</strong>,只要 <code>CAS</code> 成功,就直接获取锁,体现了 “插队” 的特性。】</p>
<p><strong>其次,</strong>进入aqs的<code>acquire流程</code> ,</p>
<pre><code class="language-java">1.tryAcquire(arg) 2.addWaiter(Node.EXCLUSIVE) 3.acquireQueued(xxx)
</code></pre>
<p>第一个方法tryAcquire再次尝试获取锁(非公平锁的 <code>tryAcquire</code> 即 <code>nonfairTryAcquire(acquires)</code>),在Sync :: nonfairTryAcquire(int acquires)方法里面,得到aqs里面的state,如果是0,再次尝试 CAS 抢占(体现非公平性,不检查队列);如果不是0,说明被抢占了,判断持有锁的线程是不是当前线程,如果是(体现可重入性),更新state,如果不是返回false。</p>
<p><strong>然后,</strong> 若<code>tryAcquire</code>失败【返回false】,调用<code>addWaiter(Node.EXCLUSIVE)</code> 会将当前线程包装成一个独占模式的 <code>Node</code> 节点加入 AQS 队列。接着 <code>acquireQueued</code> 会使线程在队列中自旋等待,不断尝试获取锁或被唤醒后尝试获取,直到成功。</p>
<p>接下来分析一下<code>lock.unlock()</code>方法,这个就在代码里面注释解释了</p>
<pre><code class="language-java">//ReentrantLock.java
public void unlock() {
//调用其内部同步器 sync 的 release 方法:
sync.release(1); // ===========1
}
// 内部抽象类Sync
protected final boolean tryRelease(int releases) { // ===========3
int c = getState() - releases; // 减少同步状态值(释放一次锁,`state` 减 1)
// 检查当前线程是否为锁的持有者,不是则抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {// 当 `state` 减为 0 时,完全释放锁
free = true;
setExclusiveOwnerThread(null);// 清除独占锁的线程引用
}
setState(c);// 更新 `state` 值
return free;// 返回是否完全释放锁
}
//AbstractQueuedSynchronizer.java --- acquire(1)
public final boolean release(int arg) {
//tryRelease 尝试释放锁,由子类实现具体逻辑
if (tryRelease(arg)) { // ===========2
Node h = head;// 获取等待队列头节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);// 唤醒后继节点 ===========4
return true;
}
return false;
}
//唤醒后继节点
private void unparkSuccessor(Node node) {===========5
int ws = node.waitStatus;
if (ws < 0) // 将头节点的 `waitStatus` 设为 0(取消之前的状态)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // 找到头节点的后继节点
if (s == null || s.waitStatus > 0) { // 若后继节点为空或已取消,从队尾向前找第一个非取消节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 唤醒找到的节点对应的线程
LockSupport.unpark(s.thread);
}
</code></pre>
<p>在内部抽象类Sync中的tryRelease中:</p>
<ol>
<li><strong>减少 <code>state</code> 值</strong>:<code>ReentrantLock</code> 支持重入,<code>state</code> 记录锁的重入次数。每次调用 <code>unlock()</code>,<code>state</code> 减 1。</li>
<li><strong>检查线程所有权</strong>:确保只有锁的持有者才能释放锁,否则抛出 <code>IllegalMonitorStateException</code>。</li>
<li><strong>完全释放锁</strong>:当 <code>state</code> 减为 0 时,将 <code>setExclusiveOwnerThread(null)</code>,表示锁已完全释放,返回 <code>true</code>。</li>
</ol>
<p><code>ReentrantLock</code>支持重入:【其实从名字就可以看出来了 -- Reentrant(再进去的、就是可重入嘛)】</p>
<pre><code class="language-java">ReentrantLock lock = new ReentrantLock();
lock.lock();// state=1,线程持有锁
lock.lock();// state=2(重入)
lock.unlock(); // state=1(未完全释放)
lock.unlock(); // state=0,释放锁并唤醒等待线程
</code></pre>
<h5 id="--公平锁">- 公平锁</h5>
<p>那么ReentrantLock的公平锁是什么样子的呢?其实大致步骤都差不多,主要是在<code>FairSync.java</code></p>
<pre><code class="language-java">protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//公平锁会先检查等待队列是否有前驱节点,若有则不能抢锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
</code></pre>
<p>公平锁在 <code>state == 0</code> 时,会先通过 <code>hasQueuedPredecessors</code> 检查等待队列。若有其他线程在排队,则当前线程不能抢占,必须入队等待,保证 “先来先得”,体现了公平性。而非公平锁跳过这一步,直接抢锁,这就是非公平性的核心体现。</p>
<h4 id="2-读写锁">2) 读写锁</h4>
<pre><code class="language-java">//ReadWriteLock 维护一对关联的锁,一个用于只读作,一个用于写入。只要没有写入器,多个读取器线程就可以同时持有读取锁。
//写锁是独占的。读写锁允许在访问共享数据时实现比互斥锁允许的更高级别的并发。它利用了这样一个事实,
//即虽然一次只有一个线程(写入线程)可以修改共享数据,但在许多情况下,任意数量的线程都可以同时读取数据(因此是读取线程)。
//从理论上讲,与使用互斥锁相比,使用读写锁允许的并发性增加将导致性能改进。
public interface ReadWriteLock {
//返回用于读取的锁。
Lock readLock();
//返回用于写入的锁。
Lock writeLock();
}
</code></pre>
<p><strong>读写锁是否会比使用互斥锁提高性能</strong>取决于读取数据的频率与修改数据的频率、读取和写入作的持续时间以及数据的争用 - 即尝试同时读取或写入数据的线程数。例如,最初填充数据,此后不经常修改的集合;经常搜索(例如某种目录)是使用读写锁的理想候选者。但是,如果更新变得频繁,则数据将花费大部分时间进行独占锁定,并发性几乎没有增加。只有分析和测量才能确定使用读写锁是否适合您的应用程序。</p>
<p>读写锁允许多个线程同时读(没有写入时,多个线程允许同时读(提高性能)),但只要有一个线程在写,其他线程就必须等待(只允许一个线程写入(其他线程既不能写入也不能读取))。也就是读读不冲突、读写就要冲突了。</p>
<h5 id="--readwritelock">- ReadWriteLock</h5>
<p>下面给出一个简单示例:ReadWriteLock</p>
<pre><code class="language-java">public class ReadWriteLockTest2 {
private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static final Lock readLock = readWriteLock.readLock();
private static final Lock writeLock = readWriteLock.writeLock();
private static int[] a = new int;
public static void main(String[] args) throws InterruptedException {
// 1个线程写
new Thread(ReadWriteLockTest2::write).start();
for (int i = 0; i < 9; i++)// 10个线程读
new Thread(()-> System.out.println(get())).start();
Thread.sleep(2000);
}
public static Object get() {
readLock.lock();
try {
Thread.sleep(100);
return a;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
readLock.unlock();
}
}
public static void write() {
writeLock.lock();
try {
a++;
System.out.println("写进行~~~");
Thread.sleep(1000);
System.out.println("写ok~~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
writeLock.unlock();
}
}
}
</code></pre>
<p>写操作在执行的时候,读线程是会阻塞的。但是10个读线程之间并没有阻塞</p>
<h5 id="--stampedlock">- StampedLock</h5>
<p><code>StampedLock</code>对比 <code>ReentrantReadWriteLock</code> 有所增强,在原先读写锁的基础之上新增了乐观读的模式。该模式并不会加锁,所以不会阻塞线程,会有更高的吞吐量和更高的性能。(乐观锁和悲观锁)</p>
<p>StampedLock具有三种控制读/写访问的模式:</p>
<p>1、写入(Writing):方法writeLock可能阻塞等待独占访问,并返回一个戳,该戳可在方法unlockWrite中使用以释放锁。还提供了tryWriteLock的非定时和定时版本。当锁保持在写模式时,不能获得读锁,并且所有乐观读验证都将失败。</p>
<p>2、读取(Reading):方法readLock可能会阻塞等待非独占访问,并返回一个戳,该戳可在方法unlockRead中使用以释放锁。还提供了tryReadLock的非定时和定时版本。</p>
<p>3、乐观读取(Optimistic Reading):tryOptimisticRead方法返回一个非0的stamp,只有当前同步状态没有被写模式所占有是才能获取到。他是在获取stamp值后对数据进行读取操作,最后验证该stamp值是否发生变化,如果发生变化则读取无效,代表有数据写入。这种方式能够降低竞争和提高吞吐量。</p>
<p>简单示例:</p>
<pre><code class="language-java">public class StampedLockTest {
private static final StampedLock stampedLock = new StampedLock();
private static double x = 1.0;
private static double y = 1.0;
public static void main(String[] args) {
// 1. 一个线程写
new Thread(() -> addXY(1, 1)).start();
// 2. 10个线程读
for (int i = 0; i < 10; i++) {
new Thread(() -> System.out.println(getSArea())).start();
}
}
private static double getSArea() {
// 乐观读
long stamp = stampedLock.tryOptimisticRead();
double s1 = x * y;
// 验证一下
if (!stampedLock.validate(stamp)) { // 验证失败
stamp = stampedLock.readLock(); // 升级为读锁
try {
s1 = x * y;
} finally {
stampedLock.unlockRead(stamp);
}
}
return s1;
}
private static void addXY(double a, double b) {
long stamp = stampedLock.writeLock();
try {
System.out.println("写进行~~");
x += a;
y += b;
Thread.sleep(1000);
System.out.println("写ok~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stampedLock.unlockWrite(stamp);
}
}
}
</code></pre>
<p><strong>写操作:</strong><code>writeLock()</code> 返回一个 <code>stamp</code>(时间戳),表示获取写锁成功。写锁是独占的,获取时会阻塞所有读锁和其他写锁(除了乐观读)。通过 <code>unlockWrite(stamp)</code> 释放写锁,<code>stamp</code> 必须与获取时的一致,否则抛出异常。</p>
<p><strong>乐观读:</strong> <strong><code>tryOptimisticRead()</code></strong>:获取一个 <strong>乐观读时间戳</strong>,不实际加锁,直接读取数据(成本极低),然后<strong><code>validate(stamp)</code></strong>:检查该时间戳对应的读操作期间是否有写操作发生。若 <code>stamp</code> 有效(无写操作),则数据一致;否则需要升级为读锁。锁升级:若验证失败,说明数据可能被修改,通过 <code>readLock()</code> 获取读锁(阻塞直到写锁释放),确保后续读取的数据是最新的。</p>
<h5 id="--可重入性探讨">- 可重入性探讨</h5>
<p>JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做<strong>可重入锁</strong>。</p>
<p>本小节看一下上面两种读写锁的可重入性,首先是ReadWriteLock,从他的实现类来看就是可重入的了【ReentrantReadWriteLock】</p>
<p>在<code>ReentrantReadWriteLock</code>中也有<code>Sync</code>的抽象内部类,当调用写锁的lock时,实际是会经过里面重写的<code>tryAcquire</code>,从下面的代码可以知道<strong>同一线程可多次获取写锁</strong>:当线程获取写锁后,再次调用 <code>writeLock()</code> 会直接成功(无需等待),因为内部维护了一个 <strong>重入计数器</strong>(类似 <code>ReentrantLock</code>)。每次获取写锁时计数器加 1,释放时减 1,计数器为 0 时才真正释放锁。</p>
<pre><code class="language-java">protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
....
// Reentrant acquire
setState(c + acquires);
return true;
}
....
return true;
}
</code></pre>
<p><code>ReentrantReadWriteLock</code> 基于 <strong>AQS(AbstractQueuedSynchronizer)</strong> 实现,通过 <code>state</code> 变量的高 16 位和低 16 位分别记录 <strong>读锁的共享次数</strong> 和 <strong>写锁的重入次数</strong>:</p>
<ul>
<li><strong>写锁(独占模式)</strong>:使用低 16 位记录当前线程的重入次数(和 <code>ReentrantLock</code> 类似)。</li>
<li><strong>读锁(共享模式)</strong>:使用高 16 位记录所有线程的读锁获取次数,但会通过线程本地变量(<code>ThreadLocal</code>)记录当前线程的读锁重入次数,避免不同线程的计数干扰。</li>
</ul>
<p>这种设计使得同一线程多次获取写锁或在读锁 / 写锁之间按规则重入时,不会出现死锁,符合可重入锁的定义。</p>
<pre><code class="language-java">public static void write() {
writeLock.lock();
try {
writeLock.lock();
a++;
System.out.println("写进行~~~");
Thread.sleep(1000);
System.out.println("写ok~~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
writeLock.unlock(); // 可重入
writeLock.unlock();
}
}
</code></pre>
<p><strong>StampedLock是不可重入的</strong>,为什么呢?</p>
<ol>
<li>
<p>没有锁计数机制:<code>StampedLock</code> 并没有像 <code>ReentrantLock</code> 那样维护一个锁的重入计数。在 <code>ReentrantLock</code> 中,<code>state</code> 变量用于记录锁的重入次数,每次获取锁时 <code>state</code> 加 1,释放锁时 <code>state</code> 减 1。而 <code>StampedLock</code> 中的 <code>state</code> 变量主要用于表示锁的状态和版本信息,并非用于记录重入次数。</p>
</li>
<li>
<p>如果一个线程已经持有了 <code>StampedLock</code> 的写锁或读锁,再次尝试获取相同类型的锁时,会出现以下情况:</p>
<ul>
<li><strong>写锁情况</strong>:如果线程已经持有写锁,再次调用 <code>writeLock()</code> 方法,由于写锁是独占的,该线程会被阻塞,因为它会等待自己释放写锁后才能再次获取,这显然会导致死锁。</li>
<li><strong>读锁情况</strong>:如果线程已经持有读锁,再次调用 <code>readLock()</code> 方法,虽然读锁是共享的,但 <code>StampedLock</code> 并不会像可重入锁那样允许线程多次获取而不产生问题。而且如果在持有读锁的情况下尝试获取写锁,会导致死锁,因为写锁需要独占资源,而当前线程已经持有了读锁。</li>
</ul>
</li>
</ol>
<pre><code class="language-java">private static void addXY(double a, double b) {
long stamp = stampedLock.writeLock();
try {
long lock = stampedLock.writeLock(); // 不可重入
System.out.println("写进行~~");
x += a;
y += b;
Thread.sleep(1000);
System.out.println("写ok~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stampedLock.unlockWrite(stamp);
}
}
</code></pre>
<h3 id="-锁案例">③ 锁案例</h3>
<p>上面只掌握了一丢丢的理论,没有实践怎么行呢?</p>
<h4 id="1-交替打印">1) 交替打印</h4>
<p>第一个,我们来实现一下三个线程交替打印A B C试试,第一个线程打印A,第二个B,第三个C</p>
<pre><code class="language-java">// synchronized实现
public class PrintABCSynchronized {
private int now = 1;
public static void main(String[] args) {
PrintABCSynchronized obj = new PrintABCSynchronized();
new Thread(obj::printA).start();
new Thread(obj::printB).start();
new Thread(obj::printC).start();
}
public void printA() {
for (int i = 0; i < 10; i++) {
synchronized (this) {
while ( now != 1 ) { // 为什么用while,不用if?留给读者思考
try {this.wait();}
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("A"); now = 2;
this.notifyAll();
}
}
}
public void printB() {
for (int i = 0; i < 10; i++) {
synchronized (this) {
while ( now != 2 ) {
try {this.wait();}
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("B"); now = 3;
this.notifyAll();
}
}
}
public void printC() {
for (int i = 0; i < 10; i++) {
synchronized (this) {
while ( now != 3 ) {
try {this.wait();}
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("C"); now = 1;
this.notifyAll();
}
}
}
}
</code></pre>
<p>wait - notify 【这个只能用在synchronized同步代码块中,是属于Object的方法】上面的缺陷很严重,那就是一下子就唤醒了所有挂起的线程,其实有的线程根本就不用唤醒,有没有一种办法,就是我想唤醒谁就唤醒谁呢?</p>
<pre><code class="language-java">public class PrintABCLock {
private Lock lock = new ReentrantLock();
private Condition a = lock.newCondition();
private Condition b = lock.newCondition();
private Condition c = lock.newCondition();
private int flag = 1;
public static void main(String[] args) {
PrintABCLock obj = new PrintABCLock();
new Thread(obj::printA).start();
new Thread(obj::printB).start();
new Thread(obj::printC).start();
}
public void printA(){
lock.lock();
try {
for (int i = 0; i < 10; i++) {
while ( flag != 1 ) a.await();
System.out.println('A'); flag = 2;
b.signal();
}
} catch ( InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
while ( flag != 2 ) b.await();
System.out.println('B'); flag = 3;
c.signal();
}
} catch ( InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
while ( flag != 3 ) c.await();
System.out.println('C'); flag = 1;
a.signal();
}
} catch ( InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
</code></pre>
<h4 id="2-阻塞队列">2) 阻塞队列</h4>
<p>下面模仿jdk中ArrayBlockingQueue的源码,给了一个简洁的阻塞队列</p>
<pre><code class="language-java">class TBlockedQueue<T> {
private final Lock lock;
private final Condition notEmpty;
private final Condition notFull;
private final int capacity;
private final LinkedList<T> list;
public TBlockedQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException("Capacity 不能小于1");
this.capacity = capacity;
list = new LinkedList<>();
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void add( T t ) {
list.addLast(t);
}
// 1. 入队 -- 当队列已满时,向队列中添加元素的操作会被阻塞,直到队列有空间可用。
public void put( T t ) {
if (t == null) throw new NullPointerException();
lock.lock();
try {
while (list.size() == capacity) notFull.await();
list.addLast(t);
notEmpty.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
// 2. 出队 -- 当队列为空时,从队列中获取元素的操作会被阻塞,直到队列中有新元素加入
public T take() {
lock.lock();
try {
while (list.isEmpty()) notEmpty.await();
T t = list.removeFirst();
notFull.signal();
return t;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
</code></pre>
<p>这里自定义的 <code>TBlockedQueue</code> 是一个典型的 <strong>有界阻塞队列</strong>,其核心思路是通过 <strong>锁(Lock)和条件变量(Condition)</strong> 实现线程间的同步与协调,确保在多线程环境下对队列的操作是安全的。通过 <code>lock.lock()</code> 和 <code>lock.unlock()</code> 包裹对共享资源 <code>list</code> 的操作,确保同一时刻只有一个线程修改队列。同时,使用 <code>while</code> 循环检查条件(如 <code>list.size() == capacity</code>),防止 <strong>虚假唤醒</strong>导致条件不满足时错误地继续执行。</p>
<ul>
<li><code>notEmpty</code>:当队列为空时,<code>take</code> 操作会等待此条件;当有元素入队时,通过 <code>signal()</code> 唤醒等待的消费者线程。</li>
<li><code>notFull</code>:当队列已满时,<code>put</code> 操作会等待此条件;当有元素出队时,通过 <code>signal()</code> 唤醒等待的生产者线程。</li>
</ul>
<h4 id="3-aqs自定义锁">3) AQS自定义锁</h4>
<p>前面分析可以知道ReentrantLock是以AQS为基础框架来实现的,那么,此节我们自定义来实现一个锁。</p>
<p>见 “Java并发探索--下篇”</p>
<h2 id="4探索并发工具">4.探索并发工具</h2>
<h2 id="5虚拟线程">5.虚拟线程</h2>
<p>见 “Java并发探索--下篇” --- 在下面找</p>
<p>【博客园】</p>
<p>https://www.cnblogs.com/jackjavacpp</p>
<p>【CSDN】</p>
<p>https://blog.csdn.net/okok__TXF</p>
<h2 id="end参考">end.参考</h2>
<ol>
<li>https://blog.csdn.net/agonie201218/article/details/128712507</li>
<li>https://blog.csdn.net/xu_yong_lin/article/details/117521773</li>
<li>https://www.cnblogs.com/java-bible/p/13930006.html</li>
<li>https://blog.csdn.net/fighting_yu/article/details/89473175</li>
<li>https://tech.meituan.com/2018/11/15/java-lock.html</li>
<li>https://blog.csdn.net/weixin_44772566/article/details/137398521</li>
<li>https://blog.csdn.net/m0_73978383/article/details/146442443 【synchronized详解】</li>
<li>https://liaoxuefeng.com/books/java/threading 【廖雪峰的官方网站--- 神中神】</li>
</ol><br><br>
来源:https://www.cnblogs.com/jackjavacpp/p/18852416
頁:
[1]