python操作Kafka
<p></p><div class="toc"><div class="toc-container-header">目录</div><ul><li>一、python 操作 kafka<ul><li>1. python 使用 kafka 生产者</li><li>2. python 使用 kafka 消费者</li><li>3. 使用 docker 中的 kafka</li></ul></li><li>二、python操作kafka细节<ul><li>2.1 生产者demo</li><li>2.2 消费者demo</li><li>2.3 消费者(消费者组)</li><li>2.4 消费者(读取目前最早可读的消息)</li><li>2.5 消费者(手动设置偏移量)</li><li>2.6 消费者(订阅多个主题)</li><li>2.7 消费者(手动拉取消息)</li><li>2.8 消费者(消息挂起与恢复)<ul><li>2.8.1 消息挂起和恢复的实现</li><li>2.8.2 完整示例</li></ul></li><li>2.9 Python创建自定义的Kafka Topic</li></ul></li></ul></div><p></p><h1 id="一python-操作-kafka">一、python 操作 kafka</h1>
<ul>
<li>说明:关于 kafka 的启动与安装,命令行的使用,请查看前面的几篇文章。本篇文章主要描述python 如何操作 kafka</li>
<li>使用 python 操作 kafka 首先安装如下的包</li>
</ul>
<pre><code class="language-python">pip install kafka-python# 由于 python 3.7 后的版本中 async 的关键字发生了变化,因此需要安装kafka-python库;弃用kafka库
</code></pre>
<h2 id="1-python-使用-kafka-生产者">1. python 使用 kafka 生产者</h2>
<ul>
<li>
<p><strong>说明:</strong>python 在操作 kafka 写入数据的时候,分为发送往已经存在的主题或者是不存在的主题,当主题不存在的时候,生产者会自动创建该主题,并将消息存贮在默认的 0 分区;</p>
</li>
<li>
<p>下面是将 <code>kafka-python</code> 库中生产者常用的方法进行封装,以便直接使用。更详细用法在下面第二节中</p>
</li>
</ul>
<pre><code class="language-python"> import json
import kafka
class Producer(object):
""" kafka 的生产者模型
"""
_coding = "utf-8"
def __init__(self,
broker='192.168.74.136:9092',
topic="add_topic",
max_request_size=104857600,
batch_size=0,# 即时发送,提高并发可以适当增加,但是会造成消息的延迟;
**kwargs):
"""初始化设置 kafka 生产者连接对象;参数不存在的情况下使用配置文件中的默认连接;
"""
self.broker = broker
self.topic = topic
self.max_request_size = max_request_size
# 实例化生产者对象
self.producer_json = kafka.KafkaProducer(
bootstrap_servers=self.broker,
max_request_size=self.max_request_size,
batch_size=batch_size,
key_serializer=lambda k: json.dumps(k).encode(self._coding),# 设置键的形式使用匿名函数进行转换
value_serializer=lambda v: json.dumps(v).encode(self._coding),# 当需要使用 json 传输地时候必须加上这两个参数
**kwargs
)
self.producer = kafka.KafkaProducer(
bootstrap_servers=broker,
max_request_size=self.max_request_size,
batch_size=batch_size,
api_version=(0, 10, 1),
**kwargs
)
def send(self, message: bytes, partition: int = 0):
"""
写入普通的消息;
Args:
message: bytes; 字节流数据;将字符串编码成 utf-8的格式;
partition: int; kafka 的分区,将消息发送到指定的分区之中;
Returns:
None
"""
future = self.producer.send(self.topic, message, partition=partition)
record_metadata = future.get(timeout=30)
if future.failed():# 发送失败,记录异常到日志;
raise Exception("send message failed:%s)" % future.exception)
def send_json(self, key: str, value: dict, partition: int = 0):
"""
发送 json 形式的数据;
Args:
key: str; kafka 中键的值
value: dict; 发送的具体消息
partition: int; 分区的信息
Returns:
None
"""
future = self.producer_json.send(self.topic, key=key, value=value, partition=partition)
record_metadata = future.get(timeout=30)
if future.failed():# 发送失败记录异常;
raise Exception("send json message failed:%s)" % future.exception)
def close(self):
"""
关闭kafka的连接。
Returns:
None
"""
self.producer_json.close()
self.producer.close()
if __name__ == '__main__':
'''脚本调用执行;'''
kafka_obj = Producer()
print(kafka_obj.broker)
kafka_obj.send("自动生成".encode())
</code></pre>
<ul>
<li><strong>发送的消息,主要是普通的字符串消息,和字典形式的消息,方便对接kafka</strong></li>
</ul>
<h2 id="2-python-使用-kafka-消费者">2. python 使用 kafka 消费者</h2>
<ul>
<li>
<p><strong>由于 kafka 消费者的特性,阻塞循环是一个必然的过程,可以使用 python 中的生成器进行优化,但是循环阻塞是无可避免的;</strong></p>
</li>
<li>
<p>操作 kafka 的消费者依旧只需要安装上述的第三方依赖包 <code>kafka-python</code>;</p>
</li>
<li>
<p>下面是将 <code>kafka-python</code> 库中消费者常用的方法进行封装,以便直接使用。更详细用法在下面第二节中</p>
</li>
</ul>
<pre><code class="language-python"> import json
from kafka import KafkaConsumer, KafkaProducer
from kafka.structs import TopicPartition
class KConsumer(object):
"""kafka 消费者; 动态传参,非配置文件传入;
kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;
"""
_encode = "UTF-8"
def __init__(self, topics="start_server", bootstrap_server=None, group_id="start_task", partitions=None, **kwargs):
""" 初始化kafka的消费者;
1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)
2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;
3. 手动设置偏移量
Args:
topics: str; kafka 的消费主题;
bootstrap_server: list; kafka 的消费者地址;
group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;
partitions: int; 消费的分区,当不使用分区的时候默认读取是所有分区;
**kwargs: dict; 其他原生kafka消费者参数的;
"""
if bootstrap_server is None:
bootstrap_server = ["192.168.74.136:9092"]# kafka集群的话就写多个
self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_server)
exist = self.exist_topics(topics)
if not exist:# 需要的主题不存在;
# 创建一条
self.create_topics(topics)
if partitions is not None:
self.consumer = KafkaConsumer(
bootstrap_servers=bootstrap_server,
group_id=group_id,
# 目前只有一个消费者,根据情况是否需要进行修改;当扩展多个消费者的时候需要进行扩展;
**kwargs
)
# print("指定分区信息:", partitions, topics, type(partitions))
self.topic_set = TopicPartition(topics, int(partitions))
self.consumer.assign()
else:
# 默认读取主题下的所有分区, 但是该操作不支持自定义 offset, 因为 offset 一定是在指定的分区中进行的;
self.consumer = KafkaConsumer(
topics,
bootstrap_servers=bootstrap_server,
group_id=group_id,
**kwargs
)
def exist_topics(self, topics):
"""
检查 kafka 中的主题是否存在;
Args:
topics: 主题名称;
Returns:
bool: True/False ; True,表示存在,False 表示不存在;
"""
topics_set = set(self.consumer.topics())
if topics not in topics_set:
return False
return True
@staticmethod
def create_topics(topics):
"""
创建相关的 kafka 主题信息;说明本方法可以实现用户自定义 kafka 的启动服务,默认是使用的是 start_server;
Args:
topics: str; 主题的名字;
Returns:
None
"""
producer = KafkaProducer(
bootstrap_servers='192.168.74.136:9092',
key_serializer=lambda k: json.dumps(k).encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
producer.send(topics, key="start", value={"msg": "aaaa"})
producer.close()
def recv(self):
"""
接收消费中的数据
Returns:
使用生成器进行返回;
"""
for message in self.consumer:
# 这是一个永久阻塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都会有偏移
# print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % (
# message.topic, message.partition, message.offset, message.key, message.value))
yield {"topic": message.topic, "partition": message.partition, "key": message.key,
"value": message.value.decode(self._encode)}
def recv_seek(self, offset):
"""
接收消费者中的数据,按照 offset 的指定消费位置;
Args:
offset: int; kafka 消费者中指定的消费位置;
Returns:
generator; 消费者消息的生成器;
"""
self.consumer.seek(self.topic_set, offset)
for message in self.consumer:
# print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % (
# message.topic, message.partition, message.offset, message.key, message.value))
yield {"topic": message.topic, "partition": message.partition, "key": message.key,
"value": message.value.decode(self._encode)}
if __name__ == '__main__':
""" 测试使用;
"""
obj = KConsumer("exist_topic", bootstrap_server=['192.168.74.136:9092'])
for i in obj.recv():
print(i)
</code></pre>
<ul>
<li>该消费者封装时多增加了一个需求,消费的主题不存在的时候会默认创建(创建成功的前提是kafka服务端的设置<code>auto.create.topics.enable=true</code>),下次就可以继续消费</li>
</ul>
<h2 id="3-使用-docker-中的-kafka">3. 使用 docker 中的 kafka</h2>
<ul>
<li>
<p>以上两种脚本适用于 Kafka 的生产者和消费者在大多数情况下的使用,在使用的时候只需要将相关的配置信息修改即可;</p>
</li>
<li>
<p>docker 中使用 kafka 的时候与前面的配置稍有不同,当使用<code>docker-compose</code>部署 Kafka 的时候,地址在文件中经过修改,可能会被改变,因此只需要将相关的地址配好,即可;代码信息无需修改;</p>
</li>
<li>
<p>一般情况下如果是在 docker 中配置相关的参数,需要将端口映射出来,然后如果是 windows 可能需要将<strong>host</strong>的网络地址解析,与docker 中 kafka 的名称对应;</p>
</li>
</ul>
<pre><code class="language-undefined"> host 文件
127.0.0.1 kafka
</code></pre>
<ul>
<li>当需要远程连接的时候,将地址改成该计算机在内网中的地址即可</li>
</ul>
<h1 id="二python操作kafka细节">二、python操作kafka细节</h1>
<pre><code class="language-python"># 需要安装的库如下
pip install kafka-python
pip install msgpack# msgpack库将数据序列化可以将数据转换为二进制格式,便于在网络中传输
# msgpack替换为python内置的bytes方法更简便 ,如: log = bytes(str(log), encoding="utf-8")
# msgpack的序列化
data = 'test_message'
msgpack.packb(data)
# msgpack的反序列化
packed_data = b'\x83\xa3age\x14\xa3name\xa3Tom\xa3sex\xa1M'
data = msgpack.unpackb(packed_data)
</code></pre>
<h2 id="21-生产者demo">2.1 生产者demo</h2>
<pre><code class="language-python">from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')
# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})
# produce asynchronously
for _ in range(100):
producer.send('my-topic', b'msg')
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
# block until all async messages are sent
producer.flush()
# configure multiple retries
producer = KafkaProducer(retries=5)
</code></pre>
<h2 id="22-消费者demo">2.2 消费者demo</h2>
<pre><code class="language-python">from kafka import KafkaConsumer
consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])#参数为接收主题和kafka服务器地址
# 这是一个永久堵塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都有偏移
for message in consumer:# consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待直到新消息过来
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
</code></pre>
<h2 id="23-消费者消费者组">2.3 消费者(消费者组)</h2>
<pre><code class="language-python">from kafka import KafkaConsumer
# 为定义的消费者指定一个group_id,对于订阅同一个topic来的group来说,能够使得该group中的所有消费者共同消费其中的消息(group中每个消费者消费的是不同的消息),横向扩展处理能力
# 对于订阅同一个topic来的不同group来说,每个消费组都会获取这个topic中的所有消息进行消费
# 如果不指定group ID,consumer会使用一个自动生成的group id。这个group id的格式通常是 __consumer_offsets-<generated_uuid>,其中<generated_uuid>是一个随机生成的唯一通用标识符。那么相当于一个消费者就单独属于一个消费组,尽管它们订阅了同一个topic,也只能一个人单独消费该topic的全部消息
consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
</code></pre>
<h2 id="24-消费者读取目前最早可读的消息">2.4 消费者(读取目前最早可读的消息)</h2>
<pre><code class="language-python">from kafka import KafkaConsumer
consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
</code></pre>
<ul>
<li><code>auto_offset_reset</code>:重置偏移量,<code>earliest</code> 移到最早的可用消息,<code>latest</code> 最新的消息,默认为 <code>latest</code><br>
源码定义: <code>{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}</code></li>
</ul>
<h2 id="25-消费者手动设置偏移量">2.5 消费者(手动设置偏移量)</h2>
<pre><code class="language-python"># ==========读取指定位置消息===============
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])
print(consumer.partitions_for_topic("test"))#获取test主题的分区信息
print(consumer.topics())#获取主题列表
print(consumer.subscription())#获取当前消费者订阅的主题
topic_partition_set = consumer.assignment()
print(topic_partition_set)#获取当前消费者topic、分区信息
for topic_partition in topic_partition_set:
print(consumer.beginning_offsets(topic_partition))# 获取当前消费者可消费的偏移量
offset = consumer.position(topic_partition)
consumer.seek(TopicPartition(topic='test', partition=0), 5)#重置偏移量,从第5个偏移量消费
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
</code></pre>
<h2 id="26-消费者订阅多个主题">2.6 消费者(订阅多个主题)</h2>
<pre><code class="language-python"># =======订阅多个消费者==========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))#订阅要消费的主题
print(consumer.topics())
print(consumer.position(TopicPartition(topic='test', partition=0))) #获取当前主题的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
</code></pre>
<h2 id="27-消费者手动拉取消息">2.7 消费者(手动拉取消息)</h2>
<pre><code class="language-python">from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
msg = consumer.poll(timeout_ms=5) #从kafka获取消息
print(msg)
time.sleep(2)
</code></pre>
<h2 id="28-消费者消息挂起与恢复">2.8 消费者(消息挂起与恢复)</h2>
<h3 id="281-消息挂起和恢复的实现">2.8.1 消息挂起和恢复的实现</h3>
<pre><code class="language-python"># ==============消息恢复和挂起===========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))# pause执行后,consumer不能读取,直到调用resume后恢复。
num = 0
while True:
print(num)
print(consumer.paused()) #获取当前挂起的消费者
msg = consumer.poll(timeout_ms=5)
print(msg)
time.sleep(2)
num = num + 1
if num == 10:
print("resume...")
consumer.resume(TopicPartition(topic='test', partition=0))
print("resume......")
</code></pre>
<ul>
<li><code>pause</code> 执行后,consumer不能读取,直到调用 <code>resume</code> 后恢复</li>
</ul>
<h3 id="282-完整示例">2.8.2 完整示例</h3>
<pre><code class="language-python">import json
import msgpack
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)
# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)
# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')
# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
</code></pre>
<h2 id="29-python创建自定义的kafka-topic">2.9 Python创建自定义的Kafka Topic</h2>
<pre><code class="language-python">client = KafkaClient(bootstrap_servers=brokers)
if topic not in client.cluster.topics(exclude_internal_topics=True):# Topic不存在
request = admin.CreateTopicsRequest_v0(
create_topic_requests=[(
topic,
num_partitions,
-1,# replication unset.
[],# Partition assignment.
[(key, value) for key, value in configs.items()],# Configs
)],
timeout=timeout_ms
)
future = client.send(2, request)# 2是Controller,发送给其他Node都创建失败。
client.poll(timeout_ms=timeout_ms, future=future, sleep=False)# 这里
result = future.value
# error_code = result.topic_error_codes
print("CREATE TOPIC RESPONSE: ", result)# 0 success, 41 NOT_CONTROLLER, 36 ALREADY_EXISTS
client.close()
else:# Topic已经存在
print("Topic already exists!")
return
</code></pre><br><br>
来源:https://www.cnblogs.com/Mcoming/p/18087704
頁:
[1]