万剑合一 發表於 2025-9-18 15:54:00

从零开始学Flink:数据源

<p>在实时数据处理场景中,数据源(Source)是整个数据处理流程的起点。Flink作为流批一体的计算框架,提供了丰富的Source接口支持,其中通过Kafka获取实时数据是最常见的场景之一。本文将以Flink DataStream API为核心,带你从0到1实现“从Kafka消费数据并输出到日志”的完整流程,掌握Flink Source的核心用法。</p>
<h2 id="一为什么选择kafka作为flink的数据源">一、为什么选择Kafka作为Flink的数据源?</h2>
<p>Kafka作为分布式流处理平台,具备高吞吐量、低延迟、持久化存储等特性,是实时数据管道的首选。Flink与Kafka的集成方案经过多年优化,支持:</p>
<ul>
<li>高吞吐量:单集群可处理数十万条/秒的消息,满足大规模实时数据处理需求;</li>
<li>持久化存储:数据按时间顺序写入磁盘并保留一定周期,支持离线重放和故障恢复;</li>
<li>精确一次(Exactly-Once)消费语义:通过Kafka偏移量(Offset)管理和Flink检查点(Checkpoint)机制保证数据一致性;</li>
<li>动态分区发现:自动感知Kafka主题的分区变化(如新增分区),无需重启任务;</li>
<li>灵活的消费模式:支持从指定偏移量、时间戳或最新位置开始消费。</li>
</ul>
<h2 id="二环境准备与依赖配置">二、环境准备与依赖配置</h2>
<h3 id="1-版本说明">1. 版本说明</h3>
<p>本文基于以下版本实现(需保持版本兼容):</p>
<ul>
<li>Flink:1.20.1(最新稳定版)</li>
<li>Kafka:3.4.0(Flink Kafka Connector兼容Kafka 2.8+)</li>
<li>JDK:17+</li>
<li>gradle 8.3+</li>
</ul>
<h3 id="2-gradle依赖">2. gradle依赖</h3>
<p>在gradle添加Flink核心依赖及Kafka Connector依赖,build.gradle配置可以是如下:</p>
<pre><code class="language-text">plugins {
    id 'java' // Java项目插件
    id 'application' // 支持main方法运行
    }

    // 设置主类(可选,用于application插件)
    application {
    mainClass.set('com.cn.daimajiangxin.flink.source.KafkaSourceDemo') // 替换为你的主类全限定名
    }

    // 依赖仓库(Maven中央仓库)
    repositories {
    mavenCentral()
    }

    // 依赖配置
    dependencies {
    // Flink核心依赖(生产环境通常标记为provided,由Flink运行时提供)
    implementation 'org.apache.flink:flink-java:1.20.1'
    implementation 'org.apache.flink:flink-streaming-java_2.12:1.20.1'

    // Flink Kafka Connector(新版API,兼容Kafka 2.8+)
    implementation 'org.apache.flink:flink-connector-kafka_2.12:1.20.1'

    // SLF4J日志门面 + Log4j实现(避免日志警告)
    implementation 'org.apache.logging.log4j:log4j-api:2.17.1'
    implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
    implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1'
    }

    // 编译配置(可选,根据需要调整)
    tasks.withType(JavaCompile) {
    options.encoding = 'UTF-8' // 指定编码
    sourceCompatibility = JavaVersion.VERSION_17 // 兼容Java 8
    targetCompatibility = JavaVersion.VERSION_17
    }
</code></pre>
<h2 id="三核心概念flink-kafka-source的工作原理">三、核心概念:Flink Kafka Source的工作原理</h2>
<p>在深入代码前,需理解Flink Kafka Source的核心组件:</p>
<ul>
<li>KafkaSource:Flink提供的Kafka数据源连接器,负责与Kafka Broker建立连接、拉取消息;</li>
<li>反序列化器(Deserializer):将Kafka消息的字节数组(byte[])转换为Flink可处理的数据类型(如String、POJO、Row等);</li>
<li>偏移量管理:记录已消费的Kafka消息位置(Offset),确保故障恢复时能从断点继续消费;</li>
<li>检查点(Checkpoint):Flink的容错机制,定期将状态(包括偏移量)持久化到存储系统(如HDFS),保证Exactly-Once语义。</li>
</ul>
<h2 id="四核心代码实现从kafka读取数据并输出到日志">四、核心代码实现:从Kafka读取数据并输出到日志</h2>
<h3 id="1-流程概述">1. 流程概述</h3>
<p>整个流程分为5步:</p>
<ol>
<li>配置Kafka连接参数(如Broker地址、主题、消费者组);</li>
<li>创建Flink流执行环境(StreamExecutionEnvironment);</li>
<li>定义Kafka Source(使用新版KafkaSource);</li>
<li>将Source添加到执行环境,并处理数据(如打印到日志);</li>
<li>触发任务执行。</li>
</ol>
<h3 id="2代码详解">2.代码详解</h3>
<p>以下是完整的示例代码,包含详细注释:</p>
<pre><code class="language-text">package com.cn.daimajiangxin.flink.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class KafkaSourceDemo {
      private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceDemo.class);

public static void main(String[] args) throws Exception {
            // 1. 创建Flink流执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 可选:启用检查点(生产环境必选,保证Exactly-Once语义)
            env.enableCheckpointing(5000); // 每5秒做一次检查点
            // 启用检查点
            env.enableCheckpointing(5000); // 每5秒做一次检查点

// 设置检查点超时时间
            env.getCheckpointConfig().setCheckpointTimeout(60000);

// 2. 配置Kafka参数
            String kafkaBootstrapServers = "172.30.244.152:9092"; // Kafka Broker地址
            String topic = "test_topic"; // 目标主题
            String consumerGroup = "flink-consumer-group"; // 消费者组ID

LOG.info("Connecting to Kafka at " + kafkaBootstrapServers);
            LOG.info("Consuming topic: " + topic);
            LOG.info("Consumer group: " + consumerGroup);

// 3. 定义Kafka Source(新版API)
            KafkaSource`&lt;String&gt;` kafkaSourceDemo = KafkaSource.`&lt;String&gt;`builder()
                  .setBootstrapServers(kafkaBootstrapServers) // Kafka Broker地址
                  .setTopics(topic) // 订阅的主题
                  .setGroupId(consumerGroup) // 消费者组
                  .setProperty("enable.auto.commit", "true")
                  .setProperty("auto.commit.interval.ms", "1000")
                  .setProperty("session.timeout.ms", "30000")
                  .setProperty("retry.backoff.ms", "1000")
                  .setProperty("reconnect.backoff.max.ms", "10000")
                  .setDeserializer(new KafkaRecordDeserializationSchema `&lt;String&gt;`() {
                        @Override
                        public void deserialize(ConsumerRecord&lt;byte[], byte[]&gt; record, Collector `&lt;String&gt;` out) throws IOException {
                            // 从ConsumerRecord中提取值(字节数组),并转为字符串
                            String value = new String(record.value(), StandardCharsets.UTF_8);
                            LOG.info("Received message: " + value);
                            out.collect(value); // 将反序列化后的数据收集到Flink流中
                        }

@Override
                        public TypeInformation`&lt;String&gt;` getProducedType() {
                            return TypeInformation.of(String.class);
                        }
                  })
                  // 从最早偏移量开始消费(这样即使没有新消息也会读取历史数据)
                  .setStartingOffsets(OffsetsInitializer.earliest())
                  .build();

// 4. 将Kafka Source添加到Flink流环境,并处理数据
            DataStream`&lt;String&gt;` kafkaStream = env.fromSource(
                  kafkaSourceDemo,
                  WatermarkStrategy.noWatermarks(), // 无水印(适用于无序数据场景)
                  "Kafka Source" // Source名称(用于监控)
            );

LOG.info("Kafka source created successfully");

// 5. 处理数据:将每条数据打印到日志(实际生产中可替换为写入数据库、消息队列等)
            kafkaStream.print("KafkaData");
            LOG.info("Flink Kafka Source Demo started.");
            // 6. 触发任务执行
            env.execute("Flink Kafka Source Demo");

}
    }
</code></pre>
<h3 id="3-关键配置说明">3. 关键配置说明</h3>
<ul>
<li>KafkaSource.Builder:新版Kafka Source的核心构建器,支持灵活配置;</li>
<li>setDeserializer:指定反序列化方式,deserialize 接收Kafka的ConsumerRecord(包含键、值、偏移量等信息),提取值(record.value())并反序列化为字符,getProducedType声明输出数据的类型(此处为String);</li>
<li>setStartingOffsets:控制消费起始位置(latest()从最新数据开始,earliest()从最早数据开始,生产环境常用OffsetsInitializer.committedOffsets()从上次提交的偏移量继续);</li>
<li>WatermarkStrategy:用于事件时间(Event Time)处理,示例中无时间窗口需求,故使用noWatermarks();</li>
<li>PrintSinkFunction:Flink内置的日志打印Sink(true表示打印完整上下文,包含Subtask信息)。</li>
</ul>
<h2 id="五运行与测试">五、运行与测试</h2>
<p>在WSL2的Ubuntu 环境中安装Kafka。</p>
<h3 id="1-安装kafka服务">1. 安装Kafka服务</h3>
<ul>
<li>
<p>下载Kafka二进制包<br>
访问Apache Kafka官网,选择最新稳定版(如3.9.0),使用wget下载:</p>
<pre><code class="language-text">wget https://mirrors.aliyun.com/apache/kafka/3.9.0/kafka_2.12-3.9.0.tgz
</code></pre>
</li>
<li>
<p>解压并配置环境变量</p>
<pre><code class="language-text"># 解压到/opt/kafka(全局可访问)
sudo mkdir -p /opt/kafka
tar -zxvf kafka_2.12-3.9.0.tgz -C /opt/kafka --strip-components=1

# 永久生效(编辑~/.bashrc)
echo 'export KAFKA_HOME=/opt/kafka' &gt;&gt; /etc/profile
echo 'export PATH=$KAFKA_HOME/bin:$PATH' &gt;&gt; /etc/profile
source /etc/profile
</code></pre>
</li>
</ul>
<h3 id="2-配置kafka">2. 配置Kafka</h3>
<p>Kafka的核心配置文件位于$KAFKA_HOME/config目录,需修改以下两个文件:</p>
<p>配置Kafka Broker(server.properties)</p>
<p>修改以下关键参数以适配WSL2环境:</p>
<pre><code class="language-text"># ==================== 核心角色与ID配置 ====================
    # 启用KRaft模式(默认已启用)
    # 单节点同时担任Broker和控制器
    process.roles=broker,controller
    # 节点唯一ID(单节点必须设为0)
    node.id=0
    # 控制器ID(与node.id一致,单节点唯一)
    controller.id=0

    # ==================== 监听端口配置 ====================
    # 全局监听端口(客户端读写请求)和控制器监听端口
    # 多个监听器使用逗号分隔,每个监听器都需要指定安全协议
    listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093

    # 对外暴露的地址(Windows主机通过localhost访问)
    # 多个公布的监听器使用逗号分隔
    advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093

    # 指定CONTROLLER监听器的安全协议
    listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT

    # 定义控制器监听器的名称(KRaft模式必需)
    controller.listener.names=CONTROLLER

    # ==================== ZooKeeper兼容配置(可选) ====================
    # 若需兼容旧客户端,可保留ZooKeeper配置(但KRaft模式无需ZooKeeper)
    # zookeeper.connect=localhost:2181

    # ==================== 日志与分区配置 ====================
    # 数据存储目录配置(Kafka的核心配置参数)
    # Kafka将主题数据、索引文件等存储在该目录下
    log.dirs=/opt/kafka/data
    num.partitions=1
    # 副本数(单节点设为1)
    default.replication.factor=1
    # 最小同步副本数(单节点设为1)
    min.insync.replicas=1
    # ==================== 日志存储高级配置 ====================
    # 日志保留时间(默认7天,生产环境根据存储容量和需求调整)
    # log.retention.hours=168
    # 或按大小限制保留(单位:字节)
    # log.retention.bytes=107374182400# 100GB

    # 单个分区日志段大小(默认1GB,可根据实际需求调整)
    # log.segment.bytes=1073741824

    # 日志段检查和清理的时间间隔(默认300000ms=5分钟)
    # log.retention.check.interval.ms=300000

    # 控制是否自动创建主题(生产环境建议禁用,改为手动创建)
    # auto.create.topics.enable=false

    # ==================== 控制器引导配置 ====================
    # 控制器引导服务器(单节点指向自己,格式:host:port)
    # 与控制器监听端口一致
    controller.quorum.bootstrap.servers=localhost:9093

    # 控制器投票者配置(单节点设为0@localhost:9093)
    controller.quorum.voters=0@localhost:9093
</code></pre>
<h3 id="3启动kafka服务">3.启动Kafka服务</h3>
<h4 id="31初始化kraft存储目录首次启动必需">3.1初始化KRaft存储目录(首次启动必需)</h4>
<p>在KRaft模式下,需要先初始化元数据存储:</p>
<pre><code class="language-text">
# 生成集群ID并保存到变量
CLUSTER_ID=$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)
echo "生成的集群ID: $CLUSTER_ID"

# 使用生成的集群ID格式化存储目录$KAFKA_HOME/bin/kafka-storage.sh format -t $CLUSTER_ID -c $KAFKA_HOME/config/server.properties
</code></pre>
<p><strong>注意:</strong> 如果手动运行命令,请确保先执行生成集群ID的命令,然后使用实际生成的ID替换"$CLUSTER_ID"。</p>
<h4 id="32启动kafka-broker">3.2启动Kafka Broker</h4>
<pre><code class="language-text"># 启动Broker(日志输出到$KAFKA_HOME/logs/server.log)   $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
</code></pre>
<h4 id="33验证服务状态">3.3验证服务状态</h4>
<pre><code class="language-text">检查Kafka Broker进程:
    ps -ef | grep kafka# 应看到Kafka进程
</code></pre>
<h4 id="34创建测试主题">3.4创建测试主题</h4>
<p>确保Kafka服务已启动,并创建测试主题 <code>test_topic</code>:</p>
<pre><code class="language-text">$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test_topic
</code></pre>
<h4 id="35发送测试数据">3.5发送测试数据</h4>
<p>使用Kafka内置的生产者工具发送测试消息到 <code>test_topic</code>:</p>
<pre><code class="language-text"># 启动Kafka生产者控制台
    $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

    # 输入几条测试消息(每行一条)
    &gt; hello flink
    &gt; flink kafka integration
    &gt; real-time data processing
</code></pre>
<h4 id="36运行flink程序">3.6运行Flink程序</h4>
<p>在IDE中直接运行 <code>KafkaSourceDemo</code>类的 <code>main</code>方法,或通过Gradle构建并运行:</p>
<pre><code class="language-text"># 构建项目
    ./gradlew clean build

    # 运行Flink作业
    ./gradlew run
</code></pre>
<h4 id="37验证结果">3.7验证结果</h4>
<p>成功运行后,你应该能在控制台看到类似如下输出:</p>
<p><img alt="20250918103316" loading="lazy" src="https://pic-1258258471.cos.ap-nanjing.myqcloud.com/img/sad/20250918103316.png" class="lazyload"></p>
<h2 id="六进阶配置与优化">六、进阶配置与优化</h2>
<h3 id="1-消费语义保证">1. 消费语义保证</h3>
<p>在生产环境中,为了确保数据一致性,需要配置Flink的检查点机制和Kafka偏移量提交策略:</p>
<pre><code class="language-text">// 1. 启用检查点
    env.enableCheckpointing(5000); // 每5秒做一次检查点

    // 2. 获取检查点配置对象(Flink 1.20.1及以上版本推荐方式)
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();

    // 3. 配置检查点模式为EXACTLY_ONCE(精确一次语义)
    checkpointConfig.setMode(CheckpointingMode.EXACTLY_ONCE);

    // 4. 设置检查点超时时间
    checkpointConfig.setCheckpointTimeout(Duration.ofSeconds(60));

    // 4. 配置从上次提交的偏移量继续消费(生产环境推荐)
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
</code></pre>
<h3 id="2-并行度与资源配置">2. 并行度与资源配置</h3>
<p>合理设置并行度可充分利用集群资源并提高吞吐量:</p>
<pre><code class="language-text">// 设置Flink作业的全局并行度
    env.setParallelism(3); // 与Kafka主题分区数匹配

    // 或单独设置Source的并行度
    KafkaSource`&lt;String&gt;` kafkaSource = KafkaSource.`&lt;String&gt;`builder()
      // ... 其他配置 ...
      .build();

    DataStream`&lt;String&gt;` stream = env.fromSource(
      kafkaSource,
      WatermarkStrategy.noWatermarks(),
      "Kafka Source")
      .setParallelism(3); // Source并行度
</code></pre>
<h3 id="3-高级反序列化">3. 高级反序列化</h3>
<p>除了基础的字符串反序列化,还可以使用更灵活的反序列化方式:</p>
<h4 id="31-使用预定义反序列化器">3.1 使用预定义反序列化器</h4>
<pre><code class="language-text">// 使用Flink提供的String反序列化器   .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
</code></pre>
<h4 id="32-自定义pojo反序列化">3.2 自定义POJO反序列化</h4>
<p>如果Kafka消息是JSON格式,可以使用Jackson等库将其反序列化为POJO对象:</p>
<pre><code class="language-text">public class User {
      private String id;
      private String name;
      private int age;
      // getters, setters, constructors...
    }

    // 自定义POJO反序列化器
    .setDeserializer(new KafkaRecordDeserializationSchema`&lt;User&gt;`() {
      private final ObjectMapper mapper = new ObjectMapper();

    @Override
      public void deserialize(ConsumerRecord&lt;byte[], byte[]&gt; record, Collector`&lt;User&gt;` out) throws IOException {
            User user = mapper.readValue(record.value(), User.class);
            out.collect(user);
      }

    @Override
      public TypeInformation`&lt;User&gt;` getProducedType() {
            return TypeInformation.of(User.class);
      }
    })
</code></pre>
<h2 id="七常见问题与解决方案">七、常见问题与解决方案</h2>
<h3 id="1-连接超时问题">1. 连接超时问题</h3>
<p><strong>问题现象</strong>:程序启动后报 <code>org.apache.kafka.common.errors.TimeoutException</code></p>
<p><strong>解决方案</strong>:</p>
<ul>
<li>检查Kafka服务是否正常运行:<code>ps -ef | grep kafka</code></li>
<li>确认 <code>bootstrap.servers</code>配置正确,特别是在WSL2环境中确保端口映射正确</li>
<li>检查防火墙设置,确保9092端口开放</li>
</ul>
<h3 id="2-数据消费不完整">2. 数据消费不完整</h3>
<p><strong>问题现象</strong>:部分Kafka消息未被Flink消费</p>
<p><strong>解决方案</strong>:</p>
<ul>
<li>检查Kafka主题的分区数与Flink Source并行度是否匹配</li>
<li>确认 <code>setStartingOffsets</code>配置正确,生产环境建议使用 <code>OffsetsInitializer.committedOffsets()</code></li>
<li>检查检查点机制是否正常启用,确保偏移量正确提交</li>
</ul>
<h3 id="3-性能优化">3. 性能优化</h3>
<p>对于高吞吐量场景,可以通过以下方式优化性能:</p>
<ul>
<li>增加Kafka主题分区数(与Flink并行度匹配)</li>
<li>调大 <code>fetch.max.bytes</code>和 <code>max.partition.fetch.bytes</code>参数,增加单次拉取的数据量</li>
<li>启用增量检查点,减少检查点开销</li>
<li>使用 <code>setUnboundedUsePreviousEventTimeWatermark()</code>优化水印生成</li>
</ul>
<h2 id="八总结与扩展">八、总结与扩展</h2>
<p>本文详细介绍了如何使用Flink从Kafka读取数据,包括环境准备、代码实现、运行测试以及进阶配置。通过本文的学习,你应该能够掌握Flink数据源的核心用法,为构建企业级实时数据处理应用打下坚实基础。</p>
<p>在实际应用中,Flink还支持多种其他数据源,如:</p>
<ul>
<li>文件系统(HDFS、本地文件)</li>
<li>数据库(MySQL、PostgreSQL、MongoDB等)</li>
<li>消息队列(RabbitMQ、Pulsar等)</li>
<li>自定义数据源(通过实现 <code>SourceFunction</code>接口)</li>
</ul>
<p>后续文章将继续深入探讨Flink的数据转换、窗口计算、状态管理等核心概念,敬请关注!</p>
<hr>
<p>源文来自:http://blog.daimajiangxin.com.cn</p>
<p>源码地址:https://gitee.com/daimajiangxin/flink-learning</p><br><br>
来源:https://www.cnblogs.com/daimajiangxin/p/19098967
頁: [1]
查看完整版本: 从零开始学Flink:数据源