Java 原生异步编程与Spring 异步编程 详解
<h3 id="简介">简介</h3><p><code>Java</code> 异步编程是现代高性能应用开发的核心技术之一,它允许程序在执行耗时操作(如网络请求、文件 <code>IO</code>)时不必阻塞主线程,从而提高系统吞吐量和响应性。</p>
<h3 id="异步-vs-同步">异步 vs 同步</h3>
<ul>
<li>同步:任务按顺序执行,后续任务需等待前任务完成。</li>
</ul>
<pre><code class="language-java">public String syncTask() {
// 模拟耗时操作
Thread.sleep(1000);
return "Result";
}
</code></pre>
<ul>
<li>异步:任务并行或在后台执行,主线程立即返回。</li>
</ul>
<pre><code class="language-java">public CompletableFuture<String> asyncTask() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Result";
});
}
</code></pre>
<h3 id="java-原生异步支持">Java 原生异步支持</h3>
<h4 id="手动创建线程">手动创建线程</h4>
<p>最基本的异步方式是创建 <code>Thread</code> 或实现 <code>Runnable</code>。</p>
<ul>
<li>缺点:管理线程池困难,资源浪费,难以复用,缺乏结果处理机制。</li>
</ul>
<pre><code class="language-java">public class BasicAsync {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println("Task completed");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
System.out.println("Main thread continues");
}
}
</code></pre>
<h4 id="使用-executorservice">使用 ExecutorService</h4>
<ul>
<li>
<p>优点:提供线程池管理,复用线程,减少创建开销</p>
</li>
<li>
<p>缺点:<code>Future.get()</code> 是阻塞的,难以链式调用</p>
</li>
</ul>
<pre><code class="language-java">import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
Thread.sleep(1000);
System.out.println("Task 1 completed");
});
executor.submit(() -> {
Thread.sleep(500);
System.out.println("Task 2 completed");
});
executor.shutdown();
}
}
</code></pre>
<h5 id="常用方法">常用方法:</h5>
<ul>
<li>
<p><code>submit(Runnable)</code>:提交无返回值的任务。</p>
</li>
<li>
<p><code>submit(Callable)</code>:提交有返回值的任务,返回 <code>Future</code>。</p>
</li>
<li>
<p><code>shutdown()</code>:关闭线程池,不接受新任务。</p>
</li>
</ul>
<h5 id="线程池类型">线程池类型:</h5>
<ul>
<li>
<p><code>Executors.newFixedThreadPool(n)</code>:固定大小线程池。</p>
</li>
<li>
<p><code>Executors.newCachedThreadPool()</code>:动态调整线程数。</p>
</li>
<li>
<p><code>Executors.newSingleThreadExecutor()</code>:单线程执行。</p>
</li>
</ul>
<p>线程池类型对比:</p>
<table>
<thead>
<tr>
<th>类型</th>
<th>特性</th>
<th>适用场景</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>FixedThreadPool</code></td>
<td>固定线程数,无界队列</td>
<td>负载稳定的长期任务</td>
</tr>
<tr>
<td><code>CachedThreadPool</code></td>
<td>自动扩容,60秒闲置回收</td>
<td>短时突发任务</td>
</tr>
<tr>
<td><code>ScheduledThreadPool</code></td>
<td>支持定时/周期性任务</td>
<td>心跳检测、定时报表</td>
</tr>
<tr>
<td><code>WorkStealingPool</code></td>
<td>使用 <code>ForkJoinPool</code>,任务窃取算法</td>
<td>计算密集型并行任务</td>
</tr>
</tbody>
</table>
<h4 id="futurejava-5">Future(Java 5+)</h4>
<pre><code class="language-java">import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Task completed";
});
// 主线程继续
System.out.println("Doing other work");
// 阻塞获取结果
String result = future.get(); // 等待任务完成
System.out.println(result);
executor.shutdown();
}
}
</code></pre>
<h5 id="方法">方法</h5>
<ul>
<li>
<p><code>get()</code>:阻塞获取结果。</p>
</li>
<li>
<p><code>isDone()</code>:检查任务是否完成。</p>
</li>
<li>
<p><code>cancel(boolean)</code>:取消任务。</p>
</li>
</ul>
<h5 id="缺点">缺点</h5>
<ul>
<li>
<p><code>get()</code> 是阻塞的,不利于非阻塞编程。</p>
</li>
<li>
<p>难以组合多个异步任务。</p>
</li>
</ul>
<h4 id="completablefuturejava-8">CompletableFuture(Java 8+)</h4>
<p>支持链式调用,真正现代化异步编程方式。</p>
<pre><code class="language-java">import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Task result";
})
.thenApply(result -> result.toUpperCase()) // 转换结果
.thenAccept(result -> System.out.println(result)) // 消费结果
.exceptionally(throwable -> {
System.err.println("Error: " + throwable.getMessage());
return null;
});
System.out.println("Main thread continues");
}
}
</code></pre>
<h4 id="虚拟线程java-21project-loom">虚拟线程(Java 21+,Project Loom)</h4>
<p>虚拟线程是 <code>Java 21</code> 引入的轻量级线程,适合高并发 I/O 密集型任务。</p>
<pre><code class="language-java">public class VirtualThreadExample {
public static void main(String[] args) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
try {
Thread.sleep(1000);
System.out.println("Task completed in virtual thread");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println("Main thread continues");
}
}
</code></pre>
<p><strong>优势</strong></p>
<ul>
<li>
<p>轻量级,创建开销极低(相比传统线程)。</p>
</li>
<li>
<p>适合 I/O 密集型任务(如 HTTP 请求、数据库查询)。</p>
</li>
</ul>
<p><strong>注意</strong></p>
<ul>
<li>
<p>不适合 <code>CPU</code> 密集型任务(可能导致线程饥饿)。</p>
</li>
<li>
<p><code>Spring Boot 3.2+</code> 支持虚拟线程(需配置)。</p>
</li>
</ul>
<h4 id="阻塞-vs-非阻塞">阻塞 vs 非阻塞</h4>
<table>
<thead>
<tr>
<th>类型</th>
<th>是否阻塞</th>
<th>获取结果方式</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>Future<T></code></td>
<td>✅ 是</td>
<td><code>future.get()</code>(阻塞)</td>
</tr>
<tr>
<td><code>CompletableFuture<T></code></td>
<td>✅(get) ❌(then)</td>
<td>支持非阻塞链式处理</td>
</tr>
<tr>
<td><code>@Async + Future/CompletableFuture</code></td>
<td>✅</td>
<td><code>get()</code> 或回调</td>
</tr>
<tr>
<td><code>WebFlux</code></td>
<td>❌ 完全非阻塞</td>
<td>响应式 <code>Mono</code> / <code>Flux</code></td>
</tr>
</tbody>
</table>
<h4 id="futuret-vs-completablefuturet核心对比"><code>Future<T></code> vs <code>CompletableFuture<T></code>:核心对比</h4>
<table>
<thead>
<tr>
<th>功能</th>
<th><code>Future<T></code></th>
<th><code>CompletableFuture<T></code></th>
</tr>
</thead>
<tbody>
<tr>
<td>Java 版本</td>
<td>Java 5+</td>
<td>Java 8+</td>
</tr>
<tr>
<td>是否可组合</td>
<td>❌ 不支持</td>
<td>✅ 支持链式组合、并行执行</td>
</tr>
<tr>
<td>支持异步回调</td>
<td>❌ 无</td>
<td>✅ 有 <code>.thenApply()</code>、<code>.thenAccept()</code> 等</td>
</tr>
<tr>
<td>支持异常处理</td>
<td>❌ 无</td>
<td>✅ 有 <code>.exceptionally()</code> 等</td>
</tr>
<tr>
<td>可取消</td>
<td>✅ 支持 <code>cancel()</code></td>
<td>✅ 也支持</td>
</tr>
<tr>
<td>阻塞获取</td>
<td>✅ <code>get()</code> 阻塞</td>
<td>✅ <code>get()</code> 阻塞(也可非阻塞)</td>
</tr>
<tr>
<td>使用场景</td>
<td>简单线程任务</td>
<td>多异步任务组合、复杂控制流</td>
</tr>
</tbody>
</table>
<h3 id="spring-异步编程基于-async">Spring 异步编程(基于 @Async)</h3>
<h4 id="配置类或启动类启用异步支持">配置类或启动类启用异步支持</h4>
<pre><code class="language-java">@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
</code></pre>
<pre><code class="language-java">@Configuration
@EnableAsync
public class AsyncConfig {
}
</code></pre>
<h4 id="无返回值用法">无返回值用法</h4>
<pre><code class="language-java">// 无返回值的异步方法
@Async
public void sendEmail(String to) {
System.out.println("异步发送邮件给: " + to);
try { Thread.sleep(2000); } catch (InterruptedException e) {}
System.out.println("邮件发送完成");
}
</code></pre>
<h4 id="使用-futuret">使用 <code>Future<T></code></h4>
<p><strong>创建异步方法</strong></p>
<pre><code class="language-java">@Service
public class AsyncService {
@Async
public Future<String> processTask() {
// 模拟耗时操作
return new AsyncResult<>("Task completed");
}
}
</code></pre>
<p><strong>调用并获取结果:</strong></p>
<pre><code class="language-java">@Autowired
private AsyncService asyncService;
public void executeTask() throws Exception {
Future<String> future = asyncService.processTask();
String result = future.get(); // 阻塞等待结果
}
</code></pre>
<h4 id="使用-completablefuturet">使用 <code>CompletableFuture<T></code></h4>
<p><strong>创建异步方法</strong></p>
<pre><code class="language-java">@Async
public CompletableFuture<String> asyncMethod() {
return CompletableFuture.completedFuture("Async Result");
}
</code></pre>
<p><strong>调用方式:</strong></p>
<pre><code class="language-java">CompletableFuture<String> result = asyncService.asyncMethod();
// 非阻塞,可以做其他事
String value = result.get(); // 阻塞获取
</code></pre>
<h4 id="线程池配置">线程池配置</h4>
<h5 id="使用自定义配置类">使用自定义配置类</h5>
<pre><code class="language-java">@Configuration
public class AsyncConfig {
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // 核心线程数
executor.setMaxPoolSize(20); // 最大线程数
executor.setQueueCapacity(100); // 队列容量
executor.setKeepAliveSeconds(30); // 空闲线程存活时间
executor.setThreadNamePrefix("async-task-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
// 指定线程池
@Async("taskExecutor")
public Future<String> customPoolTask() { ... }
</code></pre>
<h5 id="使用配置文件">使用配置文件</h5>
<pre><code class="language-yml"># application.yml
spring:
task:
execution:
pool:
core-size: 5
max-size: 20
queue-capacity: 100
thread-name-prefix: async-
shutdown:
await-termination: true
terminate-on-timeout: true
</code></pre>
<h4 id="spring-webflux-示例">Spring WebFlux 示例</h4>
<pre><code class="language-java">@Service
public class UserService {
public Mono<String> getUser() {
return Mono.just("用户信息").delayElement(Duration.ofSeconds(2));
}
public Flux<String> getAllUsers() {
return Flux.just("用户1", "用户2", "用户3").delayElements(Duration.ofSeconds(1));
}
}
</code></pre>
<pre><code class="language-java">@RestController
@RequestMapping("/users")
public class UserController {
@Autowired
private UserService userService;
@GetMapping("/one")
public Mono<String> getUser() {
return userService.getUser();
}
@GetMapping("/all")
public Flux<String> getAllUsers() {
return userService.getAllUsers();
}
}
</code></pre>
<p>调用时非阻塞行为体现</p>
<ul>
<li>
<p><code>Mono<String></code> 表示未来异步返回一个值;</p>
</li>
<li>
<p><code>Flux<String></code> 表示异步返回多个值;</p>
</li>
<li>
<p>请求立即返回 <code>Publisher</code>,只有订阅时才开始执行(懒执行、非阻塞);</p>
</li>
<li>
<p>它不占用线程,不会“卡死线程”等待值返回。</p>
</li>
</ul>
<h4 id="springboot-集成示例">SpringBoot 集成示例</h4>
<ul>
<li>标记 <code>@Async</code> 注解:</li>
</ul>
<p><code>@Async</code> 标记方法为异步执行,<code>Spring</code> 在线程池中运行该方法。</p>
<pre><code class="language-java">import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class AsyncService {
@Async
public CompletableFuture<String> doAsyncTask() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return CompletableFuture.completedFuture("Task completed");
}
}
</code></pre>
<ul>
<li>启用异步</li>
</ul>
<p>在主类或配置类上添加 <code>@EnableAsync</code>。</p>
<pre><code class="language-java">@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
</code></pre>
<ul>
<li>控制器调用异步方法</li>
</ul>
<pre><code class="language-java">@RestController
public class AsyncController {
@Autowired
private AsyncService asyncService;
@GetMapping("/async")
public String triggerAsync() {
asyncService.doAsyncTask().thenAccept(result -> System.out.println(result));
return "Task triggered";
}
}
</code></pre>
<ul>
<li>自定义线程池</li>
</ul>
<p><code>Spring</code> 默认使用 <code>SimpleAsyncTaskExecutor</code>,不适合生产环境。推荐配置自定义线程池。</p>
<pre><code class="language-java">import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class AsyncConfig {
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
}
</code></pre>
<ul>
<li>指定线程池:</li>
</ul>
<pre><code class="language-java">@Async("taskExecutor")
public CompletableFuture<String> doAsyncTask() {
// 异步逻辑
}
</code></pre>
<ul>
<li>为 <code>@Async</code> 方法定义全局异常处理器</li>
</ul>
<pre><code class="language-java">@Component
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
System.err.println("Async error: " + ex.getMessage());
}
}
</code></pre>
<ul>
<li><code>Spring Boot</code> 测试:</li>
</ul>
<pre><code class="language-java">@SpringBootTest
public class AsyncServiceTest {
@Autowired
private AsyncService asyncService;
@Test
void testAsync() throws Exception {
CompletableFuture<String> future = asyncService.doAsyncTask();
assertEquals("Task completed", future.get(2, TimeUnit.SECONDS));
}
}
</code></pre>
<h4 id="并行调用多个服务示例">并行调用多个服务示例</h4>
<p>并行调用 <code>getUser</code> 和 <code>getProfile</code>,总耗时接近较慢的任务(~1s)。</p>
<pre><code class="language-java">@Service
public class UserService {
@Async
public CompletableFuture<User> getUser(Long id) {
return CompletableFuture.supplyAsync(() -> {
// 模拟远程调用
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new User(id, "User" + id);
});
}
@Async
public CompletableFuture<Profile> getProfile(Long id) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new Profile(id, "Profile" + id);
});
}
}
@RestController
public class UserController {
@Autowired
private UserService userService;
@GetMapping("/user/{id}")
public CompletableFuture<UserProfile> getUserProfile(@PathVariable Long id) {
return userService.getUser(id)
.thenCombine(userService.getProfile(id),
(user, profile) -> new UserProfile(user, profile));
}
}
</code></pre>
<h4 id="异步批量处理示例">异步批量处理示例</h4>
<p>并行处理 10 个任务,显著减少总耗时。</p>
<pre><code class="language-java">@Service
public class BatchService {
@Async
public CompletableFuture<Void> processItem(int item) {
return CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100);
System.out.println("Processed item: " + item);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
@RestController
public class BatchController {
@Autowired
private BatchService batchService;
@PostMapping("/batch")
public CompletableFuture<Void> processBatch() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
futures.add(batchService.processItem(i));
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture));
}
}
</code></pre>
<h4 id="响应式-webflux-示例">响应式 WebFlux 示例</h4>
<pre><code class="language-java">@Service
public class ReactiveService {
public Mono<String> fetchData() {
return Mono.just("Data")
.delayElement(Duration.ofSeconds(1));
}
}
@RestController
public class ReactiveController {
@Autowired
private ReactiveService reactiveService;
@GetMapping("/data")
public Mono<String> getData() {
return reactiveService.fetchData();
}
}
</code></pre>
<h4 id="spring-data-jpa-集成示例">Spring Data JPA 集成示例</h4>
<p><code>JPA</code> 默认阻塞操作,可通过 <code>@Async</code> 包装异步调用。</p>
<pre><code class="language-java">@Repository
public interface UserRepository extends JpaRepository<User, Long> {}
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
@Async
public CompletableFuture<User> findUser(Long id) {
return CompletableFuture.supplyAsync(() -> userRepository.findById(id).orElse(null));
}
}
</code></pre>
<h4 id="mybatis-plus-集成示例">MyBatis Plus 集成示例</h4>
<p><code>MyBatis Plus</code> 默认阻塞,可通过 <code>@Async</code> 或线程池异步化。</p>
<pre><code class="language-java">@Mapper
public interface UserMapper extends BaseMapper<User> {}
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
@Async
public CompletableFuture<User> getUser(Long id) {
return CompletableFuture.supplyAsync(() -> userMapper.selectById(id));
}
}
</code></pre>
<h4 id="注意事项">注意事项</h4>
<ul>
<li>
<p><code>@Async</code> 方法必须是 <code>public</code> 的。</p>
</li>
<li>
<p>不能在同一类内调用 <code>@Async</code> 方法(因 <code>Spring AOP</code> 代理机制)。</p>
</li>
<li>
<p>默认线程池由 <code>Spring</code> 提供,可自定义。</p>
</li>
</ul>
<h3 id="completablefuture-所有核心-api">CompletableFuture 所有核心 API</h3>
<ul>
<li>
<p><code>supplyAsync()</code>:异步执行任务,返回值</p>
</li>
<li>
<p><code>runAsync()</code>:异步执行任务,无返回值</p>
</li>
<li>
<p><code>thenApply()</code>:接收前面任务结果并返回新结果</p>
</li>
<li>
<p><code>thenAccept()</code>:接收结果但无返回</p>
</li>
<li>
<p><code>thenRun()</code>:不接收结果也不返回,仅执行</p>
</li>
<li>
<p><code>thenCompose()</code>:嵌套异步任务</p>
</li>
<li>
<p><code>thenCombine()</code>:两个任务都完成后,合并结果</p>
</li>
<li>
<p><code>allOf()</code>:等多个任务全部完成</p>
</li>
<li>
<p><code>anyOf()</code>:任一任务完成即继续</p>
</li>
<li>
<p><code>exceptionally()</code>:捕获异常并处理</p>
</li>
<li>
<p><code>whenComplete()</code>:无论成功失败都执行</p>
</li>
<li>
<p><code>handle()</code>:可处理正常或异常结果</p>
</li>
</ul>
<h3 id="completablefuturet-用法详解"><code>CompletableFuture<T></code> 用法详解</h3>
<h4 id="创建异步任务">创建异步任务</h4>
<h5 id="supplyasync基本异步任务执行"><code>supplyAsync</code>:基本异步任务执行</h5>
<pre><code class="language-java">CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "Result");
</code></pre>
<h5 id="runasync异步执行任务无返回值"><code>runAsync</code>:异步执行任务,无返回值</h5>
<pre><code class="language-java">CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("Async run"));
</code></pre>
<h4 id="任务转换">任务转换</h4>
<h5 id="thenapplyfunction转换结果对结果加工"><code>thenApply(Function)</code>:转换结果,对结果加工</h5>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "data")
.thenApply(data -> data.toUpperCase());
System.out.println(future.get()); // DATA
</code></pre>
<h5 id="thencomposefunction扁平化链式异步"><code>thenCompose(Function)</code>:扁平化链式异步</h5>
<pre><code class="language-java">CompletableFuture<String> composed = CompletableFuture
.supplyAsync(() -> "A")
.thenCompose(a -> CompletableFuture.supplyAsync(() -> a + "B"));
composed.thenAccept(System.out::println); // 输出 AB
</code></pre>
<h5 id="thencombinecompletionstage-bifunction两个任务完成后合并结果"><code>thenCombine(CompletionStage, BiFunction)</code>:两个任务完成后合并结果</h5>
<pre><code class="language-java">CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "World");
cf1.thenCombine(cf2, (a, b) -> a + " " + b).thenAccept(System.out::println);
</code></pre>
<pre><code class="language-java">CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> result = f1.thenCombine(f2, (a, b) -> a + b);
System.out.println(result.get()); // AB
</code></pre>
<h4 id="消费结果">消费结果</h4>
<h5 id="thenacceptconsumer消费结果"><code>thenAccept(Consumer)</code>:消费结果</h5>
<pre><code class="language-java">CompletableFuture
.supplyAsync(() -> "Result")
.thenAccept(result -> System.out.println("Received: " + result));
</code></pre>
<h5 id="thenrunrunnable继续执行下一个任务无需前面结果"><code>thenRun(Runnable)</code>:继续执行下一个任务,无需前面结果</h5>
<pre><code class="language-java">CompletableFuture
.supplyAsync(() -> "X")
.thenRun(() -> System.out.println("Next step executed"));
</code></pre>
<h4 id="异常处理">异常处理</h4>
<h5 id="exceptionallyfunctionthrowable-t异常处理"><code>exceptionally(Function<Throwable, T>)</code>:异常处理</h5>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("Oops!");
return "ok";
}).exceptionally(ex -> "Fallback: " + ex.getMessage());
System.out.println(future.get());
</code></pre>
<h5 id="handlebifunctiont-throwable-r同时处理正常与异常结果"><code>handle(BiFunction<T, Throwable, R>)</code>:同时处理正常与异常结果</h5>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Error!");
}).handle((result, ex) -> {
if (ex != null) return "Handled: " + ex.getMessage();
return result;
});
System.out.println(future.get());
</code></pre>
<h5 id="whencompletebiconsumert-throwable类似-finally"><code>whenComplete(BiConsumer<T, Throwable>)</code>:类似 finally</h5>
<ul>
<li>
<p>在 <code>CompletableFuture</code> 执行完毕后执行一个回调,无论是成功还是异常。</p>
</li>
<li>
<p>不会改变原来的结果或异常,仅用于处理副作用(如日志)。</p>
</li>
</ul>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Final Result")
.whenComplete((result, ex) -> {
System.out.println("Completed with: " + result);
});
</code></pre>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了");
return "成功";
}).whenComplete((result, exception) -> {
if (exception != null) {
System.out.println("发生异常:" + exception.getMessage());
} else {
System.out.println("执行结果:" + result);
}
});
</code></pre>
<h4 id="并发组合">并发组合</h4>
<h5 id="allof--anyof组合任务">allOf / anyOf:组合任务</h5>
<pre><code class="language-java">CompletableFuture<Void> all = CompletableFuture.allOf(task1, task2);
CompletableFuture<Object> any = CompletableFuture.anyOf(task1, task2);
</code></pre>
<h5 id="allof等待全部任务完成">allOf(...):等待全部任务完成</h5>
<p>需要单独从每个任务中再 <code>.get()</code> 拿到结果</p>
<pre><code class="language-java">CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
all.thenRun(() -> System.out.println("All done")).get();
</code></pre>
<pre><code class="language-java">CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> fetchUser());
CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> fetchOrder());
// 两个任务都完成后执行
CompletableFuture<Void> bothDone = CompletableFuture.allOf(userFuture, orderFuture);
bothDone.thenRun(() -> {
try {
String user = userFuture.get();
String order = orderFuture.get();
System.out.println("用户: " + user + ", 订单: " + order);
} catch (Exception e) {
e.printStackTrace();
}
});
</code></pre>
<h5 id="anyof任一完成即触发">anyOf(...):任一完成即触发</h5>
<pre><code class="language-java">CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "fast";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "slow");
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2);
System.out.println(any.get()); // 输出最快那个
</code></pre>
<h4 id="超时控制">超时控制</h4>
<h5 id="ortimeoutlong-timeout-timeunit-unit超时异常"><code>orTimeout(long timeout, TimeUnit unit)</code>:超时异常</h5>
<p>如果在指定时间内没有完成,就抛出 <code>TimeoutException</code> 异常。</p>
<pre><code class="language-java">CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(2000); } catch (Exception e) {}
return "late result";
}).orTimeout(1, TimeUnit.SECONDS);
try {
System.out.println(f.get());
} catch (Exception e) {
System.out.println("Timeout: " + e.getMessage());
}
</code></pre>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "执行完成";
}).orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> "捕获到异常:" + ex.getClass().getSimpleName());
System.out.println("结果:" + future.join()); // 打印“捕获到异常:TimeoutException”
</code></pre>
<h5 id="completeontimeoutt-value-long-timeout-timeunit-unit超时默认值"><code>completeOnTimeout(T value, long timeout, TimeUnit unit)</code>:超时默认值</h5>
<p>如果在指定时间内没有完成,则返回一个默认值,并完成该任务。</p>
<pre><code class="language-java">CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(2000); } catch (Exception e) {}
return "slow";
}).completeOnTimeout("timeout default", 1, TimeUnit.SECONDS);
System.out.println(f.get()); // timeout default
</code></pre>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000); // 模拟耗时任务
} catch (InterruptedException e) {
e.printStackTrace();
}
return "正常返回结果";
}).completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);
System.out.println("最终结果:" + future.join()); // 会打印“超时默认值”
</code></pre>
<h4 id="自定义线程池">自定义线程池</h4>
<pre><code class="language-java">ExecutorService pool = Executors.newFixedThreadPool(2);
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "pooled", pool);
System.out.println(f.get());
pool.shutdown();
</code></pre>
<h4 id="异步任务--消费结果">异步任务 + 消费结果</h4>
<pre><code class="language-java">CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> "hello")
.thenAccept(result -> System.out.println("结果是:" + result));
</code></pre>
<h4 id="异步任务--转换结果链式调用">异步任务 + 转换结果(链式调用)</h4>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "5")
.thenApply(Integer::parseInt)
.thenApply(num -> num * 2)
.thenApply(Object::toString);
</code></pre>
<h4 id="异常处理-1">异常处理</h4>
<pre><code class="language-java">CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了!");
return "success";
})
.exceptionally(ex -> {
System.out.println("异常: " + ex.getMessage());
return "默认值";
});
</code></pre>
<h4 id="多任务并发组合allof--anyof">多任务并发组合(allOf / anyOf)</h4>
<pre><code class="language-java">CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
// 等待全部完成
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
all.join();
System.out.println("结果:" + f1.join() + ", " + f2.join());
</code></pre>
<h4 id="合并两个任务结果">合并两个任务结果</h4>
<pre><code class="language-java">CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 200);
CompletableFuture<Integer> result = f1.thenCombine(f2, Integer::sum);
System.out.println(result.get()); // 输出 300
</code></pre>
<h4 id="自定义线程池-1">自定义线程池</h4>
<pre><code class="language-java">ExecutorService pool = Executors.newFixedThreadPool(4);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "线程池中的任务";
}, pool);
System.out.println(future.get());
pool.shutdown();
</code></pre>
<h4 id="链式异步处理">链式异步处理</h4>
<pre><code class="language-java">CompletableFuture.supplyAsync(() -> "Step 1")
.thenApply(s -> s + " -> Step 2")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " -> Step 3"))
.thenAccept(System.out::println)
.exceptionally(ex -> {
ex.printStackTrace();
return null;
});
</code></pre>
<h4 id="订单处理示例">订单处理示例</h4>
<pre><code class="language-java">public class OrderSystem {
@Async("dbExecutor")
public CompletableFuture<Order> saveOrder(Order order) {
// 数据库写入操作
return CompletableFuture.completedFuture(order);
}
@Async("httpExecutor")
public CompletableFuture<String> notifyLogistics(Order order) {
// 调用物流API
return CompletableFuture.completedFuture("SUCCESS");
}
public void processOrder(Order order) {
CompletableFuture<Order> saveFuture = saveOrder(order);
saveFuture.thenCompose(savedOrder ->
notifyLogistics(savedOrder)
).exceptionally(ex -> {
log.error("物流通知失败", ex);
return "FALLBACK";
});
}
}
</code></pre>
<h4 id="总结图谱">总结图谱</h4>
<pre><code class="language-scss">CompletableFuture
├─ 创建任务
│├─ runAsync() -> 无返回值
│└─ supplyAsync() -> 有返回值
├─ 处理结果
│├─ thenApply() -> 转换
│├─ thenAccept() -> 消费
│├─ thenRun() -> 执行新任务
│├─ thenCombine() -> 合并结果
│└─ thenCompose() -> 链式调用
├─ 异常处理
│├─ exceptionally()
│├─ handle()
│└─ whenComplete()
├─ 组合任务
│├─ allOf()
│└─ anyOf()
└─ 超时控制
├─ orTimeout()
└─ completeOnTimeout()
</code></pre>
<h3 id="什么场景适合用-java-异步async--completablefuture">什么场景适合用 Java 异步(@Async / CompletableFuture)?</h3>
<table>
<thead>
<tr>
<th>场景</th>
<th>是否适合异步?</th>
</tr>
</thead>
<tbody>
<tr>
<td>调用多个远程服务并行</td>
<td>✅ 很适合</td>
</tr>
<tr>
<td>复杂 CPU 运算耗时任务</td>
<td>✅ 可以放到异步线程池</td>
</tr>
<tr>
<td>简单业务逻辑、数据库操作</td>
<td>❌ 不建议,同步更可控</td>
</tr>
<tr>
<td>非主流程的日志、打点操作</td>
<td>✅ 合适异步处理</td>
</tr>
</tbody>
</table>
<h3 id="java-和-net-异步处理对比">Java 和 .NET 异步处理对比</h3>
<blockquote>
<p>并行调用两个服务,提高响应速度</p>
</blockquote>
<h4 id="spring-boot-示例async--completablefuture">Spring Boot 示例(@Async + CompletableFuture)</h4>
<p><strong>项目结构</strong></p>
<pre><code class="language-java">└── src
└── main
├── java
│ ├── demo
│ │ ├── controller
│ │ │ └── AggregateController.java
│ │ ├── service
│ │ │ ├── RemoteService.java
│ │ │ └── RemoteServiceImpl.java
│ │ └── DemoApplication.java
</code></pre>
<p><strong>RemoteService.java</strong></p>
<pre><code class="language-java">public interface RemoteService {
@Async
CompletableFuture<String> getUserInfo();
@Async
CompletableFuture<String> getAccountInfo();
}
</code></pre>
<p><strong>RemoteServiceImpl.java</strong></p>
<pre><code class="language-java">@Service
public class RemoteServiceImpl implements RemoteService {
@Override
public CompletableFuture<String> getUserInfo() {
try {
Thread.sleep(2000); // 模拟耗时
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return CompletableFuture.completedFuture("UserInfo");
}
@Override
public CompletableFuture<String> getAccountInfo() {
try {
Thread.sleep(3000); // 模拟耗时
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return CompletableFuture.completedFuture("AccountInfo");
}
}
</code></pre>
<p><strong>AggregateController.java</strong></p>
<pre><code class="language-java">@RestController
@RequestMapping("/api")
public class AggregateController {
@Autowired
private RemoteService remoteService;
@GetMapping("/aggregate")
public ResponseEntity<String> aggregate() throws Exception {
CompletableFuture<String> userFuture = remoteService.getUserInfo();
CompletableFuture<String> accountFuture = remoteService.getAccountInfo();
// 等待所有完成
CompletableFuture.allOf(userFuture, accountFuture).join();
// 获取结果
String result = userFuture.get() + " + " + accountFuture.get();
return ResponseEntity.ok(result);
}
}
</code></pre>
<p><strong>DemoApplication.java</strong></p>
<pre><code class="language-java">@SpringBootApplication
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
</code></pre>
<h4 id="net-示例asyncawait">.NET 示例(async/await)</h4>
<p><strong>项目结构</strong></p>
<pre><code class="language-csharp">└── Controllers
└── AggregateController.cs
└── Services
└── IRemoteService.cs
└── RemoteService.cs
</code></pre>
<p><strong>IRemoteService.cs</strong></p>
<pre><code class="language-csharp">public interface IRemoteService {
Task<string> GetUserInfoAsync();
Task<string> GetAccountInfoAsync();
}
</code></pre>
<p><strong>RemoteService.cs</strong></p>
<pre><code class="language-csharp">public class RemoteService : IRemoteService {
public async Task<string> GetUserInfoAsync() {
await Task.Delay(2000); // 模拟耗时
return "UserInfo";
}
public async Task<string> GetAccountInfoAsync() {
await Task.Delay(3000); // 模拟耗时
return "AccountInfo";
}
}
</code></pre>
<p><strong>AggregateController.cs</strong></p>
<pre><code class="language-csharp">
")]
public class AggregateController : ControllerBase {
private readonly IRemoteService _remoteService;
public AggregateController(IRemoteService remoteService) {
_remoteService = remoteService;
}
public async Task<IActionResult> Aggregate() {
var userTask = _remoteService.GetUserInfoAsync();
var accountTask = _remoteService.GetAccountInfoAsync();
await Task.WhenAll(userTask, accountTask);
var result = $"{userTask.Result} + {accountTask.Result}";
return Ok(result);
}
}
</code></pre>
<h4 id="java-vs-net-异步用法对比总结">Java vs .NET 异步用法对比总结</h4>
<table>
<thead>
<tr>
<th>方面</th>
<th>Java(Spring Boot)</th>
<th>.NET Core(ASP.NET)</th>
</tr>
</thead>
<tbody>
<tr>
<td>异步声明方式</td>
<td><code>@Async</code> + <code>CompletableFuture</code></td>
<td><code>async/await</code></td>
</tr>
<tr>
<td>返回值类型</td>
<td><code>CompletableFuture<T></code></td>
<td><code>Task<T></code></td>
</tr>
<tr>
<td>等待多个任务</td>
<td><code>CompletableFuture.allOf()</code></td>
<td><code>Task.WhenAll()</code></td>
</tr>
<tr>
<td>是否阻塞</td>
<td><code>.get()</code> 会阻塞,链式不阻塞</td>
<td><code>await</code> 非阻塞</td>
</tr>
<tr>
<td>简洁性</td>
<td>稍复杂(需要注解和线程池配置)</td>
<td>极简、天然异步支持</td>
</tr>
</tbody>
</table><br><br>
来源:https://www.cnblogs.com/TangQF/p/18871568
頁:
[1]