kafka的使用—系统保卫战

网友投稿 664 2023-02-12

本站部分文章、图片属于网络上可搜索到的公开信息,均用于学习和交流用途,不能代表睿象云的观点、立场或意见。我们接受网民的监督,如发现任何违法内容或侵犯了您的权益,请第一时间联系小编邮箱jiasou666@gmail.com 处理。

kafka的使用—系统保卫战

前言

最近有个需求,在不同的系统中做数据同步。我们是java+mysql、他们是c#+sqlserver。需求是sqlserver提出的,并且他们提出要实时,并且要我们主动推数据给他们。他们接口都提供好了,说要我们对数据库表操作的时候调用他们的接口把数据传他们。咋看没有什么事,不就是一个接口的调用么。仔细想想,这样对我们的系统影响还是很大的,其他的不说。重要的一点是我们的系统都依赖他们的系统了,如果他们的系统问题或网络问题会影响我们系统的操作,这显然是不可行的。为了保卫我们系统的利益。这种事是绝对不能做的。

讨论了一下了解到,他们的需求无非就是需要实时能得到某个表的数据码。刚开始我提出,我们开一个接口,让你们查看我们从库数据不就好了,这样多省事。可是他们说自己要保存数据到sqlserver(当然还有其他原因)。他们要把事情搞复杂也没办法。当然,我们同样要保护自己的利益啊。这时候就想到了使用 MQ 消息队列的方案。我们只要在数据操作成功后吧数据传到 MQ 中,之后的处理就让他们自己做了。真的是费了好大的力气才说服让他们使用 MQ 啊~~~

软件介绍

在这里我们使用 zookeeper + kafka 的方案来做。

软件版本其他
zookeeper3.4.6
kafka2.10-0.9.0.0
pykafka2.1.2python的kafka API

zookeeper + kafka 基本使用教程

先决条件

使用zookeeper、kafka创建一个topic名为 goods-topic需要安装pykafka一个python的zookeeper、kafka API一个goods示例数据库

使用消息队列:

1
2
3
4
5
6
7
8
9
10
11
# 启动zookeeper
/usr/local/zookeeper/bin/zkServer.sh start
# 启动kafka
/usr/local/kafka/bin/kafka-server-start.sh/usr/local/kafka/config/server.properties>/tmp/kafka-logs/kafka.out2>&1&
# 创建 goods-topic
/usr/local/kafka/bin/kafka-topics.sh\
--create\
--zookeeper localhost:2181\
--replication-factor1\
--partitions1\
--topic test

安装pykafka:

1
pip install pykafka

创建示例数据库:

1
2
3
4
5
6
7
CREATE TABLE goods(
goods_id INTNOTNULLAUTO_INCREMENT,
goods_name VARCHAR(30)NOTNULL,
goods_price DECIMAL(13,2)NOTNULLDEFAULT0.00,
create_time DATETIME NOTNULL,
PRIMARY KEY(goods_id)
);

伪代码展示

生产者端伪代码-python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time,json
from pykafka import KafkaClient
# 相关的mysql操作
mysql_op()
# 可接受多个Client这是重点
client=KafkaClient(hosts="192.168.1.233:9092, \
                            192.168.1.233:9093, \
                            192.168.1.233:9094")
# 选择一个topic
topic=client.topics['goods-topic']
# 创建一个生产者
producer=topic.get_producer()
# 模拟接收前端生成的商品信息
goods_dict={
'option_type':'insert'
'option_obj':{
'goods_name':'goods-1',
'goods_price':10.00,
'create_time':time.strftime('%Y-%m-%d %H:%M:%S')
}
}
goods_json=json.dumps(goods_dict)
# 生产消息
producer.produce(msg)

消费者端伪代码-python(作为后台进程在跑)

作者信息

昵称:HH

QQ:275258836

上一篇:运维事件单三线处理目标(三线运维工作内容)
下一篇:Linux之ssh连接保持与重用
相关文章

 发表评论

评论列表