封装CompletionService的并发任务分发器(优化版)
<p>这个框架代码用了很长时间,使用场景也挺多,初衷是简化CompletionService的编程接口,尽量减少业务代码处的感知。<br>今天找deepseek做了一版优化,优化点:</p>
<ul>
<li>整体的超时控制</li>
<li>超时、异常处理和封装</li>
<li>取消未完成的任务</li>
</ul>
<p>核心代码</p>
<pre><code class="language-java">public class TaskDispatcher<T> {
private final CompletionService<T> completionService;
/**
* 待处理任务
*/
private final Set<Future<T>> pending = Sets.newHashSet();
/**
* 超时时间, 单位: s
*/
private long timeout = 10000;
public TaskDispatcher(Executor executor, long timeout) {
completionService = new ExecutorCompletionService<>(executor);
if (timeout > 0) {
this.timeout = timeout;
}
}
public void submit(Callable<T> task) {
Future<T> future = completionService.submit(task);
pending.add(future);
}
/**
* 仅获取执行的任务结果
*
* @param ignoreException 忽略执行时发生的异常
* @return
*/
public List<T> taskCompletedResult(boolean ignoreException) {
List<TaskResult<T>> taskResultList = taskCompleted();
List<T> res = Lists.newArrayList();
if (CollectionUtils.isEmpty(taskResultList)) {
return res;
}
boolean hasError = false;
for (TaskResult<T> taskResult : taskResultList) {
if (!taskResult.isTimeout() && taskResult.getError() == null) {
res.add(taskResult.getValue());
} else if (taskResult.isTimeout() && !ignoreException) {
LoggerUtils.error("执行任务时超时");
hasError = true;
} else if (taskResult.getError() != null && !ignoreException) {
LoggerUtils.error("执行任务时发生异常", taskResult.getError());
hasError = true;
}
}
if (hasError) {
throw new ZHException("任务并发处理时发生异常");
}
return res;
}
/**
* 获取执行的任务
*
* @return
*/
public List<TaskResult<T>> taskCompleted() {
long deadline = System.currentTimeMillis() + timeout;
List<TaskResult<T>> results = Lists.newArrayList();
int totalTasks = pending.size();
try {
for (int i = 0; i < totalTasks; i++) {
long remaining = Math.max(0, deadline - System.currentTimeMillis());
Future<T> future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
TaskResult<T> result = new TaskResult<>();
if (future == null) {
result.setTimeout(true);
} else {
pending.remove(future);
try {
result.setValue(future.get());
} catch (ExecutionException e) {
result.setError(e.getCause());
}
}
results.add(result);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务结果收集中断", e);
} finally {
pending.forEach(f -> f.cancel(true));
pending.clear();
}
return results;
}
@Data
static class TaskResult<T> {
private T value;
private Throwable error;
private boolean isTimeout;
}
}
</code></pre>
<p>需要自己声明线程池bean,使用方式如下</p>
<pre><code class="language-java"> TaskDispatcher<Integer> taskDispatcher = new TaskDispatcher<Integer>(threadExecutor, TIME_OUT);
for (long index: indexList) {
taskDispatcher.submit(() -> xxxService.count(index));
}
</code></pre>
<p>为了便于在计数求和场景使用,进一步实现了一个子类</p>
<pre><code class="language-java">public class IntSumTaskDispatcher extends TaskDispatcher<Integer> {
public IntSumTaskDispatcher(Executor executor, long timeout, boolean throwException) {
super(executor, timeout);
}
/**
* 对所有结果求和
*
* @return
*/
public int takeCompletedSum() {
List<Integer> countResList = taskCompletedResult(true);
int count = 0;
for (Integer countSingle : countResList) {
if (countSingle == null) {
continue;
}
count += countSingle;
}
return count;
}
}
</code></pre>
</div>
<div id="MySignature" role="contentinfo">
<div id="AllanboltSignature">
<p id="PSignature" style="border-top: #e0e0e0 1px dashed; border-right: #e0e0e0 1px dashed; border-bottom: #e0e0e0 1px dashed; border-left: #e0e0e0 1px dashed;
padding-top: 10px;padding-right: 10px;padding-bottom: 10px;padding-left: 60px;
font-family: 微软雅黑; font-size:11px;">
<br />
作者:五岳
<br />
出处:http://www.cnblogs.com/wuyuegb2312
<br />
对于标题未标注为“转载”的文章均为原创,其版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
</p>
</div><br><br>
来源:https://www.cnblogs.com/wuyuegb2312/p/18920444
頁:
[1]