三维 發表於 2025-6-9 11:31:00

封装CompletionService的并发任务分发器(优化版)

<p>这个框架代码用了很长时间,使用场景也挺多,初衷是简化CompletionService的编程接口,尽量减少业务代码处的感知。<br>
今天找deepseek做了一版优化,优化点:</p>
<ul>
<li>整体的超时控制</li>
<li>超时、异常处理和封装</li>
<li>取消未完成的任务</li>
</ul>
<p>核心代码</p>
<pre><code class="language-java">public class TaskDispatcher&lt;T&gt; {

    private final CompletionService&lt;T&gt; completionService;

    /**
   * 待处理任务
   */
    private final Set&lt;Future&lt;T&gt;&gt; pending = Sets.newHashSet();

    /**
   * 超时时间, 单位: s
   */
    private long timeout = 10000;

    public TaskDispatcher(Executor executor, long timeout) {
      completionService = new ExecutorCompletionService&lt;&gt;(executor);
      if (timeout &gt; 0) {
            this.timeout = timeout;
      }
    }

    public void submit(Callable&lt;T&gt; task) {
      Future&lt;T&gt; future = completionService.submit(task);
      pending.add(future);
    }

    /**
   * 仅获取执行的任务结果
   *
   * @param ignoreException 忽略执行时发生的异常
   * @return
   */
    public List&lt;T&gt; taskCompletedResult(boolean ignoreException) {
      List&lt;TaskResult&lt;T&gt;&gt; taskResultList = taskCompleted();
      List&lt;T&gt; res = Lists.newArrayList();
      if (CollectionUtils.isEmpty(taskResultList)) {
            return res;
      }
      boolean hasError = false;
      for (TaskResult&lt;T&gt; taskResult : taskResultList) {
            if (!taskResult.isTimeout() &amp;&amp; taskResult.getError() == null) {
                res.add(taskResult.getValue());
            } else if (taskResult.isTimeout() &amp;&amp; !ignoreException) {
                LoggerUtils.error("执行任务时超时");
                hasError = true;
            } else if (taskResult.getError() != null &amp;&amp; !ignoreException) {
                LoggerUtils.error("执行任务时发生异常", taskResult.getError());
                hasError = true;
            }
      }
      if (hasError) {
            throw new ZHException("任务并发处理时发生异常");
      }
      return res;
    }

    /**
   * 获取执行的任务
   *
   * @return
   */
    public List&lt;TaskResult&lt;T&gt;&gt; taskCompleted() {
      long deadline = System.currentTimeMillis() + timeout;
      List&lt;TaskResult&lt;T&gt;&gt; results = Lists.newArrayList();
      int totalTasks = pending.size();

      try {
            for (int i = 0; i &lt; totalTasks; i++) {
                long remaining = Math.max(0, deadline - System.currentTimeMillis());
                Future&lt;T&gt; future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
                TaskResult&lt;T&gt; result = new TaskResult&lt;&gt;();
                if (future == null) {
                  result.setTimeout(true);
                } else {
                  pending.remove(future);
                  try {
                        result.setValue(future.get());
                  } catch (ExecutionException e) {
                        result.setError(e.getCause());
                  }
                }
                results.add(result);
            }
      } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("任务结果收集中断", e);
      } finally {
            pending.forEach(f -&gt; f.cancel(true));
            pending.clear();
      }
      return results;
    }

    @Data
    static class TaskResult&lt;T&gt; {
      private T value;
      private Throwable error;
      private boolean isTimeout;
    }
}
</code></pre>
<p>需要自己声明线程池bean,使用方式如下</p>
<pre><code class="language-java">      TaskDispatcher&lt;Integer&gt; taskDispatcher = new TaskDispatcher&lt;Integer&gt;(threadExecutor, TIME_OUT);
      for (long index: indexList) {
            taskDispatcher.submit(() -&gt; xxxService.count(index));
      }

</code></pre>
<p>为了便于在计数求和场景使用,进一步实现了一个子类</p>
<pre><code class="language-java">public class IntSumTaskDispatcher extends TaskDispatcher&lt;Integer&gt; {
    public IntSumTaskDispatcher(Executor executor, long timeout, boolean throwException) {
      super(executor, timeout);
    }

    /**
   * 对所有结果求和
   *
   * @return
   */
    public int takeCompletedSum() {
      List&lt;Integer&gt; countResList = taskCompletedResult(true);
      int count = 0;
      for (Integer countSingle : countResList) {
            if (countSingle == null) {
                continue;
            }
            count += countSingle;
      }
      return count;
    }
}

</code></pre>


</div>
<div id="MySignature" role="contentinfo">
    <div id="AllanboltSignature">
      <p id="PSignature" style="border-top: #e0e0e0 1px dashed; border-right: #e0e0e0 1px dashed; border-bottom: #e0e0e0 1px dashed; border-left: #e0e0e0 1px dashed;
            padding-top: 10px;padding-right: 10px;padding-bottom: 10px;padding-left: 60px;
            font-family: 微软雅黑; font-size:11px;">
            <br />
            作者:五岳
            <br />
            出处:http://www.cnblogs.com/wuyuegb2312
            <br />
            对于标题未标注为“转载”的文章均为原创,其版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
      </p>
     </div><br><br>
来源:https://www.cnblogs.com/wuyuegb2312/p/18920444
頁: [1]
查看完整版本: 封装CompletionService的并发任务分发器(优化版)