看见了就想说 發表於 2026-4-29 09:00:00

Tomcat 线程池的设计与实现:StandardThreadExecutor

<h2 id="理解思路">理解思路</h2>
<blockquote>
<p>我们如下几个方面开始引入线程池的,这里主要从上文Service引入,保持上下文之间的衔接,会很好的构筑你的知识体系。</p>
</blockquote>
<ul>
<li>上文中我们了解到,Executor是包含在Service中的,Service中关于Executor的配置和相关代码如下:</li>
</ul>
<p>server.xml中service里包含Executor的配置</p>
<pre><code class="language-xml">&lt;Service name="Catalina"&gt;
&lt;!-- 1. 属性说明
        name:Service的名称
--&gt;

    &lt;!--2. 一个或多个excecutors --&gt; // 看这里
    &lt;!--
    &lt;Executor name="tomcatThreadPool" namePrefix="catalina-exec-"
      maxThreads="150" minSpareThreads="4"/&gt;
    --&gt;
&lt;/Service&gt;   
</code></pre>
<p>Service中executors相关方法</p>
<pre><code class="language-java">/**
* Adds a named executor to the service
* @param ex Executor
*/
@Override
public void addExecutor(Executor ex) {
    synchronized (executors) {
      if (!executors.contains(ex)) {
            executors.add(ex);
            if (getState().isAvailable()) {
                try {
                  ex.start(); // 启动
                } catch (LifecycleException x) {
                  log.error(sm.getString("standardService.executor.start"), x);
                }
            }
      }
    }
}

/**
* Retrieves all executors
* @return Executor[]
*/
@Override
public Executor[] findExecutors() {
    synchronized (executors) {
      Executor[] arr = new Executor;
      executors.toArray(arr);
      return arr;
    }
}


/**
* Retrieves executor by name, null if not found
* @param executorName String
* @return Executor
*/
@Override
public Executor getExecutor(String executorName) {
    synchronized (executors) {
      for (Executor executor: executors) {
            if (executorName.equals(executor.getName()))
                return executor;
      }
    }
    return null;
}

/**
* Removes an executor from the service
* @param ex Executor
*/
@Override
public void removeExecutor(Executor ex) {
    synchronized (executors) {
      if ( executors.remove(ex) &amp;&amp; getState().isAvailable() ) {
            try {
                ex.stop(); // 停止
            } catch (LifecycleException e) {
                log.error(sm.getString("standardService.executor.stop"), e);
            }
      }
    }
}
</code></pre>
<ul>
<li>和Server、Service实现一样,StandardThreadExecutor也是继承LifecycleMBeanBase;然后实现Executor的接口。</li>
</ul>
<p><img src="https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202603082033960.jpeg" alt="" loading="lazy"></p>
<ul>
<li>Tomcat关于Executor相关的配置文档</li>
</ul>
<p>http://tomcat.apache.org/tomcat-9.0-doc/config/executor.html</p>
<h2 id="executor接口设计">Executor接口设计</h2>
<p>Executor的设计很简单,在理解的时候需要理解两点:</p>
<ol>
<li>Tomcat希望将Executor也纳入Lifecycle<strong>生命周期管理</strong>,所以让它实现了Lifecycle接口</li>
<li><strong>引入超时机制</strong>:也就是说当work queue满时,会等待指定的时间,如果超时将抛出RejectedExecutionException,所以这里增加了一个<code>void execute(Runnable command, long timeout, TimeUnit unit)</code>方法; 其实本质上,它构造了JUC中ThreadPoolExecutor,通过它调用ThreadPoolExecutor的<code>void execute(Runnable command, long timeout, TimeUnit unit)</code>方法。</li>
</ol>
<pre><code class="language-java">public interface Executor extends java.util.concurrent.Executor, Lifecycle {

    public String getName();

    /**
   * Executes the given command at some time in the future.The command
   * may execute in a new thread, in a pooled thread, or in the calling
   * thread, at the discretion of the &lt;code&gt;Executor&lt;/code&gt; implementation.
   * If no threads are available, it will be added to the work queue.
   * If the work queue is full, the system will wait for the specified
   * time until it throws a RejectedExecutionException
   *
   * @param command the runnable task
   * @param timeout the length of time to wait for the task to complete
   * @param unit    the units in which timeout is expressed
   *
   * @throws java.util.concurrent.RejectedExecutionException if this task
   * cannot be accepted for execution - the queue is full
   * @throws NullPointerException if command or unit is null
   */
    void execute(Runnable command, long timeout, TimeUnit unit);
}
</code></pre>
<p>找到Executor的实现类</p>
<p><img src="https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202603082035276.jpeg" alt="" loading="lazy"></p>
<h2 id="standardthreadexecutor的实现">StandardThreadExecutor的实现</h2>
<blockquote>
<p>接下来我们看下具体的实现类StandardThreadExecutor。</p>
</blockquote>
<h3 id="理解相关配置参数">理解相关配置参数</h3>
<p>Executor官方配置说明文档</p>
<ul>
<li>公共属性</li>
</ul>
<p>Executor的所有实现都 支持以下属性:</p>
<table>
<thead>
<tr>
<th>属性</th>
<th>描述</th>
</tr>
</thead>
<tbody>
<tr>
<td>className</td>
<td>实现的类。实现必须实现 org.apache.catalina.Executor接口。此接口确保可以通过其name属性引用对象并实现Lifecycle,以便可以使用容器启动和停止对象。className的默认值是org.apache.catalina.core.StandardThreadExecutor</td>
</tr>
<tr>
<td>name</td>
<td>用于在server.xml中的其他位置引用此池的名称。该名称是必需的,必须是唯一的。</td>
</tr>
</tbody>
</table>
<ul>
<li><strong>StandardThreadExecutor属性</strong></li>
</ul>
<p>默认实现支持以下属性:</p>
<table>
<thead>
<tr>
<th>属性</th>
<th>描述</th>
</tr>
</thead>
<tbody>
<tr>
<td>threadPriority</td>
<td>(int)执行程序中线程的线程优先级,默认为 5(Thread.NORM_PRIORITY常量的值)</td>
</tr>
<tr>
<td>daemon</td>
<td>(boolean)线程是否应该是守护程序线程,默认为 true</td>
</tr>
<tr>
<td>namePrefix</td>
<td>(字符串)执行程序创建的每个线程的名称前缀。单个线程的线程名称将是namePrefix+threadNumber</td>
</tr>
<tr>
<td>maxThreads</td>
<td>(int)此池中活动线程的最大数量,默认为 200</td>
</tr>
<tr>
<td>minSpareThreads</td>
<td>(int)最小线程数(空闲和活动)始终保持活动状态,默认为 25</td>
</tr>
<tr>
<td>maxIdleTime</td>
<td>(int)空闲线程关闭之前的毫秒数,除非活动线程数小于或等于minSpareThreads。默认值为60000(1分钟)</td>
</tr>
<tr>
<td>maxQueueSize</td>
<td>(int)在我们拒绝之前可以排队等待执行的可运行任务的最大数量。默认值是Integer.MAX_VALUE</td>
</tr>
<tr>
<td>prestartminSpareThreads</td>
<td>(boolean)是否应该在启动Executor时启动minSpareThreads,默认值为 false</td>
</tr>
<tr>
<td>threadRenewalDelay</td>
<td>(long)如果配置了ThreadLocalLeakPreventionListener,它将通知此执行程序有关已停止的上下文。上下文停止后,池中的线程将被更新。为避免同时更新所有线程,此选项在任意2个线程的续订之间设置延迟。该值以ms为单位,默认值为1000ms。如果值为负,则不会续订线程。</td>
</tr>
</tbody>
</table>
<h3 id="lifecycle模板方法">Lifecycle模板方法</h3>
<p>先看核心变量:</p>
<pre><code class="language-java">// 任务队列
private TaskQueue taskqueue = null;

// 包装了一个ThreadPoolExecutor
protected ThreadPoolExecutor executor = null;
</code></pre>
<ul>
<li><strong>initInternal</strong>和<strong>destroyInternal</strong>默认父类实现</li>
</ul>
<pre><code class="language-java">@Override
protected void initInternal() throws LifecycleException {
    super.initInternal();
}
@Override
protected void destroyInternal() throws LifecycleException {
    super.destroyInternal();
}
</code></pre>
<ul>
<li><strong>startInternal方法</strong></li>
</ul>
<p>这个方法中,我们不难看出,就是初始化taskqueue,同时构造ThreadPoolExecutor的实例,后面Tomcat的StandardThreadExecutor的实现本质上通过ThreadPoolExecutor实现的。</p>
<pre><code class="language-java">/**
* Start the component and implement the requirements
* of {@link org.apache.catalina.util.LifecycleBase#startInternal()}.
*
* @exception LifecycleException if this component detects a fatal error
*that prevents this component from being used
*/
@Override
protected void startInternal() throws LifecycleException {

    taskqueue = new TaskQueue(maxQueueSize);
    TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
    executor.setThreadRenewalDelay(threadRenewalDelay);
    if (prestartminSpareThreads) {
      executor.prestartAllCoreThreads();
    }
    taskqueue.setParent(executor);

    setState(LifecycleState.STARTING);
}
</code></pre>
<ul>
<li><strong>stopInternal方法</strong></li>
</ul>
<p>代码很简单,关闭线程池后置null, 方便GC回收。</p>
<pre><code class="language-java">/**
* Stop the component and implement the requirements
* of {@link org.apache.catalina.util.LifecycleBase#stopInternal()}.
*
* @exception LifecycleException if this component detects a fatal error
*that needs to be reported
*/
@Override
protected void stopInternal() throws LifecycleException {

    setState(LifecycleState.STOPPING);
    if (executor != null) {
      executor.shutdownNow();
    }
    executor = null;
    taskqueue = null;
}
</code></pre>
<h3 id="核心executor方法">核心executor方法</h3>
<p>本质上就是调用ThreadPoolExecutor的实例的相关方法。</p>
<pre><code class="language-java">@Override
public void execute(Runnable command, long timeout, TimeUnit unit) {
    if (executor != null) {
      executor.execute(command,timeout,unit);
    } else {
      throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
    }
}


@Override
public void execute(Runnable command) {
    if (executor != null) {
      try {
            executor.execute(command);
      } catch (RejectedExecutionException rx) {
            //there could have been contention around the queue
            if (!((TaskQueue) executor.getQueue()).force(command)) {
                throw new RejectedExecutionException(sm.getString("standardThreadExecutor.queueFull"));
            }
      }
    } else {
      throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
    }
}
</code></pre>
<h3 id="动态调整线程池">动态调整线程池</h3>
<p>我们还注意到StandardThreadExecutor还实现了ResizeableExecutor,从名称上我们就可知道它是希望实现对线程池的动态调整,所以呢,它封装了一个ResizeableExecutor的接口,看下接口。</p>
<pre><code class="language-java">public interface ResizableExecutor extends Executor {

    /**
   * Returns the current number of threads in the pool.
   *
   * @return the number of threads
   */
    public int getPoolSize();

    public int getMaxThreads();

    /**
   * Returns the approximate number of threads that are actively executing
   * tasks.
   *
   * @return the number of threads
   */
    public int getActiveCount();

    public boolean resizePool(int corePoolSize, int maximumPoolSize);

    public boolean resizeQueue(int capacity);

}
</code></pre>
<p>前三个方法比较简单,我们看下后两个方法是如何实现的, 其实也很简单。</p>
<pre><code class="language-java">@Override
public boolean resizePool(int corePoolSize, int maximumPoolSize) {
    if (executor == null)
      return false;

    executor.setCorePoolSize(corePoolSize);
    executor.setMaximumPoolSize(maximumPoolSize);
    return true;
}

// 默认没有实现
@Override
public boolean resizeQueue(int capacity) {
    return false;
}
</code></pre>
<h3 id="补充taskqueue">补充TaskQueue</h3>
<p>我们知道工作队列是有TaskQueue保障的,它集成自LinkedBlockingQueue(一个阻塞的链表队列),来看下源代码吧。</p>
<pre><code class="language-java">/**
* As task queue specifically designed to run with a thread pool executor. The
* task queue is optimised to properly utilize threads within a thread pool
* executor. If you use a normal queue, the executor will spawn threads when
* there are idle threads and you wont be able to force items onto the queue
* itself.
*/
public class TaskQueue extends LinkedBlockingQueue&lt;Runnable&gt; {

    private static final long serialVersionUID = 1L;
    protected static final StringManager sm = StringManager
            .getManager("org.apache.tomcat.util.threads.res");
    private static final int DEFAULT_FORCED_REMAINING_CAPACITY = -1;

    private transient volatile ThreadPoolExecutor parent = null;

    // No need to be volatile. This is written and read in a single thread
    // (when stopping a context and firing the listeners)
    private int forcedRemainingCapacity = -1;

    public TaskQueue() {
      super();
    }

    public TaskQueue(int capacity) {
      super(capacity);
    }

    public TaskQueue(Collection&lt;? extends Runnable&gt; c) {
      super(c);
    }

    public void setParent(ThreadPoolExecutor tp) {
      parent = tp;
    }

    public boolean force(Runnable o) {
      if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
      return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
    }

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
      if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
      return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
      if (parent==null) return super.offer(o);
      //we are maxed out on threads, simply queue the object
      if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
      //we have idle threads, just add it to the queue
      if (parent.getSubmittedCount()&lt;=(parent.getPoolSize())) return super.offer(o);
      //if we have less threads than maximum force creation of a new thread
      if (parent.getPoolSize()&lt;parent.getMaximumPoolSize()) return false;
      //if we reached here, we need to add it to the queue
      return super.offer(o);
    }


    @Override
    public Runnable poll(long timeout, TimeUnit unit)
            throws InterruptedException {
      Runnable runnable = super.poll(timeout, unit);
      if (runnable == null &amp;&amp; parent != null) {
            // the poll timed out, it gives an opportunity to stop the current
            // thread if needed to avoid memory leaks.
            parent.stopCurrentThreadIfNeeded();
      }
      return runnable;
    }

    @Override
    public Runnable take() throws InterruptedException {
      if (parent != null &amp;&amp; parent.currentThreadShouldBeStopped()) {
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                  TimeUnit.MILLISECONDS);
            // yes, this may return null (in case of timeout) which normally
            // does not occur with take()
            // but the ThreadPoolExecutor implementation allows this
      }
      return super.take();
    }

    @Override
    public int remainingCapacity() {
      if (forcedRemainingCapacity &gt; DEFAULT_FORCED_REMAINING_CAPACITY) {
            // ThreadPoolExecutor.setCorePoolSize checks that
            // remainingCapacity==0 to allow to interrupt idle threads
            // I don't see why, but this hack allows to conform to this
            // "requirement"
            return forcedRemainingCapacity;
      }
      return super.remainingCapacity();
    }

    public void setForcedRemainingCapacity(int forcedRemainingCapacity) {
      this.forcedRemainingCapacity = forcedRemainingCapacity;
    }

    void resetForcedRemainingCapacity() {
      this.forcedRemainingCapacity = DEFAULT_FORCED_REMAINING_CAPACITY;
    }

}
</code></pre>
<p>TaskQueue这个任务队列是专门为线程池而设计的。优化任务队列以适当地利用线程池执行器内的线程。</p>
<p>如果你使用一个普通的队列,当有空闲线程executor将产生线程并且你不能强制将任务添加到队列。</p>
<h3 id="为什么不是直接使用threadpoolexecutor">为什么不是直接使用ThreadPoolExecutor</h3>
<p>这里你是否考虑过一个问题,为什么Tomcat会自己构造一个StandardThreadExecutor而不是直接使用ThreadPoolExecutor?</p>
<p>从上面的代码,你会发现这里只是使用executor只是使用了execute的两个主要方法,它希望让调用层屏蔽掉ThreadPoolExecutor的其它方法:</p>
<ul>
<li>
<p>它体现的原则:<strong>最少知识原则</strong>: 只和你的密友谈话。也就是说客户对象所需要交互的对象应当尽可能少</p>
</li>
<li>
<p>它体现的设计模式:结构型 - 外观(Facade):外观模式(Facade pattern),它提供了一个统一的接口,用来访问子系统中的一群接口,从而让子系统更容易使用</p>
</li>
</ul>
   

</div>
<div id="MySignature" role="contentinfo">
    <p>本文来自在线网站:seven的菜鸟成长之路,作者:seven,转载请注明原文链接:www.seven97.top</p><br><br>
来源:https://www.cnblogs.com/sevencoding/p/19927962
頁: [1]
查看完整版本: Tomcat 线程池的设计与实现:StandardThreadExecutor