PythonNote023---python与kafka交互

网友投稿 758 2022-10-08

PythonNote023---python与kafka交互

PythonNote023---python与kafka交互

Intro

Kafka的原理不是很清楚,通过python去订阅local环境的kafka,也只是完成数据的自测。以后有机会再去了解kafka的原理吧~

需要额外安装​​pip install kafka-python​​,相关环境和package信息:

import kafkaimport pandas as pdimport sysimport datetimeprint("Python版本:",sys.version)print("pandas版本:",pd.__version__)print("kafka版本:",kafka.__version__)

Python版本: 3.7.0 (default, Jun 28 2018, 08:04:48) [MSC v.1912 64 bit (AMD64)]pandas版本: 0.23.4kafka版本: 2.0.1

创建生产者

创建生产者的代码如下,kafka似乎不关注你写了什么,想写啥写啥,消费方关注写得内容是否是他需要的,符合既定格式的

import jsonimport timeimport datetimefrom kafka import KafkaProducerkafka_topic = "test_topic"kafka_server = "255.255.255.255"producer = KafkaProducer(bootstrap_servers=kafka_server, value_serializer=lambda m: json.dumps(m).encode())for i in range(10): print(i) producer.send(kafka_topic, {'num': i, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) time.sleep(1)

创建消费者

根据指定topic进行消费。

from kafka import KafkaConsumerkafka_topic = "test_topic"kafka_server = "255.255.255.255"consumer = Kafkaonsumer(kafka_topic, bootstrap_servers=, group_id='test', auto_offset_reset='earliest')for msg in consumer: print(msg.value)

kafka_topic topic信息bootstrap_servers 用于指定 Kafka 服务器连接地址group_id 似乎和kafka的设置有关系,如果配置,需严格匹配group_id,也可以不带,似乎会影响消费的数据量auto_offset_rest 这个参数比较复杂,放在后面说

下面是关于auto_offset_reset和group_id的一些说明,不知道对不对,做了测试了下基本符合,主要参考与 latest 在我们创建消费者对象的时候,有一个参数叫做auto_offset_reset=‘earliest’。 auto_offset_reset这个参数,只有在一个group第一次运行的时候才有作用,从第二次运行开始,这个参数就失效了。 假设现在你的 Topic 里面有100个数据,你设置了一个全新的 group_id 为test2。auto_offset_reset设置为 earliest。那么当你的消费者运行的时候,Kafka 会先把你的 offset 设置为0,然后让你从头开始消费的。//[此处存疑,group_id似乎只能指定或者不写,代码才能执行成功]

假设现在你的 Topic 里面有100个数据,你设置了一个全新的 group_id 为test3。auto_offset_reset设置为 latest。那么当你的消费者运行的时候,Kafka 不会给你返回任何数据,消费者看起来就像卡住了一样,但是 Kafka 会直接强制把前100条数据的状态设置为已经被你消费的状态。所以当前你的 offset 就直接是99了。直到生产者插入了一条新的数据,此时消费者才能读取到。这条新的数据对应的 offset 就变成了100。//[测试发现,如果group_id还是指定的,消费者读到的数据不全,可能和partition有关,如果不设置group_id则不影响数据量]

假设现在你的 Topic 里面有100个数据,你设置了一个全新的 group_id 为test4。auto_offset_reset设置为 earliest。那么当你的消费者运行的时候,Kafka 会先把你的 offset 设置为0,然后让你从头开始消费的。等消费到第50条数据时,你把消费者程序关了,把auto_offset_reset设置为latest,再重新运行。此时消费者依然会接着从第51条数据开始读取。不会跳过剩下的50条数据。//[这个测试失败,不可以新建group_id,可能和kafka的配置有关系]

所以,auto_offset_reset的作用,是在你的 group 第一次运行,还没有 offset 的时候,给你设定初始的 offset。而一旦你这个 group 已经有 offset 了,那么auto_offset_reset这个参数就不会再起作用了。//[这里的意思应该是说group运行之后,参数再搞成earliest,也不会读取历史数据]

做个小总结:

如果指定groupid,第一次执行时,auto_offset_reset=“earliest”,那么会从最早的数据开始消费。后续同一个groupid再执行,则不再读取数据老数据,但是如果生产者再消费者程序执行过后写数据,是可以被消费的。有个问题,消费的数据可能不全,不知道是不是partition机制的原因如果不指定groupid,auto_offset_reset=“earliest”,每次都从最早的数据开始消费。auto_offset_reset=“latest”,则后续写入时再被消费,并且应该没有数据缺失的情况。

partition 是如何分配

可以通过​​consumer.partitions_for_topic(kafka_topic)​​​查看 对于同一个 Topic 的同一个 Group: 假设你的 Topic 有10个 Partition,一开始你只启动了1个消费者。那么这个消费者会轮换着从这10个Partition 中读取数据。 当你启动第二个消费者时,Kafka 会从第一个消费者手上抢走5个Partition,分给第二个消费者。于是两个消费者各自读5个 Partition。互不影响。 当第三个消费者又出现时,Kafka 从第一个消费者手上再抢走1个 Partition,从第二个消费者手上抢走2个 Partition 给第三个消费者。于是,消费者1有4个 Partition,消费者2有3个 Partition,消费者3有3个 Partiton,互不影响。 当你有10个消费者一起消费时,每个消费者读取一个 Partition,互不影响。 当第11个消费者出现时,它由于分配不到 Partition,所以它什么都读不到。 在同一个 Topic,同一个 Group 中,你有多少个 Partiton,就能起多少个进程同时消费。

Ref

​​[1] 于南京市江宁区九龙湖

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:详解微信小程序开发实现定位到当前城市代码(微信小程序定位是基于微信定位吗)
下一篇:PythonNote025---conda创建python虚拟环境
相关文章

 发表评论

暂时没有评论,来抢沙发吧~