查看: 33|回复: 0

Apache Kafka实战:Spring Boot消息队列完整指南

[复制链接]

1

主题

0

回帖

0

积分

热心网友

金币
0
阅读权限
220
精华
0
威望
0
贡献
0
在线时间
0 小时
注册时间
2012-4-2
发表于 2026-4-5 23:45:00 | 显示全部楼层 |阅读模式

前言

Apache Kafka 是分布式消息队列的事实标准,本文带你实战 Spring Boot 整合 Kafka,完成生产者和消费者的完整开发。

一、Kafka 核心概念

  • Producer:消息生产者
  • Consumer:消息消费者
  • Broker:Kafka 服务节点
  • Topic:消息主题分类
  • Partition:Topic 的分区,实现并行处理
  • Consumer Group:消费者组,实现负载均衡

二、Docker 安装 Kafka

# docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

三、Spring Boot 整合 Kafka

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

# application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

四、生产者

@Service
public class OrderProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendOrder(String orderId) {
        String message = "{\"orderId\":\""+orderId+"\",\"time\":\""+LocalDateTime.now()+"\"}";
        kafkaTemplate.send("order-topic", orderId, message);
        System.out.println("Sent: " + message);
    }
}

五、消费者

@Component
public class OrderConsumer {

    @KafkaListener(topics = "order-topic", groupId = "my-group")
    public void consume(ConsumerRecord<String, String> record) {
        System.out.println("Received: key=" + record.key());
        System.out.println("Value: " + record.value());
        System.out.println("Partition: " + record.partition());
        System.out.println("Offset: " + record.offset());
    }
}

六、手动提交 Offset

@KafkaListener(topics = "order-topic", groupId = "my-group")
public void consumeWithManualAck(
        ConsumerRecord<String, String> record,        Acknowledgment ack) {
    try {
        processOrder(record.value());
        ack.acknowledge();  // 手动确认
    } catch (Exception e) {
        // 处理失败,消息会被重新消费
        log.error("Failed to process: {}", record.key(), e);
    }
}

# 开启手动提交
spring.kafka.listener.ack-mode: manual

七、常用命令速查

# 创建 Topic
kafka-topics.sh --create --topic order-topic \
  --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# 控制台消费者
kafka-console-consumer.sh --topic order-topic \
  --bootstrap-server localhost:9092 --from-beginning

总结

Kafka 是微服务架构中异步通信的核心组件。核心要点:Producer/Consumer 模式实现解耦、Partition 实现并行处理、Consumer Group 实现负载均衡、手动提交 Offset 保证消息可靠性。

觉得有帮助请点赞收藏!有问题欢迎评论区交流 🚀

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

相关侵权、举报、投诉及建议等,请发 E-mail:qiongdian@foxmail.com

Powered by Discuz! X5.0 © 2001-2026 Discuz! Team.

在本版发帖返回顶部