实时警报通知:微信告警通知的重要性解析
664
2023-02-12
kafka的使用—系统保卫战
前言
最近有个需求,在不同的系统中做数据同步。我们是java+mysql、他们是c#+sqlserver。需求是sqlserver提出的,并且他们提出要实时,并且要我们主动推数据给他们。他们接口都提供好了,说要我们对数据库表操作的时候调用他们的接口把数据传他们。咋看没有什么事,不就是一个接口的调用么。仔细想想,这样对我们的系统影响还是很大的,其他的不说。重要的一点是我们的系统都依赖他们的系统了,如果他们的系统问题或网络问题会影响我们系统的操作,这显然是不可行的。为了保卫我们系统的利益。这种事是绝对不能做的。
讨论了一下了解到,他们的需求无非就是需要实时能得到某个表的数据码。刚开始我提出,我们开一个接口,让你们查看我们从库数据不就好了,这样多省事。可是他们说自己要保存数据到sqlserver(当然还有其他原因)。他们要把事情搞复杂也没办法。当然,我们同样要保护自己的利益啊。这时候就想到了使用 MQ 消息队列的方案。我们只要在数据操作成功后吧数据传到 MQ 中,之后的处理就让他们自己做了。真的是费了好大的力气才说服让他们使用 MQ 啊~~~
软件介绍
在这里我们使用 zookeeper + kafka 的方案来做。
软件 | 版本 | 其他 |
zookeeper | 3.4.6 | |
kafka | 2.10-0.9.0.0 | |
pykafka | 2.1.2 | python的kafka API |
zookeeper + kafka 基本使用教程
先决条件
使用zookeeper、kafka创建一个topic名为 goods-topic需要安装pykafka一个python的zookeeper、kafka API一个goods示例数据库
使用消息队列:
1 2 3 4 5 6 7 8 9 10 11 | # 启动zookeeper /usr/local/zookeeper/bin/zkServer.sh start # 启动kafka /usr/local/kafka/bin/kafka-server-start.sh/usr/local/kafka/config/server.properties>/tmp/kafka-logs/kafka.out2>&1& # 创建 goods-topic /usr/local/kafka/bin/kafka-topics.sh\ --create\ --zookeeper localhost:2181\ --replication-factor1\ --partitions1\ --topic test |
安装pykafka:
1 | pip install pykafka |
创建示例数据库:
1 2 3 4 5 6 7 | CREATE TABLE goods( goods_id INTNOTNULLAUTO_INCREMENT, goods_name VARCHAR(30)NOTNULL, goods_price DECIMAL(13,2)NOTNULLDEFAULT0.00, create_time DATETIME NOTNULL, PRIMARY KEY(goods_id) ); |
伪代码展示
生产者端伪代码-python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | import time,json from pykafka import KafkaClient # 相关的mysql操作 mysql_op() # 可接受多个Client这是重点 client=KafkaClient(hosts="192.168.1.233:9092, \ 192.168.1.233:9093, \ 192.168.1.233:9094") # 选择一个topic topic=client.topics['goods-topic'] # 创建一个生产者 producer=topic.get_producer() # 模拟接收前端生成的商品信息 goods_dict={ 'option_type':'insert' 'option_obj':{ 'goods_name':'goods-1', 'goods_price':10.00, 'create_time':time.strftime('%Y-%m-%d %H:%M:%S') } } goods_json=json.dumps(goods_dict) # 生产消息 producer.produce(msg) |
消费者端伪代码-python(作为后台进程在跑)
作者信息
昵称:HH
QQ:275258836
发表评论
评论列表