人鬼勤喂鸟 發表於 2026-4-5 23:45:00

Apache Kafka实战:Spring Boot消息队列完整指南

<h2>前言</h2>
<p>Apache Kafka 是分布式消息队列的事实标准,本文带你实战 Spring Boot 整合 Kafka,完成生产者和消费者的完整开发。</p>
<h2>一、Kafka 核心概念</h2>
<ul>
<li><strong>Producer</strong>:消息生产者</li>
<li><strong>Consumer</strong>:消息消费者</li>
<li><strong>Broker</strong>:Kafka 服务节点</li>
<li><strong>Topic</strong>:消息主题分类</li>
<li><strong>Partition</strong>:Topic 的分区,实现并行处理</li>
<li><strong>Consumer Group</strong>:消费者组,实现负载均衡</li>
</ul>
<h2>二、Docker 安装 Kafka</h2>
<pre><code># docker-compose.yml
services:
zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"</code></pre>
<h2>三、Spring Boot 整合 Kafka</h2>
<pre><code>&lt;!-- pom.xml --&gt;
&lt;dependency&gt;
    &lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
    &lt;artifactId&gt;spring-kafka&lt;/artifactId&gt;
&lt;/dependency&gt;

# application.yml
spring:
kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer</code></pre>
<h2>四、生产者</h2>
<pre><code>@Service
public class OrderProducer {

    @Autowired
    private KafkaTemplate&lt;String, String&gt; kafkaTemplate;

    public void sendOrder(String orderId) {
      String message = "{\"orderId\":\""+orderId+"\",\"time\":\""+LocalDateTime.now()+"\"}";
      kafkaTemplate.send("order-topic", orderId, message);
      System.out.println("Sent: " + message);
    }
}</code></pre>
<h2>五、消费者</h2>
<pre><code>@Component
public class OrderConsumer {

    @KafkaListener(topics = "order-topic", groupId = "my-group")
    public void consume(ConsumerRecord&lt;String, String&gt; record) {
      System.out.println("Received: key=" + record.key());
      System.out.println("Value: " + record.value());
      System.out.println("Partition: " + record.partition());
      System.out.println("Offset: " + record.offset());
    }
}</code></pre>
<h2>六、手动提交 Offset</h2>
<pre><code>@KafkaListener(topics = "order-topic", groupId = "my-group")
public void consumeWithManualAck(
      ConsumerRecord&lt;String, String&gt; record,      Acknowledgment ack) {
    try {
      processOrder(record.value());
      ack.acknowledge();// 手动确认
    } catch (Exception e) {
      // 处理失败,消息会被重新消费
      log.error("Failed to process: {}", record.key(), e);
    }
}

# 开启手动提交
spring.kafka.listener.ack-mode: manual</code></pre>
<h2>七、常用命令速查</h2>
<pre><code># 创建 Topic
kafka-topics.sh --create --topic order-topic \
--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# 控制台消费者
kafka-console-consumer.sh --topic order-topic \
--bootstrap-server localhost:9092 --from-beginning</code></pre>
<h2>总结</h2>
<p>Kafka 是微服务架构中异步通信的核心组件。核心要点:Producer/Consumer 模式实现解耦、Partition 实现并行处理、Consumer Group 实现负载均衡、手动提交 Offset 保证消息可靠性。</p>
<p>觉得有帮助请点赞收藏!有问题欢迎评论区交流 🚀</p>

</div>
<div id="MySignature" role="contentinfo">
   

---

📌 **如果觉得文章对你有帮助,欢迎点赞👍收藏⭐!**

💬 有问题或建议?欢迎在评论区留言讨论~

🔗 更多技术干货请关注作者:弥烟袅绕

📚 本文地址:https://www.cnblogs.com/czlws/p/19824559/apache-kafka-spring-boot-message-queue<br><br>
来源:https://www.cnblogs.com/czlws/p/19824559/apache-kafka-spring-boot-message-queue
頁: [1]
查看完整版本: Apache Kafka实战:Spring Boot消息队列完整指南