技术面:Java并发(线程同步、死锁、多线程编排)
<h2 id="线程同步的方式有哪些">线程同步的方式有哪些?</h2><h3 id="线程同步">线程同步</h3>
<p><strong>线程同步</strong>,是多线程编程中的一种机制,用于协调多个线程的执行顺序,确保它们在共享资源或关键操作上按照预定的规则运行,避免因并发访问导致的数据不一致、竞态条件(Race Condition)等问题。</p>
<h3 id="线程同步的方式有哪些-1">线程同步的方式有哪些?</h3>
<ol>
<li><code>synchronized</code> 关键字,通过 JVM 内置的锁机制实现线程同步,确保同一时刻只有一个线程访问共享资源。既可以修饰<strong>实例方法</strong>也可以修饰<strong>静态方法</strong>,也可以锁<strong>代码块</strong>和锁住某个具体的<strong>实例对象</strong>。</li>
</ol>
<pre><code class="language-java">public synchronized void method() { ... } // 实例方法锁(this)
public static synchronized void method() { ... } // 类方法锁(Class 对象)
// 锁实例对象
synchronized (lockObject) {
// 同步代码块
}
</code></pre>
<ol start="2">
<li><code>ReentrantLock</code> 基于 <code>java.util.concurrent.locks.Lock</code> 接口实现的可重入互斥锁,需显式调用 <code>lock()</code> 和 <code>unlock()</code>进行加锁和解锁。<br>
支持<strong>公平锁、可中断锁、超时锁以及多条件变量(Condition)</strong>,相比 synchronized 提供了更高的灵活性。</li>
</ol>
<pre><code class="language-java">ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// 同步代码
} finally {
lock.unlock();
}
</code></pre>
<ol start="3">
<li><code>Semaphore</code>(信号量),允许多个线程同时访问资源,但是限制访问线程的数量。</li>
</ol>
<pre><code class="language-java">Semaphore semaphore = new Semaphore(3); // 初始许可数为3
semaphore.acquire(); // 获取许可
try {
// 同步代码
} finally {
semaphore.release(); // 释放许可
}
</code></pre>
<ol start="4">
<li><code>CountDownLatch</code>,允许多个线程等待其他线程执行完毕之后再执行,用于线程间的协作。</li>
</ol>
<pre><code class="language-java">public class LatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 1; i <= threadCount; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 初始化完成");
latch.countDown(); // 子线程完成,计数器减 1
}, "线程-" + i).start();
}
latch.await(); // 主线程等待所有子线程完成
System.out.println("所有子线程完成,主线程继续执行");
}
}
</code></pre>
<ol start="5">
<li><code>CyclicBarrier</code>,多个线程互相等待,所有线程都到到屏障点后,再继续执行。线程计数器可重置。</li>
</ol>
<pre><code class="language-java">import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierExample {
// 总和变量(线程安全)
private static int sum = 0;
// 线程池
private static final ExecutorService executor = Executors.newFixedThreadPool(5);
public static void main(String[] args) {
// 定义需要等待的线程数量(5个)
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
// 所有线程到达屏障后执行的回调(汇总结果)
System.out.println("所有线程已完成计算,总和为: " + sum);
});
// 启动5个线程
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
try {
// 模拟线程计算
int value = (int) (Math.random() * 100);
System.out.println(Thread.currentThread().getName() + " 计算值: " + value);
// 将计算结果累加到总和中
sum += value;
// 调用await()等待其他线程到达屏障
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
executor.shutdown();
}
}
</code></pre>
<ol start="6">
<li><code>Phaser</code>,和<code>CyclicBarrier</code>类似,但是支持更灵活的屏障操作,适用于复杂多阶段任务,支持动态注册/注销参与者,并可控制各参与者的阶段进度,并可以控制各个参与者的到达和离开。</li>
</ol>
<pre><code class="language-java">Phaser phaser = new Phaser(1); // 初始参与线程
phaser.bulkRegister(3); // 动态注册3个工作线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
for (int phase = 0; phase < 2; phase++) {
phaser.arriveAndAwaitAdvance(); // 阶段1:等待所有线程完成阶段1
// 执行阶段2任务
}
phaser.arriveAndDeregister(); // 完成并注销
}).start();
}
// 主线程等待所有线程完成
phaser.arriveAndAwaitAdvance();
</code></pre>
<ol start="7">
<li><strong>其他</strong>,另外还有<code>volatile</code>,这种能保证可见性和有序性,但不能保证原子性的关键字。以及基于CAS实现的<code>Atomic</code> 类(无锁同步),但是仅适用于简单数据类型和部分操作(如 getAndAdd)。</li>
</ol>
<h3 id="死锁">死锁</h3>
<p>死锁,通常是指在两个或多个进程(或线程、事务)在执行过程中,因争夺资源而陷入相互等待的状态,导致所有进程都无法继续执行。</p>
<h4 id="什么情况下会产生死锁">什么情况下会产生死锁?</h4>
<p>产生死锁的四个必要条件</p>
<ul>
<li><strong>互斥(Mutual Exclusion</strong>),一个资源只能被一个进程占用,其他进程必须等待其释放。</li>
<li><strong>占有并等待(Hold and Wait)</strong>,进程在持有资源的同时,申请新的资源。</li>
<li><strong>不可剥夺(No Preemption)</strong>,资源只能由持有它的进程主动释放,不能被强制剥夺。</li>
<li><strong>循环等待(Circular Wait)</strong>,存在一个进程环,每个进程都在等待下一个进程所持有的资源。</li>
</ul>
<p><img src="https://img2024.cnblogs.com/blog/772743/202509/772743-20250910102253836-484663043.png" alt="image" width="679" height="286" loading="lazy"></p>
<h4 id="如何解决死锁">如何解决死锁?</h4>
<p>上面我们已经知道产生死锁有四个必要条件,那解决死锁,只需要破坏死锁的这些必要条件即可。<br>
一般从以下几方面入手即可:</p>
<ul>
<li>
<p>破坏“占有并等待”条件</p>
<ol>
<li>进程或线程一次性申请所需的所有资源,否则不分配任何资源。(<code>可能导致资源利用率低,进程长期等待资源</code>)</li>
<li>进程或线程申请资源时,必须释放已持有的所有资源。(<code>可能导致频繁的资源释放和重新申请,增加系统开销</code>)</li>
</ol>
</li>
<li>
<p>破坏“不可剥夺”条件,允许系统强制回收资源。(<code>可能中断进程的正常执行,导致数据不一致</code>)</p>
</li>
<li>
<p>破坏“循环等待”条件,要求进程或线程按顺序申请资源。保证多个进程(线程)的执行顺序相同即可避免循环等待。<strong>这是最常用的解决死锁的方法。</strong></p>
<p>例如:事务1的执行顺序是:A->B->C,事务2的执行顺序是:C->D->A,这种情况下就容易产生死锁。因为事务1占用了A,等待C,但是事务2占用了C但是等待A。因此只需要把事务2的执行顺序改成:A->D->C,这样事务2在执行时,会发现事务1占用着A呢,因此事务会先不执行,等待事务1释放A。<br>
<img src="https://img2024.cnblogs.com/blog/772743/202509/772743-20250910102335872-1151925212.png" alt="image" width="1235" height="393" loading="lazy"></p>
</li>
</ul>
<h4 id="死锁如何恢复">死锁如何恢复?</h4>
<ul>
<li><strong><code>回滚进程或线程</code></strong>,可以执行一个或多个进程(或线程)回滚到安全状态,释放资源。一般回滚时,要遵循<strong>按优先级选择(优先级低的进程先回滚)</strong> 。<strong>按资源占用时间选择(占用时间短的进程先回滚)。</strong></li>
<li><strong><code>终止进程或线程</code></strong>,直接终止全部或部分死锁进程(或线程),释放资源。(可能导致数据丢失或事务不完整)</li>
<li><strong><code>资源剥夺</code></strong>,从某些进程或线程中强制回收资源分配给其他进程。</li>
<li><strong><code>超时机制</code></strong>,为进程或线程设置等待资源的超时时间,若超时则自动放弃请求并释放已占资源。</li>
</ul>
<h4 id="数据库中的死锁">数据库中的死锁</h4>
<p>在操作数据库时,如果有多个事务并发执行,也是可能发生死锁的。当事务1持有资源A的锁,但是尝试获取资源B的锁,而事务2持有资源B的锁,尝试获取资源A的锁的时候,这时候就会发生死锁的情况。</p>
<p>当数据库发生死锁的时候,会报出来如下的错误:</p>
<pre><code class="language-powershell">Error updating database. Cause: ERR-CODE:
Deadlock found when trying to get lock;
The error occurred while setting parameters### SQL:
update test_table set updated=now(),type_state = ? where test_num = 123
</code></pre>
<h4 id="数据库操作中如何避免死锁">数据库操作中如何避免死锁?</h4>
<p>一般对于数据库的死锁,<strong>主要是避免发生并发更新同一资源的操作。或者可以考虑保证操作的顺序,比如多个事务都是先操作资源A、再操作资源B,这样就能有效的避免死锁。</strong></p>
<p>还有一些其他优化措施:<br>
<strong>减少事务持有锁的时间</strong>:尽快提交或回滚事务。<br>
<strong>锁粒度控制</strong>:使用行级锁而非表级锁,减少资源竞争。<br>
<strong>避免嵌套事务</strong>:减少循环等待的可能性。</p>
<h3 id="多线程编排">多线程编排</h3>
<p>在 Java 中,多线程的编排可以通过多种方式实现,主要涉及 <strong>线程池</strong>、<strong>同步机制</strong>、<strong>并发工具类</strong> 以及 任务协调工具(如 <code>Future</code>、<code>CompletableFuture</code>)等。</p>
<h4 id="completablefuture怎么实现多线程异步编排">CompletableFuture怎么实现多线程异步编排?</h4>
<p><code>CompletableFuture</code>,提供了非常强大的<code>Future</code>的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合<code>CompletableFuture</code>的方法。<br>
我在【上一篇文章】提过<code>CompletableFuture</code>底层就是用<code>ForkJoinPool</code>来实现,那么<code>CompletableFuture</code>如何使用来实现多线程任务编排的呢?</p>
<h5 id="单个任务">单个任务</h5>
<h6 id="runasync无返回值"><code>runAsync</code>:无返回值</h6>
<pre><code class="language-java">CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("无返回值任务执行中");
});
</code></pre>
<h6 id="supplyasync有返回值"><code>supplyAsync</code>:有返回值</h6>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "异步任务结果";
});
System.out.println(future.get()); // 输出: 异步任务结果
</code></pre>
<h6 id="指定线程池">指定线程池</h6>
<p>因为<code>CompletableFuture</code>默认底层是使用的<code>ForkJoinPool.commonPool()</code>,但是也是可以自定义线程池,配置线程的一些指定信息。</p>
<pre><code class="language-java">ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "自定义线程池任务";
}, executor);
</code></pre>
<h5 id="两个任务编排">两个任务编排</h5>
<p><strong><code>thenApplyAsync</code></strong>:能接收上一次的执行结果,还可以有返回值</p>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApplyAsync(result -> result + " World");// 运行结果 Hello World
</code></pre>
<p><strong><code>thenRunAsync</code></strong>:不能接收上一次的执行结果,并且也没返回值</p>
<pre><code class="language-java">CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
}).thenRunAsync(() -> {
System.out.println("after Hello World");
});
</code></pre>
<h5 id="组合多个任务编排">组合多个任务编排</h5>
<ol>
<li><strong>串行组合(<code>thenCompose</code>)</strong>,可以将前一个任务的结果传递给下一个任务(链式依赖)</li>
</ol>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " World"));
System.out.println(future.get()); // 输出: Hello World
</code></pre>
<ol start="2">
<li><strong><code>并行组合(thenCombine)</code></strong>,将两个独立任务的结果进行合并</li>
</ol>
<pre><code class="language-java">CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2)
.thenAccept(System.out::println); // 输出: Hello World
</code></pre>
<ol start="3">
<li><strong><code>多任务并行(allOf / anyOf)</code></strong></li>
</ol>
<ul>
<li><code>allOf</code>:等待所有任务完成</li>
</ul>
<pre><code class="language-java">CompletableFuture<Void> allFutures = CompletableFuture.allOf(
CompletableFuture.runAsync(() -> System.out.println("Task 1")),
CompletableFuture.runAsync(() -> System.out.println("Task 2"))
);
allFutures.get(); // 等待所有任务完成
</code></pre>
<ul>
<li><code>anyOf</code>:任一任务完成即触发</li>
</ul>
<pre><code class="language-java">CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> "Task 1"),
CompletableFuture.supplyAsync(() -> "Task 2")
);
System.out.println(anyFuture.get()); // 输出: Task 1 或 Task 2(取决于哪个先完成)
</code></pre>
<h5 id="任务编排异常处理">任务编排异常处理</h5>
<ol>
<li><strong>捕获异常(<code>exceptionally</code>)</strong>,在任务抛出异常时提供默认信息。</li>
</ol>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("失败");
return "成功";
}).exceptionally(ex -> {
System.out.println("异常处理: " + ex.getMessage());
return "默认值";
});
System.out.println(future.get());
</code></pre>
<ol start="2">
<li><strong>全局处理(<code>handle / whenComplete</code>)</strong></li>
</ol>
<ul>
<li><code>handle</code>:处理异常并返回新结果</li>
</ul>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("失败");
return "成功";
}).handle((result, ex) -> {
if (ex != null) {
System.out.println("异常处理: " + ex.getMessage());
return "默认值";
}
return result;
});
</code></pre>
<ul>
<li><code>whenComplete</code>:无论成功或失败均执行(不可中断)</li>
</ul>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("失败");
return "成功";
}).whenComplete((result, ex) -> {
if (ex != null) System.out.println("任务失败");
else System.out.println("任务成功: " + result);
});
</code></pre>
</div>
<div id="MySignature" role="contentinfo">
<div style="float:left;letter-spacing: 1px;font-size: 13px;"><p><strong>作者:</strong>纪莫
<br>欢迎任何形式的转载,但请务必注明出处。<br>
限于本人水平,如果文章和代码有表述不当之处,还请不吝赐教。</p>
<!-- 分享图标开始 -->
<p>欢迎扫描二维码关注公众号:<strong>Jimoer</strong></p>
<p>文章会同步到公众号上面,大家一起成长,共同提升技术能力。</p>
<p><strong>声援博主:</strong>如果您觉得文章对您有帮助,可以点击文章右下角【推荐】一下。</p>
<p>您的鼓励是博主的最大动力!</p>
</div>
<div style="float:right;">
<img src="https://img2018.cnblogs.com/blog/772743/201909/772743-20190904004009398-659676330.png" alt="微信公众号" ></div>
<!-- 分享图标结束 --><br><br>
来源:https://www.cnblogs.com/jimoer/p/19083170
頁:
[1]