痞子秦 發表於 2023-6-15 00:00:00

kafka的使用—系统保卫战

<h3>前言</h3>
<p>最近有个需求,在不同的系统中做数据同步。我们是java+mysql、他们是c#+sqlserver。需求是sqlserver提出的,并且他们提出要实时,并且要我们主动推数据给他们。他们接口都提供好了,说要我们对数据库表操作的时候调用他们的接口把数据传他们。咋看没有什么事,不就是一个接口的调用么。仔细想想,这样对我们的系统影响还是很大的,其他的不说。重要的一点是我们的系统都依赖他们的系统了,如果他们的系统问题或网络问题会影响我们系统的操作,这显然是不可行的。为了保卫我们系统的利益。这种事是绝对不能做的。</p>
<p>讨论了一下了解到,他们的需求无非就是需要实时能得到某个表的数据码。刚开始我提出,我们开一个接口,让你们查看我们从库数据不就好了,这样多省事。可是他们说自己要保存数据到sqlserver(当然还有其他原因)。他们要把事情搞复杂也没办法。当然,我们同样要保护自己的利益啊。这时候就想到了使用 MQ 消息队列的方案。我们只要在数据操作成功后吧数据传到 MQ 中,之后的处理就让他们自己做了。真的是费了好大的力气才说服让他们使用 MQ 啊~~~</p>
<p>下面就使用python来模拟一下我们的方案(希望大家来吐槽 :) )</p>
<h3>软件介绍</h3>
<p>在这里我们使用 zookeeper + kafka 的方案来做。</p>
<table><tbody>
<tr>
<td width="260"><strong>软件</strong></td>
<td width="260"><strong>版本</strong></td>
<td width="260"><strong>其他</strong></td>
</tr>
<tr>
<td width="260">zookeeper</td>
<td width="260">3.4.6</td>
<td width="260"></td>
</tr>
<tr>
<td width="260">kafka</td>
<td width="260">2.10-0.9.0.0</td>
<td width="260"></td>
</tr>
<tr>
<td width="260">pykafka</td>
<td width="260">2.1.2</td>
<td width="260">python的kafka API</td>
</tr>
</tbody></table>
<h3>zookeeper + kafka 基本使用教程</h3>
<p>http://www.linuxidc.com/Linux/2014-07/104470.htm</p>
<h3>先决条件</h3>
<ol>
<li>使用zookeeper、kafka创建一个topic名为 goods-topic</li>
<li>需要安装pykafka一个python的zookeeper、kafka API</li>
<li>一个goods示例数据库</li>
</ol>
<ul>
<li><strong>使用消息队列:</strong></li>
</ul>
<p></p><pre class="brush:bash;toolbar:false"># 启动zookeeper
/usr/local/zookeeper/bin/zkServer.sh start
# 启动kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &gt; /tmp/kafka-logs/kafka.out 2&gt;&amp;1 &amp;
# 创建 goods-topic
/usr/local/kafka/bin/kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test</pre><p></p>
<ul>
<li><strong>安装pykafka:</strong></li>
</ul>
<p></p><pre class="brush:bash;toolbar:false">pip install pykafka</pre><p>官网:http://readthedocs.org/projects/pykafka/</p>
<ul>
<li><strong>创建示例数据库:</strong></li>
</ul>
<p></p><pre class="brush:bash;toolbar:false">CREATE TABLE goods(
goods_id INT NOT NULL AUTO_INCREMENT,
goods_name VARCHAR(30) NOT NULL,
goods_price DECIMAL(13, 2) NOT NULL DEFAULT 0.00,
create_time DATETIME NOT NULL,
PRIMARY KEY(goods_id)
);</pre><p></p>
<h3>伪代码展示</h3>
<ul>
<li><strong>生产者端伪代码-python</strong></li>
</ul>
<p></p><pre class="brush:bash;toolbar:false">import time, json
from pykafka import KafkaClient

# 相关的mysql操作
mysql_op()

# 可接受多个Client这是重点
client = KafkaClient(hosts="192.168.1.233:9092, \
                            192.168.1.233:9093, \
                            192.168.1.233:9094")
# 选择一个topic
topic = client.topics['goods-topic']
# 创建一个生产者
producer = topic.get_producer()
# 模拟接收前端生成的商品信息
goods_dict = {
'option_type':'insert'
'option_obj':{
    'goods_name':'goods-1',
    'goods_price':10.00,
    'create_time':time.strftime('%Y-%m-%d %H:%M:%S')
}
}
goods_json = json.dumps(goods_dict)
# 生产消息
producer.produce(msg)</pre><p></p>
<ul>
<li>
<strong>消费者端伪代码-python(</strong><strong>作为后台进程在跑)</strong>
</li>
</ul>
<p></p><pre class="brush:bash;toolbar:false">import time, json
from pykafka import KafkaClient
# 可接受多个Client这是重点
client = KafkaClient(hosts="192.168.1.233:9092, \
                            192.168.1.233:9093, \
                            192.168.1.233:9094")
# 选择一个topic
topic = client.topics['goods-topic']
# 生成一个消费者
balanced_consumer = topic.get_balanced_consumer(
consumer_group='goods_group',
auto_commit_enable=True,
zookeeper_connect='localhost:2181'
)
# 消费信息
for message in balanced_consumer:
if message is not None:
    # 解析json为dict
    goods_dict = json.loads(message)
    # 对数据库进行操作
    if goods_dict['option_type'] == 'insert':
      mysql_insert()
    elif goods_dict['option_type'] == 'update':
      mysql_update()
    elif goods_dict['option_type'] == 'delete':
      mysql_delete()
    else:
      order_option()</pre><p></p>
<h3> 作者信息</h3>
<p>昵称:HH</p>
<p>QQ:275258836</p>
<p>感觉本文内容不错,读后有收获?</p>
<p>逛逛衣服店,鼓励作者写出更好文章。</p>
頁: [1]
查看完整版本: kafka的使用—系统保卫战