好端端的线程池,怎么就卡死了?
<h1 id="写在前面">写在前面</h1><p>最近,我们的业务收到一项报障,线上某个业务模块偶尔会出现无法正常工作的情况。</p>
<p>经过多方排查,最终确认是线程池使用方式不合理导致的。鉴于线程池使用的普遍性和该类问题的隐秘性,本文将其中涉及的“坑”整理出来,与大家分享。</p>
<p>本文将尽可能淡化业务本身,着重介绍其中的技术问题。</p>
<h1 id="场景说明">场景说明</h1>
<p>该业务链路有三个节点,分别为“演示调度入口”、“获取数据”,以及“推送数据”:</p>
<p><img src="https://files.mdnice.com/user/102576/0e2e7b36-1aaa-47f5-92ea-9a7916c3c0ac.png" alt="" loading="lazy"></p>
<p>其中,三个节点配置了同一个线程池,为呈现更为直观,将实际业务代码做了必要的内联和简化处理,如下:</p>
<pre><code class="language-java">private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
public void handle() {
// 线程延迟调度(第一个节点,调度后续两个节点)
executor.schedule(() -> {
doHandle();
}, 10L, TimeUnit.MILLISECONDS);
}
private List<String> fetchData(){
CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(() -> {
// 模拟数据获取业务逻辑(第二个节点)
}, executor);
return future.get();
}
private void doHandle() {
// 推送数据业务逻辑(第三个节点,需要等第二个节点的数据获取完毕)
List<String> data = fetchData(taskId);
executor.schedule(() -> {
push(data);
}, 50L, TimeUnit.MILLISECONDS);
}
</code></pre>
<p>有经验的同学可以一眼看出,同一个链路上的多个业务节点,若共用同一个线程池,可能会出问题。</p>
<h1 id="知识回顾">知识回顾</h1>
<p>在接着往下讲之前,先来回顾一下基础知识。</p>
<h2 id="java线程池的运行机制">Java线程池的运行机制</h2>
<p>任务提交到线程池中后:</p>
<ol>
<li>如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。</li>
<li>如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。</li>
<li>如果workerCount >= corePoolSize且线程池内的阻塞队列已满,判断 workerCount < maximumPoolSize 是否成立,若成立,则创建并启动一个线程来执行新提交的任务。</li>
<li>如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务。</li>
</ol>
<p>下面两个图很清楚地呈现了这个过程(源自参考资料):</p>
<p><img src="https://files.mdnice.com/user/102576/d9c0f0a4-3aaf-4846-920f-f3a6480234ff.png" alt="" loading="lazy"></p>
<p><img src="https://files.mdnice.com/user/102576/5929236d-8dd4-4548-8376-6242460509e3.png" alt="" loading="lazy"></p>
<p>进一步地,线程池的生命周期定义在java.util.concurrent.ThreadPoolExecutor 类中,这个不是本文重点,有兴趣的同学自行学习即可:</p>
<pre><code class="language-java">// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN =0 << COUNT_BITS;
private static final int STOP =1 << COUNT_BITS;
private static final int TIDYING =2 << COUNT_BITS;
private static final int TERMINATED =3 << COUNT_BITS;
</code></pre>
<p><img src="https://files.mdnice.com/user/102576/0079d76c-b0fb-4e03-8137-000dfe0981f8.png" alt="" loading="lazy"></p>
<h2 id="java线程的生命周期">Java线程的生命周期</h2>
<p>定义在 java.lang.Thread 类中:</p>
<pre><code class="language-java">public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
</code></pre>
<p>状态转换图如下(源自参考资料):</p>
<p><img src="https://files.mdnice.com/user/102576/8caa99b9-08e9-44f5-83e0-8e0a4fb5cd96.png" alt="" loading="lazy"></p>
<p>在我们将要描述的场景中,大量线程就是卡在了waiting状态。</p>
<h1 id="问题推演">问题推演</h1>
<p>我们在代码中定义了一个线程池,其核心线程数为5、配置了无界阻塞队列、最大线程数为 Integer.MAX_VALUE(约21亿):</p>
<pre><code class="language-java">private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
</code></pre>
<p>那么任务并发调用时,问题是如何产生的呢?一个简单的办法是进行推演和模拟。<br>
我们写一段代码进行复现(加了一些日志输出):</p>
<pre><code class="language-java">import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
public class TaskScheduler {
// 增加线程池的线程数量,以便模拟多线程抢占
private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
public void handle(int taskId) {
// 线程延迟调度(第一个节点,调度后续两个节点)
executor.schedule(() -> {
System.out.println("任务 " + taskId + " - 第一个节点开始执行,线程编号: " + Thread.currentThread().getId());
try {
doHandle(taskId);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务 " + taskId + " - 第一个节点执行完成,线程编号: " + Thread.currentThread().getId());
}, 10L, TimeUnit.MILLISECONDS);
}
private List<String> fetchData(int taskId) throws ExecutionException, InterruptedException {
CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(() -> {
System.out.println("任务 " + taskId + " - 第二个节点开始执行,线程编号: " + Thread.currentThread().getId());
// 模拟数据获取业务逻辑(第二个节点)
Random random = new Random();
int delay = 500 + random.nextInt(2000); // 随机延迟时间
try {
Thread.sleep(delay); // 模拟耗时操作
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务 " + taskId + " - 第二个节点执行完成,线程编号: " + Thread.currentThread().getId());
return new ArrayList<String>() {{
add("data1");
add("data2");
add("data3");
}};
}, executor);
return future.get();
}
private void doHandle(int taskId) throws ExecutionException, InterruptedException {
// 推送数据业务逻辑(第三个节点,需要等第二个节点的数据获取完毕)
List<String> data = fetchData(taskId);
System.out.println("任务 " + taskId + " - 第二个节点的数据已获取,线程编号: " + Thread.currentThread().getId());
executor.schedule(() -> {
System.out.println("任务 " + taskId + " - 第三个节点开始执行,线程编号: " + Thread.currentThread().getId());
push(data);
System.out.println("\033[0;31m任务 " + taskId + " - 第三个节点执行完成,线程编号: " + Thread.currentThread().getId() + "\033[0m");
}, 50L, TimeUnit.MILLISECONDS);
}
private void push(List<String> data) {
// 模拟数据推送
System.out.println("推送数据: " + data);
}
/**
* 监控线程池状态
*/
private static void monitorThreadPoolStatus(ScheduledThreadPoolExecutor executor) {
ScheduledExecutorService monitorExecutor = Executors.newSingleThreadScheduledExecutor();
monitorExecutor.scheduleAtFixedRate(() -> {
int poolSize = executor.getPoolSize();
int corePoolSize = executor.getCorePoolSize();
int maximumPoolSize = executor.getMaximumPoolSize();
long completedTaskCount = executor.getCompletedTaskCount();
int activeCount = executor.getActiveCount();
int queueSize = executor.getQueue().size();
System.out.println("\n【线程池状态】");
System.out.println("核心线程数: " + corePoolSize);
System.out.println("最大线程数: " + maximumPoolSize);
System.out.println("当前线程数: " + poolSize);
System.out.println("活跃线程数: " + activeCount);
System.out.println("已完成任务数: " + completedTaskCount);
System.out.println("队列中等待的任务数: " + queueSize);
System.out.println("是否关闭: " + executor.isShutdown());
System.out.println("是否终止: " + executor.isTerminated());
}, 0, 1, TimeUnit.SECONDS); // 每秒打印一次
// 在适当的时候关闭监控器(例如主程序结束时)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
monitorExecutor.shutdownNow();
}));
}
// 模拟任务数量(可自由调整)
private static final int TASK_COUNT = 5;
public static void main(String[] args) throws InterruptedException {
TaskScheduler scheduler = new TaskScheduler();
ExecutorService taskExecutor = Executors.newFixedThreadPool(TASK_COUNT); // 创建一个固定大小的线程池
CountDownLatch startSignal = new CountDownLatch(1); // 起跑线
CountDownLatch readySignal = new CountDownLatch(TASK_COUNT); // 用于通知主线程所有线程已就位
// 模拟多个并发任务
for (int i = 1; i <= TASK_COUNT; i++) {
final int taskId = i;
taskExecutor.submit(() -> {
readySignal.countDown(); // 通知主线程当前线程已准备就绪
try {
startSignal.await(); // 等待起跑信号
scheduler.handle(taskId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
System.out.println("等待所有线程就绪...");
readySignal.await(); // 等待所有线程准备完毕
System.out.println("所有线程已准备就绪,开始并发执行!");
startSignal.countDown(); // 放行所有等待的线程
// 启动线程池状态监控
monitorThreadPoolStatus(scheduler.executor);
taskExecutor.shutdown(); // 后续会自动关闭,当所有任务完成时
}
}
</code></pre>
<p>执行之,观察输出日志。</p>
<p>当任务数量小于5时(比如2):</p>
<pre><code class="language-shell">等待所有线程就绪...
所有线程已准备就绪,开始并发执行!
任务 2 - 第一个节点开始执行,线程编号: 25
任务 1 - 第一个节点开始执行,线程编号: 23
任务 1 - 第二个节点开始执行,线程编号: 27
任务 2 - 第二个节点开始执行,线程编号: 28
任务 1 - 第二个节点执行完成,线程编号: 27
任务 1 - 第二个节点的数据已获取,线程编号: 23
任务 1 - 第一个节点执行完成,线程编号: 23
任务 1 - 第三个节点开始执行,线程编号: 27
推送数据:
任务 1 - 第三个节点执行完成,线程编号: 27
任务 2 - 第二个节点执行完成,线程编号: 28
任务 2 - 第二个节点的数据已获取,线程编号: 25
任务 2 - 第一个节点执行完成,线程编号: 25
任务 2 - 第三个节点开始执行,线程编号: 23
推送数据:
任务 2 - 第三个节点执行完成,线程编号: 23
</code></pre>
<p>最终线程池状态为:</p>
<pre><code class="language-shell">核心线程数: 5
最大线程数: 2147483647
当前线程数: 5
活跃线程数: 0
已完成任务数: 6
队列中等待的任务数: 0
是否关闭: false
是否终止: false
</code></pre>
<p>当任务数量大于等于5时(比如5):</p>
<pre><code class="language-shell">等待所有线程就绪...
所有线程已准备就绪,开始并发执行!
任务 5 - 第一个节点开始执行,线程编号: 27
任务 1 - 第一个节点开始执行,线程编号: 29
任务 4 - 第一个节点开始执行,线程编号: 28
任务 3 - 第一个节点开始执行,线程编号: 30
任务 2 - 第一个节点开始执行,线程编号: 31
</code></pre>
<p>最终线程池状态为:</p>
<pre><code class="language-shell">核心线程数: 5
最大线程数: 2147483647
当前线程数: 5
活跃线程数: 5
已完成任务数: 0
队列中等待的任务数: 5
是否关闭: false
是否终止: false
</code></pre>
<p>可以看出,<strong>当任务数量大于线程池的数量时,任务就会卡死,因为后续的节点已经没有线程可用了,对应的任务始终无法完成,因此已经被占用的线程无法释放(因为属于同一个任务)。此后继续提交的新任务会被放到无界阻塞队列中,表现出来就是系统处于假死状态。</strong></p>
<p>以5个任务为例,卡死之后的线程池是这样:</p>
<p><img src="https://files.mdnice.com/user/102576/6cc5946d-fd20-468d-8476-1743d1735b3f.png" alt="" loading="lazy"></p>
<p>以6个任务为例,卡死之后的线程池是这样:</p>
<p><img src="https://files.mdnice.com/user/102576/b31b6ddb-3322-480c-b55d-8edba40e1052.png" alt="" loading="lazy"></p>
<p>以此类推...</p>
<p>再看看线程状态(以5个任务为例):</p>
<p><img src="https://files.mdnice.com/user/102576/b8868896-ddea-4520-8325-68b8b0cf806f.png" alt="" loading="lazy"></p>
<p><img src="https://files.mdnice.com/user/102576/f91e2cf8-0895-4477-a137-711a400a217b.png" alt="" loading="lazy"></p>
<p>可以看到,状态均为WAITING。</p>
<h1 id="刨根问底">刨根问底</h1>
<h2 id="1-为何多年前的雷现在才炸">1. 为何多年前的雷,现在才炸?</h2>
<p>其实不是最近才出问题,而是可能早已出问题了,但无人知晓。相关的几个原因:</p>
<ul>
<li>业务本身流量不高,问题可能要在某些流量高峰期间才被触发。从上面的分析过程来看,只要同一时刻提交的任务数量小于5,就是可以持续运行下去的(因为后续两个节点,一定会被逐步执行完毕,对应的线程就会被释放掉了)。</li>
<li>需求迭代频繁,机器隔几天就会部署重启。即使之前部分机器引起了线程卡死,也会被“不经意间”解决掉(从这个角度看,大促期间扩容还是有必要的/doge)。</li>
<li>业务监控不完善。线程卡死后,后续的业务逻辑即使未执行,也没有被感知到,这反映出:缺乏对应的监控预警信息。</li>
<li>相关功能在此前不够受重视。本次的故障是一个辅助功能,此前可能已经出问题了,但没有收到业务反馈。最近业务方开始关注数据指标,进而发现了异常。</li>
</ul>
<h2 id="2-为什么cpu没有异常">2. 为什么CPU没有异常?</h2>
<p>因为本次故障并不消耗CPU。线程处于等待状态,没有实际“工作”,当然也就不会引发CPU占用率升高。</p>
<p>另外提一句,当时我们还看了火焰图,尝试发现相关的“死锁”,显而易见,并不会有任何收获。</p>
<h2 id="3-为什么内存没有异常">3. 为什么内存没有异常?</h2>
<p>根据上述分析,发生线程池卡死的现象时,后续提交进来的任务均会被放到阻塞队列中,按理会使得内存不断增长,从而引发内存溢出,但我们在排查过程中并未观察到内存异常。那么是什么原因呢?</p>
<p>我们看看提交到阻塞队列中的对象占了多大内存,改造一下handle方法,引入jol工具将其打印出来:</p>
<pre><code class="language-java">public void handle(int taskId) {
Runnable task = () -> {
System.out.println("任务 " + taskId + " - 第一个节点开始执行,线程编号: " + Thread.currentThread().getId());
try {
doHandle(taskId);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务 " + taskId + " - 第一个节点执行完成,线程编号: " + Thread.currentThread().getId());
};
// 使用 JOL 打印该任务对象的内存大小
System.out.println("\n【任务对象内存占用】");
System.out.println(GraphLayout.parseInstance(task).toFootprint());
executor.schedule(task, 10L, TimeUnit.MILLISECONDS);
}
</code></pre>
<p>输出:</p>
<pre><code class="language-shell"> COUNT AVG SUM DESCRIPTION
1 16 16 TaskScheduler
1 24 24 TaskScheduler$$Lambda$5/74534624
</code></pre>
<p>可以看到对象大小为24字节。当然,这是简化后的程序,实际的业务代码还要大一些,保守起见,预估为50字节。<br>
假设每天10万次请求,那么会产生内存约为5M,按照平均每个月发版一次(实际大家的业务应该更为频繁)的节奏,仅会积累150M的内存,这个量级并不算高,也未引起JVM的“重视”。其实,如果把持续运行很久的内存dump下来,是可以发现端倪的。</p>
<blockquote>
<p>内存计算方案有多种,除了JOL,也可以使用这个开源工具类:https://github.com/sunshanpeng/dark_magic</p>
</blockquote>
<h1 id="解决方案">解决方案</h1>
<p>讲完了问题,谈谈如何解决。</p>
<p>对于我们遇到的问题来说,是因为一个任务链路中的多个节点共用了同一个线程池,从而导致多个任务的前置节点把线程消耗完毕,后续资源没有线程去执行。从这个角度来看,可行的解决方案有几种:</p>
<p><strong>方案1.</strong> 假设确实需要共用线程池,可以把线程池的核心线程数调大,比业务高峰期间的流量更高即可。当然,这个方案,并不算很优雅。</p>
<p><strong>方案2.</strong> 任务链路中的多个节点,拆分独立线程池。这种可以从根源上避免线程争用(因为节点3总是会执行完毕的,对应任务占用的线程池2和线程池1会被逐级释放)。如下图:</p>
<p><img src="https://files.mdnice.com/user/102576/4f0cc814-9704-447e-8808-e9177901b9a8.png" alt="" loading="lazy"></p>
<p><strong>方案3.</strong> 重新审视链路中的多层节点,是否必须异步执行,如某些地方其实可以改为同步执行。</p>
<p>最终,我们采用了方案2,改造后的代码类似于:</p>
<pre><code class="language-java">// 独立线程池:用于第一个调度节点(延迟执行 doHandle)
private ScheduledThreadPoolExecutor stage1Pool = new ScheduledThreadPoolExecutor(5);
// 独立线程池:用于数据获取(第二个节点)
private ExecutorService stage2Pool = Executors.newFixedThreadPool(5);
// 独立线程池:用于推送数据(第三个节点)
private ExecutorService stage3Pool = Executors.newFixedThreadPool(10);
public void handle() {
// 线程延迟调度(第一个节点,调度后续两个节点)
stage1Pool.schedule(() -> {
doHandle();
}, 10L, TimeUnit.MILLISECONDS);
}
private List<String> fetchData(){
CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(() -> {
// 模拟数据获取业务逻辑(第二个节点)
}, stage2Pool);
return future.get();
}
private void doHandle() {
// 推送数据业务逻辑(第三个节点,需要等第二个节点的数据获取完毕)
List<String> data = fetchData(taskId);
stage3Pool.schedule(() -> {
push(data);
}, 50L, TimeUnit.MILLISECONDS);
}
</code></pre>
<p>另外还有个新问题:如何合理设置线程池参数?其实这里面也有一套方法论。由于不是本文重点,不再展开,感兴趣的读者请参考此前写的一篇文章:https://www.cnblogs.com/xiaoxi666/p/16755570.html。</p>
<h1 id="经验教训">经验教训</h1>
<h2 id="要有全局观">要有全局观</h2>
<p>本文为了表述方便,对代码做了简化,实际的业务逻辑较长,且为不同时期的历史逻辑,写代码时容易忽略全局,导致同一个线程池配置在同一个链路的多个节点而不自知。这是很有风险的。</p>
<h2 id="要深刻掌握技术才能直击本质">要深刻掌握技术,才能直击本质</h2>
<p>即便看出来了同一个线程池被链路中的多个节点复用,也不一定能意识到可能的风险。我们在排查的过程中就曾忽略这个方向,多花了很多时间(又是查CPU,又是看内存和GC,又是看火焰图,直到发现各项指标都正常时,才回过头重新审视代码,进而找到根因)。</p>
<h2 id="要有意识地逐步重构代码">要有意识地逐步重构代码</h2>
<p>在开发过程中,遇到历史上不合理的逻辑,鼓励大胆提出来,共同探讨出更合适的方案并执行小步重构,防患于未然。</p>
<h2 id="闭环思维">闭环思维</h2>
<p>遇到“诡异”问题,势必要挖掘根因,不能让可能的问题处于悬而未决的状态,可能出问题的地方在将来一定会出问题。</p>
<h1 id="参考文档">参考文档</h1>
<p> Life Cycle of a Thread in Java</p>
<p> Java线程池实现原理及其在美团业务中的实践</p>
</div>
<div id="MySignature" role="contentinfo">
『注:本文来自博客园“小溪的博客”,若非声明均为原创内容,请勿用于商业用途,转载请注明出处http://www.cnblogs.com/xiaoxi666/』<br><br>
来源:https://www.cnblogs.com/xiaoxi666/p/18892107
頁:
[1]