Flink实时计算心智模型——流、窗口、水位线、状态与Checkpoint的协作
<div data-page-id="Ar5CfFD4HddcCPcgShfccBjnnuh" data-lark-html-role="root" data-docx-has-block-data="false"><h1 class="ace-line ace-line old-record-id-Ar5CfFD4HddcCPcgShfccBjnnuh">Flink实时计算心智模型——流、窗口、水位线、状态与Checkpoint的协作</h1>
<div class="ace-line ace-line old-record-id-E3AdfW1FcdGhn5c9ytwcO9C6n9f">在实时计算领域,Flink凭借其强大的流处理能力、低延迟特性和高可靠性,成为当前最主流的框架之一。但对于很多初学者甚至资深开发者而言,Flink的核心概念——流、窗口、水位线、状态与Checkpoint,往往是“单独能懂,放在一起就乱”。其实,这五大组件并非孤立存在,而是形成了一套紧密协作的“心智模型”:流是数据的载体,窗口是流的切割工具,水位线是时间的标尺,状态是计算的记忆,Checkpoint是可靠性的保障。只有理解它们之间的协作逻辑,才能真正掌握Flink实时计算的精髓,避开开发中的“坑”,写出高效、稳定的实时任务。</div>
<div class="ace-line ace-line old-record-id-Qvw0fOtEkdiaaLcGyEKcwfbZnMg">本文将从“组件本质→协作逻辑→实践场景→常见问题”四个维度,层层拆解这套心智模型,用通俗的语言+实际案例,帮你彻底搞懂Flink实时计算的核心原理,让你在开发中能够“知其然,更知其所以然”。</div>
<h2 class="heading-2 ace-line old-record-id-X1JVfNhxCdrs5RcdnsCcwh8On1h">一、先搞懂:五大核心组件的本质(基础认知,避免混淆)</h2>
<div class="ace-line ace-line old-record-id-PE7nfinZ0dbYFvcDNZGcSgQPnzg">在讲解协作逻辑之前,我们先单独拆解每个组件的核心作用,明确其“定位”和“职责”。很多人之所以困惑,本质是对每个组件的本质理解不透彻,把“功能”和“作用”混为一谈。</div>
<h3 class="heading-3 ace-line old-record-id-BBrjf84fSdRHg9c6ZVjcLy85nqg">1. 流(Stream):实时数据的“载体”,一切计算的起点</h3>
<div class="ace-line ace-line old-record-id-TFL6fWLWvddzxycHFDycqlfVnic">流是Flink最基础的概念,本质是<strong>无限序列的连续数据项</strong>,这些数据项按照时间顺序产生、传输,没有固定的边界(区别于批处理的“有限数据集”)。比如:用户的点击日志、设备的监控数据、订单的支付记录,这些持续产生的数据,都可以看作是一条“流”。</div>
<div class="ace-line ace-line old-record-id-No6FfHgNTdNnXhcIRjlcDadxnVe">Flink中的流分为两种,这是理解后续协作的关键:</div>
<ul class="list-bullet1">
<li class="ace-line ace-line old-record-id-BEUzfPvbadNNvec4eC6cVHKGnoh" data-list="bullet">事件时间(Event Time)流:数据本身携带的时间戳,代表数据“发生的时间”。比如用户点击按钮的时间、订单生成的时间,这种时间是客观存在的,不受数据传输速度、处理延迟的影响。这是实际业务中最常用的流类型,也是Flink的核心优势所在——能够基于“真实时间”进行计算,避免因系统延迟导致的计算偏差。</li>
<li class="ace-line ace-line old-record-id-ExRVfqBfBdHKXXcYCAvc1t23n6c" data-list="bullet">处理时间(Processing Time)流:数据到达Flink节点(如Source、Operator)的时间,代表数据“被处理的时间”。这种时间依赖于系统时钟,容易受网络延迟、节点负载影响,适合对时间精度要求不高的场景(如简单的实时监控报警)。</li>
</ul>
<div class="ace-line ace-line old-record-id-N2qPfPocbdLAMpcwAUXcds9nnId">核心要点:流的核心是“时间序列”,而事件时间是Flink实时计算的核心基准——后续的窗口、水位线,都是围绕事件时间展开的。没有流,就没有后续的一切计算;没有事件时间,就没有Flink的“精准实时计算”。</div>
<h4 class="heading-4 ace-line old-record-id-LtxGfyZ9IdhurzcFSBjcxB1GnQd">代码示例1:Flink创建事件时间流(Kafka Source为例)</h4>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.eventtime.WatermarkStrategy;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.connector.kafka.source.KafkaSource;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.datastream.DataStream;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> EventTimeStreamDemo {
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> main(String[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1. 创建执行环境</span>
StreamExecutionEnvironment env =<span style="color: rgba(0, 0, 0, 1)"> StreamExecutionEnvironment.getExecutionEnvironment();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 开启事件时间(Flink 1.12+ 默认开启,但显式声明更规范)</span>
<span style="color: rgba(0, 0, 0, 1)"> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2. 配置Kafka Source,读取订单流(事件时间流)</span>
KafkaSource<String> kafkaSource = KafkaSource.<String><span style="color: rgba(0, 0, 0, 1)">builder()
.setBootstrapServers(</span>"localhost:9092") <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Kafka集群地址</span>
.setTopics("order_topic") <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 订阅的订单主题</span>
.setGroupId("flink_order_group") <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 消费者组
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 从最新偏移量开始读取(生产环境可根据需求调整为 earliest)</span>
<span style="color: rgba(0, 0, 0, 1)"> .setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(</span><span style="color: rgba(0, 0, 255, 1)">new</span> SimpleStringSchema()) <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 字符串反序列化</span>
<span style="color: rgba(0, 0, 0, 1)"> .build();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 3. 读取Kafka数据,指定事件时间字段(假设订单数据格式:orderId,eventTime,amount)</span>
DataStream<Order> orderStream =<span style="color: rgba(0, 0, 0, 1)"> env.fromSource(
kafkaSource,
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 水位线策略:基于事件时间字段,允许3秒乱序(后续水位线章节详细说明)</span>
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(3<span style="color: rgba(0, 0, 0, 1)">))
.mapTimestamp(line </span>-><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 解析订单数据,提取事件时间戳(毫秒级)</span>
String[] fields = line.split(","<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">return</span> Long.parseLong(fields);
}),
</span>"Kafka Order Source"<span style="color: rgba(0, 0, 0, 1)">
)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 将字符串转换为Order实体类</span>
.map(line -><span style="color: rgba(0, 0, 0, 1)"> {
String[] fields </span>= line.split(","<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Order(
fields[</span>0<span style="color: rgba(0, 0, 0, 1)">],
Long.parseLong(fields[</span>1<span style="color: rgba(0, 0, 0, 1)">]),
Double.parseDouble(fields[</span>2<span style="color: rgba(0, 0, 0, 1)">])
);
});
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 后续可对orderStream进行窗口、聚合等操作</span>
orderStream.print("Event Time Order Stream"<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 执行任务</span>
env.execute("Flink Event Time Stream Demo"<span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 订单实体类</span>
<span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> Order {
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String orderId;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> Long eventTime; <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 事件时间戳(毫秒)</span>
<span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Double amount;
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 构造方法、getter/setter省略</span>
<span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Order(String orderId, Long eventTime, Double amount) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.orderId =<span style="color: rgba(0, 0, 0, 1)"> orderId;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.eventTime =<span style="color: rgba(0, 0, 0, 1)"> eventTime;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.amount =<span style="color: rgba(0, 0, 0, 1)"> amount;
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String toString() {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> "Order{orderId='" + orderId + "', eventTime=" + eventTime + ", amount=" + amount + "}"<span style="color: rgba(0, 0, 0, 1)">;
}
}
}</span></pre>
</div>
<p> </p>
<div class="ace-line ace-line old-record-id-BAtUfXl5wdSz0OcTrz8cZeQ2nqd">说明:该示例创建了基于Kafka的事件时间流,核心是通过<code>WatermarkStrategy</code>指定事件时间字段,并设置3秒乱序容忍,为后续水位线和窗口计算奠定基础;同时使用Operator State(Kafka偏移量状态),Flink会自动维护偏移量,避免重复读取。</div>
<h3 class="heading-3 ace-line old-record-id-F5v2fP2UadOU4JcdpKlcW9gCnSh">2. 窗口(Window):流的“切割工具”,将无限流转化为有限计算单元</h3>
<div class="ace-line ace-line old-record-id-EXU6f6FQcdfNlTc4XIHcD4qfn9Z">流是无限的,我们无法对“无限的数据”直接进行聚合计算(比如统计每小时的订单量)。因此,需要一种工具,将无限流“切割”成一个个有限的、可计算的“数据块”,这种工具就是窗口。</div>
<div class="ace-line ace-line old-record-id-JD0Lf2Vt1d5TqgcuPDQcKvZKnsd">窗口的核心作用:<strong>将无限流转化为有限的计算单元</strong>,让聚合操作(求和、计数、平均值)能够落地。比如,我们要统计“每10分钟的用户点击量”,就需要用窗口将持续的点击流,切割成一个个10分钟的“数据块”,然后对每个数据块进行计数。</div>
<div class="ace-line ace-line old-record-id-NGOnfWii0dRaCkciZPPcMHwNnGe">Flink中最常用的窗口类型,按触发机制可分为两种:</div>
<ul class="list-bullet1">
<li class="ace-line ace-line old-record-id-L3qWfge2yd6jT4cgwltcIz2mn1f" data-list="bullet">滚动窗口(Tumbling Window):窗口大小固定,无重叠,比如每10分钟一个窗口,每个窗口的时间范围互不重叠(0-10分钟、10-20分钟、20-30分钟)。适合需要“固定周期统计”的场景,如每小时的订单汇总。</li>
<li class="ace-line ace-line old-record-id-J9bCfQcN8djMjMcXoZdcKVPUndf" data-list="bullet">滑动窗口(Sliding Window):窗口大小固定,但有重叠,比如窗口大小10分钟,滑动步长5分钟,那么会出现“0-10分钟、5-15分钟、10-20分钟”这样的重叠窗口。适合需要“连续统计”的场景,如每5分钟统计一次过去10分钟的用户活跃度。</li>
</ul>
<div class="ace-line ace-line old-record-id-VhL2fSiUXdzOtFcdCiwcyj4QnHb">核心要点:窗口的本质是“时间范围的划分”,但它本身无法判断“窗口内的数据是否已经全部到达”——这就需要水位线来辅助;同时,窗口的计算结果需要被记录下来,这就需要状态来存储。</div>
<h4 class="heading-4 ace-line old-record-id-Djp5fqJ5wdX2OZclUQccLfJznXg">代码示例2:滚动窗口+滑动窗口实现(结合事件时间)</h4>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.eventtime.WatermarkStrategy;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.functions.AggregateFunction;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.datastream.DataStream;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.windowing.time.Time;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> WindowDemo {
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> main(String[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
StreamExecutionEnvironment env </span>=<span style="color: rgba(0, 0, 0, 1)"> StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1. 读取订单流(复用上面的orderStream,此处简化)</span>
DataStream<Order> orderStream =<span style="color: rgba(0, 0, 0, 1)"> getOrderStream(env);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2. 滚动窗口:每10分钟统计一次订单总数和总金额(事件时间)</span>
DataStream<OrderStats> tumblingWindowResult =<span style="color: rgba(0, 0, 0, 1)"> orderStream
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 按窗口ID分组(此处无需额外分组,窗口本身按时间划分)</span>
.windowAll(TumblingEventTimeWindows.of(Time.minutes(10<span style="color: rgba(0, 0, 0, 1)">)))
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 聚合计算:统计订单数和总金额</span>
.aggregate(<span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderAggregateFunction());
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 3. 滑动窗口:每5分钟统计一次过去10分钟的订单数据(事件时间)</span>
DataStream<OrderStats> slidingWindowResult =<span style="color: rgba(0, 0, 0, 1)"> orderStream
.windowAll(SlidingEventTimeWindows.of(Time.minutes(</span>10), Time.minutes(5<span style="color: rgba(0, 0, 0, 1)">)))
.aggregate(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderAggregateFunction());
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 输出结果</span>
tumblingWindowResult.print("滚动窗口(10分钟)统计结果"<span style="color: rgba(0, 0, 0, 1)">);
slidingWindowResult.print(</span>"滑动窗口(10分钟窗口,5分钟滑动)统计结果"<span style="color: rgba(0, 0, 0, 1)">);
env.execute(</span>"Flink Window Demo"<span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 聚合函数:统计每个窗口的订单总数和总金额</span>
<span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span> OrderAggregateFunction <span style="color: rgba(0, 0, 255, 1)">implements</span> AggregateFunction<Order, OrderStats, OrderStats><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 初始化聚合状态(初始订单数0,总金额0)</span>
<span style="color: rgba(0, 0, 0, 1)"> @Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats createAccumulator() {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> OrderStats(0L, 0.0<span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 累加数据:每来一条订单,更新状态</span>
<span style="color: rgba(0, 0, 0, 1)"> @Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats add(Order order, OrderStats accumulator) {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats(
accumulator.getOrderCount() </span>+ 1<span style="color: rgba(0, 0, 0, 1)">,
accumulator.getTotalAmount() </span>+<span style="color: rgba(0, 0, 0, 1)"> order.getAmount()
);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 窗口触发时,输出聚合结果</span>
<span style="color: rgba(0, 0, 0, 1)"> @Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats getResult(OrderStats accumulator) {
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> accumulator;
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 并行窗口的状态合并(windowAll无需合并,多并行时需实现)</span>
<span style="color: rgba(0, 0, 0, 1)"> @Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats merge(OrderStats a, OrderStats b) {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats(
a.getOrderCount() </span>+<span style="color: rgba(0, 0, 0, 1)"> b.getOrderCount(),
a.getTotalAmount() </span>+<span style="color: rgba(0, 0, 0, 1)"> b.getTotalAmount()
);
}
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 订单统计结果实体类</span>
<span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats {
</span><span style="color: rgba(0, 0, 255, 1)">private</span> Long orderCount; <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 订单总数</span>
<span style="color: rgba(0, 0, 255, 1)">private</span> Double totalAmount; <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 订单总金额
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 构造方法、getter/setter省略</span>
<span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats(Long orderCount, Double totalAmount) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.orderCount =<span style="color: rgba(0, 0, 0, 1)"> orderCount;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.totalAmount =<span style="color: rgba(0, 0, 0, 1)"> totalAmount;
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String toString() {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> "OrderStats{orderCount=" + orderCount + ", totalAmount=" + totalAmount + "}"<span style="color: rgba(0, 0, 0, 1)">;
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> getter方法</span>
<span style="color: rgba(0, 0, 255, 1)">public</span> Long getOrderCount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> orderCount; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Double getTotalAmount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> totalAmount; }
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 简化:获取订单流(实际可复用代码示例1的Kafka Source逻辑)</span>
<span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">static</span> DataStream<Order><span style="color: rgba(0, 0, 0, 1)"> getOrderStream(StreamExecutionEnvironment env) {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟订单数据(实际替换为Kafka Source)</span>
<span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> env.fromElements(
</span><span style="color: rgba(0, 0, 255, 1)">new</span> Order("1001", 1683000625000L, 99.0), <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2024-05-01 10:03:45</span>
<span style="color: rgba(0, 0, 255, 1)">new</span> Order("1002", 1683001225000L, 199.0),<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2024-05-01 10:10:25</span>
<span style="color: rgba(0, 0, 255, 1)">new</span> Order("1003", 1683001825000L, 299.0) <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2024-05-01 10:20:25</span>
<span style="color: rgba(0, 0, 0, 1)"> )
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟水位线生成(后续章节详细说明)</span>
<span style="color: rgba(0, 0, 0, 1)"> .assignTimestampsAndWatermarks(
WatermarkStrategy.</span><Order>forBoundedOutOfOrderness(Duration.ofMinutes(5<span style="color: rgba(0, 0, 0, 1)">))
.withTimestampAssigner((order, timestamp) </span>-><span style="color: rgba(0, 0, 0, 1)"> order.getEventTime())
);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 复用Order实体类(同代码示例1)</span>
<span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> Order {
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String orderId;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Long eventTime;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Double amount;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Order(String orderId, Long eventTime, Double amount) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.orderId =<span style="color: rgba(0, 0, 0, 1)"> orderId;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.eventTime =<span style="color: rgba(0, 0, 0, 1)"> eventTime;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.amount =<span style="color: rgba(0, 0, 0, 1)"> amount;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Long getEventTime() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> eventTime; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Double getAmount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> amount; }
}
}</span></pre>
</div>
<p> </p>
<div class="ace-line ace-line old-record-id-EIfFfNWI0d4yQnc7YvFcVzvMnWe">说明:该示例实现了滚动窗口和滑动窗口的核心逻辑,通过<code>AggregateFunction</code>实现订单数和总金额的聚合,窗口的触发由后续的水位线控制;聚合过程中,中间结果会自动存储在Window State(Keyed State的一种)中,无需手动管理。</div>
<h3 class="heading-3 ace-line old-record-id-Cxfqf0mxFdhJ7NccncIc5hksnof">3. 水位线(Watermark):时间的“标尺”,解决窗口的“数据迟到”问题</h3>
<div class="ace-line ace-line old-record-id-GVKTf0OU6d9lv3cTgtTcISianqc">在事件时间流中,数据的传输是异步的、无序的——比如,一个发生在10:00的事件,可能因为网络延迟,在10:05才到达Flink节点。如果窗口的结束时间是10:00,那么这个迟到的数据是否应该被计入这个窗口?如果计入,如何判断“什么时候窗口可以停止等待迟到数据”?</div>
<div class="ace-line ace-line old-record-id-B3fGfbnh4dsNdqcPsVMcSXHPnqG">水位线就是用来解决这个问题的核心组件,它的本质是<strong>一条带有时间戳的“特殊事件”</strong>,用来告诉Flink:“当前时间已经到达X,所有发生时间≤X的事件,都已经到达(或大概率已经到达),后续再出现发生时间≤X的事件,就是迟到数据”。</div>
<div class="ace-line ace-line old-record-id-F8JbfwCCrdZqkwcnPXccoM6Enze">水位线的核心规则(必记):</div>
<ul class="list-bullet1">
<li class="ace-line ace-line old-record-id-YXXAfrXVZduxhpc3B5uc2vbin6d" data-list="bullet">水位线的时间戳,必须单调递增(避免时间回退,导致窗口重复触发)。</li>
<li class="ace-line ace-line old-record-id-JToFfaOIYdwjaiceCWNco71On0b" data-list="bullet">水位线 = 当前最大事件时间 - 允许迟到时间(Allowed Lateness)。比如,允许数据迟到5分钟,当前最大事件时间是10:05,那么水位线就是10:00——此时,10:00结束的窗口,就可以触发计算(因为允许迟到5分钟,所以窗口会再等待5分钟,直到10:05才真正关闭)。</li>
<li class="ace-line ace-line old-record-id-IYi2fBlhPdNeFwcl0yhc9dVAnUf" data-list="bullet">水位线是“全局同步”的——Flink的分布式环境中,多个并行节点会各自生成水位线,最终由JobManager同步出全局水位线,确保所有节点的时间基准一致。</li>
</ul>
<div class="ace-line ace-line old-record-id-F6oUfIRfgda63IcKeYecfwG0nZf">核心要点:水位线不是“真实的时间”,而是Flink对“数据到达情况”的一种“估计”。它的作用是“触发窗口计算”和“界定迟到数据”,没有水位线,窗口就无法判断何时该停止等待,要么会遗漏数据,要么会无限等待导致计算无法推进。</div>
<h4 class="heading-4 ace-line old-record-id-T12UfYaCiddYRUck7HUcL3pFnR3">代码示例3:水位线生成与迟到数据处理</h4>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">import</span> org.apache.flink.api.common.eventtime.*<span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.datastream.DataStream;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.windowing.time.Time;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.util.OutputTag;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> WatermarkDemo {
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> main(String[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
StreamExecutionEnvironment env </span>=<span style="color: rgba(0, 0, 0, 1)"> StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 并行度设置为1(方便测试,生产环境根据集群配置调整)</span>
env.setParallelism(1<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1. 定义迟到数据输出标签(用于收集窗口关闭后到达的迟到数据)</span>
OutputTag<Order> lateDataTag = <span style="color: rgba(0, 0, 255, 1)">new</span> OutputTag<Order>("late_order_data"<span style="color: rgba(0, 0, 0, 1)">){};
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2. 读取订单流,生成水位线</span>
DataStream<Order> orderStream =<span style="color: rgba(0, 0, 0, 1)"> env.fromElements(
</span><span style="color: rgba(0, 0, 255, 1)">new</span> Order("1001", 1683000000000L, 99.0), <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 10:00:00</span>
<span style="color: rgba(0, 0, 255, 1)">new</span> Order("1002", 1683000599000L, 199.0),<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 10:09:59(窗口内最后一条正常数据)</span>
<span style="color: rgba(0, 0, 255, 1)">new</span> Order("1003", 1683000601000L, 299.0),<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 10:10:01(迟到1秒)</span>
<span style="color: rgba(0, 0, 255, 1)">new</span> Order("1004", 1683000900000L, 399.0) <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 10:15:00(迟到5分钟,超过允许迟到时间)</span>
<span style="color: rgba(0, 0, 0, 1)"> )
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 生成水位线:允许5分钟乱序(对应场景中的允许迟到时间)</span>
<span style="color: rgba(0, 0, 0, 1)"> .assignTimestampsAndWatermarks(
</span><span style="color: rgba(0, 0, 255, 1)">new</span> WatermarkStrategy<Order><span style="color: rgba(0, 0, 0, 1)">() {
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> WatermarkGenerator<Order><span style="color: rgba(0, 0, 0, 1)"> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 周期性水位线生成器:每100ms生成一次水位线</span>
<span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> PeriodicWatermarkGenerator<Order><span style="color: rgba(0, 0, 0, 1)">() {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 当前最大事件时间</span>
<span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">long</span> maxEventTime =<span style="color: rgba(0, 0, 0, 1)"> Long.MIN_VALUE;
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 允许迟到时间(5分钟,转换为毫秒)</span>
<span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">final</span> <span style="color: rgba(0, 0, 255, 1)">long</span> allowedLateness = 5 * 60 * 1000<span style="color: rgba(0, 0, 0, 1)">;
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> onEvent(Order order, <span style="color: rgba(0, 0, 255, 1)">long</span><span style="color: rgba(0, 0, 0, 1)"> eventTimestamp, WatermarkOutput output) {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 每接收一条事件,更新最大事件时间</span>
maxEventTime =<span style="color: rgba(0, 0, 0, 1)"> Math.max(maxEventTime, eventTimestamp);
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onPeriodicEmit(WatermarkOutput output) {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 生成水位线:当前最大事件时间 - 允许迟到时间</span>
Watermark watermark = <span style="color: rgba(0, 0, 255, 1)">new</span> Watermark(maxEventTime -<span style="color: rgba(0, 0, 0, 1)"> allowedLateness);
output.emitWatermark(watermark);
}
};
}
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 指定事件时间字段(Order类的eventTime属性)</span>
.withTimestampAssigner((order, timestamp) -><span style="color: rgba(0, 0, 0, 1)"> order.getEventTime())
);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 3. 滚动窗口(10分钟),处理迟到数据</span>
SingleOutputStreamOperator<OrderStats> windowResult =<span style="color: rgba(0, 0, 0, 1)"> orderStream
.windowAll(TumblingEventTimeWindows.of(Time.minutes(</span>10<span style="color: rgba(0, 0, 0, 1)">)))
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 设置允许迟到时间(5分钟),与水位线策略一致</span>
.allowedLateness(Time.minutes(5<span style="color: rgba(0, 0, 0, 1)">))
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 将超过允许迟到时间的迟到数据,输出到侧输出流</span>
<span style="color: rgba(0, 0, 0, 1)"> .sideOutputLateData(lateDataTag)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 聚合计算</span>
.aggregate(<span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderAggregateFunction());
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 4. 输出窗口计算结果和迟到数据</span>
windowResult.print("窗口计算结果"<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 读取侧输出流的迟到数据(可用于后续补算)</span>
windowResult.getSideOutput(lateDataTag).print("迟到数据(超过5分钟)"<span style="color: rgba(0, 0, 0, 1)">);
env.execute(</span>"Flink Watermark & Late Data Demo"<span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 复用聚合函数和实体类(同代码示例2)</span>
<span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span> OrderAggregateFunction <span style="color: rgba(0, 0, 255, 1)">implements</span> AggregateFunction<Order, OrderStats, OrderStats><span style="color: rgba(0, 0, 0, 1)"> {
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> OrderStats createAccumulator() { <span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> OrderStats(0L, 0.0<span style="color: rgba(0, 0, 0, 1)">); }
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats add(Order order, OrderStats accumulator) {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> OrderStats(accumulator.getOrderCount() + 1, accumulator.getTotalAmount() +<span style="color: rgba(0, 0, 0, 1)"> order.getAmount());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> OrderStats getResult(OrderStats accumulator) { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> accumulator; }
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats merge(OrderStats a, OrderStats b) {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> OrderStats(a.getOrderCount() + b.getOrderCount(), a.getTotalAmount() +<span style="color: rgba(0, 0, 0, 1)"> b.getTotalAmount());
}
}
</span><span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats {
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Long orderCount;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Double totalAmount;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats(Long orderCount, Double totalAmount) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.orderCount =<span style="color: rgba(0, 0, 0, 1)"> orderCount;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.totalAmount =<span style="color: rgba(0, 0, 0, 1)"> totalAmount;
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String toString() {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> "OrderStats{orderCount=" + orderCount + ", totalAmount=" + totalAmount + "}"<span style="color: rgba(0, 0, 0, 1)">;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Long getOrderCount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> orderCount; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Double getTotalAmount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> totalAmount; }
}
</span><span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> Order {
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String orderId;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Long eventTime;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Double amount;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Order(String orderId, Long eventTime, Double amount) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.orderId =<span style="color: rgba(0, 0, 0, 1)"> orderId;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.eventTime =<span style="color: rgba(0, 0, 0, 1)"> eventTime;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.amount =<span style="color: rgba(0, 0, 0, 1)"> amount;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Long getEventTime() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> eventTime; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Double getAmount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> amount; }
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String toString() {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> "Order{orderId='" + orderId + "', eventTime=" + eventTime + ", amount=" + amount + "}"<span style="color: rgba(0, 0, 0, 1)">;
}
}
}</span></pre>
</div>
<p> </p>
<div class="ace-line ace-line old-record-id-Cjj5fA9oOdSPVCcRgUPcZZxYnbf">说明:该示例实现了自定义水位线生成器,明确了“水位线=当前最大事件时间-允许迟到时间”的核心逻辑;同时通过<code>allowedLateness</code>设置窗口允许迟到时间,通过侧输出流收集超过允许迟到时间的数据,解决了“数据迟到”的核心痛点。</div>
<h3 class="heading-3 ace-line old-record-id-JzR0fZ8t6dQprNcaV2QcC3EInTb">4. 状态(State):计算的“记忆”,存储窗口计算的中间结果与上下文</h3>
<div class="ace-line ace-line old-record-id-XGtKfS0EvdoSL8cSomBchXGSnhb">在实时计算中,很多计算需要“记住”之前的中间结果——比如,统计每10分钟的订单量,需要持续累加窗口内的订单数;比如,计算用户的连续点击次数,需要记住用户上一次点击的时间。这种“记忆能力”,就是状态提供的。</div>
<div class="ace-line ace-line old-record-id-ExEOfBaKXdXupzcJuoHc8Nszn5c">状态的本质是<strong>Flink在内存(或磁盘)中存储的“中间计算结果”</strong>,它与具体的Operator(如Map、Reduce、Window Operator)绑定,用于支撑有状态计算。</div>
<div class="ace-line ace-line old-record-id-NMyefHaDNdV43Nc496ccV2MCnrf">Flink中的状态分为两种核心类型:</div>
<ul class="list-bullet1">
<li class="ace-line ace-line old-record-id-AUvsf2WYddm8g8ccKVkcvFFbnEd" data-list="bullet">Keyed State(键控状态):与Key绑定的状态,每个Key对应一个独立的状态实例。比如,按用户ID分组,统计每个用户的点击次数,每个用户ID对应一个“点击次数计数器”,这就是Keyed State。这是最常用的状态类型,支持求和、计数、列表等多种操作。</li>
<li class="ace-line ace-line old-record-id-NZTofvtxAdisCgc3bfdcHFllnke" data-list="bullet">Operator State(算子状态):与Operator的并行实例绑定,每个并行实例对应一个状态实例,与Key无关。比如,Source算子的“偏移量状态”(记录已经读取的数据偏移量,避免重启后重复读取),就是Operator State。</li>
</ul>
<div class="ace-line ace-line old-record-id-QZhxfSfT7de4WzcXlyNcvV0Hnmd">核心要点:状态是“有状态计算”的基础,没有状态,Flink就无法完成复杂的聚合、关联操作;但状态也会占用资源,需要合理配置状态的存储方式(内存、磁盘、RocksDB),避免内存溢出。同时,状态的一致性需要Checkpoint来保障——如果没有Checkpoint,一旦节点故障,状态就会丢失,计算结果就会出错。</div>
<h4 class="heading-4 ace-line old-record-id-BR5ZfkVjhdu7NFca4mTcxG5Anzb">代码示例4:Keyed State与状态TTL配置(统计每个用户订单总额)</h4>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.eventtime.WatermarkStrategy;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.state.ValueState;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.state.ValueStateDescriptor;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.time.Time;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.configuration.Configuration;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.datastream.DataStream;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.functions.KeyedProcessFunction;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.util.Collector;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> KeyedStateDemo {
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> main(String[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
StreamExecutionEnvironment env </span>=<span style="color: rgba(0, 0, 0, 1)"> StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(</span>1<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1. 读取订单流(按用户ID分组,统计每个用户的订单总额)</span>
DataStream<Order> orderStream =<span style="color: rgba(0, 0, 0, 1)"> env.fromElements(
</span><span style="color: rgba(0, 0, 255, 1)">new</span> Order("user1", "1001", 1683000625000L, 99.0<span style="color: rgba(0, 0, 0, 1)">),
</span><span style="color: rgba(0, 0, 255, 1)">new</span> Order("user1", "1002", 1683001225000L, 199.0<span style="color: rgba(0, 0, 0, 1)">),
</span><span style="color: rgba(0, 0, 255, 1)">new</span> Order("user2", "1003", 1683001825000L, 299.0<span style="color: rgba(0, 0, 0, 1)">),
</span><span style="color: rgba(0, 0, 255, 1)">new</span> Order("user1", "1004", 1683002425000L, 399.0<span style="color: rgba(0, 0, 0, 1)">)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.</span><Order>forBoundedOutOfOrderness(Time.seconds(3<span style="color: rgba(0, 0, 0, 1)">))
.withTimestampAssigner((order, timestamp) </span>-><span style="color: rgba(0, 0, 0, 1)"> order.getEventTime())
);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2. 按用户ID分组,使用Keyed State统计每个用户的订单总额</span>
DataStream<UserOrderTotal> userTotalStream =<span style="color: rgba(0, 0, 0, 1)"> orderStream
.keyBy(Order::getUserId) </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 按用户ID分组,每个用户对应一个独立的状态实例</span>
.process(<span style="color: rgba(0, 0, 255, 1)">new</span> KeyedProcessFunction<String, Order, UserOrderTotal><span style="color: rgba(0, 0, 0, 1)">() {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 定义Keyed State:存储当前用户的订单总额(ValueState是最常用的Keyed State类型)</span>
<span style="color: rgba(0, 0, 255, 1)">private</span> ValueState<Double><span style="color: rgba(0, 0, 0, 1)"> userTotalAmountState;
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> open(Configuration parameters) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
</span><span style="color: rgba(0, 0, 255, 1)">super</span><span style="color: rgba(0, 0, 0, 1)">.open(parameters);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 初始化状态,设置状态TTL(过期时间):1小时未更新则自动清理</span>
ValueStateDescriptor<Double> stateDescriptor = <span style="color: rgba(0, 0, 255, 1)">new</span> ValueStateDescriptor<><span style="color: rgba(0, 0, 0, 1)">(
</span>"user_total_amount", <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 状态名称</span>
Double.<span style="color: rgba(0, 0, 255, 1)">class</span> <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 状态类型</span>
<span style="color: rgba(0, 0, 0, 1)"> );
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 配置状态TTL</span>
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1<span style="color: rgba(0, 0, 0, 1)">))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 创建/更新时刷新TTL</span>
<span style="color: rgba(0, 0, 0, 1)"> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build();
stateDescriptor.enableTimeToLive(ttlConfig);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 获取状态实例</span>
userTotalAmountState =<span style="color: rgba(0, 0, 0, 1)"> getRuntimeContext().getState(stateDescriptor);
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> processElement(Order order, Context ctx, Collector<UserOrderTotal> out) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 读取当前状态中的订单总额(若状态未初始化,默认值为null)</span>
Double currentTotal =<span style="color: rgba(0, 0, 0, 1)"> userTotalAmountState.value();
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (currentTotal == <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) {
currentTotal </span>= 0.0<span style="color: rgba(0, 0, 0, 1)">;
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 更新状态:累加当前订单金额</span>
currentTotal +=<span style="color: rgba(0, 0, 0, 1)"> order.getAmount();
userTotalAmountState.update(currentTotal);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 输出当前用户的订单总额</span>
out.collect(<span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> UserOrderTotal(order.getUserId(), currentTotal));
}
});
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 输出结果</span>
userTotalStream.print("每个用户订单总额统计"<span style="color: rgba(0, 0, 0, 1)">);
env.execute(</span>"Flink Keyed State Demo"<span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 订单实体类(新增userId字段)</span>
<span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> Order {
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String userId;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String orderId;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Long eventTime;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Double amount;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Order(String userId, String orderId, Long eventTime, Double amount) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.userId =<span style="color: rgba(0, 0, 0, 1)"> userId;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.orderId =<span style="color: rgba(0, 0, 0, 1)"> orderId;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.eventTime =<span style="color: rgba(0, 0, 0, 1)"> eventTime;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.amount =<span style="color: rgba(0, 0, 0, 1)"> amount;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> String getUserId() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> userId; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Long getEventTime() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> eventTime; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Double getAmount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> amount; }
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 用户订单总额实体类</span>
<span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> UserOrderTotal {
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String userId;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Double totalAmount;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> UserOrderTotal(String userId, Double totalAmount) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.userId =<span style="color: rgba(0, 0, 0, 1)"> userId;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.totalAmount =<span style="color: rgba(0, 0, 0, 1)"> totalAmount;
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String toString() {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> "UserOrderTotal{userId='" + userId + "', totalAmount=" + totalAmount + "}"<span style="color: rgba(0, 0, 0, 1)">;
}
}
}</span></pre>
</div>
<p> </p>
<div class="ace-line ace-line old-record-id-Ym7uffIMCd62azcuYiUcfCl6nic">说明:该示例使用Keyed State(ValueState)统计每个用户的订单总额,核心是通过<code>ValueStateDescriptor</code>初始化状态,并配置状态TTL(1小时),避免过期状态占用资源;每个用户ID对应一个独立的状态实例,实现了“按Key独立统计”的需求。</div>
<h3 class="heading-3 ace-line old-record-id-DUZ5fTrwLdeN2rcMwZhcRh8nnrg">5. Checkpoint:可靠性的“保障”,实现状态的持久化与故障恢复</h3>
<div class="ace-line ace-line old-record-id-QtGWfqllZd7ze6cDvvVcc7IOn5b">实时任务需要7×24小时运行,但分布式环境中,节点故障(如机器宕机、网络中断)是不可避免的。如果故障发生时,状态没有被持久化,那么之前的计算结果就会全部丢失,任务重启后需要重新计算,不仅浪费资源,还会导致数据不一致。</div>
<div class="ace-line ace-line old-record-id-F3FUfYZkydnN9kcYGGec3yvInod">Checkpoint的本质是<strong>状态的“快照”</strong>——Flink会定期将所有Operator的状态,持久化到可靠存储(如HDFS、S3)中,形成一个“Checkpoint快照”。当任务故障重启时,Flink会从最近的一个Checkpoint快照中恢复所有状态,确保任务能够继续从故障前的状态开始计算,实现“ exactly-once ”(精确一次)的语义。</div>
<div class="ace-line ace-line old-record-id-LwscfIC3Cdn3w9chqOPcRY6pnse">Checkpoint的核心流程(简化版):</div>
<ol class="list-number1" start="1">
<li class="ace-line ace-line old-record-id-DknPfpGsvd0CjMcAycHcjCxOn7f" data-list="number">JobManager触发Checkpoint,向所有Source算子发送“Checkpoint触发指令”。</li>
<li class="ace-line ace-line old-record-id-RLPufz7q2dxu3DcXTd5cdM6snoj" data-list="number">Source算子接收到指令后,记录当前的偏移量状态,生成Checkpoint快照,然后将“Checkpoint完成信号”发送给下游算子,并同步将快照写入可靠存储。</li>
<li class="ace-line ace-line old-record-id-J7VwftQQHdIF3SclX7ccBidSni5" data-list="number">下游算子接收到“Checkpoint完成信号”后,记录自己的状态,生成快照,再将信号传递给更下游的算子,直到所有算子都完成Checkpoint。</li>
<li class="ace-line ace-line old-record-id-KXFCfTJvDdsKmicHFYzch6E4naf" data-list="number">当所有算子都完成Checkpoint后,JobManager确认本次Checkpoint成功,并记录Checkpoint的位置,用于故障恢复。</li>
</ol>
<div class="ace-line ace-line old-record-id-Mn4pfeH5zd2SnFc6X6OcGRBXnHg">核心要点:Checkpoint的作用是“保障状态的一致性和可恢复性”,它与状态是“相辅相成”的——状态是Checkpoint的“存储对象”,Checkpoint是状态的“安全保障”。没有Checkpoint,状态就无法持久化,实时任务就无法实现高可靠运行。</div>
<h4 class="heading-4 ace-line old-record-id-XunmfI94udLPWFcyu5Rcz6Eon6b">代码示例5:Checkpoint配置与故障恢复(结合状态持久化)</h4>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.eventtime.WatermarkStrategy;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.restartstrategy.RestartStrategies;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.time.Time;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.connector.kafka.source.KafkaSource;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.runtime.state.filesystem.FsStateBackend;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.datastream.DataStream;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.windowing.time.Time;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.util.concurrent.TimeUnit;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> CheckpointDemo {
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> main(String[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
StreamExecutionEnvironment env </span>=<span style="color: rgba(0, 0, 0, 1)"> StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(</span>1<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1. 配置Checkpoint(核心:持久化状态,保障故障恢复)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1.1 开启Checkpoint,间隔1分钟(1000ms * 60)</span>
env.enableCheckpointing(60000<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1.2 配置Checkpoint存储介质:HDFS(生产环境推荐),本地测试可用file:</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">/tmp/flink-checkpoint</span>
env.setStateBackend(<span style="color: rgba(0, 0, 255, 1)">new</span> FsStateBackend("hdfs://localhost:9000/flink/checkpoints"<span style="color: rgba(0, 0, 0, 1)">));
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1.3 配置Checkpoint参数</span>
env.getCheckpointConfig().setCheckpointTimeout(30000); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Checkpoint超时时间:30秒</span>
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 两次Checkpoint最小间隔:30秒</span>
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 允许Checkpoint失败次数:3次
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1.4 配置故障重启策略:失败后自动重启,最多重启3次,每次间隔5秒</span>
<span style="color: rgba(0, 0, 0, 1)"> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
</span>3, <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 最大重启次数</span>
Time.of(5, TimeUnit.SECONDS) <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 重启间隔</span>
<span style="color: rgba(0, 0, 0, 1)"> ));
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2. 配置Kafka Source(Operator State:偏移量由Checkpoint管理)</span>
KafkaSource<String> kafkaSource = KafkaSource.<String><span style="color: rgba(0, 0, 0, 1)">builder()
.setBootstrapServers(</span>"localhost:9092"<span style="color: rgba(0, 0, 0, 1)">)
.setTopics(</span>"order_topic"<span style="color: rgba(0, 0, 0, 1)">)
.setGroupId(</span>"flink_order_checkpoint_group"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 从Checkpoint中恢复偏移量(若没有Checkpoint,从最新偏移量开始)</span>
<span style="color: rgba(0, 0, 0, 1)"> .setStartingOffsets(OffsetsInitializer.restoreFromCheckpoint())
.setValueOnlyDeserializer(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> SimpleStringSchema())
.build();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 3. 读取订单流,生成水位线</span>
DataStream<Order> orderStream =<span style="color: rgba(0, 0, 0, 1)"> env.fromSource(
kafkaSource,
WatermarkStrategy.</span><String>forBoundedOutOfOrderness(Time.minutes(5<span style="color: rgba(0, 0, 0, 1)">))
.mapTimestamp(line </span>-><span style="color: rgba(0, 0, 0, 1)"> {
String[] fields </span>= line.split(","<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">return</span> Long.parseLong(fields); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 假设第三列为事件时间戳</span>
<span style="color: rgba(0, 0, 0, 1)"> }),
</span>"Kafka Source With Checkpoint"<span style="color: rgba(0, 0, 0, 1)">
)
.map(line </span>-><span style="color: rgba(0, 0, 0, 1)"> {
String[] fields </span>= line.split(","<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> Order(fields, fields, Long.parseLong(fields), Double.parseDouble(fields));
});
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 4. 滚动窗口计算,状态由Checkpoint持久化</span>
DataStream<OrderStats> windowResult =<span style="color: rgba(0, 0, 0, 1)"> orderStream
.windowAll(TumblingEventTimeWindows.of(Time.minutes(</span>10<span style="color: rgba(0, 0, 0, 1)">)))
.allowedLateness(Time.minutes(</span>5<span style="color: rgba(0, 0, 0, 1)">))
.aggregate(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderAggregateFunction());
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 输出结果</span>
windowResult.print("Checkpoint Demo 窗口计算结果"<span style="color: rgba(0, 0, 0, 1)">);
env.execute(</span>"Flink Checkpoint & Fault Recovery Demo"<span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 复用聚合函数和实体类(同前面示例)</span>
<span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span> OrderAggregateFunction <span style="color: rgba(0, 0, 255, 1)">implements</span> AggregateFunction<Order, OrderStats, OrderStats><span style="color: rgba(0, 0, 0, 1)"> {
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> OrderStats createAccumulator() { <span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> OrderStats(0L, 0.0<span style="color: rgba(0, 0, 0, 1)">); }
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats add(Order order, OrderStats accumulator) {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> OrderStats(accumulator.getOrderCount() + 1, accumulator.getTotalAmount() +<span style="color: rgba(0, 0, 0, 1)"> order.getAmount());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> OrderStats getResult(OrderStats accumulator) { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> accumulator; }
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats merge(OrderStats a, OrderStats b) {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> OrderStats(a.getOrderCount() + b.getOrderCount(), a.getTotalAmount() +<span style="color: rgba(0, 0, 0, 1)"> b.getTotalAmount());
}
}
</span><span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats {
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Long orderCount;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Double totalAmount;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStats(Long orderCount, Double totalAmount) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.orderCount =<span style="color: rgba(0, 0, 0, 1)"> orderCount;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.totalAmount =<span style="color: rgba(0, 0, 0, 1)"> totalAmount;
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String toString() {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> "OrderStats{orderCount=" + orderCount + ", totalAmount=" + totalAmount + "}"<span style="color: rgba(0, 0, 0, 1)">;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Long getOrderCount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> orderCount; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Double getTotalAmount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> totalAmount; }
}
</span><span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> Order {
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String userId;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String orderId;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Long eventTime;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Double amount;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Order(String userId, String orderId, Long eventTime, Double amount) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.userId =<span style="color: rgba(0, 0, 0, 1)"> userId;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.orderId =<span style="color: rgba(0, 0, 0, 1)"> orderId;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.eventTime =<span style="color: rgba(0, 0, 0, 1)"> eventTime;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.amount =<span style="color: rgba(0, 0, 0, 1)"> amount;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Long getEventTime() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> eventTime; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Double getAmount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> amount; }
}
}</span></pre>
</div>
<p> </p>
<div class="ace-line ace-line old-record-id-RE6JfSpx6dtq05c8T3XcBxg1nsY">说明:该示例完整配置了Checkpoint,包括存储介质(HDFS)、触发间隔、超时时间、重启策略等核心参数;Kafka Source通过<code>OffsetsInitializer.restoreFromCheckpoint()</code>从Checkpoint中恢复偏移量,窗口聚合状态也会被定期持久化。当任务故障重启时,会从最近的Checkpoint快照中恢复所有状态(偏移量、聚合结果),实现“exactly-once”语义。</div>
<h2 class="heading-2 ace-line old-record-id-K3g6fPFwFdED8ycsxCucrtkFny8">二、核心协作逻辑:五大组件如何“配合工作”?(重中之重)</h2>
<div class="ace-line ace-line old-record-id-VwkBfa8zudpAUEcotqvcTxs0n3f">理解了每个组件的本质后,我们重点讲解它们之间的协作逻辑——这是Flink实时计算心智模型的核心。我们用一个“实时统计每10分钟订单量”的实际场景,拆解整个协作流程,让你直观看到五大组件的配合过程。</div>
<div class="ace-line ace-line old-record-id-Q6fxflgLLdujCFczouHcKZMTnMf">场景设定:电商平台的订单流(事件时间流),每个订单数据携带“订单ID、下单时间(事件时间戳)、订单金额”,要求实时统计每10分钟的订单总金额、订单总数,允许数据迟到5分钟,任务需要7×24小时可靠运行。</div>
<h3 class="heading-3 ace-line old-record-id-YtNbfsw0ldaCumcoCu3c4HJGngf">第一步:流(Stream)作为数据入口,持续输入订单数据</h3>
<div class="ace-line ace-line old-record-id-XheafzUywdqFQXchNmNc6JStnH8">订单系统持续产生订单数据,通过Flink的Source算子(如Kafka Source)接入Flink,形成一条<strong>事件时间流</strong>。每个订单数据都是流中的一个“事件”,携带自己的事件时间戳(比如2024-05-01 10:03:25)。</div>
<div class="ace-line ace-line old-record-id-Ec6kfkqU1dc386c0OSwclRH8nib">此时,流的作用是“输送数据”,将无限的订单数据持续传递给下游算子,是整个计算的“源头”。同时,Source算子会维护一个“偏移量状态”(Operator State),记录已经读取的Kafka消息偏移量,避免重复读取数据——这是状态的第一次参与。</div>
<h3 class="heading-3 ace-line old-record-id-Z7Xwf7stEdQEw3c2on1cqSvGnDc">第二步:水位线(Watermark)实时生成,标定当前事件时间基准</h3>
<div class="ace-line ace-line old-record-id-ITWqfM7Z6d7ZYiclVxhcfZ0znRg">Source算子在读取订单数据的同时,会根据订单的事件时间戳,实时生成水位线。结合场景设定(允许迟到5分钟),水位线的计算逻辑是:<strong>当前最大订单事件时间 - 5分钟</strong>。</div>
<div class="ace-line ace-line old-record-id-RJuJfV1WIdsVgkc8PwLclFi6nVf">举个例子:</div>
<ul class="list-bullet1">
<li class="ace-line ace-line old-record-id-WPUIfLTP2dppcBcnJ39cFCaQnbf" data-list="bullet">当Source算子读取到第一个订单(事件时间10:03:25),当前最大事件时间是10:03:25,水位线就是10:03:25 - 5分钟 = 09:58:25。此时,水位线低于第一个窗口(09:50-10:00)的结束时间(10:00),窗口不会触发计算。</li>
<li class="ace-line ace-line old-record-id-BGJNfcMi2dNcEncjc7XcWjzkneh" data-list="bullet">随着订单数据持续输入,当出现事件时间为10:05:10的订单时,当前最大事件时间是10:05:10,水位线就是10:05:10 - 5分钟 = 10:00:10。此时,水位线超过了第一个窗口(09:50-10:00)的结束时间(10:00),意味着“所有发生时间≤10:00的订单,大概率已经全部到达”,窗口可以触发计算。</li>
</ul>
<div class="ace-line ace-line old-record-id-RuE0fVe6WdvNzNcCNCyc4LeInKc">这里需要注意:水位线是“全局同步”的——如果Flink任务有多个并行的Source算子,每个Source算子都会生成自己的水位线,JobManager会取所有水位线中的最小值作为“全局水位线”,确保所有并行节点的时间基准一致。比如,一个Source算子的水位线是10:00:10,另一个是09:59:30,那么全局水位线就是09:59:30,直到所有Source算子的水位线都超过10:00,全局水位线才会更新到10:00以上。</div>
<h3 class="heading-3 ace-line old-record-id-NlwRf3EDxdUirPcSlZDc7Ij0nQd">第三步:窗口(Window)根据水位线触发,状态(State)存储中间计算结果</h3>
<div class="ace-line ace-line old-record-id-Wc2cfdKsWdrYOlcK0VAcHK3rnid">当全局水位线超过窗口的结束时间时,窗口就会被触发,开始进行聚合计算。在这个场景中,我们使用的是“滚动窗口”,窗口大小10分钟,窗口的时间范围是09:50-10:00、10:00-10:10、10:10-10:20等。</div>
<div class="ace-line ace-line old-record-id-KfNWfsWNadBgFgcDuFYcIt2en6d">在窗口触发之前,所有进入窗口的订单数据,都会被暂存到状态中(Keyed State,这里按窗口ID分组,每个窗口对应一个状态实例),状态中存储的是“当前窗口的订单总数、订单总金额”。</div>
<div class="ace-line ace-line old-record-id-Vcu7f97nldgi1McVnXKc1wcHn8b">举个例子,对于09:50-10:00的窗口:</div>
<ul class="list-bullet1">
<li class="ace-line ace-line old-record-id-KDJffNGUcdrlJdcrjuHcuECQnbd" data-list="bullet">当事件时间为09:52:10的订单到达时,窗口判断该订单属于09:50-10:00的窗口,将订单金额累加到“窗口总金额”状态,将订单总数加1,更新状态。</li>
<li class="ace-line ace-line old-record-id-LDxfflye6dKIiFcm8rwcUHkBnbd" data-list="bullet">当事件时间为10:03:00的订单到达时(迟到3分钟,允许迟到5分钟),窗口判断该订单属于09:50-10:00的窗口(因为事件时间≤10:00),继续更新状态,将订单金额和总数累加。</li>
<li class="ace-line ace-line old-record-id-INEBfdYTLd05lucuor2cd3WynKH" data-list="bullet">当全局水位线达到10:05:00(10:10:00 - 5分钟)时,09:50-10:00的窗口正式关闭(因为允许迟到5分钟,窗口的关闭时间是10:00 + 5分钟 = 10:05),此时窗口会读取状态中的“订单总数、总金额”,输出计算结果(比如:09:50-10:00,订单总数120,总金额58600元)。</li>
</ul>
<div class="ace-line ace-line old-record-id-GR7TfU1Qid2r5FclX5wcFMY7nEb">这里的核心协作点:窗口的触发由水位线决定,窗口的计算依赖状态存储的中间结果;没有水位线,窗口无法判断何时触发;没有状态,窗口无法累加计算结果,每次有新数据到来都只能重新计算,效率极低。</div>
<h3 class="heading-3 ace-line old-record-id-JiJSfeKG5dWaKOcbIelcpjNSnff">第四步:Checkpoint定期执行,持久化状态,保障可靠性</h3>
<div class="ace-line ace-line old-record-id-I2QKfVkR8d1BURc3okzcyzmNnQg">在整个计算过程中,Checkpoint会定期执行(比如每隔1分钟执行一次),将所有算子的状态(包括Source算子的偏移量状态、Window算子的聚合状态)持久化到可靠存储(如HDFS)中。</div>
<div class="ace-line ace-line old-record-id-Asnnff0BtdRLrucBvlVcGl61nKg">假设在10:03:00时,Flink节点发生故障,此时最近的一次Checkpoint是在10:02:00执行的,快照中存储了:Source算子的偏移量(到10:02:00为止的所有订单都已读取)、Window算子的状态(09:50-10:00窗口的订单总数110,总金额52300元;10:00-10:10窗口的订单总数30,总金额12800元)。</div>
<div class="ace-line ace-line old-record-id-Vo6Jfitg9du1ioc03EPcyW1jnnf">当任务重启时,Flink会从10:02:00的Checkpoint快照中恢复所有状态:</div>
<ul class="list-bullet1">
<li class="ace-line ace-line old-record-id-YREGf3NADdcsgkclED8cjNzinog" data-list="bullet">Source算子恢复偏移量,从10:02:00之后的订单开始读取,避免重复读取和遗漏。</li>
<li class="ace-line ace-line old-record-id-L9fIfdkZPdIR1bc6ksMcVUptnGg" data-list="bullet">Window算子恢复聚合状态,继续累加10:02:00之后的订单数据,确保计算结果的连续性。</li>
</ul>
<div class="ace-line ace-line old-record-id-Eplbfugh4dEUy9cG2FSc0CB7nbd">这样一来,即使发生故障,任务也能快速恢复,计算结果不会丢失,实现了“exactly-once”的语义——这就是Checkpoint的核心作用,它为整个实时任务的可靠性提供了保障,与状态、流、窗口、水位线形成了闭环。</div>
<h4 class="heading-4 ace-line old-record-id-HEsWf6tu5d0YCVcxdcYc9mIZnNg">代码示例6:五大组件完整协作示例(实时统计每10分钟订单量)</h4>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">import</span> org.apache.flink.api.common.eventtime.*<span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.functions.AggregateFunction;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.restartstrategy.RestartStrategies;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.state.StateTtlConfig;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.state.ValueStateDescriptor;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.api.common.time.Time;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.connector.kafka.source.KafkaSource;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.runtime.state.filesystem.FsStateBackend;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.datastream.DataStream;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.streaming.api.windowing.time.Time;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.apache.flink.util.OutputTag;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.util.concurrent.TimeUnit;
</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
* 五大组件完整协作示例:流(Kafka)+水位线+窗口+状态+Checkpoint
* 功能:实时统计每10分钟的订单总数和总金额,允许5分钟迟到,支持故障恢复
</span><span style="color: rgba(0, 128, 0, 1)">*/</span>
<span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> FlinkFullCooperationDemo {
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> main(String[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1. 初始化执行环境</span>
StreamExecutionEnvironment env =<span style="color: rgba(0, 0, 0, 1)"> StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(</span>2); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟分布式环境,多并行度
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2. 配置Checkpoint(保障状态可靠)</span>
env.enableCheckpointing(60000); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 每1分钟触发一次Checkpoint</span>
env.setStateBackend(<span style="color: rgba(0, 0, 255, 1)">new</span> FsStateBackend("hdfs://localhost:9000/flink/full-cooperation-checkpoints"<span style="color: rgba(0, 0, 0, 1)">));
env.getCheckpointConfig().setCheckpointTimeout(</span>30000<span style="color: rgba(0, 0, 0, 1)">);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(</span>30000<span style="color: rgba(0, 0, 0, 1)">);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(</span>3, Time.of(5<span style="color: rgba(0, 0, 0, 1)">, TimeUnit.SECONDS)));
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 3. 配置Kafka Source(流:数据入口)</span>
KafkaSource<String> kafkaSource = KafkaSource.<String><span style="color: rgba(0, 0, 0, 1)">builder()
.setBootstrapServers(</span>"localhost:9092"<span style="color: rgba(0, 0, 0, 1)">)
.setTopics(</span>"order_topic"<span style="color: rgba(0, 0, 0, 1)">)
.setGroupId(</span>"flink_full_cooperation_group"<span style="color: rgba(0, 0, 0, 1)">)
.setStartingOffsets(OffsetsInitializer.restoreFromCheckpoint())
.setValueOnlyDeserializer(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> SimpleStringSchema())
.build();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 4. 读取流数据,生成水位线(时间标尺)</span>
OutputTag<Order> lateDataTag = <span style="color: rgba(0, 0, 255, 1)">new</span> OutputTag<Order>("late_order"<span style="color: rgba(0, 0, 0, 1)">){};
DataStream</span><Order> orderStream =<span style="color: rgba(0, 0, 0, 1)"> env.fromSource(
kafkaSource,
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 水位线策略:允许5分钟乱序</span>
WatermarkStrategy.<String>forBoundedOutOfOrderness(Time.minutes(5<span style="color: rgba(0, 0, 0, 1)">))
.mapTimestamp(line </span>-><span style="color: rgba(0, 0, 0, 1)"> {
String[] fields </span>= line.split(","<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">return</span> Long.parseLong(fields); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 第三列为事件时间戳(毫秒)</span>
<span style="color: rgba(0, 0, 0, 1)"> }),
</span>"Kafka Order Source"<span style="color: rgba(0, 0, 0, 1)">
)
.map(line </span>-><span style="color: rgba(0, 0, 0, 1)"> {
String[] fields </span>= line.split(","<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Order(
fields[</span>0], <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> userId</span>
fields, <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> orderId</span>
Long.parseLong(fields), <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> eventTime</span>
Double.parseDouble(fields) <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> amount</span>
<span style="color: rgba(0, 0, 0, 1)"> );
});
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 5. 窗口(切割数据)+ 状态(存储中间结果)+ 聚合计算</span>
SingleOutputStreamOperator<OrderWindowStats> windowResult =<span style="color: rgba(0, 0, 0, 1)"> orderStream
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 按窗口ID分组(此处用windowAll,多并行可用keyBy+window)</span>
.windowAll(TumblingEventTimeWindows.of(Time.minutes(10<span style="color: rgba(0, 0, 0, 1)">)))
.allowedLateness(Time.minutes(</span>5)) <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 允许5分钟迟到</span>
.sideOutputLateData(lateDataTag) <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 收集超期迟到数据</span>
.aggregate(<span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderWindowAggregate());
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 6. 输出结果</span>
windowResult.print("每10分钟订单统计结果"<span style="color: rgba(0, 0, 0, 1)">);
windowResult.getSideOutput(lateDataTag).print(</span>"超期迟到订单(补算用)"<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 7. 执行任务</span>
env.execute("Flink 五大组件完整协作示例"<span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 窗口聚合函数:状态自动存储中间结果(Window State)</span>
<span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span> OrderWindowAggregate <span style="color: rgba(0, 0, 255, 1)">implements</span> AggregateFunction<Order, OrderWindowStats, OrderWindowStats><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 初始化聚合状态(订单数0,总金额0)</span>
<span style="color: rgba(0, 0, 0, 1)"> @Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderWindowStats createAccumulator() {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> OrderWindowStats(0L, 0.0<span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 累加数据,更新状态</span>
<span style="color: rgba(0, 0, 0, 1)"> @Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderWindowStats add(Order order, OrderWindowStats accumulator) {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderWindowStats(
accumulator.getOrderCount() </span>+ 1<span style="color: rgba(0, 0, 0, 1)">,
accumulator.getTotalAmount() </span>+<span style="color: rgba(0, 0, 0, 1)"> order.getAmount()
);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 窗口触发(水位线到达),输出结果</span>
<span style="color: rgba(0, 0, 0, 1)"> @Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderWindowStats getResult(OrderWindowStats accumulator) {
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> accumulator;
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 多并行窗口状态合并</span>
<span style="color: rgba(0, 0, 0, 1)"> @Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderWindowStats merge(OrderWindowStats a, OrderWindowStats b) {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderWindowStats(
a.getOrderCount() </span>+<span style="color: rgba(0, 0, 0, 1)"> b.getOrderCount(),
a.getTotalAmount() </span>+<span style="color: rgba(0, 0, 0, 1)"> b.getTotalAmount()
);
}
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 窗口统计结果实体类</span>
<span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> OrderWindowStats {
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Long orderCount;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Double totalAmount;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderWindowStats(Long orderCount, Double totalAmount) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.orderCount =<span style="color: rgba(0, 0, 0, 1)"> orderCount;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.totalAmount =<span style="color: rgba(0, 0, 0, 1)"> totalAmount;
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String toString() {
</span><span style="color: rgba(0, 0, 255, 1)">return</span> "OrderWindowStats{orderCount=" + orderCount + ", totalAmount=" + totalAmount + "}"<span style="color: rgba(0, 0, 0, 1)">;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Long getOrderCount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> orderCount; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Double getTotalAmount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> totalAmount; }
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 订单实体类</span>
<span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> Order {
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String userId;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String orderId;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Long eventTime;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Double amount;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Order(String userId, String orderId, Long eventTime, Double amount) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.userId =<span style="color: rgba(0, 0, 0, 1)"> userId;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.orderId =<span style="color: rgba(0, 0, 0, 1)"> orderId;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.eventTime =<span style="color: rgba(0, 0, 0, 1)"> eventTime;
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.amount =<span style="color: rgba(0, 0, 0, 1)"> amount;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Long getEventTime() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> eventTime; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Double getAmount() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> amount; }
}
}</span></pre>
</div>
<p> </p>
<div class="ace-line ace-line old-record-id-EhM7fefyTdhg0Zc8rcSck3Sqnnc">说明:该示例是五大组件的完整协作实现,涵盖了“Kafka流(数据入口)→水位线(时间标尺)→滚动窗口(切割数据)→Window State(存储中间结果)→Checkpoint(持久化状态)”的全流程,与前文“实时统计每10分钟订单量”的场景完全对应,可直接用于生产环境参考;同时包含迟到数据处理、故障重启策略,贴合实际业务需求。</div>
<h3 class="heading-3 ace-line old-record-id-HThhfFCCmd1h0Ocy5UVcOcqmnvS">总结协作闭环(必记)</h3>
<div class="ace-line ace-line old-record-id-QRzofccRSdQS9DcOYdEc0J1bnvc">流(数据载体)→ 水位线(时间标尺)→ 窗口(切割数据)→ 状态(存储中间结果)→ Checkpoint(持久化状态,保障恢复)→ 流(持续输入新数据,循环往复)。</div>
<div class="ace-line ace-line old-record-id-Tja2fpIARd1GjYchrBWc3DUpnCc">这五个组件环环相扣,缺一不可:没有流,就没有数据;没有水位线,窗口无法触发;没有窗口,无限流无法计算;没有状态,复杂计算无法实现;没有Checkpoint,状态无法持久化,任务无法可靠运行。</div>
<h2 class="heading-2 ace-line old-record-id-Rv21fSOWkd1T4McOJxZcLx77nBe">三、实践场景:基于协作逻辑,避开常见“坑”</h2>
<div class="ace-line ace-line old-record-id-SsonfOeW8dloFzcZuBmcobKmnof">理解了协作逻辑后,我们结合实际开发中的常见场景,讲解如何运用这套心智模型,避开容易踩的“坑”。很多开发者在开发Flink任务时,遇到的问题(如数据丢失、计算偏差、任务重启后结果不一致),本质都是没有理解五大组件的协作逻辑。</div>
<h3 class="heading-3 ace-line old-record-id-SBV3fOCExdDdfbc8r5icBCmInqf">场景1:窗口计算结果缺失数据——水位线设置不合理</h3>
<div class="ace-line ace-line old-record-id-ExVMfBggndgZk9c1kvOcZ3G0nLc">问题现象:统计每10分钟的订单量,发现部分订单数据没有被计入对应的窗口,计算结果偏小。</div>
<div class="ace-line ace-line old-record-id-CbxSftoZMdUefJcmbHKc2DKlnfd">原因分析:水位线设置的“允许迟到时间”过短,导致部分迟到数据(如网络延迟较长的数据)在窗口关闭后才到达,被判定为“迟到数据”,没有被计入窗口计算。或者,水位线生成逻辑不合理,没有正确反映当前的最大事件时间(如Source算子没有及时更新最大事件时间)。</div>
<div class="ace-line ace-line old-record-id-W7ERftgxJdYPlRcJt6ncQi3anMe">解决方案:</div>
<ul class="list-bullet1">
<li class="ace-line ace-line old-record-id-V9YufU5lfdMmJgcIKE2cbyVGnfd" data-list="bullet">根据业务场景,合理设置“允许迟到时间”——比如,订单数据的网络延迟通常不超过5分钟,就设置允许迟到5分钟,确保大部分迟到数据能被计入窗口。</li>
<li class="ace-line ace-line old-record-id-PztGf28VSdy1TfcCy0jcGFk6nLc" data-list="bullet">优化水位线生成逻辑:对于Source算子,确保每次读取数据后,及时更新最大事件时间,生成单调递增的水位线;如果是多并行Source,确保全局水位线能够正确同步。</li>
<li class="ace-line ace-line old-record-id-A6BdfcDCCdwHhNczUt5cBNi2nuf" data-list="bullet">对于确实无法在允许迟到时间内到达的数据,可以通过“侧输出流(Side Output)”收集,进行后续的补算处理,避免数据丢失。</li>
</ul>
<h3 class="heading-3 ace-line old-record-id-MgsZfcwygdxCXvctoC3ctvTlnSe">场景2:任务重启后,计算结果重复或缺失——Checkpoint配置不当</h3>
<div class="ace-line ace-line old-record-id-K3aEf4jQgd98grcnvi9cTL1ZnPf">问题现象:Flink任务故障重启后,部分数据被重复计算(导致结果偏大),或者部分数据丢失(导致结果偏小)。</div>
<div class="ace-line ace-line old-record-id-Grp7fE7WDdhXAXcGRHqcYZaXnde">原因分析:Checkpoint配置不当,比如Checkpoint间隔过长,导致故障时丢失的状态过多;或者Checkpoint的存储介质不可靠(如本地磁盘),导致快照丢失;也可能是Source算子的偏移量状态没有被正确持久化(如Kafka Source没有开启偏移量提交)。</div>
<div class="ace-line ace-line old-record-id-Lzcvf4P4nduufhcVjpnc7VA7neh">解决方案:</div>
<ul class="list-bullet1">
<li class="ace-line ace-line old-record-id-HArQf2Hy5d0XTtc0RkWcit9cn6d" data-list="bullet">合理设置Checkpoint间隔——根据业务的实时性要求和数据量,设置合适的间隔(通常1-5分钟),间隔过短会增加资源开销,间隔过长会增加状态丢失的风险。</li>
<li class="ace-line ace-line old-record-id-UoMufZ7fidYUu3cFE4McP046nkN" data-list="bullet">使用可靠的Checkpoint存储介质(如HDFS、S3),避免使用本地磁盘(节点故障后,本地快照会丢失)。</li>
<li class="ace-line ace-line old-record-id-H4xiflc48d2MwUc8c0xcSegTnag" data-list="bullet">确保Source算子的偏移量状态被正确持久化——比如,Kafka Source设置“enable.auto.commit”为false,由Flink的Checkpoint机制统一管理偏移量,避免偏移量提交与Checkpoint不同步。</li>
</ul>
<h3 class="heading-3 ace-line old-record-id-NbzafmLdedfYnEckAd2csPNFnNc">场景3:任务运行一段时间后,内存溢出——状态管理不当</h3>
<div class="ace-line ace-line old-record-id-QICYfJCVpdWp7scgmvScJCusnqf">问题现象:Flink任务运行一段时间后,节点内存溢出,任务崩溃。</div>
<div class="ace-line ace-line old-record-id-WPG1fsHN0d2wKzcFIoecGWBHnVx">原因分析:状态过大,没有及时清理过期状态;或者状态存储方式选择不当(如将大量状态存储在内存中,没有使用RocksDB进行磁盘存储)。比如,窗口关闭后,对应的状态没有被清理,导致状态不断累积,占用大量内存。</div>
<div class="ace-line ace-line old-record-id-Zmh3fnb17d2NEJccTBncmfgGnEe">解决方案:</div>
<ul class="list-bullet1">
<li class="ace-line ace-line old-record-id-TdhifYXKLdW5gqcaxtJcLq4On8e" data-list="bullet">及时清理过期状态——对于窗口状态,设置“窗口保留时间”,窗口关闭后,自动清理对应的状态;对于Keyed State,使用“状态TTL(Time-To-Live)”,设置状态的过期时间,过期后自动清理。</li>
<li class="ace-line ace-line old-record-id-H1iHfpDySdIElecYSB5coHCUnsc" data-list="bullet">选择合适的状态存储方式——对于大量状态(如亿级Key的状态),使用RocksDB作为状态后端,将状态持久化到磁盘,避免占用过多内存。</li>
<li class="ace-line ace-line old-record-id-PCoEfKQOjdrfeHcgn0ocDqpKnag" data-list="bullet">优化并行度——合理设置任务的并行度,避免单个并行节点承担过多的状态(如将Key均匀分布,避免Key倾斜导致单个节点状态过大)。</li>
</ul>
<h3 class="heading-3 ace-line old-record-id-UGeUfy9i5dEHaPck6OLcdFlcnLf">场景4:事件时间乱序,导致窗口计算偏差——水位线与窗口配合不当</h3>
<div class="ace-line ace-line old-record-id-P0DmfYmYtd0ITxcfV3fcV9tBnhb">问题现象:由于事件时间乱序(比如,发生时间10:05的订单,比发生时间10:03的订单先到达),导致窗口计算结果出现偏差。</div>
<div class="ace-line ace-line old-record-id-CBWnfhKj8dvufNcIHQac7eGBnKb">原因分析:水位线的生成没有考虑事件时间的乱序程度,导致水位线更新过快,窗口提前触发,后续到达的乱序数据被判定为迟到数据,没有被计入窗口。</div>
<div class="ace-line ace-line old-record-id-CUiEfFWJrdh0arcMunQcdfIZn2e">解决方案:</div>
<ul class="list-bullet1">
<li class="ace-line ace-line old-record-id-V1H7fmgSKdpBbGcLiibcM2ZSn2b" data-list="bullet">设置合理的“乱序容忍时间”——在生成水位线时,预留一定的时间来等待乱序数据,比如,根据业务中乱序数据的最大延迟,设置“允许迟到时间”,让水位线更新更平缓。</li>
<li class="ace-line ace-line old-record-id-DOWcf02UqdigItcx7gvcJ3OTndY" data-list="bullet">使用“水位线对齐”——对于多并行Source,确保全局水位线取所有并行节点的最小值,避免部分节点水位线更新过快,导致窗口提前触发。</li>
<li class="ace-line ace-line old-record-id-Dd7lf3h9Odsd0TcfdgAc2rFanze" data-list="bullet">对于严重乱序的场景,可以使用“会话窗口(Session Window)”替代滚动/滑动窗口,会话窗口根据数据的到达时间自动划分窗口,更适合乱序数据的计算。</li>
</ul>
<h2 class="heading-2 ace-line old-record-id-KbwHfzk4MdRotDchVDLcjuNNnlc">四、总结:构建Flink实时计算心智模型的关键</h2>
<div class="ace-line ace-line old-record-id-ARmMftooAd6oUOcZ6U0ckywCn9d">Flink的流、窗口、水位线、状态与Checkpoint,不是孤立的五个组件,而是一套“数据→时间→计算→记忆→保障”的完整协作体系。构建这套心智模型,关键在于抓住三个核心:</div>
<div class="ace-line ace-line old-record-id-Bh97fIZmEdOTNQceSJqcI4XNn4f">1. 时间是核心基准——所有组件的协作,都是围绕“事件时间”展开的:水位线标定时间,窗口基于时间切割数据,状态记录时间范围内的中间结果,Checkpoint保障时间维度上的状态一致性。</div>
<div class="ace-line ace-line old-record-id-Af0rfmkVMd9AUKcOnZZc29iynle">2. 状态是计算的核心——没有状态,就没有复杂的实时计算;状态的管理(存储、清理、恢复),直接决定了任务的性能和可靠性。</div>
<div class="ace-line ace-line old-record-id-LzuFfxGCTdAYl7cuxqQcU0ZfnUd">3. 闭环是可靠的核心——流、窗口、水位线、状态、Checkpoint形成的闭环,确保了实时任务能够“持续计算、精准计算、可靠计算”,这也是Flink能够支撑大规模实时业务的核心原因。</div>
<div class="ace-line ace-line old-record-id-AVBlfZH3TdGgv8clMtJcxm1mnub">对于开发者而言,掌握这套心智模型,不仅能快速理解Flink的核心原理,更能在实际开发中,快速定位问题、优化性能、保障任务稳定运行。无论是简单的实时统计,还是复杂的实时关联、实时风控,这套心智模型都是你解决问题的“底层逻辑”。</div>
<div class="ace-line ace-line old-record-id-Op9VfozpIdi40vcDflScAsDRnK8">最后,建议大家在实际开发中,多动手实践——尝试调整水位线的允许迟到时间、窗口大小、Checkpoint间隔,观察组件之间的协作变化,感受每个组件的作用,这样才能真正将这套心智模型“内化”,成为自己的开发能力。</div>
</div><br><br>
来源:https://www.cnblogs.com/cnoneblog/p/19655903
頁:
[1]