电你 發表於 2025-5-18 19:53:00

SpringBoot3整合AI

<h1 id="玩一下ai">玩一下AI</h1>
<h2 id="1-sse协议">1. SSE协议</h2>
<p>我们都知道tcp,ip,http,https,websocket等等协议,今天了解一个新的协议SSE协议(Server-Sent Events)</p>
<p><strong>SSE(Server-Sent Events)</strong> 是一种允许<strong>服务器主动向客户端推送数据</strong>的轻量级协议,基于 <strong>HTTP 长连接</strong>,实现 <strong>单向通信</strong>(服务器→客户端)。它是 W3C 标准,浏览器原生支持,无需额外插件(如 <code>EventSource</code> API)</p>
<p><strong>核心特点与工作原理</strong></p>
<ul>
<li><strong>单向通信</strong>:仅服务器向客户端发送数据,适合实时通知、日志流、实时更新等场景。</li>
<li><strong>基于 HTTP</strong>:客户端通过 <code>GET</code> 请求建立连接,服务器返回特殊格式的文本流(<code>text/event-stream</code>),连接保持打开状态,直到服务器主动关闭或超时。</li>
<li><strong>自动重连</strong>:浏览器内置重连机制,连接断开后自动尝试重新连接。</li>
<li><strong>数据格式</strong>:每条消息以 <code>\n</code> 分隔,支持事件类型、数据内容、重试时间等字段,例如:</li>
</ul>
<pre><code class="language-java">data: Hello, SSE!// 数据内容
event: customEvent // 自定义事件类型(可选)
id: 123            // 消息ID(可选)
retry: 5000      // 重连时间(毫秒,可选)
\n
</code></pre>
<p>适用于无需双向通信,仅需服务器单向推送数据。【比如现在的 gpt,豆包这个问答形式】</p>
<p>前端客户端可以使用原生的 <code>EventSource</code> API:</p>
<pre><code class="language-js">// 创建EventSource实例,连接服务器
const eventSource = new EventSource('/sse-endpoint');
// 监听默认事件("message")
eventSource.onmessage = (event) =&gt; {
console.log('Received:', event.data);
};
// 监听自定义事件(如"customEvent")
eventSource.addEventListener('customEvent', (event) =&gt; {
console.log('Custom Event:', event.data);
});
// 处理错误
eventSource.onerror = (error) =&gt; {
console.error('SSE Error:', error);
// 浏览器会自动重连,无需手动处理
};
</code></pre>
<p>服务端可用的就太多了。(本文以SpringBoot3.4.2为例子)</p>
<p>在知道这个协议之前,我们想要达到gpt这种问答形式,输出内容是一点一点拼接的,该怎么弄呢?是不是还可以用websocket。</p>
<table>
<thead>
<tr>
<th>特性</th>
<th>SSE</th>
<th>WebSocket</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>通信方向</strong></td>
<td>单向(服务器→客户端)</td>
<td>双向(全双工)</td>
</tr>
<tr>
<td><strong>协议</strong></td>
<td>基于 HTTP(升级为长连接)</td>
<td>独立协议(ws:// 或 wss://)</td>
</tr>
<tr>
<td><strong>二进制支持</strong></td>
<td>仅文本(<code>text/event-stream</code>)</td>
<td>支持文本和二进制</td>
</tr>
<tr>
<td><strong>自动重连</strong></td>
<td>浏览器内置</td>
<td>需手动实现</td>
</tr>
<tr>
<td><strong>复杂度</strong></td>
<td>简单(服务端实现轻量)</td>
<td>较复杂(需处理握手、心跳)</td>
</tr>
<tr>
<td><strong>适用场景</strong></td>
<td>服务器单向推送数据</td>
<td>双向交互(聊天、实时协作)</td>
</tr>
</tbody>
</table>
<p>下面结合Spring Boot 简单用一下SSE</p>
<pre><code class="language-java"> // sse协议测试
@PostMapping(value = "/chat", produces = "text/event-stream;charset=UTF-8")
public SseEmitter streamSseMvc() {
    // 感谢评论区:键盘三个键 指出该timeout问题。
    // 有一点需要注意的是:这里的time_out参数,是SseEmitter(session会话)的存活时间. 这一点需要注意一下
    SseEmitter emitter = new SseEmitter(30_000L);
    // 模拟发送消息
    System.out.println("SSE connection started");
    ScheduledFuture&lt;?&gt; future = service.scheduleAtFixedRate(() -&gt; {
      try {
            String message = "Message at " + System.currentTimeMillis();
            emitter.send(SseEmitter.event().data(message));
      } catch (IOException e) {
            try {
                emitter.send(SseEmitter.event().name("error").data(Map.of("error", e.getMessage())));
            } catch (IOException ex) {
                // ignore
            }
            emitter.completeWithError(e);
      }
    }, 0, 5, TimeUnit.SECONDS);
    emitter.onCompletion(() -&gt; {
      System.out.println("SSE connection completed");

    });
    emitter.onTimeout(() -&gt; {
      System.out.println("SSE connection timed out");
      emitter.complete();

    });
    emitter.onError((e) -&gt; {
      System.out.println("SSE connection error: " + e.getMessage());
      emitter.completeWithError(e);
    });

    return emitter;
}
</code></pre>
<p>在SpringBoot中,用SseEmitter就可以达到这个效果了,它也和Websocket一样有onXXX这种类似的方法。上面是使用一个周期性的任务,来模拟AI生成对话的效果的。<code>emitter.send(SseEmitter.event().data(message));</code> 这个就是服务端向客户端推送数据。</p>
<h2 id="2-okhttp3ssedeepseek">2. okhttp3+sse+deepseek</h2>
<p><strong>简单示例:就问一句话</strong></p>
<p>申请deepseekKey这里就略过了,各位读者自行去申请。【因为deepseek官网示例是用的okhttp,所以我这里也用okhttp了】</p>
<p>我们先准备一个接口</p>
<pre><code class="language-java">@RestController
@RequestMapping("/deepseek")
public class DeepSeekController {
    @Resource
    private DeepSeekUtil deepSeekUtil;

    /**
   * 访问deepseek-chat
   */
    @PostMapping(value = "/chat", produces = "text/event-stream;charset=UTF-8")
    public SseEmitter chatSSE() throws IOException {
      SseEmitter emitter = new SseEmitter(60000L);
      deepSeekUtil.sendChatReqStream("123456", "你会MySQL数据库吗?", emitter);
      return emitter; // 这里把该sse对象返回了
    }

    private boolean notModel(String model) {
      return !"deepseek-chat".equals(model) &amp;&amp; !"deepseek-reasoner".equals(model);
    }
}
</code></pre>
<p>可以看到我们创建了一个SseEmitter对象,传给了我们自定义的工具</p>
<pre><code class="language-java">@Component
public class DeepSeekUtil {
    public static final String DEEPSEEK_CHAT = "deepseek-chat";
    public static final String DEEPSEEK_REASONER = "deepseek-reasoner";
    public static final String url = "https://api.deepseek.com/chat/completions";
    // 存储每个用户的消息列表
    private static final ConcurrentHashMap&lt;String, List&lt;Message&gt;&gt; msgList = new ConcurrentHashMap&lt;&gt;();

    // 1.调用api,然后以以 SSE(server-sent events)的形式以流式发送消息增量。消息流以 data: 结尾。
    public void sendChatReqStream(String uid, String message, SseEmitter sseEmitter) throws IOException {
      // 1.构建一个普通的聊天body请求
      AccessRequest tRequest = buildNormalChatRequest(uid, message);

      OkHttpClient client = new OkHttpClient().newBuilder()
                .build();
      // 封装请求体参数
      MediaType mediaType = MediaType.parse("application/json; charset=utf-8");
      RequestBody body = RequestBody.create(JSON.toJSONString(tRequest), mediaType);
      // 构建请求和请求头
      Request request = new Request.Builder()
                .url(url)
                .method("POST", body)
                .addHeader("Content-Type", "application/json")
                .addHeader("Accept", "text/event-stream")
                    // 比如你的key是:s-123456
                    // .addHeader("Authorization", "Bearer s-123456")
                .addHeader("Authorization", "Bearer 你的key")
                .build();
                // 创建一个监听器
      SseChatListener listener = new SseChatListener(sseEmitter);
      RealEventSource eventSource = new RealEventSource(request, listener);
      eventSource.connect(client);
    }

    private AccessRequest buildNormalChatRequest(String uid, String message) {
      // 这里,我们messages,添加了一条“你会MySQL数据库吗?",来达到一种对话具有上下文的效果
      List&lt;Message&gt; messages = msgList.computeIfAbsent(uid, k -&gt; new ArrayList&lt;&gt;());
      messages.add(new Message("user", message));
      /*
      [
                {"system", "你好, 我是DeepSeek-AI助手!"},       
                {"user", "你会MySQL数据库吗?"}
      ]
      */
      AccessRequest request = new AccessRequest();
      request.setMessages(messages);
      request.setModel(DEEPSEEK_CHAT);
      request.setResponse_format(Map.of("type", "text"));
      request.setStream(true); // 设置为true
      request.setTemperature(1.0);
      request.setTop_p(1.0);
      return request;
    }

    @PostConstruct
    public void init() {
      List&lt;Message&gt; m = new ArrayList&lt;Message&gt;();
      m.add(new Message("system", "你好, 我是DeepSeek-AI助手!"));
      // 初始化消息列表
      msgList.put("123456", m);
    }
}

// 请求体,参考deepseek官网
public class AccessRequest {
    private List&lt;Message&gt; messages;
    private String model; // 默认模型为deepseek-chat
    private Double frequency_penalty = 0.0;
    private Integer max_tokens;
    private Double presence_penalty = 0.0;
    //{
    //    "type": "text"
    //}
    private Map&lt;String, String&gt; response_format;
    private Object stop = null; // null
    private Boolean stream; //如果设置为 True,将会以 SSE(server-sent events)的形式以流式发送消息增量。消息流以 data: 结尾。
    private Object stream_options = null;

    private Double temperature; // 1
    private Double top_p; // 1
    private Object tools; // null
    private String tool_choice = "none";
    private Boolean logprobs = false;
    private Integer top_logprobs = null;
    // get set
}
</code></pre>
<p>监听器</p>
<pre><code class="language-java">@Slf4j
public class SseChatListener extends EventSourceListener {
    private SseEmitter sseEmitter;
    public SseChatListener( SseEmitter sseEmitter) {
       this.sseEmitter = sseEmitter;
    }
    /**
   * 事件
   */
    @Override
    public void onEvent(EventSource eventSource, String id, String type, String data) {
      //log.info("sse数据:{}", data);
      DeepSeekResponse deepSeekResponse = JSON.parseObject(data, DeepSeekResponse.class);
      DeepSeekResponse.Choice[] choices = deepSeekResponse.getChoices();
      try {
            // 发送给前端【客户端】
            sseEmitter.send(SseEmitter.event().data(choices));
      } catch (IOException e) {
            log.error("数据发送异常");
            throw new RuntimeException(e);
      }
    }
    /**
   * 建立sse连接
   */
    @Override
    public void onOpen(final EventSource eventSource, final Response response) {
      log.info("建立sse连接... {}");
    }

    /**
   * sse关闭
   */
    @Override
    public void onClosed(final EventSource eventSource) {
      log.info("sse连接关闭:{}");
    }
    /**
   * 出错了
   */
    @Override
    public void onFailure(final EventSource eventSource, final Throwable t, final Response response) {
      log.error("使用事件源时出现异常......");

    }
}
// DeepSeekResponse.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DeepSeekResponse {
    private String id;
    private String object;
    private Long created;
    private String model;
    private String system_fingerprint;
    private Choice[] choices;

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Choice {
      private Integer index;
      private Delta delta;
      private Object logprobs;
      private String finish_reason;
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Delta {
      private String content;
    }
}
</code></pre>
<p>然后我们用apifox测试一下:</p>
<p><img src="https://img2023.cnblogs.com/blog/2358057/202505/2358057-20250518183013649-1693503517.png" alt="" loading="lazy"></p>
<p>返回这些信息,然后把ai返回的存起来,具体怎么存,就靠读者自行发挥了,添加到该对话,使该<strong>对话具有上下文</strong>。【DeepSeek <code>/chat/completions</code> API 是一个“无状态” API,即服务端不记录用户请求的上下文,用户在每次请求时,<strong>需将之前所有对话历史拼接好后</strong>,传递给对话 API。】</p>
<pre><code class="language-json">[
    {"system", "你好, 我是DeepSeek-AI助手!"},       
    {"user", "你会MySQL数据库吗?"},
    {"ststem", "是的,我熟悉........"} // 把ai返回的存起来
]
</code></pre>
<p>下一次对话的时候,请求体<code>AccessRequest</code>里面的<code>List&lt;Message&gt; messages</code>就向上面那样,再往后添加用户问的消息。</p>
<p>上面的例子还有一些小问题,比如说什么时候断开连接那些的。</p>
<h2 id="3-springai">3. SpringAI</h2>
<p>Spring AI 是一个专注于 AI 工程的应用框架,其目标是将 Spring 生态的 “POJO 构建块” 和模块化设计引入 AI 场景,简化企业数据与第三方模型的对接和使用。</p>
<p>下面快速接入deepseek</p>
<pre><code class="language-xml">&lt;!--maven的pom.xml--&gt;
&lt;?xml version="1.0" encoding="UTF-8"?&gt;
&lt;project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"&gt;
    &lt;modelVersion&gt;4.0.0&lt;/modelVersion&gt;
    &lt;parent&gt;
      &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
      &lt;artifactId&gt;spring-boot-starter-parent&lt;/artifactId&gt;
      &lt;version&gt;3.4.3&lt;/version&gt;
      &lt;relativePath/&gt; &lt;!-- lookup parent from repository --&gt;
    &lt;/parent&gt;
    &lt;groupId&gt;com.feng.ai&lt;/groupId&gt;
    &lt;artifactId&gt;spring-ai-chat&lt;/artifactId&gt;
    &lt;version&gt;0.0.1-SNAPSHOT&lt;/version&gt;
    &lt;name&gt;spring-ai-chat&lt;/name&gt;
    &lt;description&gt;spring-ai-chat&lt;/description&gt;
   
    &lt;properties&gt;
      &lt;java.version&gt;21&lt;/java.version&gt;
      &lt;spring-ai.version&gt;1.0.0-M6&lt;/spring-ai.version&gt;
    &lt;/properties&gt;
    &lt;dependencies&gt;
      &lt;dependency&gt;
            &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
            &lt;artifactId&gt;spring-boot-starter-web&lt;/artifactId&gt;
      &lt;/dependency&gt;
      &lt;dependency&gt;
            &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
            &lt;artifactId&gt;spring-boot-starter-webflux&lt;/artifactId&gt;
      &lt;/dependency&gt;
      &lt;!--openAI--&gt;
      &lt;dependency&gt;
            &lt;groupId&gt;org.springframework.ai&lt;/groupId&gt;
            &lt;artifactId&gt;spring-ai-openai-spring-boot-starter&lt;/artifactId&gt;
      &lt;/dependency&gt;

      &lt;dependency&gt;
            &lt;groupId&gt;com.alibaba.fastjson2&lt;/groupId&gt;
            &lt;artifactId&gt;fastjson2&lt;/artifactId&gt;
            &lt;version&gt;2.0.44&lt;/version&gt;
      &lt;/dependency&gt;
    &lt;/dependencies&gt;

    &lt;repositories&gt;
      &lt;repository&gt;
            &lt;id&gt;spring-snapshots&lt;/id&gt;
            &lt;name&gt;Spring Snapshots&lt;/name&gt;
            &lt;url&gt;https://repo.spring.io/snapshot&lt;/url&gt;
            &lt;releases&gt;
                &lt;enabled&gt;false&lt;/enabled&gt;
            &lt;/releases&gt;
            &lt;snapshots&gt;
                &lt;enabled&gt;true&lt;/enabled&gt;
            &lt;/snapshots&gt;
      &lt;/repository&gt;
      &lt;repository&gt;
            &lt;name&gt;Central Portal Snapshots&lt;/name&gt;
            &lt;id&gt;central-portal-snapshots&lt;/id&gt;
            &lt;url&gt;https://central.sonatype.com/repository/maven-snapshots/&lt;/url&gt;
            &lt;releases&gt;
                &lt;enabled&gt;false&lt;/enabled&gt;
            &lt;/releases&gt;
            &lt;snapshots&gt;
                &lt;enabled&gt;true&lt;/enabled&gt;
            &lt;/snapshots&gt;
      &lt;/repository&gt;
    &lt;/repositories&gt;


    &lt;dependencyManagement&gt;
      &lt;dependencies&gt;
            &lt;dependency&gt;
                &lt;groupId&gt;org.springframework.ai&lt;/groupId&gt;
                &lt;artifactId&gt;spring-ai-bom&lt;/artifactId&gt;
                &lt;version&gt;${spring-ai.version}&lt;/version&gt;
                &lt;type&gt;pom&lt;/type&gt;
                &lt;scope&gt;import&lt;/scope&gt;
            &lt;/dependency&gt;
      &lt;/dependencies&gt;
    &lt;/dependencyManagement&gt;
&lt;/project&gt;
</code></pre>
<p>然后是配置文件</p>
<pre><code class="language-yaml">spring:
application:
    name: spring-ai-chat
ai:
    # The DeepSeek API doesn't support embeddings, so we need to disable it.
    openai:
      embedding:
      enabled: false
      base-url: https://api.deepseek.com
      api-key: 你的key
      chat:
      options:
          model: deepseek-reasoner # 使用推理模型
          stream-usage: true
</code></pre>
<p>controller</p>
<pre><code class="language-java">@Slf4j
@RestController
@RequestMapping("/sp/deepseek")
public class SpDeepseekController {

    @Resource( name = "openAiChatModel")
    private OpenAiChatModel deepseekModel;
   
    // 直接回答 --- stream-usage: false
    //@GetMapping("/simpleChat")
    //public R chat() {
    //    String call = deepseekModel.call("你好, 你会java吗?");
    //    return R.success().setData("call", call);
    //}

    // 流式回答
    @PostMapping(value = "/streamChat", produces = "text/event-stream;charset=UTF-8")
    public Flux&lt;SpMessage&gt; streamChat(@RequestBody Map&lt;String, String&gt; p) {
      String userMessage = p.get("userMessage");
      String sessionId = p.get("sessionId");
      
      Prompt prompt = new Prompt(new UserMessage(userMessage));

      StringBuilder modelStr = new StringBuilder();
      return deepseekModel.stream(prompt)
                .doOnSubscribe(subscription -&gt; log.info("SSE 连接已启动: {}", sessionId))
                .doOnComplete(() -&gt; log.info("SSE 连接已关闭: {}", sessionId))
                .doOnCancel(() -&gt; log.info("SSE 连接已取消: {}", sessionId))
                .timeout(Duration.ofSeconds(60)) // 超时设置
                .filter(chatResponse -&gt; chatResponse.getResult().getOutput().getText() != null) // 过滤掉空的响应
                .map(chatResponse -&gt; {
                        //log.info("SSE 响应: {}", chatResponse.getResult().getOutput());
                  modelStr.append(chatResponse.getResult().getOutput().getText());
                        return SpMessage.builder()
                            .role("system")
                            .content(chatResponse.getResult().getOutput().getText())
                            .build();
                  }
                );
    }
}
</code></pre>
<p><strong>TODO:</strong>上面的对话没有记忆,新的请求来了,ai模型并不会带上以前的场景,故需要记忆化。 记忆化的同时还要注意如果把该会话历史中所有的对话全部传给deepseek的话,可能导致 token 超限,故还需要做一个窗口,避免把太多历史对话传过去了。</p>
<h2 id="4-延伸-http远程调用">4. 延伸-Http远程调用</h2>
<p>在不讨论微服务架构模式下,我们平时开发难免会碰到需要远程调用接口的情况,【<strong>比如说上面调用deepseek的服务</strong>】,那么,我们怎么做才是比较好的方式呢?</p>
<p>一次良好的调用过程,我们应该要考虑这几点:<strong>超时处理、重试机制、异常处理、日志记录</strong>;</p>
<p>此外,于性能来说,我们要避免频繁创建连接带来的开销,可以使用<strong>连接池管理</strong>;</p>
<h3 id="-resttemplate">① RestTemplate</h3>
<p><code>RestTemplate</code> 是一个同步的 HTTP 客户端,提供了简单的方法来发送 HTTP 请求并处理响应。它支持常见的 HTTP 方法(GET、POST、PUT、DELETE 等),并能自动处理 JSON/XML 的序列化和反序列化,这个也是我们非常熟悉的。</p>
<p>下面由于是基于SpringBoot3.4.3,所以httpclient的版本是httpclient5.</p>
<pre><code class="language-java">@Configuration
public class RestConfig {
    @Bean("restTemplate")
    public RestTemplate restTemplate() {
      // 使用Apache HttpClient连接池(替代默认的 SimpleClientHttpRequestFactory)
      PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
      connectionManager.setMaxTotal(100);      // 最大连接数
      connectionManager.setDefaultMaxPerRoute(20); // 每个路由的最大连接数

      CloseableHttpClient httpClient = HttpClients.custom()
                .setConnectionManager(connectionManager)
                .evictIdleConnections(TimeValue.of(10, TimeUnit.SECONDS))// 清理空闲连接
                .build();

      HttpComponentsClientHttpRequestFactory factory =
                new HttpComponentsClientHttpRequestFactory(httpClient);
      factory.setConnectTimeout(3000);// 连接超时(ms)
      factory.setReadTimeout(5000);// 读取超时(ms)
      RestTemplate restTemplate = new RestTemplate(factory);
                // 添加自定义的错误处理器
      restTemplate.setErrorHandler(new CustomErrorHandler());
      // 添加日志拦截器
      restTemplate.getInterceptors().add(new LoggingInterceptor());
      return restTemplate;
    }
}

@Slf4j
public class LoggingInterceptor implements ClientHttpRequestInterceptor {
    @NotNull
    @Override
    public ClientHttpResponse intercept(HttpRequest request, @NotNull byte[] body, ClientHttpRequestExecution execution) throws IOException {
      log.info("请求地址: {} {}", request.getMethod(), request.getURI());
      log.info("请求头: {}", request.getHeaders());
      log.info("请求体: {}", new String(body, StandardCharsets.UTF_8));
      ClientHttpResponse response = execution.execute(request, body);
      log.info("响应状态码: {}", response.getStatusCode());
      return response;
    }
}

@Slf4j
public class CustomErrorHandler implements ResponseErrorHandler {
    @Override
    public boolean hasError(@NotNull ClientHttpResponse response) throws IOException {
      // 获取 HTTP 状态码
      HttpStatusCode statusCode = response.getStatusCode();
      return statusCode.isError(); // 判断状态码是否为错误状态码 【4xx、5xx是true,执行下面的handleError,其他的就false】
    }

    @Override
    public void handleError(@NotNull URI url, @NotNull HttpMethod method, @NotNull ClientHttpResponse response) throws IOException {
      log.info("请求地址: {}Method: {}",url, method);
      HttpStatusCode code = response.getStatusCode();
      if (code.is4xxClientError()) {
            log.info("客户端错误:{}", code.value());
            // xxx
      } else {
            log.info("服务器错误:{}", code.value());
            // xxx
      }
    }
}
</code></pre>
<p>重试降级机制:</p>
<pre><code class="language-java">@Configuration
@EnableRetry // 开启重试 -- 需要引入AOP
public class RetryConfig {
}

// 在service层调用的时候
@Service
public class OrderService {
    @Resource
    private RestTemplate restTemplate;

    @Retryable(
      maxAttempts = 3,
      backoff = @Backoff(delay = 1000, multiplier = 2), // 重试间隔 1s, 2s, 4s
      retryFor = {Exception.class} // 默认重试所有异常
      //retryFor = {ResourceAccessException.class} // 仅在网络异常时重试
    )
    public String queryOrder(String orderId) {
      return restTemplate.getForObject("/orders/" + orderId, String.class); // 远程调用
    }

    @Recover // 重试全部失败后的降级方法
    public String fallbackQueryOrder(ResourceAccessException e, String orderId) {
      return "默认订单";
    }
}
</code></pre>
<p>当然还可以再远程调用那里try catch起来,有异常的时候,throw出去可以被@Retryable捕获。</p>
<h3 id="-restclient">② RestClient</h3>
<p>Spring Framework 6.1 引入了全新的同步 HTTP 客户端 <strong>RestClient</strong>,它在底层使用了与 <code>RestTemplate</code> 相同的基础设施(比如消息转换器和拦截器),但提供了如同 <code>WebClient</code> 一样的现代、流式(fluent)API,兼顾了简洁性与可复用性。与传统的阻塞式 <code>RestTemplate</code> 相比,<code>RestClient</code> 更加直观易用,同时也保持了对同步调用语境的全量支持</p>
<p><strong>同步调用</strong>:<code>RestClient</code> 是一个阻塞式客户端,每次 HTTP 请求都会阻塞调用线程直到响应完成。</p>
<p><strong>流式 API</strong>:借鉴 <code>WebClient</code> 的设计风格,所有操作均可链式调用,代码更具可读性和可维护性。</p>
<p><strong>复用基础组件</strong>:与 <code>RestTemplate</code> 共用 HTTP 请求工厂、消息转换器、拦截器等组件,便于平滑迁移与统一配置</p>
<pre><code class="language-java">@Configuration
@Slf4j
public class RestClientConfig {

    @Bean("serviceARestClient")
    public RestClient restClientA(@Value("${api-service.a-base-url}") String baseUrl) {
      // 创建连接池
      PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager();
      manager.setMaxTotal(100);
      manager.setDefaultMaxPerRoute(20);
      // 创建HttpClient
      HttpClient httpClient = HttpClientBuilder.create()
                .setConnectionManager(manager)
                .build();
      // 创建HttpComponentsClientHttpRequestFactory
      HttpComponentsClientHttpRequestFactory factory =
                new HttpComponentsClientHttpRequestFactory(httpClient);
      factory.setConnectTimeout(3000);
      factory.setReadTimeout(5000);
      return RestClient.builder()
                .baseUrl(baseUrl)
                .defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
                .defaultCookie("myCookie", "1234")
                .requestInterceptor(clientRequestInterceptor())
                .requestFactory(factory) // 连接池与超时
                .build();
    }

    @Bean("serviceBRestClient")
    public RestClient restClientB(@Value("${api-service.b-base-url}") String baseUrl) {
      return RestClient.builder()
                .baseUrl(baseUrl)
                .defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
                .defaultCookie("myCookie", "1234")
                .requestInterceptor(clientRequestInterceptor())
                .build();
    }

    private ClientHttpRequestInterceptor clientRequestInterceptor() {
      return (request, body, execution) -&gt; {
            // 添加统一请求头(如认证信息)
            request.getHeaders().add("my-head", "head-gggggg");
            // 日志记录
            log.debug("Request: {} {}", request.getMethod(), request.getURI());
            request.getHeaders().forEach((name, values) -&gt;
                  values.forEach(value -&gt; log.debug("Header: {}={}", name, value)));

            ClientHttpResponse response = execution.execute(request, body);
            log.debug("Response status: {}", response.getStatusCode());
            return response;
      };
    }
}
</code></pre>
<p><strong>简单调用:</strong></p>
<pre><code class="language-java">@Service
public class AService {
    @Resource(name = "serviceARestClient")
    private RestClient restClientA;

    public String queryA(String a) {
      return restClientA.get()
                .uri("/api/a?a={a}", a)
                .retrieve()
                .onStatus(HttpStatusCode::is4xxClientError, (request, response) -&gt; {
                  throw new HttpClientErrorException(response.getStatusCode());
                })
                .onStatus(HttpStatusCode::is5xxServerError, (request, response) -&gt; {
                  throw new ServerErrorException(response.getStatusCode().toString(), null);
                })
                .body(String.class);
    }

    // 复杂query参数
    public String queryA(String a, String b) {
      return restClientA.get()
                .uri(uriBuilder -&gt;
                        uriBuilder.path("/api/bbb")
                        .queryParam("a", 25)
                        .queryParam("b", "30")
                        .build()
                )
                .retrieve()
                .onStatus(HttpStatusCode::is4xxClientError, (request, response) -&gt; {
                  throw new HttpClientErrorException(response.getStatusCode());
                })
                .onStatus(HttpStatusCode::is5xxServerError, (request, response) -&gt; {
                  throw new ServerErrorException(response.getStatusCode().toString(), null);
                })
                .body(String.class);
    }

    // post
    public String postA(String a) {
      HashMap&lt;String, Object&gt; map = new HashMap&lt;&gt;();
      map.put("a", a); map.put("page", 1); map.put("size", 10);

      return restClientA.post()
                .uri("/api/post")
                .body(map)
                .retrieve()
                .onStatus(HttpStatusCode::is4xxClientError, (request, response) -&gt; {
                  throw new HttpClientErrorException(response.getStatusCode());
                })
                .onStatus(HttpStatusCode::is5xxServerError, (request, response) -&gt; {
                  throw new ServerErrorException(response.getStatusCode().toString(), null);
                })
                .body(String.class);
    }


}
</code></pre>
<h3 id="-webclient">③ WebClient</h3>
<p>Spring框架中包含的原始web框架Spring web MVC是专门为Servlet API和Servlet容器构建的。响应式堆栈web框架Spring WebFlux是在5.0版本中添加的。它是完全非阻塞的,支持响应式流回压,并运行在诸如Netty、Undertow和Servlet容器之类的服务器上。</p>
<p>这两个web框架都镜像了它们的源模块的名字(<strong>Spring-webmvc和Spring-webflux</strong> 他们的关系图如下,节选自官网),并在Spring框架中共存。每个模块都是可选的。应用程序可以使用其中一个或另一个模块,或者在某些情况下,两者都使用——例如,Spring MVC控制器与响应式WebClient。它对同步和异步以及流方案都有很好的支持。</p>
<p><strong>非阻塞异步模型</strong>:基于 Reactor 库(<code>Mono</code>/<code>Flux</code>)实现异步调用,避免线程阻塞,通过少量线程处理高并发请求,显著提升性能</p>
<p><strong>函数式编程</strong>:支持链式调用(Builder 模式)与 Lambda 表达式,代码更简洁</p>
<p><strong>流式传输</strong>:支持大文件或实时数据的分块传输(Chunked Data),减少内存占用。</p>
<p><img src="https://img2023.cnblogs.com/blog/2358057/202505/2358057-20250518183014348-1543004107.png" alt="" loading="lazy"></p>
<p>这里就不介绍了。</p>
<table>
<thead>
<tr>
<th>特性</th>
<th>RestTemplate</th>
<th>RestClient</th>
<th>WebClient</th>
</tr>
</thead>
<tbody>
<tr>
<td>模型</td>
<td>阻塞,同步</td>
<td>阻塞,同步,流式 API</td>
<td>非阻塞,响应式【学习曲线较为陡峭】</td>
</tr>
<tr>
<td>API 风格</td>
<td>模板方法 (<code>getForObject</code>, <code>exchange</code> 等)</td>
<td>链式流式 (<code>get().uri()...retrieve()</code>)</td>
<td>链式流式,支持 <code>Mono</code>/<code>Flux</code></td>
</tr>
<tr>
<td>可扩展性</td>
<td>依赖大量重载方法</td>
<td>可配置拦截器、初始器,支持自定义消息转换器</td>
<td>强大的过滤器、拦截器与背压支持</td>
</tr>
<tr>
<td>性能</td>
<td>受限于线程池</td>
<td>同 <code>RestTemplate</code>,但更简洁</td>
<td>更佳,适合高并发场景</td>
</tr>
<tr>
<td>迁移成本</td>
<td>低</td>
<td>较低,可自然承接现有 <code>RestTemplate</code> 配置</td>
<td>较高,需要重构为响应式编程</td>
</tr>
</tbody>
</table>
<h2 id="end-参考">end. 参考</h2>
<ol>
<li>https://segmentfault.com/a/1190000021133071 【思否-Spring5的WebClient使用详解】</li>
<li>https://docs.spring.io/spring-framework/reference/integration/rest-clients.html 【Spring官网】</li>
</ol><br><br>
来源:https://www.cnblogs.com/jackjavacpp/p/18882466
頁: [1]
查看完整版本: SpringBoot3整合AI