kafka 部署

网友投稿 664 2022-10-11

kafka 部署

kafka 部署

推荐该文章,亲测可用:

​​yml:

version: '3'services: zookeeper: image: wurstmeister/zookeeper ports: - "2182:2181" networks: - hbl_net kafka: image: wurstmeister/kafka depends_on: - zookeeper links: - zookeeper networks: - hbl_net ports: - "9095:9092" environment: KAFKA_BROKER_ID: 5 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xx.xx.xx.xx:9095 #宿主机监听端口 volumes: - /var/run/docker.sock:/var/run/docker.socknetworks: hbl_net: driver: bridge # 生成一个桥接网络,用于容器内部通信,注意实际生成的网络名称会带有docker-compose.yml文件所在文件夹的前缀,比如我的.yml文件放在了hbl文件夹下,所以执行后生成的网络名为hbl_hbl_net # external: true 如果外部已有网络就用这个配置

update 2020-12-04

另外一个成功的案例:使用bitnami的镜像搭建的,感觉会正规点

version: '3'services: zookeeper: image: 'bitnami/zookeeper:latest' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:latest' ports: - '9092:9092' - '29092:29092' environment: - KAFKA_BROKER_ID=1 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092,PLAINTEXT_HOST://IP:29092 - ALLOW_PLAINTEXT_LISTENER=yes

附上Python相关的调用代码:

# 生产者:import jsonfrom kafka import KafkaProducerfrom decimal import Decimalimport decimaldef producer(): host_port = '0.0.0.0:29092' topic = 'handle__price_cost' producer = KafkaProducer(bootstrap_servers=host_port, value_serializer=lambda v: json.dumps(v).encode('utf-8'), # security_protocol='SASL_PLAINTEXT', # sasl_plain_username="user", # sasl_plain_password="bitnami", # sasl_mechanism="PLAIN", max_block_ms=2000, ) msg_dict = { 'location_id': 193, 'product_id': 842892} msg = json.dumps(msg_dict, cls=DecimalEncoder) print(msg) producer.send(topic, msg) # record_metadata = future.get(timeout=10) print('send success') producer.close() print('end')

# 消费者:from kafka import KafkaConsumerimport jsonfrom pykafka import KafkaClientdef customer2(): # '192.168.1.217:9092', '192.168.1.234:9092', '192.168.1.188:9092' # hosts = '39.108.194.209:29092' hosts = '0.0.0.0:29092' client = KafkaClient(hosts=hosts) topic = client-ics['handle_price_cost'] # topic = client-ics['1111'] consumer = topic.get_simple_consumer(consumer_group=b'sinfl00d51044-200', auto_commit_interval_ms=1, auto_commit_enable=True) with open('customer.txt','wb') as f: for msg in consumer: # recv = "%s:%d:%d: key=%s value=%s" % (msg-ics, msg.partition, msg.offset, msg.key, msg.value.decode('utf-8')) a = json.loads(msg.value,encoding='utf-8') f.write((str(a)+'\n').encode('utf-8')) print(a)

懂得,原来世界如此简单!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring boot2.0 日志集成方法分享(1)
下一篇:drf serializer获取viewset 中的上下文context,自定义上下文
相关文章

 发表评论

暂时没有评论,来抢沙发吧~