docker搭建zookeeper集群和kafka集群并使用Java测试详解
<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">前置要求</a></li><li><a href="#_label1">1.拉相关镜像</a></li><li><a href="#_label2">2.设置容器的网络</a></li><li><a href="#_label3">3.搭建zookeeper集群</a></li><li><a href="#_label4">4.搭建kafka集群和kafka manager</a></li><li><a href="#_label5">5. 启动集群</a></li><li><a href="#_label6">6.测试有没有启动成功</a></li><ul class="second_class_ul"><li><a href="#_lab2_6_0">6.1 kafka-manager测试</a></li><li><a href="#_lab2_6_1">6.2 在shell终端测试</a></li></ul><li><a href="#_label7">7.使用Java连接集群</a></li><ul class="second_class_ul"><li><a href="#_lab2_7_2">!!!!重点</a></li></ul><li><a href="#_label8">8.重点总结</a></li><ul class="second_class_ul"></ul></ul></div><p>本人也是在学习kafka中,本人只有笔记本电脑里的一台Linux虚拟机这么一个小环境,要搭建kafka集群环境最好的方式应该就是容器技术了,于是花了两天时间,尝试用docker搭建zookeeper和kafka集群环境</p><p class="maodian"><a name="_label0"></a></p><h2>前置要求</h2>
<ol><li>在虚拟机中安装好JDK</li><li>虚拟机安装好docker和docker-compose</li><li>集群环境中涉及到的端口比较多,建议学习中,直接把虚拟机防火墙关了,省得费事</li></ol>
<p class="maodian"><a name="_label1"></a></p><h2>1.拉相关镜像</h2>
<p style="text-align:center">我的镜像:<img alt="" src="https://img.jbzj.com/file_images/article/202601/2026010710170761.png" /></p>
<p class="maodian"><a name="_label2"></a></p><h2>2.设置容器的网络</h2>
<p>名称是zookeeper_kafka</p>
<div class="jb51code"><pre class="brush:bash;">docker network create --subnet 172.19.0.0/24 --gateway 172.19.0.1 zookeeper_kafka
</pre></div>
<p>设置完了可以查看一下:</p>
<div class="jb51code"><pre class="brush:bash;">docker network ls</pre></div>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202601/2026010710170759.png" /></p>
<p class="maodian"><a name="_label3"></a></p><h2>3.搭建zookeeper集群</h2>
<p>用docker-compose,3个zookeeper节点,注意网络的名字和端口映射,我的yml文件:</p>
<div class="jb51code"><pre class="brush:yaml;">version: '2'
services:
zk_node1:
image: zookeeper:3.4
restart: always
hostname: zk_node1
container_name: zk_node1
ports:
- 2181:2181
volumes:
- ./zk_node1/data:/data
- ./zk_node1/datalog:/datalog
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zk_node2:2888:3888 server.3=zk_node3:2888:3888
networks:
zookeeper_kafka:
ipv4_address: 172.19.0.11
zk_node2:
image: zookeeper:3.4
restart: always
hostname: zk_node2
container_name: zk_node2
ports:
- 2182:2181
volumes:
- ./zk_node2/data:/data
- ./zk_node2/datalog:/datalog
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zk_node1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zk_node3:2888:3888
networks:
zookeeper_kafka:
ipv4_address: 172.19.0.12
zk_node3:
image: zookeeper:3.4
restart: always
hostname: zk_node3
container_name: zk_node3
ports:
- 2183:2181
volumes:
- ./zk_node3/data:/data
- ./zk_node3/datalog:/datalog
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zk_node1:2888:3888 server.2=zk_node2:2888:3888 server.3=0.0.0.0:2888:3888
networks:
zookeeper_kafka:
ipv4_address: 172.19.0.13
networks:
zookeeper_kafka:
external:
name: zookeeper_kafka
</pre></div>
<p class="maodian"><a name="_label4"></a></p><h2>4.搭建kafka集群和kafka manager</h2>
<p>同样是3个节点</p>
<div class="jb51code"><pre class="brush:yaml;">version: '2'
services:
broker1:
image: wurstmeister/kafka
restart: always
hostname: broker1
container_name: broker1
privileged: true
ports:
- 9091:9091
environment:
#KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: PLAINTEXT://broker1:9091
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker1:9091
KAFKA_ADVERTISED_HOST_NAME: broker1
KAFKA_ADVERTISED_PORT: 9091
KAFKA_ZOOKEEPER_CONNECT: zk_node1:2181,zk_node2:2181,zk_node3:2181
#JMX_PORT: 9988
volumes:
- ./broker1/docker.sock:/var/run/docker.sock
- ./broker1/logs:/kafka/kafka-logs-broker1
external_links:
- zk_node1
- zk_node2
- zk_node3
networks:
zookeeper_kafka:
ipv4_address: 172.19.0.21
broker2:
image: wurstmeister/kafka
restart: always
hostname: broker2
container_name: broker2
privileged: true
ports:
- 9092:9092
environment:
#KAFKA_BROKER_ID: 2
KAFKA_LISTENERS: PLAINTEXT://broker2:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:9092
KAFKA_ADVERTISED_HOST_NAME: broker2
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zk_node1:2181,zk_node2:2181,zk_node3:2181
#JMX_PORT: 9988
volumes:
- ./broker2/docker.sock:/var/run/docker.sock
- ./broker2/logs:/kafka/kafka-logs-broker2
external_links:# 连接本compose文件以外的container
- zk_node1
- zk_node2
- zk_node3
networks:
zookeeper_kafka:
ipv4_address: 172.19.0.22
broker3:
image: wurstmeister/kafka
restart: always
hostname: broker3
container_name: broker3
privileged: true
ports:
- 9093:9093
environment:
#KAFKA_BROKER_ID: 3
KAFKA_LISTENERS: PLAINTEXT://broker3:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker3:9093
KAFKA_ADVERTISED_HOST_NAME: broker3
KAFKA_ADVERTISED_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: zk_node1:2181,zk_node2:2181,zk_node3:2181
#JMX_PORT: 9988
volumes:
- ./broker3/docker.sock:/var/run/docker.sock
- ./broker3/logs:/kafka/kafka-logs-broker3
external_links:# 连接本compose文件以外的container
- zk_node1
- zk_node2
- zk_node3
networks:
zookeeper_kafka:
ipv4_address: 172.19.0.23
kafka-manager:
image: sheepkiller/kafka-manager
restart: always
container_name: kafka-manager
hostname: kafka-manager
ports:
- "9000:9000"
links: # 连接本compose文件创建的container
- broker1
- broker2
- broker3
external_links: # 连接本compose文件以外的container
- zk_node1
- zk_node2
- zk_node3
environment:
ZK_HOSTS: zk_node1:2181,zk_node2:2181,zk_node3:2181
KAFKA_BROKERS: broker1:9091,broker2:9092,broker3:9093
APPLICATION_SECRET: letmein
KM_ARGS: -Djava.net.preferIPv4Stack=true
networks:
zookeeper_kafka:
ipv4_address: 172.19.0.10
networks:
zookeeper_kafka:
external: # 使用已创建的网络
name: zookeeper_kafka
</pre></div>
<p>注意这里文件里的zookeeper节点名称和kafka节点名称要换成自己的</p>
<p class="maodian"><a name="_label5"></a></p><h2>5. 启动集群</h2>
<p>用docker-compose启动很方便的</p>
<p>先启动zookeeper集群:</p>
<div class="jb51code"><pre class="brush:bash;">docker-compose -f /home/docker/zookeeper/docker-compose.yml up -d
</pre></div>
<p>这里的-f选项让我们可以指定yml,因此yml的名称可以任意取,位置也可以任意放</p>
<p>如果需要关闭并删除容器:</p>
<div class="jb51code"><pre class="brush:bash;">docker-compose -f /home/docker/zookeeper/docker-compose.yml down
</pre></div>
<p>然后启动kafka集群:</p>
<div class="jb51code"><pre class="brush:bash;">docker-compose -f /home/docker/zookeeper/docker-compose.yml up -d
</pre></div>
<p>启动没问题的话,查看一下:docker ps</p>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202601/2026010710170732.png" /></p>
<p>可以看到3个zookeeper和3个kafka,1个kafka-manager已经启动好了,看起来很简单,但是这些花了我一天时间。。。</p>
<p class="maodian"><a name="_label6"></a></p><h2>6.测试有没有启动成功</h2>
<p class="maodian"><a name="_lab2_6_0"></a></p><h3>6.1 kafka-manager测试</h3>
<p>kafka的可视化工具也蛮多的,包括kafka-manager,kafka-eagle,kafka-tool等等,还有idea里面的kafka插件,本人都试了,因为本人是先用zookeeper单机和kafka单机测试的(也就是压缩包直接安装的方式),这些工具都能连接上,但是用docker单机集群之后,用kafka-tool就连接不上,然后比较坑的是kafka-eagle,其实<strong>kafka-eagle在这里面是最炫酷的,用起来也很爽,但是这玩意太耗内存了,我的虚拟机实在是跑不动它</strong>。。。我也是折腾了好久才发现的</p>
<p>回到正题,在上一步做完了之后,在Win10系统里,访问http://虚拟机IP:9000就可以访问kafka-manager了,一开始是空的,要create cluster,然后只需要随意填一个名字,再填上zookeeper的集群,就填kafka的yml文件里面,kafka-manager配置的zookeeper集群就好了,然后save</p>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202601/2026010710170797.jpg" /></p>
<p>看到这个集群的brokers是3,就说明成功了</p>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202601/2026010710170712.png" /></p>
<p>接着我建了一个test的topic,partition=3,replica = 3</p>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202601/2026010710170750.png" /></p>
<p class="maodian"><a name="_lab2_6_1"></a></p><h3>6.2 在shell终端测试</h3>
<p>先进入一个容器,然后使用kafka-console-producer.sh搞出一个生产者</p>
<p>指令清单:</p>
<div class="jb51code"><pre class="brush:bash;">docker ps -a
docker exec -it 910 bash #注: 910是broker2的id前缀
cd /opt/kafka/bin/
#可以先查看一下topic列表看看是否连得通
kafka-topics.sh --bootstrap-server broker1:9091,broker2:9092,broker3:9093 --list
kafka-console-producer.sh --broker-list broker1:9091,broker2:9092,broker3:9093 --topic test
</pre></div>
<p>截图:</p>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202601/2026010710170712.png" /></p>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202601/2026010710170741.png" /></p>
<p>这个时候就进入了生产者模式,hello 和kafka是我自己输入</p>
<p>这个时候,复制一个shell会话,同样进入一个broker,然后使用消费者模式</p>
<p>指令清单:</p>
<div class="jb51code"><pre class="brush:bash;">docker exec -it 910 bash
cd /opt/kafka/bin/
kafka-console-consumer.sh --bootstrap-server broker1:9091,broker2:9092,broker3:9093 --topic test --from-beginning
</pre></div>
<p>就能收到我刚刚输入的hello和kafka,也就说明测试成功了</p>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202601/2026010710170714.png" /></p>
<p class="maodian"><a name="_label7"></a></p><h2>7.使用Java连接集群</h2>
<p>首先引入kafka依赖和Junit依赖,我的kafka版本好像是2.8.1</p>
<div class="jb51code"><pre class="brush:xml;"> <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
</pre></div>
<p>我的生产者代码:</p>
<div class="jb51code"><pre class="brush:bash;">/**
* @author
* @Package PACKAGE_NAME
* @date 2021/12/19 12:36
*/
public class TestKafkaProducer {
private KafkaProducer<String, String> kafkaProducer;
@Before
public void before(){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.209.130:9091,192.168.209.130:9092,192.168.209.130:9093");
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<String, String>(props);
}
@Test
public void testProduce() throws ExecutionException, InterruptedException {
for (int i = 0; i < 100;) {
Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<String, String>("test", "key" + (++i), "value" + i));
future.get();
}
}
@After
public void close(){
kafkaProducer.close();
}
}
</pre></div>
<p class="maodian"><a name="_lab2_7_2"></a></p><h3>!!!!重点</h3>
<p>这个时候会发现怎么连都连不上kafka集群,然后在拉钩教育的教程中看到了要在Win10的电脑中加hosts文件的映射,如下!!!这里不是打广告,是真的谢谢他们。</p>
<div class="jb51code"><pre class="brush:bash;">192.168.209.130 broker1
192.168.209.130 broker2
192.168.209.130 broker3
</pre></div>
<p>192.168.209.130是我虚拟机的IP,broker1,2,3是yml创建的kafka集群节点,加上host映射就能连上了!!!!</p>
<p>跑生产者代码,然后这个时候,我之前打开的shell端消费者没关,正好消费到了我上面写的100条数据</p>
<p>于是我就不放消费者的Java代码出来了,能连上集群,其他的就是正常开发了。</p>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202601/2026010710170740.png" /></p>
<p class="maodian"><a name="_label8"></a></p><h2>8.重点总结</h2>
<ul><li>kafka-eagle很耗内存,机子内存不够的情况下慎用</li><li>Java连接docker里的kafka集群,需要在Win10中加host映射</li></ul>
<p>以上为个人经验,希望能给大家一个参考,也希望大家多多支持琼殿技术社区。</p>
頁:
[1]