同路人相信你 發表於 2026-2-2 09:00:00

线程如何停止?线程之间如何协作?线程之间的异常如何处理?

<h2 id="线程停止">线程停止</h2>
<h3 id="stop方法">stop方法</h3>
<p>stop 方法虽然可以停止线程,但它已经是不建议使用的废弃方法了,这一点可以通过 Thread 类中的源码发现,stop 源码如下:</p>
<p><img src="https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404251004858.gif" alt="" loading="lazy"></p>
<p>stop 方法是被 @Deprecated 修饰的不建议使用的过期方法,并且在注释的第一句话就说明了 stop 方法为非安全的方法。</p>
<p>原因在于它在终止一个线程时会强制中断线程的执行,不管run方法是否执行完了,并且还会释放这个线程所持有的所有的锁对象。这一现象会被其它因为请求锁而阻塞的线程看到,使他们继续向下执行。这就会造成数据的不一致。</p>
<p>比如银行转账,从A账户向B账户转账500元,这一过程分为三步,第一步是从A账户中减去500元,假如到这时线程就被stop了,那么这个线程就会释放它所取得锁,然后其他的线程继续执行,这样A账户就莫名其妙的少了500元而B账户也没有收到钱。这就是stop方法的不安全性。</p>
<h3 id="设置标志位">设置标志位</h3>
<p>如果线程的run方法中执行的是一个重复执行的循环,可以提供一个标记来控制循环是否继续</p>
<pre><code class="language-java">class FlagThread extends Thread {
    // 自定义中断标识符
    public volatile boolean isInterrupt = false;
    @Override
    public void run() {
      // 如果为 true -&gt; 中断执行
      while (!isInterrupt) {
            // 业务逻辑处理
      }
    }
}
</code></pre>
<p>但自定义中断标识符的问题在于:线程中断的不够及时。因为线程在执行过程中,无法调用 while(!isInterrupt) 来判断线程是否为终止状态,它只能在下一轮运行时判断是否要终止当前线程,所以它中断线程不够及时,比如以下代码:</p>
<pre><code class="language-java">class InterruptFlag {
    // 自定义的中断标识符
    private static volatile boolean isInterrupt = false;

    public static void main(String[] args) throws InterruptedException {
      // 创建可中断的线程实例
      Thread thread = new Thread(() -&gt; {
            while (!isInterrupt) { // 如果 isInterrupt=true 则停止线程
                System.out.println("thread 执行步骤1:线程即将进入休眠状态");
                try {
                  // 休眠 1s
                  Thread.sleep(1000);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
                System.out.println("thread 执行步骤2:线程执行了任务");
            }
      });
      thread.start(); // 启动线程

      // 休眠 100ms,等待 thread 线程运行起来
      Thread.sleep(100);
      System.out.println("主线程:试图终止线程 thread");
      // 修改中断标识符,中断线程
      isInterrupt = true;
    }
}
</code></pre>
<p>输出:我们期望的是:线程执行了步骤 1 之后,收到中断线程的指令,然后就不要再执行步骤 2 了,但从上述执行结果可以看出,使用自定义中断标识符是没办法实现我们预期的结果的,这就是自定义中断标识符,响应不够及时的问题。</p>
<p><img src="https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404251004853.gif" alt="" loading="lazy"></p>
<h3 id="interrupted中断">interrupted中断</h3>
<p>这种方式需要在while循环中判断使用</p>
<p>使用 interrupt 方法可以给执行任务的线程,发送一个中断线程的指令,它并不直接中断线程,而是发送一个中断线程的信号,把是否正在中断线程的主动权交给代码编写者。相比于自定义中断标识符而然,它能更及时的接收到中断指令,如下代码所示:</p>
<pre><code class="language-java">public static void main(String[] args) throws InterruptedException {
    // 创建可中断的线程实例
    Thread thread = new Thread(() -&gt; {
      while (!Thread.currentThread().isInterrupted()) {
            System.out.println("thread 执行步骤1:线程即将进入休眠状态");
            try {
                // 休眠 1s
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("thread 线程接收到中断指令,执行中断操作");
                // 中断当前线程的任务执行
                break;
            }
            System.out.println("thread 执行步骤2:线程执行了任务");
      }
    });
    thread.start(); // 启动线程

    // 休眠 100ms,等待 thread 线程运行起来
    Thread.sleep(100);
    System.out.println("主线程:试图终止线程 thread");
    // 修改中断标识符,中断线程
    thread.interrupt();
}
</code></pre>
<p>输出:</p>
<p><img src="https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404251004555.gif" alt="" loading="lazy"></p>
<p>从上述结果可以看出,线程在接收到中断指令之后,立即中断了线程,相比于上一种自定义中断标识符的方法来说,它能更及时的响应中断线程指令。</p>
<h3 id="利用interruptedexception">利用interruptedException</h3>
<p>这种方式 不 需要在while循环中判断使用</p>
<p>如果线程因为执行join(),sleep或者wait()而进入阻塞状态,此时想要停止它,可以调用interrupt(),程序会抛出interruptedException异常。可以利用这个异常终止线程</p>
<pre><code class="language-java">public void run() {
    System.out.println(this.getName() + "start");
    int i=0;
    //while (!Thread.interrupted()){
    while (!Thread.currentThread().isInterrupted()){
      try {
            Thread.sleep(10000);
      } catch (InterruptedException e) {
            //e.printStackTrace();
            System.out.println("中断线程");
            break;//通过识别到异常来中断
      }
      System.out.println(this.getName() + " "+ i);
      i++;
    }
    System.out.println(this.getName() + "end");
}
</code></pre>
<p><img src="https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404251004577.gif" alt="" loading="lazy"></p>
<h3 id="executor-的中断操作">Executor 的中断操作</h3>
<p>调用 Executor 的 shutdown() 方法会等待线程都执行完毕之后再关闭,但是如果调用的是 shutdownNow() 方法,则相当于调用每个线程的 interrupt() 方法。</p>
<p>以下使用 Lambda 创建线程,相当于创建了一个匿名内部线程。</p>
<pre><code class="language-java">public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -&gt; {
      try {
            Thread.sleep(2000);
            System.out.println("Thread run");
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
    });
    executorService.shutdownNow();
    System.out.println("Main run");
}
</code></pre>
<pre><code class="language-java">Main run
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
    at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
</code></pre>
<p>如果只想中断 Executor 中的一个线程,可以通过使用 submit() 方法来提交一个线程,它会返回一个 <code>Future&lt;?&gt;</code> 对象,通过调用该对象的 cancel(true) 方法就可以中断线程。</p>
<pre><code class="language-java">Future&lt;?&gt; future = executorService.submit(() -&gt; {
    // ..
});
future.cancel(true);
</code></pre>
<h2 id="线程之间的协作">线程之间的协作</h2>
<p>当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。</p>
<h3 id="join">join()</h3>
<h4 id="案例">案例</h4>
<p>在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。</p>
<p>对于以下代码,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join() 方法,b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出。</p>
<pre><code class="language-java">public class JoinExample {

    private class A extends Thread {
      @Override
      public void run() {
            System.out.println("A");
      }
    }

    private class B extends Thread {

      private A a;

      B(A a) {
            this.a = a;
      }

      @Override
      public void run() {
            try {
                a.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B");
      }
    }

    public void test() {
      A a = new A();
      B b = new B(a);
      b.start();
      a.start();
    }
}
</code></pre>
<pre><code class="language-java">public static void main(String[] args) {
    JoinExample example = new JoinExample();
    example.test();
}
</code></pre>
<pre><code class="language-java">A
B
</code></pre>
<h4 id="原理">原理</h4>
<p><img src="https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404251004601.gif" alt="" loading="lazy"></p>
<pre><code class="language-java">public final synchronized void join(long millis)
throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis &lt; 0) {
      throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
      while (isAlive()) {//检查线程是否存活,只要线程还没结束,主线程就会一直阻塞
            wait(0);//这里的wait调用的本地方法。
      }
    } else {//等待一段指定的时间
      while (isAlive()) {
            long delay = millis - now;
            if (delay &lt;= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
      }
    }
}
</code></pre>
<p>从源码来看,实际上join方法就是调用了wait方法来使得线程阻塞,一直到线程结束运行。注意到,join方法前的synchronized修饰符,它相当于:</p>
<pre><code class="language-java">public final void join(long millis){
synchronized(this){
      //代码块
    }
}
</code></pre>
<p>也就是说加锁的对象即调用这个锁的线程对象,在main()方法中即为t1,持有这个锁的是主线程即main()方法,也就是说代码相当于如下:</p>
<pre><code class="language-java">//t1.join()前的代码
synchronized (t1) {
// 调用者线程进入 t1 的 waitSet 等待, 直到 t1 运行结束
while (t1.isAlive()) {
t1.wait(0);
}
}
//t1.join()后的代码
</code></pre>
<p>也因此主线程进入等待队列,直到 t1 线程结束。</p>
<blockquote>
<p>这里可能会有很多人会有疑惑,为什么t1.wait了,阻塞的不是t1,而是主线程?</p>
<p>实际上,如果要阻塞t1,那么就应该在t1的run 方法里进行阻塞,如在run方法里写wait();(当然还有suspend方法,这属于非Java层面,另说)</p>
<p>而这里的 wait 方法被调用以后,是让持有锁的线程进入等待队列,即主线程调用,因此 t1 线程并不会被阻塞,阻塞的是主线程。</p>
</blockquote>
<p>也就是说,join方法是一个同步方法,当主线程调用t1.join()方法时,主线程先获得了t1对象的锁,随后进入方法,调用了t1对象的wait()方法,使主线程进入了t1对象的等待池。</p>
<p>那么问题在于,这里只看到了wait方法,却并没有看到notify或者是notifyAll方法,那么主线程在那里被唤醒呢?</p>
<p>这里参考jvm的代码:</p>
<pre><code class="language-java">static void ensure_join(JavaThread* thread) {

Handle threadObj(thread, thread-&gt;threadObj());

ObjectLocker lock(threadObj, thread);

hread-&gt;clear_pending_exception();

//这一句中的TERMINATED表示这是线程结束以后运行的
java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);

    //这里会清楚native线程,isAlive()方法会返回false
    java_lang_Thread::set_thread(threadObj(), NULL);

//thread就是当前线程,调用这个方法唤醒等待的线程。
lock.notify_all(thread);

hread-&gt;clear_pending_exception();

}
</code></pre>
<p>其实是jvm虚拟机中存在方法lock.notify_all(thread),在t1线程结束以后,会调用该方法,最后唤醒主线程。</p>
<p>所以简化一下,流程即:</p>
<p><img src="https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404251004626.gif" alt="" loading="lazy"></p>
<h3 id="wait-notify-notifyall">wait() notify() notifyAll()</h3>
<p>调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程。</p>
<p>它们都属于 Object 的一部分,而不属于 Thread。</p>
<p>只能用在同步方法synchronized或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateExeception。</p>
<p>使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。</p>
<pre><code class="language-java">public class WaitNotifyExample {
    public synchronized void before() {
      System.out.println("before");
      notifyAll();
    }

    public synchronized void after() {
      try {
            wait();
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      System.out.println("after");
    }
}
</code></pre>
<pre><code class="language-java">public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    WaitNotifyExample example = new WaitNotifyExample();
    executorService.execute(() -&gt; example.after());
    executorService.execute(() -&gt; example.before());
}
</code></pre>
<pre><code class="language-java">before
after
</code></pre>
<p><strong>wait() 和 sleep() 的区别</strong></p>
<ul>
<li>
<p>wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法;</p>
</li>
<li>
<p>wait() 会释放锁,sleep() 不会。</p>
</li>
</ul>
<h3 id="await-signal-signalall">await() signal() signalAll()</h3>
<p>java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。相比于 wait() 这种等待方式,await() 可以指定等待的条件,因此更加灵活。</p>
<p>使用 Lock 来获取一个 Condition 对象。</p>
<pre><code class="language-java">public class AwaitSignalExample {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void before() {
      lock.lock();
      try {
            System.out.println("before");
            condition.signalAll();
      } finally {
            lock.unlock();
      }
    }

    public void after() {
      lock.lock();
      try {
            condition.await();
            System.out.println("after");
      } catch (InterruptedException e) {
            e.printStackTrace();
      } finally {
            lock.unlock();
      }
    }
}
</code></pre>
<pre><code class="language-java">public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    AwaitSignalExample example = new AwaitSignalExample();
    executorService.execute(() -&gt; example.after());
    executorService.execute(() -&gt; example.before());
}
</code></pre>
<pre><code class="language-java">before
after
</code></pre>
<h2 id="线程中的异常处理">线程中的异常处理</h2>
<h3 id="runnable中异常如何被吞掉">Runnable中异常如何被吞掉</h3>
<p><code>Runnable</code> 接口的 <code>run()</code> 方法不允许抛出任何被检查的异常(checked exceptions),只能处理或抛出运行时异常(unchecked exceptions)。当在 <code>run()</code> 方法内发生异常时,如果没有显式地捕获和处理这些异常,它们通常会在执行该 <code>Runnable</code> 的线程中被“吞掉”,即异常会导致线程终止,但不会影响其他线程的执行。</p>
<pre><code class="language-java">public void uncaughtException(Thread t, Throwable e) {
   if (parent != null) {
      parent.uncaughtException(t, e);
   } else {
      Thread.UncaughtExceptionHandler ueh =
            Thread.getDefaultUncaughtExceptionHandler();
      if (ueh != null) {
            ueh.uncaughtException(t, e);
      } else if (!(e instanceof ThreadDeath)) {
            System.err.print("Exception in thread \""
                           + t.getName() + "\" ");
            e.printStackTrace(System.err);
      }
    }
}
</code></pre>
<p>解决方案:</p>
<ol>
<li>
<p>在run方法中显示的捕获异常</p>
<pre><code class="language-java">public void run() {
    try {
      // 可能抛出异常的代码
    } catch (Exception e) {
      // 记录日志或处理异常
      throw new RuntimeException(e);
    }
}
</code></pre>
</li>
<li>
<p>为创建的线程设置一个<code>UncaughtExceptionHandler</code></p>
<pre><code class="language-java">Thread t = new Thread(() -&gt; {
   int i = 1 / 0;
}, "t1");
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
   @Override
   public void uncaughtException(Thread t, Throwable e) {
      logger.error('---', e);
   }
});
</code></pre>
</li>
<li>
<p>使用<code>Callable</code>代替<code>Runnable</code>,<code>Callable</code>的<code>call</code>方法允许抛出异常,然后可以通过提交给<code>ExecutorService</code>返回的<code>Future</code>来捕获和处理这些异常</p>
<pre><code class="language-java">ExecutorService executor = Executors.newFixedThreadPool(1);
Future&lt;?&gt; future = executor.submit(() -&gt; {
    // 可能抛出异常的代码
});

try {
    future.get(); // 这里会捕获到Callable中的异常
} catch (ExecutionException e) {
    Throwable cause = e.getCause(); // 获取原始异常
}
</code></pre>
</li>
</ol>
<h3 id="callable中异常如何被吞掉">Callable中异常如何被吞掉</h3>
<pre><code class="language-java">class MyCallable implements Callable&lt;String&gt; {
    @Override
    public String call() throws Exception {
      System.out.println("===&gt; 开始执行callable");
      int i = 1 / 0; //异常的地方
      return "callable的结果";
    }
}

public class CallableAndRunnableTest {

    public static void main(String[] args) {
      System.out.println(" =========&gt; main start ");
      ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue&lt;&gt;(100));
      Future&lt;String&gt; submit = threadPoolExecutor.submit(new MyCallable());
      try {
            TimeUnit.SECONDS.sleep(2);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      System.out.println(" =========&gt; main end ");
    }
}
</code></pre>
<p>运行结果</p>
<pre><code class="language-java"> =========&gt; main start
===&gt; 开始执行callable
=========&gt; main end
</code></pre>
<p>源码如下:</p>
<pre><code class="language-java">public &lt;T&gt; Future&lt;T&gt; submit(Callable&lt;T&gt; task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture&lt;T&gt; ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected &lt;T&gt; RunnableFuture&lt;T&gt; newTaskFor(Callable&lt;T&gt; callable) {
    return new FutureTask&lt;T&gt;(callable);
}
</code></pre>
<p><code>RunableFuture&lt;T&gt;</code> 是个接口,但是它继承了Runnable 接口 , 实现类是 FutureTask ,因此就需要看下 FutureTask里的run方法 是不是和 构造时的Callable 有关系:</p>
<pre><code class="language-java">public void run() {
   // 状态不属于初始状态的情况下,不进行后续逻辑处理
   // 那也就是run 方法只能执行一次
   if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                 null, Thread.currentThread()))
      return;
    try {
      Callable&lt;V&gt; c = callable;
      if (c != null &amp;&amp; state == NEW) {
            V result;
            //
            boolean ran;
            try {
                // 执行 Callable 里的 call 方法 ,将结果存入result变量中
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
               // call 方法异常 , 记录下异常结果
                setException(ex);
            }
            // call 方法正常执行完毕 ,进行结果存储
            if (ran)
                set(result);
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s &gt;= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
</code></pre>
<p>接下来就要看,如果存储正常结果的<code>set(result)</code>方法 和存储异常结果的 <code>setException(ex)</code> 方法</p>
<pre><code class="language-java">protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = t;
      UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
      finishCompletion();
    }
}

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = v;
      UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
      finishCompletion();
    }
}
</code></pre>
<p>这两个代码都做了一个操作,就是将正常结果<code>result</code> 和 异常结果 <code>exception</code> 都赋值给了 <code>outcome</code> 这个变量 。</p>
<p>接着再看下future的get方法</p>
<pre><code class="language-java">//这里有必须看下Task的结束时的状态,如果正常结束,状态为 NORMAL , 异常结果,状态为EXCEPTIONAL 。 看下几个状态的定义,如下:
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL= 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED= 6;

/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // NORMAL(2) 、EXCEPTIONAL(3) 都大于 COMPLETING(1),所以Task结束之后,不会走该if
    if (s &lt;= COMPLETING)
         s = awaitDone(false, 0L);
    // 重点: 返回结果
    return report(s);
}

private V report(int s) throws ExecutionException {
    // 之前正常结果或者异常都存放在Object outcomme 中了
    Object x = outcome;
    // 正常返回
    if (s == NORMAL)
      return (V)x;
    // EXCEPTIONAL(3) 小于 CANCELLED(4) ,所以不会走该if分支,直接后续的throw 抛异常的逻辑
    if (s &gt;= CANCELLED)
      throw new CancellationException();
    // 不等于NORMAL 且 大于等于 CANCELLED,再结合 调用 report(int s ) 之前也做了state 的过滤
    //到这一步,那只能是EXCEPTIONAL(3)
    throw new ExecutionException((Throwable)x);
}
</code></pre>
<p>因此可以通过get方法获取到异常结果</p>


</div>
<div id="MySignature" role="contentinfo">
    <p>本文来自在线网站:seven的菜鸟成长之路,作者:seven,转载请注明原文链接:www.seven97.top</p><br><br>
来源:https://www.cnblogs.com/sevencoding/p/19557016
頁: [1]
查看完整版本: 线程如何停止?线程之间如何协作?线程之间的异常如何处理?