木十文 發表於 2025-11-4 15:03:00

从零开始学Flink:事件驱动

<p>在实时计算领域,很多业务逻辑天然适合“事件驱动”模式:当事件到达时触发处理、在某个时间点触发补偿或汇总、根据状态变化发出告警等。Apache Flink 为此提供了强大的 ProcessFunction 家族(KeyedProcessFunction、CoProcessFunction、BroadcastProcessFunction 等),它们在算子层面同时具备“事件处理 + 定时器 + 状态”的能力,是构建复杂流式应用的核心基石。</p>
<p>本文基于 Flink 1.20 的语义,带你从零理解事件驱动的编程模型,并一步步实现一个“伪窗口 PseudoWindow”示例,体会 ProcessFunction 如何代替窗口完成时间分桶、累加和触发输出。</p>
<h2 id="一为什么选择事件驱动">一、为什么选择事件驱动</h2>
<p>对于如下需求,事件驱动往往比简单窗口更灵活:</p>
<ul>
<li>自定义触发逻辑(不仅仅是固定窗口边界)。</li>
<li>精细的迟到事件处理策略(事件时间/处理时间混用、不同类型事件分别处理)。</li>
<li>需要在算子级别维护复杂状态(如每个 key 多个并发“子窗口”或会话)。</li>
<li>需要与外部系统交互或对齐(例如到达某个业务时间点后批量写出)。</li>
</ul>
<p>ProcessFunction 能满足上述场景,因为它同时提供:</p>
<ul>
<li>事件回调:processElement,用于逐条事件处理。</li>
<li>定时器:事件时间或处理时间两种类型,支持在指定时刻触发 onTimer 回调。</li>
<li>管理状态:借助 RichFunction 的上下文,访问 keyed state(如 ValueState、MapState、ListState 等)。</li>
</ul>
<h2 id="二核心概念速览">二、核心概念速览</h2>
<ul>
<li>KeyedProcessFunction:在 keyBy 之后对每个 key 独立处理事件、注册和触发定时器、读写 keyed state。</li>
<li>TimerService:通过 ctx.timerService() 注册事件时间或处理时间定时器;在 onTimer 中被调用。</li>
<li>Watermark:推进事件时间的“时钟”,只有当 Watermark 超过某个时间点时,对应的事件时间定时器才会触发。</li>
<li>RichFunction:ProcessFunction 属于 RichFunction,因而拥有 open/getRuntimeContext 等生命周期方法,可初始化状态描述符等。</li>
</ul>
<h2 id="三示例用-keyedprocessfunction-实现小时级伪窗口">三、示例:用 KeyedProcessFunction 实现“小时级伪窗口”</h2>
<p>目标:按司机 driverId,每小时汇总 tip(小费)之和。我们先给出窗口版本,再给出伪窗口版本以对比两者的思路差异。</p>
<h3 id="1-窗口实现参考思路">1. 窗口实现(参考思路)</h3>
<pre><code class="language-java">// 每小时、每个司机的提示费求和(传统事件时间翻转窗口)
DataStream&lt;Tuple3&lt;Long, Long, Float&gt;&gt; hourlyTips = fares
      .keyBy((TaxiFare fare) -&gt; fare.driverId)
      .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
      .process(new AggregateTipsProcess());
</code></pre>
<p>窗口版本直观,但触发逻辑受窗口边界约束。如果我们希望完全掌控“何时触发”和“如何管理多窗口并发”,可以使用 KeyedProcessFunction:</p>
<h3 id="2-事件驱动实现pseudowindow">2. 事件驱动实现(PseudoWindow)</h3>
<pre><code class="language-java">// 使用事件驱动的 KeyedProcessFunction 替代窗口
DataStream&lt;Tuple3&lt;Long, Long, Float&gt;&gt; hourlyTips = fares
      .keyBy((TaxiFare fare) -&gt; fare.driverId)
      .process(new PseudoWindow(Duration.ofSeconds(5)));

// 伪窗口:按事件时间把每条数据归入其所在小时段,注册窗口结束时间的定时器,定时器触发时输出该小时汇总
public static class PseudoWindow extends KeyedProcessFunction&lt;Long, TaxiFare, Tuple3&lt;Long, Long, Float&gt;&gt; {

    private final long durationMsec;
    // MapState&lt;窗口结束时间, 累计 tips&gt;
    private transient MapState&lt;Long, Float&gt; sumOfTips;

    public PseudoWindow(Duration duration) {
      this.durationMsec = duration.toMillis();
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      MapStateDescriptor&lt;Long, Float&gt; sumDesc =
                new MapStateDescriptor&lt;&gt;("sumOfTips", Long.class, Float.class);
      sumOfTips = getRuntimeContext().getMapState(sumDesc);
    }

    @Override
    public void processElement(
            TaxiFare fare,
            Context ctx,
            Collector&lt;Tuple3&lt;Long, Long, Float&gt;&gt; out) throws Exception {

      long eventTime = fare.getEventTime();
      TimerService timerService = ctx.timerService();

      // 若事件时间早于当前 Watermark,说明窗口已触发,该事件为迟到事件(按需决定丢弃或补偿)
      if (eventTime &lt;= timerService.currentWatermark()) {
            // 迟到事件处理策略:可以记录指标、写侧输出、或进行补偿
            return;
      }

      // 计算该事件所属小时窗口的“窗口结束时间”戳
      long endOfWindow = eventTime - (eventTime % durationMsec) + durationMsec - 1;

      // 注册事件时间定时器:当 Watermark 超过 endOfWindow 时触发 onTimer
      timerService.registerEventTimeTimer(endOfWindow);

      // 累加该窗口的 tips
      Float sum = sumOfTips.get(endOfWindow);
      if (sum == null) {
            sum = 0.0F;
      }
      sum += fare.tip;
      sumOfTips.put(endOfWindow, sum);
    }

    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector&lt;Tuple3&lt;Long, Long, Float&gt;&gt; out) throws Exception {

      // 定时器时间戳即窗口结束时间,输出 (driverId, windowEnd, sum)
      Float sum = sumOfTips.get(timestamp);
      if (sum != null) {
            Long driverId = ctx.getCurrentKey();
            out.collect(Tuple3.of(driverId, timestamp, sum));
            // 输出后清理该窗口的状态,避免泄漏
            sumOfTips.remove(timestamp);
      }
    }
}
</code></pre>
<p>从这个实现可以观察到:</p>
<ul>
<li>我们手动决定“窗口”形态与触发时机:不依赖 Window API,而是依赖事件时间定时器和 Watermark。</li>
<li>MapState 使一个 key 能同时维护多个并发窗口(不同结束时间戳)。</li>
<li>迟到事件处理策略高度可定制:可丢弃、可侧输出、也可做补偿累加再延迟触发。</li>
</ul>
<h2 id="四生命周期与关键回调">四、生命周期与关键回调</h2>
<ul>
<li>open:初始化状态(如 MapState、ValueState),常用于设置描述符和外部资源连接。</li>
<li>processElement:每到一条事件都会调用。典型逻辑包括:计算归属时间段、注册定时器、修改状态、按需提前输出。</li>
<li>onTimer:当定时器触发时调用。常见动作:基于状态汇总并输出、清理过期状态、注册下一次定时器等。</li>
</ul>
<h2 id="五事件时间-vs-处理时间定时器">五、事件时间 vs 处理时间定时器</h2>
<ul>
<li>事件时间(Event Time):以事件携带的时间戳为准,Watermark 推进时触发。适合有乱序、需要时间一致性的业务场景。</li>
<li>处理时间(Processing Time):以算子所在 TaskManager 的系统时间为准,时间一到立即触发。适合周期性心跳、定时轮询等逻辑。</li>
</ul>
<p>建议:涉及业务时间逻辑时优先使用事件时间,并合理设置 Watermark 与乱序容忍度;同时可以结合处理时间定时器做后台清理或补偿任务。</p>
<h2 id="六watermark-与迟到事件">六、Watermark 与迟到事件</h2>
<ul>
<li>Watermark 是事件时间“时钟”。当 Watermark 超过某个窗口的结束时间,说明该窗口已“完成”,对应事件时间定时器会被触发。</li>
<li>迟到事件:其事件时间落在已完成窗口内。在窗口 API 中可配置允许迟到与侧输出;在 ProcessFunction 中则由你自定义策略(记录日志、侧输出、修正状态等)。</li>
</ul>
<p>在批处理场景(有界数据)中,通常可以使用单调递增或默认 Watermark 策略;在流处理场景(无界数据)中,常用“有界乱序”策略。</p>
<h2 id="七与窗口-api-的对比">七、与窗口 API 的对比</h2>
<ul>
<li>窗口 API:更易用、约束更明显,适合绝大多数时间分桶与聚合场景。</li>
<li>ProcessFunction:更低层、可完全自定义触发与状态管理,适合复杂业务流程编排、会话识别、跨窗口补偿、规则引擎等。</li>
</ul>
<p>经验法则:能用窗口优雅解决的就用窗口;当窗口表达力不够时,考虑 ProcessFunction。</p>
<h2 id="八常见事件驱动模式">八、常见事件驱动模式</h2>
<ul>
<li>会话化(Sessionization):用 ValueState 记录最近活动时间,注册处理时间或事件时间定时器判定会话结束。</li>
<li>去重(Deduplication):维护最近看到的事件 ID 集合(BloomFilter/MapState),设置过期清理定时器。</li>
<li>告警与监控:根据状态阈值注册近未来定时器并在 onTimer 中发出告警。</li>
<li>复杂汇总:如本文示例的伪窗口;或跨窗口滚动汇总、迟到补偿输出等。</li>
</ul>
<h2 id="九最佳实践">九、最佳实践</h2>
<ul>
<li>状态清理与 TTL:定时清理过期状态,或使用 State TTL,避免内存泄漏。</li>
<li>触发器设计:避免过密的定时器注册,减少 onTimer 风暴,可合并多个时间点或批量触发。</li>
<li>乱序容忍:根据业务乱序程度设置 Watermark 策略,既保证准确性又避免过度延迟。</li>
<li>侧输出:对迟到或异常事件使用 Side Output,既不影响主流计算又便于单独监控。</li>
<li>可观察性:对迟到率、定时器触发延迟、状态大小等打点,便于定位瓶颈与异常。</li>
</ul>
<h2 id="十完整示例骨架整合-source-与-watermark">十、完整示例骨架(整合 source 与 Watermark)</h2>
<pre><code class="language-java">StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10_000);

// 示例:Kafka Source + Bounded Out-Of-Orderness Watermark
KafkaSource&lt;TaxiFare&gt; source = KafkaSource.&lt;TaxiFare&gt;builder()
      .setBootstrapServers("localhost:9092")
      .setTopics("fares")
      .setGroupId("flink-fare-group")
      .setValueOnlyDeserializer(new TaxiFareDeserializer())
      .build();

DataStream&lt;TaxiFare&gt; fares = env.fromSource(
      source,
      WatermarkStrategy
                .&lt;TaxiFare&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((fare, ts) -&gt; fare.getEventTime()),
      "Kafka Fares");

DataStream&lt;Tuple3&lt;Long, Long, Float&gt;&gt; hourlyTips = fares
      .keyBy(f -&gt; f.driverId)
      .process(new PseudoWindow(Duration.ofSeconds(5)));

hourlyTips.print();
env.execute("Event-driven Hourly Tips");
</code></pre>
<h2 id="十一创建-topic-和发送测试数据">十一、创建 Topic 和发送测试数据</h2>
<ol>
<li>创建 Topic fares<br>
./bin/kafka-topics.sh --create --topic fares --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1</li>
<li>打开 Console Producer(交互式)<br>
./bin/kafka-console-producer.sh --topic fares --bootstrap-server localhost:9092</li>
<li>在 Producer 里输入 CSV 测试消息(示例)<br>
42,1710003600000,3.5<br>
42,1710007100000,2.1<br>
77,1710003800000,1.0<br>
如果希望使用当前毫秒时间戳,可以在另一个终端获取:<br>
date +%s%3N<br>
然后输入例如:<br>
42,1699999999999,3.5</li>
<li>可选:使用 Console Consumer 验证消息进出<br>
./bin/kafka-console-consumer.sh --topic fares --bootstrap-server localhost:9092 --from-beginning</li>
</ol>
<h2 id="十二总结">十二、总结</h2>
<p>事件驱动让你在算子层面掌控“事件处理 + 定时器 + 状态”,从而能表达超越窗口 API 的复杂业务逻辑。在 Flink 中,KeyedProcessFunction 是实现事件驱动应用的核心武器:用它来注册事件或处理时间定时器、维护键控状态、为迟到与补偿设计精细策略。恰当地选择 Watermark 策略和状态清理机制,可以在保证准确性的同时兼顾性能与资源使用。</p>
<hr>
<p>原文来自:http://blog.daimajiangxin.com.cn</p>
<p>源码地址:https://gitee.com/daimajiangxin/flink-learning</p><br><br>
来源:https://www.cnblogs.com/daimajiangxin/p/19190368
頁: [1]
查看完整版本: 从零开始学Flink:事件驱动