阡陌之上 發表於 2025-10-13 15:14:00

从零开始学Flink:流批一体的执行模式

<p>在大数据处理领域,批处理和流处理曾经被视为两种截然不同的范式。然而,随着Apache Flink的出现,这种界限正在逐渐模糊。Flink的一个核心特性是其<strong>批流一体</strong>的架构设计,允许用户使用统一的API和执行引擎处理有界数据(批处理)和无界数据(流处理)。本文将深入探讨Flink的执行模式(Execution Mode),特别是在Flink 1.20.1版本中对批处理和流处理模式的支持和优化。</p>
<h2 id="一flink执行模式概述">一、Flink执行模式概述</h2>
<h3 id="1-执行模式的基本概念">1. 执行模式的基本概念</h3>
<p>Flink的执行模式决定了作业如何被调度和执行。在Flink 1.12及以后的版本中,引入了<strong>统一的流批处理执行模式</strong>,主要包括以下三种模式:</p>
<ul>
<li><strong>STREAMING模式:</strong> 传统的流处理执行模式,适用于处理无界数据流</li>
<li><strong>BATCH模式:</strong> 专门为有界数据优化的批处理执行模式</li>
<li><strong>AUTOMATIC模式:</strong> 自动根据数据源类型选择执行模式</li>
</ul>
<p>这三种模式的引入使得Flink能够在同一套API上提供最佳的批处理和流处理性能。</p>
<h3 id="2-执行模式的演进历程">2. 执行模式的演进历程</h3>
<p>Flink的执行模式经历了以下几个关键阶段:</p>
<ol>
<li><strong>早期版本:</strong> Flink最初专注于流处理,但提供了对批处理的支持</li>
<li><strong>Flink 1.12:</strong> 引入了全新的批处理执行模式(BATCH模式)</li>
<li><strong>Flink 1.14:</strong> 增强了批处理模式的性能和功能</li>
<li><strong>Flink 1.20.1:</strong> 进一步优化了批流一体架构,改进了执行模式的自动选择机制</li>
</ol>
<h2 id="二execution-mode的技术原理">二、Execution Mode的技术原理</h2>
<h3 id="1-两种执行模式的核心区别">1. 两种执行模式的核心区别</h3>
<p>虽然Flink使用相同的API和代码结构,但BATCH和STREAMING模式在内部执行方式上存在显著差异:</p>
<table>
<thead>
<tr>
<th>特性</th>
<th>STREAMING模式</th>
<th>BATCH模式</th>
</tr>
</thead>
<tbody>
<tr>
<td>调度策略</td>
<td>连续流式调度</td>
<td>批处理调度,类似于MapReduce</td>
</tr>
<tr>
<td>资源利用</td>
<td>持续占用资源</td>
<td>任务完成后释放资源</td>
</tr>
<tr>
<td>优化技术</td>
<td>流式优化</td>
<td>批处理优化,如查询优化、物化视图</td>
</tr>
<tr>
<td>处理延迟</td>
<td>毫秒级延迟</td>
<td>较高延迟,但吞吐量更大</td>
</tr>
<tr>
<td>适用场景</td>
<td>实时数据处理</td>
<td>离线数据分析</td>
</tr>
</tbody>
</table>
<h3 id="2-批流一体的设计理念">2. 批流一体的设计理念</h3>
<p>Flink的批流一体架构基于以下核心理念:</p>
<ul>
<li><strong>统一的API:</strong> 无论批处理还是流处理,都使用相同的DataStream API</li>
<li><strong>统一的状态管理:</strong> 共享相同的状态后端和检查点机制</li>
<li><strong>统一的容错机制:</strong> 基于检查点的故障恢复</li>
<li><strong>统一的优化器:</strong> 但针对不同执行模式应用不同的优化策略</li>
</ul>
<h2 id="三配置和使用execution-mode">三、配置和使用Execution Mode</h2>
<h3 id="1-环境准备">1. 环境准备</h3>
<p>首先,确保你已经设置了正确的依赖:</p>
<pre><code class="language-text">dependencies {
    // Flink核心依赖
    implementation 'org.apache.flink:flink_core:1.20.1'
    implementation 'org.apache.flink:flink-streaming-java:1.20.1'
    implementation 'org.apache.flink:flink-clients:1.20.1'
    implementation 'org.apache.flink:flink-connector-files:1.20.1'
    implementation 'org.apache.flink:flink-connector-kafka:3.4.0-1.20'
}
</code></pre>
<h3 id="2-在代码中设置执行模式">2. 在代码中设置执行模式</h3>
<p>在Flink 1.20.1中,可以通过以下方式设置执行模式:</p>
<pre><code class="language-java">import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ExecutionModeExample {
    public static void main(String[] args) throws Exception {
      // 创建执行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      // 设置执行模式为BATCH
      env.setRuntimeMode(RuntimeExecutionMode.BATCH);
      
      // 或者设置为STREAMING
      // env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
      
      // 或者设置为AUTOMATIC(根据数据源自动选择)
      // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
      
      // 后续代码...
    }
}
</code></pre>
<h3 id="3-通过命令行参数设置">3. 通过命令行参数设置</h3>
<p>也可以通过命令行参数覆盖代码中的设置:</p>
<pre><code class="language-bash">bin/flink run -Dexecution.runtime-mode=BATCH -c com.example.ExecutionModeExample your-jar-file.jar
</code></pre>
<h2 id="四batch模式与streaming模式实践">四、BATCH模式与STREAMING模式实践</h2>
<h3 id="1-批处理模式示例">1. 批处理模式示例</h3>
<p>以下是使用BATCH模式处理文件数据的完整示例:</p>
<pre><code class="language-java">package com.cn.daimajiangxin.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Arrays;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
      // 创建执行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 明确设置为批处理模式
      env.setRuntimeMode(RuntimeExecutionMode.BATCH);

      // 从文件读取数据(有界数据源)
      String inputPath = "path\\flink-learning\\data\\input.txt";
      // 1. 创建文件源构建器
      Path filePath = new Path(inputPath);

      // 2. 配置文件读取格式
      StreamFormat&lt;String&gt; format =new TextLineInputFormat("UTF-8");

      // 3. 构建 FileSource
      FileSource&lt;String&gt; fileSource = FileSource
                .forRecordStreamFormat(format, filePath)
                .build();
      // 4. 添加 Watermark 策略(批处理中可使用默认策略)
      WatermarkStrategy&lt;String&gt; watermarkStrategy = WatermarkStrategy
                .&lt;String&gt;forMonotonousTimestamps()
                .withIdleness(Duration.ofSeconds(10));

      DataStream&lt;String&gt; text = env.fromSource(fileSource,watermarkStrategy,"FileSource");

      // 数据处理逻辑
      DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; counts = text
                .flatMap(new Tokenizer())
                .keyBy(value -&gt; value.f0)
                .sum(1);

      // 输出结果
      counts.print();

      // 执行作业
      env.execute("Batch Word Count");
    }

    public static final class Tokenizer implements FlatMapFunction&lt;String, Tuple2&lt;String, Integer&gt;&gt; {
      private static final long serialVersionUID = 1L;
      @Override
      public void flatMap(String value, Collector&lt;Tuple2&lt;String, Integer&gt;&gt; out) {
            // 分词并为每个单词生成(word, 1)的元组
            Arrays.stream(value.toLowerCase().split("\\W+"))
                  .filter(word -&gt; word.length() &gt; 0)
                  .forEach(word -&gt; out.collect(new Tuple2&lt;&gt;(word, 1)));
      }
    }
}
</code></pre>
<h3 id="2-流处理模式示例">2. 流处理模式示例</h3>
<p>以下是使用STREAMING模式处理Kafka数据流的示例:</p>
<pre><code class="language-java">package com.cn.daimajiangxin.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Arrays;

public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
      // 创建执行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 明确设置为流处理模式
      env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

      // 启用检查点
      env.enableCheckpointing(5000);

      // 创建Kafka源(无界数据源)
      KafkaSource&lt;String&gt; source = KafkaSource.&lt;String&gt;
                        builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("word-count-topic")
                .setGroupId("flink-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

      // 从Kafka读取数据
      DataStream&lt;String&gt; text = env.fromSource(
                source,
                WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)),
                "Kafka Source"
      );

      // 数据处理逻辑
      DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; counts = text
                .flatMap(new Tokenizer())
                .keyBy(value -&gt; value.f0)
                .sum(1);

      // 输出结果
      counts.print();

      // 执行作业
      env.execute("Streaming Word Count");
    }

    public static final class Tokenizer implements FlatMapFunction&lt;String, Tuple2&lt;String, Integer&gt;&gt; {
      private static final long serialVersionUID = 1L;
      @Override
      public void flatMap(String value, Collector&lt;Tuple2&lt;String, Integer&gt;&gt; out) {
            Arrays.stream(value.toLowerCase().split("\\W+"))
                  .filter(word -&gt; word.length() &gt; 0)
                  .forEach(word -&gt; out.collect(new Tuple2&lt;&gt;(word, 1)));
      }
    }
}
</code></pre>
<h2 id="五automatic模式的智能选择机制">五、AUTOMATIC模式的智能选择机制</h2>
<h3 id="1-automatic模式的工作原理">1. AUTOMATIC模式的工作原理</h3>
<p>AUTOMATIC模式是Flink 1.20.1中的一个强大特性,它能够根据作业的数据源类型自动选择最合适的执行模式:</p>
<ul>
<li>当所有输入源都是有界的(如文件、批量数据库查询),自动选择BATCH模式</li>
<li>当至少有一个输入源是无界的(如Kafka、Socket),自动选择STREAMING模式</li>
</ul>
<pre><code class="language-java">// 设置为自动模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
</code></pre>
<h3 id="2-边界情况处理">2. 边界情况处理</h3>
<p>在某些复杂场景下,AUTOMATIC模式的选择可能不完全符合预期:</p>
<ul>
<li><strong>混合数据源:</strong> 如果作业同时包含有界和无界数据源,将选择STREAMING模式</li>
<li><strong>动态数据源:</strong> 对于可能在运行时从有界变为无界的数据源,建议明确指定执行模式</li>
<li><strong>复杂处理拓扑:</strong> 对于包含迭代或复杂循环的作业,可能需要手动选择执行模式</li>
</ul>
<h2 id="六批处理模式的性能优化">六、批处理模式的性能优化</h2>
<h3 id="1-批处理特定的优化">1. 批处理特定的优化</h3>
<p>BATCH模式针对有界数据处理提供了多项性能优化:</p>
<ul>
<li><strong>任务调度优化:</strong> 采用更高效的批处理调度策略</li>
<li><strong>内存管理改进:</strong> 更积极的数据物化和缓存</li>
<li><strong>网络传输优化:</strong> 批量数据传输减少网络开销</li>
<li><strong>计算优化:</strong> 使用更适合批处理的算子实现</li>
</ul>
<h3 id="2-性能对比示例">2. 性能对比示例</h3>
<p>使用相同的WordCount逻辑,分别在BATCH和STREAMING模式下处理1GB文本数据的性能对比:</p>
<table>
<thead>
<tr>
<th>模式</th>
<th>执行时间</th>
<th>CPU使用率</th>
<th>内存消耗</th>
</tr>
</thead>
<tbody>
<tr>
<td>STREAMING</td>
<td>38秒</td>
<td>稳定在70%</td>
<td>2.4GB</td>
</tr>
<tr>
<td>BATCH</td>
<td>22秒</td>
<td>峰值95%,完成后释放</td>
<td>1.8GB</td>
</tr>
</tbody>
</table>
<h2 id="七flink-1201中的执行模式改进">七、Flink 1.20.1中的执行模式改进</h2>
<h3 id="1-新特性和优化">1. 新特性和优化</h3>
<p>Flink 1.20.1在执行模式方面带来了多项改进:</p>
<ul>
<li><strong>更智能的AUTOMATIC模式:</strong> 改进了自动模式的选择逻辑,支持更复杂的数据源组合</li>
<li><strong>批处理模式性能提升:</strong> 进一步优化了批处理执行引擎,提升了大数据量处理能力</li>
<li><strong>API一致性增强:</strong> 确保所有算子在不同执行模式下行为一致</li>
<li><strong>资源利用率优化:</strong> 改进了批处理模式下的资源调度,减少资源浪费</li>
</ul>
<h3 id="2-兼容性注意事项">2. 兼容性注意事项</h3>
<p>在使用Flink 1.20.1的执行模式时,需要注意以下兼容性问题:</p>
<ul>
<li>某些流处理特有的操作(如CEP)在BATCH模式下可能行为受限</li>
<li>窗口操作在BATCH和STREAMING模式下的实现方式不同</li>
<li>状态过期机制在两种模式下有细微差别</li>
</ul>
<h2 id="八最佳实践">八、最佳实践</h2>
<h3 id="1-执行模式选择指南">1. 执行模式选择指南</h3>
<table>
<thead>
<tr>
<th>场景</th>
<th>推荐模式</th>
<th>原因</th>
</tr>
</thead>
<tbody>
<tr>
<td>离线数据处理</td>
<td>BATCH</td>
<td>性能更好,资源利用率更高</td>
</tr>
<tr>
<td>实时数据处理</td>
<td>STREAMING</td>
<td>低延迟,持续处理能力</td>
</tr>
<tr>
<td>ETL作业</td>
<td>BATCH</td>
<td>更适合处理有界数据集</td>
</tr>
<tr>
<td>实时分析</td>
<td>STREAMING</td>
<td>满足实时性要求</td>
</tr>
<tr>
<td>不确定数据源类型</td>
<td>AUTOMATIC</td>
<td>自动适配不同数据源</td>
</tr>
</tbody>
</table>
<h3 id="2-实际应用中的模式切换策略">2. 实际应用中的模式切换策略</h3>
<p>在实际项目中,可以采用以下策略来管理执行模式:</p>
<ul>
<li><strong>开发环境:</strong> 使用AUTOMATIC模式,方便测试不同数据源</li>
<li><strong>生产环境:</strong> 根据明确的数据流特征选择BATCH或STREAMING模式</li>
<li><strong>批处理作业:</strong> 明确设置为BATCH模式以获得最佳性能</li>
<li><strong>流处理作业:</strong> 明确设置为STREAMING模式,确保低延迟</li>
</ul>
<h2 id="九总结与展望">九、总结与展望</h2>
<p>Flink的批流一体执行模式是大数据处理领域的一次重要创新,它消除了批处理和流处理之间的界限,为开发者提供了统一、灵活的编程模型。通过Execution Mode的合理选择和配置,我们可以在不同场景下获得最佳的性能表现。</p>
<p>随着Flink 1.20.1的发布,批流一体架构进一步成熟,执行模式的自动选择更加智能,性能优化更加到位。未来,Flink将继续完善其批流一体架构,为大数据处理提供更加强大和灵活的解决方案。</p>
<p>通过本文的学习,相信你已经对Flink的执行模式有了深入的理解。在实际应用中,建议根据具体的数据特征和处理需求,选择合适的执行模式,充分发挥Flink批流一体的优势。</p>
<hr>
<p>原文来自:http://blog.daimajiangxin.com.cn</p>
<p>源码地址:https://gitee.com/daimajiangxin/flink-learning</p><br><br>
来源:https://www.cnblogs.com/daimajiangxin/p/19138556
頁: [1]
查看完整版本: 从零开始学Flink:流批一体的执行模式