Apache Kafka实战:Spring Boot消息队列完整指南
<h2>前言</h2><p>Apache Kafka 是分布式消息队列的事实标准,本文带你实战 Spring Boot 整合 Kafka,完成生产者和消费者的完整开发。</p>
<h2>一、Kafka 核心概念</h2>
<ul>
<li><strong>Producer</strong>:消息生产者</li>
<li><strong>Consumer</strong>:消息消费者</li>
<li><strong>Broker</strong>:Kafka 服务节点</li>
<li><strong>Topic</strong>:消息主题分类</li>
<li><strong>Partition</strong>:Topic 的分区,实现并行处理</li>
<li><strong>Consumer Group</strong>:消费者组,实现负载均衡</li>
</ul>
<h2>二、Docker 安装 Kafka</h2>
<pre><code># docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"</code></pre>
<h2>三、Spring Boot 整合 Kafka</h2>
<pre><code><!-- pom.xml -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer</code></pre>
<h2>四、生产者</h2>
<pre><code>@Service
public class OrderProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendOrder(String orderId) {
String message = "{\"orderId\":\""+orderId+"\",\"time\":\""+LocalDateTime.now()+"\"}";
kafkaTemplate.send("order-topic", orderId, message);
System.out.println("Sent: " + message);
}
}</code></pre>
<h2>五、消费者</h2>
<pre><code>@Component
public class OrderConsumer {
@KafkaListener(topics = "order-topic", groupId = "my-group")
public void consume(ConsumerRecord<String, String> record) {
System.out.println("Received: key=" + record.key());
System.out.println("Value: " + record.value());
System.out.println("Partition: " + record.partition());
System.out.println("Offset: " + record.offset());
}
}</code></pre>
<h2>六、手动提交 Offset</h2>
<pre><code>@KafkaListener(topics = "order-topic", groupId = "my-group")
public void consumeWithManualAck(
ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
processOrder(record.value());
ack.acknowledge();// 手动确认
} catch (Exception e) {
// 处理失败,消息会被重新消费
log.error("Failed to process: {}", record.key(), e);
}
}
# 开启手动提交
spring.kafka.listener.ack-mode: manual</code></pre>
<h2>七、常用命令速查</h2>
<pre><code># 创建 Topic
kafka-topics.sh --create --topic order-topic \
--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 控制台消费者
kafka-console-consumer.sh --topic order-topic \
--bootstrap-server localhost:9092 --from-beginning</code></pre>
<h2>总结</h2>
<p>Kafka 是微服务架构中异步通信的核心组件。核心要点:Producer/Consumer 模式实现解耦、Partition 实现并行处理、Consumer Group 实现负载均衡、手动提交 Offset 保证消息可靠性。</p>
<p>觉得有帮助请点赞收藏!有问题欢迎评论区交流 🚀</p>
</div>
<div id="MySignature" role="contentinfo">
---
📌 **如果觉得文章对你有帮助,欢迎点赞👍收藏⭐!**
💬 有问题或建议?欢迎在评论区留言讨论~
🔗 更多技术干货请关注作者:弥烟袅绕
📚 本文地址:https://www.cnblogs.com/czlws/p/19824559/apache-kafka-spring-boot-message-queue<br><br>
来源:https://www.cnblogs.com/czlws/p/19824559/apache-kafka-spring-boot-message-queue
頁:
[1]