元春 發表於 2025-9-15 16:29:00

wso2~把事件处理的思想应用到spring框架

<p>理解你对于WSO2 APIM中事件处理组件以及在Spring Boot中实现类似功能的兴趣。我会为你梳理WSO2 APIM中四个事件核心组件的作用和关系,并提供在Spring Boot中实现类似事件处理模块的思路和示例。</p>
<p>WSO2 APIM(API Manager)中的事件处理核心组件,主要用于实时流处理(Stream Processing)和复杂事件处理(Complex Event Processing, CEP)。这些组件协同工作,构成了一个<strong>事件处理管道(Event Processing Pipeline)</strong>。</p>
<p>为了更直观地展示这四个核心组件之间的关系,请看下面的流程图:</p>
<div class="mermaid">flowchart TD
A[外部数据源&lt;br&gt;Kafka/HTTP/JMS等] --&gt;|推送原始事件| B
B --&gt;|注入标准化事件| C
C --&gt;|被消费处理| D
D --&gt;|产生新事件| E
E --&gt;|输出结果| F
F --&gt;|发布最终结果| G[外部系统&lt;br&gt;数据库、消息队列、API等]
</div><p>上图展示了数据在这四个组件间的流动过程,它是一个<strong>单向的、管道式的处理流程</strong>。</p>
<h1 id="wso2-apim-事件处理核心组件详解">WSO2 APIM 事件处理核心组件详解</h1>
<p>下面我们详细了解一下每个组件的作用。</p>
<h2 id="1-事件接收器-event-receivers">1. 事件接收器 (Event Receivers)</h2>
<p><strong>作用</strong>:事件处理管道的<strong>入口</strong>,负责与外部数据源对接。</p>
<ul>
<li><strong>连接与适配</strong>:监听和接收来自各种外部源(如 Kafka、JMS、HTTP、TCP/UDP、数据库等)的原始事件数据。</li>
<li><strong>数据解析与转换</strong>:将接收到的不同格式(如 JSON、XML、 CSV)的原始数据解析并<strong>映射</strong>到内部 Event Stream 定义的统一格式。这通常通过 <code>@map</code> 等注解配置映射规则。</li>
<li><strong>事件注入</strong>:将转换后的标准化事件对象发布到指定的内部 Event Stream 中,供后续处理。</li>
</ul>
<p><strong>简单来说,Event Receivers 是平台的“感官”,负责从外部世界获取原始数据并翻译成系统能理解的“语言”。</strong></p>
<h2 id="2-事件流-event-streams">2. 事件流 (Event Streams)</h2>
<p><strong>作用</strong>:事件数据的<strong>结构定义和传输载体</strong>。</p>
<ul>
<li><strong>数据模型</strong>:明确规定事件流的元数据,即事件包含哪些属性(字段)以及每个属性的数据类型(如 string, int, float, bool等)。</li>
<li><strong>唯一标识</strong>:每个流通过名称(Stream ID)和版本(Stream Version)进行唯一标识(如 <code>StockTickStream:1.0.0</code>)。</li>
<li><strong>数据通道</strong>:实际的事件数据按照定义的结构在系统中流动。它连接了 Event Receivers、Execution Plans 和 Event Publishers,是组件间解耦通信的契约。</li>
</ul>
<p><strong>可以将 Event Streams 理解为一张数据库表的表结构定义,或者一份规定了字段和类型的消息契约。</strong></p>
<h2 id="3-执行计划-execution-plans">3. 执行计划 (Execution Plans)</h2>
<p><strong>作用</strong>:事件处理管道的<strong>大脑</strong>,包含核心业务逻辑。</p>
<ul>
<li><strong>处理逻辑容器</strong>:包含一个或多个 <strong>Siddhi 查询(SiddhiQL Queries)</strong>。SiddhiQL 是一种类似于 SQL 的流处理语言。</li>
<li><strong>复杂计算</strong>:对输入事件流中的数据执行各种操作,包括:
<ul>
<li><strong>过滤和投影</strong>:<code>select symbol, price from InputStream where price &gt; 100</code></li>
<li><strong>窗口操作</strong>:基于时间或长度进行聚合(如计算滚动平均价)。</li>
<li><strong>模式匹配</strong>:检测特定的事件序列(如5秒内价格暴涨10%)。</li>
<li><strong>关联连接</strong>:将不同流的事件基于某个条件连接起来。</li>
<li><strong>调用函数</strong>:使用内置或自定义函数进行异常检测等。</li>
</ul>
</li>
<li><strong>输出生成</strong>:处理的结果会以新事件的形式写入到<strong>新的输出事件流</strong>中。</li>
</ul>
<p><strong>Execution Plans 是定义“如何对数据流进行计算和转换”的地方。</strong></p>
<h2 id="4-事件发布器-event-publishers">4. 事件发布器 (Event Publishers)</h2>
<p><strong>作用</strong>:事件处理管道的<strong>出口</strong>,负责与下游系统对接。</p>
<ul>
<li><strong>连接下游</strong>:从内部的 Event Streams 中读取处理完成的事件,并将其转换并传输到各种外部接收系统(Sinks),如数据库、消息队列(Kafka)、HTTP 端点、邮件等。</li>
<li><strong>协议与格式适配</strong>:将内部事件格式<strong>映射并序列化</strong>成下游系统要求的格式(如 JSON、XML)和协议。</li>
<li><strong>可靠传输</strong>:尽可能可靠地将数据发送到目标系统。</li>
</ul>
<p><strong>Event Publishers 是平台的“双手”,负责将处理好的结果交付给外部系统。</strong></p>
<h1 id="在-spring-boot-中实现类似事件模块">在 Spring Boot 中实现类似事件模块</h1>
<p>在 Spring Boot 中构建类似的事件驱动系统,可以利用其丰富的生态组件。虽然不像 WSO2 那样开箱即用,但可以更灵活地定制。下图展示了一种基于 Spring Boot 构建事件处理模块的可行架构:</p>
<div class="mermaid">flowchart LR
A[外部数据源] --&gt;|通过HTTP/消息监听器| B[模拟 Event Receivers&lt;br&gt;@RestController/@KafkaListener]
B --&gt;|发布到内部总线| C
C --&gt;|监听并触发| D[模拟 Execution Plans&lt;br&gt;@Service @Async 或 Stream Processor]
D --&gt;|处理结果作为新事件发布| C
C --&gt;|被下游监听器捕获| E[模拟 Event Publishers&lt;br&gt;@EventListener 或消息发送模板]
E --&gt;|调用客户端发送数据| F[外部下游系统]
subgraph G
    B
    C
    D
    E
end
</div><p>下面我们分步骤实现:</p>
<h2 id="1-定义事件流event-streams">1. 定义事件流(Event Streams)</h2>
<p>使用 Java 类或接口来定义数据的结构(POJO)。</p>
<pre><code class="language-java">// 1. 定义事件流:股票行情流 (StockTickStream)
@Data // Lombok 注解,简化 getter/setter 等
@NoArgsConstructor
@AllArgsConstructor
public class StockTickEvent {
    private String symbol;
    private double price;
    private long timestamp;
}

// 定义事件流:告警流 (SpikeAlertStream)
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SpikeAlertEvent {
    private String symbol;
    private double startPrice;
    private double endPrice;
    private double increasePct;
}
</code></pre>
<h2 id="2-实现事件接收器event-receivers">2. 实现事件接收器(Event Receivers)</h2>
<p>使用 <strong>Spring MVC</strong> 接收 HTTP 事件,或使用 <strong>Spring Cloud Stream</strong>、<strong>@KafkaListener</strong> 消费消息。</p>
<pre><code class="language-java">@RestController
@RequestMapping("/api/events")
public class EventReceiverController {

    // 内部事件总线,用于将接收到的事件转发给处理器
    // 也可使用ApplicationEventPublisher
    private final StreamBridge streamBridge;

    public EventReceiverController(StreamBridge streamBridge) {
      this.streamBridge = streamBridge;
    }

    // 模拟 HTTP Event Receiver
    @PostMapping("/stock")
    public ResponseEntity&lt;String&gt; receiveStockTick(@RequestBody StockTickEvent stockTick) {
      // 将接收到的数据转换为标准事件对象
      // 然后发布到内部通道,模拟注入Event Stream
      streamBridge.send("stockTickStream-in-0", stockTick);
      return ResponseEntity.ok("Event received");
    }
}
</code></pre>
<pre><code class="language-java">@Component
public class KafkaEventReceiver {
    // 模拟从Kafka接收事件
    @KafkaListener(topics = "external-stock-topic", groupId = "my-group")
    public void receiveFromKafka(StockTickEvent stockTick) {
      // 同样发布到内部通道
      streamBridge.send("stockTickStream-in-0", stockTick);
    }
}
</code></pre>
<h2 id="3-实现执行逻辑execution-plans">3. 实现执行逻辑(Execution Plans)</h2>
<p>这是核心处理逻辑。可以使用 <strong>普通Spring Bean</strong>、<strong>Spring Cloud Stream 处理器</strong>、<strong>或专业流处理库</strong>(如 Kafka Streams)来实现。</p>
<h3 id="方案一使用-spring-cloud-stream-函数式编程模型推荐">方案一:使用 Spring Cloud Stream 函数式编程模型(推荐)</h3>
<p><code>application.yml</code></p>
<pre><code class="language-yaml">spring:
cloud:
    stream:
      bindings:
      stockTickStream-in-0: # 输入通道
          destination: stockTickTopic
      spikeAlertStream-out-0: # 输出通道
          destination: spikeAlertTopic
      function:
      definition: processStockTick
</code></pre>
<p><code>Java代码</code>:</p>
<pre><code class="language-java">@Component
public class StockEventProcessor {

    @Bean
    public Function&lt;Flux&lt;StockTickEvent&gt;, Flux&lt;SpikeAlertEvent&gt;&gt; processStockTick() {
      return stockTickFlux -&gt; stockTickFlux
                .window(Duration.ofSeconds(5)) // 5秒窗口
                .flatMap(window -&gt; window
                        .buffer(2, 1) // 重叠缓冲区,用于比较前后数据
                        .filter(buffer -&gt; buffer.size() == 2)
                        .map(buffer -&gt; {
                            StockTickEvent e1 = buffer.get(0);
                            StockTickEvent e2 = buffer.get(1);
                            double increasePct = (e2.getPrice() - e1.getPrice()) / e1.getPrice();
                            if (increasePct &gt; 0.10) { // 10%暴涨
                              return new SpikeAlertEvent(
                                        e2.getSymbol(),
                                        e1.getPrice(),
                                        e2.getPrice(),
                                        increasePct
                              );
                            } else {
                              return null;
                            }
                        })
                        .filter(Objects::nonNull)
                );
    }
}
</code></pre>
<h3 id="方案二在普通service中使用事件监听和异步处理">方案二:在普通Service中使用事件监听和异步处理</h3>
<pre><code class="language-java">@Service
public class SimpleStockProcessor {

    private static final Map&lt;String, StockTickEvent&gt; LAST_EVENTS = new ConcurrentHashMap&lt;&gt;();
    private final ApplicationEventPublisher publisher;

    public SimpleStockProcessor(ApplicationEventPublisher publisher) {
      this.publisher = publisher;
    }

    @EventListener
    @Async // 异步处理
    public void handleStockTick(StockTickEvent event) {
      String symbol = event.getSymbol();
      StockTickEvent lastEvent = LAST_EVENTS.get(symbol);
      LAST_EVENTS.put(symbol, event);

      if (lastEvent != null) {
            double increasePct = (event.getPrice() - lastEvent.getPrice()) / lastEvent.getPrice();
            if (increasePct &gt; 0.10) {
                SpikeAlertEvent alert = new SpikeAlertEvent(
                        symbol, lastEvent.getPrice(), event.getPrice(), increasePct
                );
                publisher.publishEvent(alert); // 发布告警事件
            }
      }
    }
}
</code></pre>
<h2 id="4-实现事件发布器event-publishers">4. 实现事件发布器(Event Publishers)</h2>
<p>监听处理结果事件,并将其发送到下游系统。</p>
<pre><code class="language-java">@Component
public class EventPublisherService {

    // 方式1: 使用RestTemplate调用下游HTTP API
    @EventListener
    public void publishSpikeAlertViaHttp(SpikeAlertEvent alert) {
      RestTemplate restTemplate = new RestTemplate();
      restTemplate.postForEntity("http://alert-system/alerts", alert, Void.class);
    }

    // 方式2: 使用KafkaTemplate发送到Kafka
    @EventListener
    public void publishSpikeAlertViaKafka(SpikeAlertEvent alert) {
      kafkaTemplate.send("spike-alerts-topic", alert.getSymbol(), alert);
    }

    // 方式3: 通过Spring Cloud Stream绑定器输出
    // 上述Processor方案的输出绑定 already handles this automatically
    // SpikeAlertEvent 会通过spikeAlertStream-out-0通道发送到MQ
}
</code></pre>
<h2 id="补充配置与依赖">补充:配置与依赖</h2>
<p><code>pom.xml</code> 关键依赖:</p>
<pre><code class="language-xml">&lt;!-- Spring Boot Web --&gt;
&lt;dependency&gt;
    &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
    &lt;artifactId&gt;spring-boot-starter-web&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;!-- Spring Cloud Stream (e.g., with Kafka binder) --&gt;
&lt;dependency&gt;
    &lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
    &lt;artifactId&gt;spring-cloud-stream&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
    &lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
    &lt;artifactId&gt;spring-cloud-stream-binder-kafka&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;!-- 或使用Reactive方式 --&gt;
&lt;dependency&gt;
    &lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
    &lt;artifactId&gt;spring-cloud-stream-binder-kafka-streams&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;!-- Kafka --&gt;
&lt;dependency&gt;
    &lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
    &lt;artifactId&gt;spring-kafka&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;!-- Lombok --&gt;
&lt;dependency&gt;
    &lt;groupId&gt;org.projectlombok&lt;/groupId&gt;
    &lt;artifactId&gt;lombok&lt;/artifactId&gt;
    &lt;optional&gt;true&lt;/optional&gt;
&lt;/dependency&gt;
</code></pre>
<h1 id="总结与建议">总结与建议</h1>
<p>WSO2 APIM 的事件处理组件提供了一套<strong>成熟、集成度高</strong>的解决方案,特别适合在 WSO2 生态中进行复杂的流处理任务。</p>
<p>在 Spring Boot 中自建类似模块,则提供了<strong>极大的灵活性和控制力</strong>,并且能更好地与现有的 Spring 生态集成。对于大多数应用场景,Spring Boot 的方案是更轻量、更熟悉的选择。</p>
<p>选择哪种方案取决于你的具体需求:</p>
<ul>
<li>如果你的项目<strong>已经深度使用 WSO2 产品线</strong>,且需要处理<strong>非常复杂的事件模式</strong>,坚持使用 WSO2 的组件是合理的。</li>
<li>如果你想要<strong>更高的灵活性</strong>、<strong>更浅的学习曲线</strong>,或者你的架构是<strong>基于Spring Cloud的微服务</strong>,那么使用 Spring Boot 及其生态组件来构建事件处理模块是一个高效且可控的选择。</li>
</ul>
<p>希望这些解释和示例能帮助你更好地理解并在你的项目中实现所需的功能。</p>


</div>
<div id="MySignature" role="contentinfo">
    <p></p>
<div class="navgood">
<p>作者:仓储大叔,张占岭,<br>
荣誉:微软MVP<br>QQ:853066980</p>

<p><strong>支付宝扫一扫,为大叔打赏!</strong>
<br><img src="https://images.cnblogs.com/cnblogs_com/lori/237884/o_IMG_7144.JPG"></p>
</div><br><br>
来源:https://www.cnblogs.com/lori/p/19093208
頁: [1]
查看完整版本: wso2~把事件处理的思想应用到spring框架