站在桥上看风景 發表於 2025-10-6 12:42:00

从零开始学Flink:数据输出的终极指南

<p>在实时数据处理的完整链路中,数据输出(Sink)是最后一个关键环节,它负责将处理后的结果传递到外部系统供后续使用。Flink提供了丰富的数据输出连接器,支持将数据写入Kafka、Elasticsearch、文件系统、数据库等各种目标系统。本文将深入探讨Flink数据输出的核心概念、配置方法和最佳实践,并基于Flink 1.20.1构建一个完整的数据输出案例。</p>
<h2 id="一flink-sink概述">一、Flink Sink概述</h2>
<h3 id="1-什么是sink">1. 什么是Sink</h3>
<p>Sink(接收器)是Flink数据处理流水线的末端,负责将计算结果输出到外部存储系统或下游处理系统。在Flink的编程模型中,Sink是DataStream API中的一个转换操作,它接收DataStream并将数据写入指定的外部系统。</p>
<h3 id="2-sink的分类">2. Sink的分类</h3>
<p>Flink的Sink连接器可以分为以下几类:</p>
<ul>
<li><strong>内置Sink</strong>:如print()、printToErr()等用于调试的内置输出</li>
<li><strong>文件系统Sink</strong>:支持写入本地文件系统、HDFS等</li>
<li><strong>消息队列Sink</strong>:如Kafka、RabbitMQ等</li>
<li><strong>数据库Sink</strong>:如JDBC、Elasticsearch等</li>
<li><strong>自定义Sink</strong>:通过实现SinkFunction接口自定义输出逻辑</li>
</ul>
<h3 id="3-输出语义保证">3. 输出语义保证</h3>
<p>Flink为Sink提供了三种输出语义保证:</p>
<ul>
<li><strong>最多一次(At-most-once)</strong>:数据可能丢失,但不会重复</li>
<li><strong>至少一次(At-least-once)</strong>:数据不会丢失,但可能重复</li>
<li><strong>精确一次(Exactly-once)</strong>:数据既不会丢失,也不会重复</li>
</ul>
<p>这些语义保证与Flink的检查点(Checkpoint)机制密切相关,我们将在后面详细讨论。</p>
<h2 id="二环境准备与依赖配置">二、环境准备与依赖配置</h2>
<h3 id="1-版本说明">1. 版本说明</h3>
<ul>
<li>Flink:1.20.1</li>
<li>JDK:17+</li>
<li>Gradle:8.3+</li>
<li>外部系统:Kafka 3.4.0、Elasticsearch 7.17.0、MySQL 8.0</li>
</ul>
<h3 id="2-核心依赖">2. 核心依赖</h3>
<pre><code class="language-text">dependencies {
    // Flink核心依赖
    implementation 'org.apache.flink:flink_core:1.20.1'
    implementation 'org.apache.flink:flink-streaming-java:1.20.1'
    implementation 'org.apache.flink:flink-clients:1.20.1'
   
    // Kafka Connector
    implementation 'org.apache.flink:flink-connector-kafka:3.4.0-1.20'
   
    // Elasticsearch Connector
    implementation 'org.apache.flink:flink-connector-elasticsearch7:3.1.0-1.20'
   
    // JDBC Connector
    implementation 'org.apache.flink:flink-connector-jdbc:3.3.0-1.20'
    implementation 'mysql:mysql-connector-java:8.0.33'
   
    // FileSystem Connector
    implementation 'org.apache.flink:flink-connector-files:1.20.1'

}
</code></pre>
<h2 id="三基础sink操作">三、基础Sink操作</h2>
<h3 id="1-内置调试sink">1. 内置调试Sink</h3>
<p>Flink提供了一些内置的Sink用于开发和调试阶段:</p>
<pre><code class="language-text">import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class BasicSinkDemo {
    public static void main(String[] args) throws Exception {
      // 创建执行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      // 创建数据源
      DataStream&lt;String&gt; stream = env.fromElements("Hello", "Flink", "Sink");
      
      // 打印到标准输出
      stream.print("StandardOutput");
      
      // 打印到标准错误输出
      stream.printToErr("ErrorOutput");
      
      // 执行作业
      env.execute("Basic Sink Demo");
    }
}
</code></pre>
<h3 id="2-文件系统sink">2. 文件系统Sink</h3>
<p>Flink支持将数据写入本地文件系统、HDFS等。下面是一个写入本地文件系统的示例:</p>
<pre><code class="language-text">package com.cn.daimajiangxin.flink.sink;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class FileSystemSinkDemo {
    public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      DataStream&lt;Object&gt; stream = env.fromData("Hello", "Flink", "FileSystem", "Sink");
      RollingPolicy&lt;Object, String&gt; rollingPolicy = DefaultRollingPolicy.&lt;Object, String&gt;builder()
                .withRolloverInterval(Duration.ofMinutes(15))
                .withInactivityInterval(Duration.ofMinutes(5))
                .withMaxPartSize(MemorySize.ofMebiBytes(64))
                .build();

      // 创建文件系统Sink
      FileSink&lt;Object&gt; sink = FileSink
                .forRowFormat(new Path("file:///tmp/flink-output"), new SimpleStringEncoder&lt;&gt;())
                .withRollingPolicy(rollingPolicy)
                .build();
      // 添加Sink
      stream.sinkTo(sink);
      env.execute("File System Sink Demo");
    }
}
</code></pre>
<h2 id="四高级sink连接器">四、高级Sink连接器</h2>
<h3 id="1-kafka-sink">1. Kafka Sink</h3>
<p>Kafka是实时数据处理中常用的消息队列,Flink提供了强大的Kafka Sink支持:</p>
<pre><code class="language-text">import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class KafkaSinkDemo {
    public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      // 开启检查点以支持Exactly-Once语义
      env.enableCheckpointing(5000);
      
      DataStream&lt;String&gt; stream = env.fromElements("Hello Kafka", "Flink to Kafka", "Data Pipeline");
      
      // Kafka配置
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", "localhost:9092");
      
      // 创建Kafka Sink
      KafkaSink&lt;String&gt; sink = KafkaSink.&lt;String&gt;
                builder()
                .setKafkaProducerConfig(props)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("flink-output-topic")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .build();
      
      // 添加Sink
      stream.sinkTo(sink);
      
      env.execute("Kafka Sink Demo");
    }
}
</code></pre>
<p>kafka消息队列消息:<br>
<img src="https://img2024.cnblogs.com/blog/3365149/202510/3365149-20251006123926402-289144018.png"></p>
<h3 id="2-elasticsearch-sink">2. Elasticsearch Sink</h3>
<p>Elasticsearch是一个实时的分布式搜索和分析引擎,非常适合存储和查询Flink处理的实时数据:</p>
<pre><code class="language-text">package com.cn.daimajiangxin.flink.sink;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.Map;

public class ElasticsearchSinkDemo {
    private static final ObjectMapper objectMapper = new ObjectMapper();
    public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.enableCheckpointing(5000);


      DataStream&lt;String&gt; stream = env.fromData(
                "{\"id\":\"1\",\"name\":\"Flink\",\"category\":\"framework\"}",
                "{\"id\":\"2\",\"name\":\"Elasticsearch\",\"category\":\"database\"}");

      // 配置Elasticsearch节点
      HttpHost httpHost=new HttpHost("localhost", 9200, "http");

      // 创建Elasticsearch Sink
      ElasticsearchSink&lt;String&gt; sink=new Elasticsearch7SinkBuilder&lt;String&gt;()
                .setBulkFlushMaxActions(10)      // 批量操作数量
                .setBulkFlushInterval(5000)          // 批量刷新间隔(毫秒)
                .setHosts(httpHost)
                .setConnectionRequestTimeout(60000)// 连接请求超时时间
                .setConnectionTimeout(60000)         // 连接超时时间
                .setSocketTimeout(60000)             // Socket 超时时间
                .setEmitter((element, context, indexer) -&gt; {
                  try {
                        Map&lt;String, Object&gt; json = objectMapper.readValue(element, Map.class);
                        IndexRequest request = Requests.indexRequest()
                              .index("flink_documents")
                              .id((String) json.get("id"))
                              .source(json);
                        indexer.add(request);
                  } catch (Exception e) {
                        // 处理解析异常
                        System.err.println("Failed to parse JSON: " + element);
                  }
                })
                .build();

      // 添加Sink
      stream.sinkTo(sink);

      env.execute("Elasticsearch Sink Demo");
    }
}
</code></pre>
<p>使用post工具查看数据<br>
<img src="https://img2024.cnblogs.com/blog/3365149/202510/3365149-20251006123926426-1756984061.png"></p>
<h3 id="3-jdbc-sink">3. JDBC Sink</h3>
<p>使用JDBC Sink可以将数据写入各种关系型数据库:</p>
<pre><code class="language-text">package com.cn.daimajiangxin.flink.sink;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.core.datastream.Jdbc;
import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;
import java.util.List;

public class JdbcSinkDemo {
    public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.enableCheckpointing(5000);
      List&lt;User&gt; userList = Arrays.asList(   new User(1, "Alice", 25,"alice"),
                new User(2, "Bob", 30,"bob"),
                new User(3, "Charlie", 35,"charlie"));
      // 模拟用户数据
      DataStream&lt;User&gt; userStream = env.fromData(userList);

      JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
                .withBatchSize(1000)
                .withBatchIntervalMs(200)
                .withMaxRetries(5)
                .build();
      JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://localhost:3306/test")
                .withDriverName("com.mysql.cj.jdbc.Driver")
                .withUsername("username")
                .withPassword("password")
                .build();
      String insertSql = "INSERT INTO user (id, name, age, user_name) VALUES (?, ?, ?, ?)";
      JdbcStatementBuilder&lt;User&gt; statementBuilder = (statement, user) -&gt; {
            statement.setInt(1, user.getId());
            statement.setString(2, user.getName());
            statement.setInt(3, user.getAge());
            statement.setString(4, user.getUserName());
      };
      // 创建JDBC Sink

      JdbcSink&lt;User&gt; jdbcSink = new Jdbc().&lt;User&gt;sinkBuilder()
                .withQueryStatement( new SimpleJdbcQueryStatement&lt;User&gt;(insertSql,statementBuilder))
                .withExecutionOptions(jdbcExecutionOptions)
                .buildAtLeastOnce(connectionOptions);
      // 添加Sink
      userStream.sinkTo(jdbcSink);
      env.execute("JDBC Sink Demo");
    }

    // 用户实体类
    public static class User {
      private int id;
      private String name;
      private String userName;
      private int age;

      public User(int id, String name, int age,String userName) {
            this.id = id;
            this.name = name;
            this.age = age;
            this.userName=userName;
      }

      public int getId() {
            return id;
      }

      public String getName() {
            return name;
      }

      public int getAge() {
            return age;
      }

      public String getUserName() {
            return userName;
      }
    }
}
</code></pre>
<p>登录mysql客户端查看数据<br>
<img src="https://img2024.cnblogs.com/blog/3365149/202510/3365149-20251006123926424-2126133098.png"></p>
<h2 id="五sink的可靠性保证机制">五、Sink的可靠性保证机制</h2>
<h3 id="1-检查点与保存点">1. 检查点与保存点</h3>
<p>Flink的检查点(Checkpoint)机制是实现精确一次语义的基础。当开启检查点后,Flink会定期将作业的状态保存到持久化存储中。如果作业失败,Flink可以从最近的检查点恢复,确保数据不会丢失。</p>
<pre><code class="language-text">// 配置检查点
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 启用检查点,间隔5000ms
env.enableCheckpointing(5000);

// 配置检查点模式为EXACTLY_ONCE(默认)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 设置检查点超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 设置最大并行检查点数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 开启外部化检查点,作业失败时保留检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
</code></pre>
<h3 id="2-事务与二阶段提交">2. 事务与二阶段提交</h3>
<p>对于支持事务的外部系统,Flink使用二阶段提交(Two-Phase Commit)协议来实现精确一次语义:</p>
<ul>
<li><strong>第一阶段(预提交)</strong>:Flink将数据写入外部系统的预提交区域,但不提交</li>
<li><strong>第二阶段(提交)</strong>:所有算子完成预提交后,Flink通知外部系统提交数据</li>
</ul>
<p>这种机制确保了即使在作业失败或恢复的情况下,数据也不会被重复写入或丢失。</p>
<h3 id="3-不同sink的语义保证级别">3. 不同Sink的语义保证级别</h3>
<p>不同的Sink连接器支持不同级别的语义保证:</p>
<ul>
<li><strong>支持精确一次(Exactly-once)</strong>:Kafka、Elasticsearch(版本支持)、文件系统(预写日志模式)</li>
<li><strong>支持至少一次(At-least-once)</strong>:JDBC、Redis、RabbitMQ</li>
<li><strong>最多一次(At-most-once)</strong>:简单的无状态输出</li>
</ul>
<h2 id="六自定义sink实现">六、自定义Sink实现</h2>
<p>当Flink内置的Sink连接器不能满足需求时,我们可以通过实现SinkFunction接口来自定义Sink:</p>
<pre><code class="language-text">package com.cn.daimajiangxin.flink.sink;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.io.IOException;

public class CustomSinkDemo {
    public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      DataStream&lt;String&gt; stream = env.fromElements("Custom", "Sink", "Example");

      // 使用自定义Sink
      stream.sinkTo(new CustomSink());

      env.execute("Custom Sink Demo");
    }

    // 自定义Sink实现 - 使用新API
    public static class CustomSink implements Sink&lt;String&gt; {

      @Override
      public SinkWriter&lt;String&gt; createWriter(InitContext context) {
            return new CustomSinkWriter();
      }

      // SinkWriter负责实际的数据写入逻辑
      private static class CustomSinkWriter implements SinkWriter&lt;String&gt; {

            // 初始化资源
            public CustomSinkWriter() {
                // 初始化连接、客户端等资源
                System.out.println("CustomSink initialized");
            }

            // 处理每个元素
            @Override
            public void write(String value, Context context)throws IOException, InterruptedException {
                // 实际的写入逻辑
                System.out.println("Writing to custom sink: " + value);
            }

            // 刷新缓冲区
            @Override
            public void flush(boolean endOfInput) {
                // 刷新逻辑(如果需要)
            }

            // 清理资源
            @Override
            public void close() throws Exception {
                // 关闭连接、客户端等资源
                System.out.println("CustomSink closed");
            }
      }
    }

}
</code></pre>
<p><img src="https://img2024.cnblogs.com/blog/3365149/202510/3365149-20251006123926427-2133356159.png"></p>
<h2 id="七实战案例实时数据处理流水线">七、实战案例:实时数据处理流水线</h2>
<p>下面我们将构建一个完整的实时数据处理流水线,从Kafka读取数据,进行转换处理,然后输出到多个目标系统:</p>
<h3 id="1-系统架构">1. 系统架构</h3>
<pre><code class="language-text">Kafka Source -&gt; Flink Processing -&gt; Multiple Sinks
                               |-&gt; Kafka Sink
                               |-&gt; Elasticsearch Sink
                               |-&gt; JDBC Sink
</code></pre>
<h3 id="2-数据模型">2. 数据模型</h3>
<p>我们将使用日志数据模型,定义一个LogEntry类来表示日志条目:</p>
<pre><code class="language-text">package com.cn.daimajiangxin.flink.sink;

public class LogEntry {
    private String timestamp;
    private String logLevel;
    private String source;
    private String message;

    public String getTimestamp() {
      return timestamp;
    }

    public void setTimestamp(String timestamp) {
      this.timestamp = timestamp;
    }

    public String getLogLevel() {
      return logLevel;
    }

    public void setLogLevel(String logLevel) {
      this.logLevel = logLevel;
    }

    public String getSource() {
      return source;
    }

    public void setSource(String source) {
      this.source = source;
    }

    public String getMessage() {
      return message;
    }

    public void setMessage(String message) {
      this.message = message;
    }

    @Override
    public String toString() {
      return String.format("LogEntry{timestamp='%s', logLevel='%s', source='%s', message='%s'}",
                timestamp, logLevel, source, message);
    }
}
</code></pre>
<p>定义一个日志统计实体类LogStats,用于表示每个源的日志统计信息:</p>
<pre><code class="language-text">package com.cn.daimajiangxin.flink.sink;

public class LogStats {
    private String source;
    private long count;

    public LogStats() {
    }

    public LogStats(String source, long count) {
      this.source = source;
      this.count = count;
    }

    public String getSource() {
      return source;
    }

    public void setSource(String source) {
      this.source = source;
    }

    public long getCount() {
      return count;
    }

    public void setCount(long count) {
      this.count = count;
    }

    @Override
    public String toString() {
      return String.format("LogStats{source='%s', count=%d}", source, count);
    }
}
</code></pre>
<h3 id="3-完整实现代码">3. 完整实现代码</h3>
<pre><code class="language-text">package com.cn.daimajiangxin.flink.sink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.core.datastream.Jdbc;
import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.sql.PreparedStatement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class MultiSinkPipeline {
    public static void main(String[] args) throws Exception {
      // 1. 创建执行环境并配置检查点
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.enableCheckpointing(5000);

      // 2. 创建Kafka Source
      KafkaSource&lt;String&gt; source = KafkaSource.&lt;String&gt;
                        builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("logs-input-topic")
                .setGroupId("flink-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

      // 3. 读取数据并解析
      DataStream&lt;String&gt; kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

      // 解析日志数据
      DataStream&lt;LogEntry&gt; logStream = kafkaStream
                .map(line -&gt; {
                  String[] parts = line.split("\\|");
                  return new LogEntry(parts, parts, parts, parts);
                })
                .name("Log Parser");

      // 4. 过滤错误日志
      DataStream&lt;LogEntry&gt; errorLogStream = logStream
                .filter(log -&gt; "ERROR".equals(log.getLogLevel()))
                .name("Error Log Filter");

      // 5. 配置并添加Kafka Sink - 输出错误日志
      // Kafka配置
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", "localhost:9092");

      // 创建Kafka Sink
      KafkaSink&lt;LogEntry&gt; kafkaSink = KafkaSink.&lt;LogEntry&gt;builder()
                .setKafkaProducerConfig(props)
                .setRecordSerializer(KafkaRecordSerializationSchema.&lt;LogEntry&gt;builder()
                        .setTopic("error-logs-topic")
                        .setValueSerializationSchema(element -&gt; element.toString().getBytes())
                        .build())
                .build();

      errorLogStream.sinkTo(kafkaSink).name("Error Logs Kafka Sink");

      // 6. 配置并添加Elasticsearch Sink - 存储所有日志
      // 配置Elasticsearch节点
      HttpHost httpHost=new HttpHost("localhost", 9200, "http");

      ElasticsearchSink&lt;LogEntry&gt; esSink = new Elasticsearch7SinkBuilder&lt;LogEntry&gt;()
                .setBulkFlushMaxActions(10)      // 批量操作数量
                .setBulkFlushInterval(5000)          // 批量刷新间隔(毫秒)
                .setHosts(httpHost)
                .setConnectionRequestTimeout(60000)// 连接请求超时时间
                .setConnectionTimeout(60000)         // 连接超时时间
                .setSocketTimeout(60000)             // Socket 超时时间
                .setEmitter((element, context, indexer) -&gt; {
                  Map&lt;String, Object&gt; json = new HashMap&lt;&gt;();
                  json.put("timestamp", element.getTimestamp());
                  json.put("logLevel", element.getLogLevel());
                  json.put("source", element.getSource());
                  json.put("message", element.getMessage());
                  IndexRequest request = Requests.indexRequest()
                            .index("logs_index")
                            .source(json);
                  indexer.add(request);
                })
                .build();

      logStream.sinkTo(esSink).name("Elasticsearch Sink");

      // 7. 配置并添加JDBC Sink - 存储错误日志统计
      // 先进行统计
      DataStream&lt;LogStats&gt; statsStream = errorLogStream
                .map(log -&gt; new LogStats(log.getSource(), 1))
                .keyBy(LogStats::getSource)
                .sum("count")
                .name("Error Log Stats");
      JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
                .withBatchSize(1000)
                .withBatchIntervalMs(200)
                .withMaxRetries(5)
                .build();
      JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://localhost:3306/test")
                .withDriverName("com.mysql.cj.jdbc.Driver")
                .withUsername("mysql用户名")
                .withPassword("mysql密码")
                .build();
      String insertSql = "INSERT INTO error_log_stats (source, count, last_updated) VALUES (?, ?, ?) " +
               "ON DUPLICATE KEY UPDATE count = count + VALUES(count), last_updated = VALUES(last_updated)";
      JdbcStatementBuilder&lt;LogStats&gt; statementBuilder = (statement, stats) -&gt; {
            statement.setString(1, stats.getSource());
            statement.setLong(2, stats.getCount());
            statement.setTimestamp(3,java.sql.Timestamp.valueOf(LocalDateTime.now()));
      };
      // 创建JDBC Sink
      JdbcSink&lt;LogStats&gt; jdbcSink = new Jdbc().&lt;LogStats&gt;sinkBuilder()
                .withQueryStatement( new SimpleJdbcQueryStatement&lt;LogStats&gt;(insertSql,statementBuilder))
                .withExecutionOptions(jdbcExecutionOptions)
                .buildAtLeastOnce(connectionOptions);
      statsStream.sinkTo(jdbcSink).name("JDBC Sink");
      // 8. 执行作业
      env.execute("Multi-Sink Data Pipeline");
    }

}
</code></pre>
<h3 id="4-测试与验证">4. 测试与验证</h3>
<p>要测试这个完整的流水线,我们需要:</p>
<ol>
<li>
<p>启动Kafka并创建必要的主题:</p>
<pre><code class="language-bash"># 创建输入主题
kafka-topics.sh --create --topic logs-input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

# 创建错误日志输出主题
kafka-topics.sh --create --topic error-logs-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
</code></pre>
</li>
<li>
<p>启动Elasticsearch并确保服务正常运行</p>
</li>
<li>
<p>在MySQL中创建必要的表:</p>
<pre><code class="language-sql">CREATE DATABASE test;
USE test;

CREATE TABLE error_log_stats (
source VARCHAR(100) PRIMARY KEY,
count BIGINT NOT NULL,
last_updated TIMESTAMP NOT NULL
);
</code></pre>
</li>
<li>
<p>向Kafka发送测试数据:</p>
<pre><code class="language-bash">kafka-console-producer.sh --topic logs-input-topic --bootstrap-server localhost:9092

# 输入以下测试数据
2025-09-29 12:00:00|INFO|application-service|Application started successfully
2025-09-29 12:01:30|ERROR|database-service|Failed to connect to database
2025-09-29 12:02:15|WARN|cache-service|Cache eviction threshold reached
2025-09-29 12:03:00|ERROR|authentication-service|Invalid credentials detected
</code></pre>
</li>
<li>
<p>运行Flink作业并观察数据流向各个目标系统</p>
</li>
</ol>
<p>查看Kafka Sink中的数据:<br>
<img src="https://img2024.cnblogs.com/blog/3365149/202510/3365149-20251006123926483-1963124969.png"></p>
<p>查看MySQL中的数据:<br>
<img src="https://img2024.cnblogs.com/blog/3365149/202510/3365149-20251006123926395-314730142.png"></p>
<p>查看Elasticsearch中的数据:<br>
<img src="https://img2024.cnblogs.com/blog/3365149/202510/3365149-20251006123926442-294120356.png"></p>
<h2 id="八性能优化与最佳实践">八、性能优化与最佳实践</h2>
<h3 id="1-并行度配置">1. 并行度配置</h3>
<p>合理设置Sink的并行度可以显著提高吞吐量:</p>
<pre><code class="language-text">// 为特定Sink设置并行度
stream.addSink(sink).setParallelism(4);

// 或为整个作业设置默认并行度
env.setParallelism(4);
</code></pre>
<h3 id="2-批处理配置">2. 批处理配置</h3>
<p>对于支持批处理的Sink,合理配置批处理参数可以减少网络开销:</p>
<pre><code class="language-text">// JDBC批处理示例
JdbcExecutionOptions.builder()
    .withBatchSize(1000)// 每批次处理的记录数
    .withBatchIntervalMs(200)// 批处理间隔
    .withMaxRetries(3)// 最大重试次数
    .build();
</code></pre>
<h3 id="3-背压处理">3. 背压处理</h3>
<p>当Sink无法处理上游数据时,会产生背压。Flink提供了背压监控和处理机制:</p>
<ul>
<li>使用Flink Web UI监控背压情况</li>
<li>考虑使用缓冲机制或调整并行度</li>
<li>对于关键路径,实现自定义的背压处理逻辑</li>
</ul>
<h3 id="4-资源管理">4. 资源管理</h3>
<p>合理管理连接和资源是保证Sink稳定运行的关键:</p>
<ul>
<li>使用连接池管理数据库连接</li>
<li>在RichSinkFunction的open()方法中初始化资源</li>
<li>在close()方法中正确释放资源</li>
</ul>
<h3 id="5-错误处理策略">5. 错误处理策略</h3>
<p>为Sink配置适当的错误处理策略:</p>
<pre><code class="language-text">// 重试策略配置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    3,// 最大重试次数
    Time.of(10, TimeUnit.SECONDS)// 重试间隔
));
</code></pre>
<h2 id="九总结与展望">九、总结与展望</h2>
<p>本文深入探讨了Flink数据输出(Sink)的核心概念、各种连接器的使用方法以及可靠性保证机制。我们学习了如何配置和使用内置Sink、文件系统Sink、Kafka Sink、Elasticsearch Sink和JDBC Sink,并通过自定义Sink扩展了Flink的输出能力。最后,我们构建了一个完整的实时数据处理流水线,将处理后的数据输出到多个目标系统。</p>
<p>在Flink的数据处理生态中,Sink是连接计算结果与外部世界的桥梁。通过选择合适的Sink连接器并配置正确的参数,我们可以构建高效、可靠的数据处理系统。</p>
<hr>
<p>源文来自:http://blog.daimajiangxin.com.cn</p>
<p>源码地址:https://gitee.com/daimajiangxin/flink-learning</p><br><br>
来源:https://www.cnblogs.com/daimajiangxin/p/19127498
頁: [1]
查看完整版本: 从零开始学Flink:数据输出的终极指南