Kafka_2.10-0.10.0.0集群安装与配置

网友投稿 814 2022-09-28

Kafka_2.10-0.10.0.0集群安装与配置

Kafka_2.10-0.10.0.0集群安装与配置

kafka_2.10-0.10 集群安装

上文已经讲过如何安装Zookeeper集群,因为Kafka集群需要依赖Zookeeper服务,虽然Kafka有内置Zookeeper,但是还是建议独立安装Zookeeper集群服务,此处不再赘述

kafka集群还是安装在192.168.20.178  、 192.168.20.179  、192.168.20.174三台机器上面

zookeeper集群

192.168.20.178 kafka1192.168.20.179 kafka2192.168.20.174 kafka3192.168.20.37 zookeeper1192.168.20.38 zookeeper2192.168.20.39 zookeeper3

1、kafka官网-kafka_2.10-0.10.0.0.tgz压缩包,解压缩

2、修改/appcom/kafka_2.10-0.10.0.0/config/server.properties文件:

192.168.20.178机器:

broker.id=1

listeners=PLAINTEXT://kafka1:9092

advertised.listeners=PLAINTEXT://kafka1:9092

log.dirs=/data/kafka/logs

需要配置较大 分片影响读写速度

zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181

其他配置暂时采取默认

192.168.20.179机器:

broker.id=2

listeners=PLAINTEXT://kafka2:9092

advertised.listeners=PLAINTEXT://kafka2:9092

log.dirs=/data/kafka/logs

需要配置较大 分片影响读写速度

zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181

其他配置暂时采取默认

192.168.20.39机器:

broker.id=3

listeners=PLAINTEXT://kafka3:9092

advertised.listeners=PLAINTEXT://kafka3:9092

log.dirs=/data/kafka/logs

需要配置较大 分片影响读写速度

zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181

其他配置暂时采取默认

3、在每台服务器分别启动kafka服务:

./kafka-server-start.sh  /data/kafka/config/server.properties & (以这种方式启动kafka应用后,kafka总会莫名其妙的退出。后来改为这种命令启动:

./kafka-server-start.sh -daemon  /data/kafka/config/server.properties  解决了问题)

4、任意一台机器上面,测试:       在kafka中创建名为“cmy_nbd_topic1”的topic,该topic切分为4份,每一份备份数为3

./kafka-topics.sh --create --zookeeper zookeeper1:2181 --replication-factor 3 --partitions 4 --topic  cinyi_topic1

5、列出所有topic :

./kafka-topics.sh --list --zookeeper zookeeper1:2181,zookeeper2:2181,zookeeper3:2181

(1)建立一个主题

/data/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1:2181 --replication-factor 3 --partitions 1 --topic summer

#注意:factor大小不能超过broker数

(2) 增加partition数量

/data/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --alter -topic summer --partitions 4

能增加,不能减少partitions, --partitions 4 是增加后的值

(3)删除一个主题

/data/kafka/bin/kafka-topics.sh --delete --zookeeper zookeeper1:2181 --topic  summer

...1. 在kafka配置文件中  /data/kafka/config/server.properties  添加一行  delete-ic.enable = true

...2. 连接zookeeper

删除zookeeper下/brokers/topics/test-topic节点

删除zookeeper下/config/topics/test-topic节点

删除zookeeper下/admin/delete_topics/test-topic节点

(4)查看有哪些主题已经创建

[root@kafka1 ~]# /data/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper1:2181 #列出集群中所有的topic summer #已经创建成功

(5)查看summer这个主题的详情

[root@kafka1 ~]# /data/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper1:2181--topic summer

Topic:summerPartitionCount:1 ReplicationFactor:3 Configs: Topic: summerPartition: 0 Leader: 2 Replicas: 2,4,3 Isr: 2,4,3 #主题名称:summer #Partition:只有一个,从0开始 #leader :id为2的broker #Replicas 副本存在于broker id为2,3,4的上面 #Isr:活跃状态的broker

(6)发送消息,这里使用的是生产者角色

[root@kafka1 ~]# /data/kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic summerThis is a messageswelcometo kafka

(7)接收消息,这里使用的是消费者角色

[root@kafka2 ~]# /data/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1:2181 --topic summer --from-beginning

# This is a messages welcometo kafka(8)kafka集群的leader平衡机制创建一个abctest3的topic,

1 Partition: 分区2 Leader : 负责读写指定分区的节点3 Replicas : 复制该分区log的节点列表4 Isr : "in-sync"

4.启动broker2 节点进程后, 3个节点不均衡。

5. 通过kafka-preferred-replica-election.sh 命令,是leader平衡

6.动态leader平衡,在配置文件中加入 auto.leader.rebalance.enable=true

8. 集群分区日志迁移,(1) broker1 上的topic abc_test ,迁移到 broker3上, (2) 把broker1上的topic abc_test的 分区,迁移到broker3上

1.编写json的文件

cat topics-to-move.json

{"topics":{"topic":abc_test},

"version":1

}

2. 生成迁移计划,并没有迁移

/data/kafka/bin/kafka-reassign-partitions.sh   --zookeeper zookeeper1:2181 --topics-to-move-json-file /root/topics-tomove.json --broker-list "3" --generate

Current partition replica assignment

{"version":1,"partitions":[]}     ####备份回滚使用Proposed partition reassignment configuration

{"version":1,"partitions":[]}  #####迁移的json文件

把 Proposed partition reassignment configuration 生成的json保存到文件中

cat /root/expand-cluster-reassignment.json

{"version":1,"partitions":[]}

3. 执行迁移

/data/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file /root/expand-cluster-reassignment.json --execute

4.验证:

/data/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file /root/expand-cluster-reassignment.json --verify

5.迁移某个topic的某些特定的partition数据到其他broker,步骤与上面一样,但是json文件如下面所示:  cat custom-reassignment.json  {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}  可以指定到topic的分区编号

(9) kafka集群的监控  kafka offset monitor 监控一个集群,  kafka manager 可以监控多个集群

kafka offset Monitor 介绍

1. kafka集群当前存活broker集合

2. kafka集群当前活动topic集合

3. 消费者列表

4.kafka集群当前consumer按组消费的offset lag数(即当前topic当前分区目前有多少消息积压而没有及时消费)

kafka offset monitor 部署

主页wget -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 192.168.20.153:2181,192.168.20.154:2181,192.168.20.155:2181 --port 8086 --refresh 10.seconds --retain 7.days 1>stdout.log 2>stderr.log &​​

topic:创建时topic名称partition:分区编号offset:表示该parition已经消费了多少条messagelogSize:表示该partition已经写了多少条messageLag:表示有多少条message没有被消费。Owner:表示消费者Created:该partition创建时间Last Seen:消费状态刷新最新时间。

######################broker.id = 0 #当前集群中的唯一标识,和zookeeper myid功能相同port = 9092host.name= 192.168.20.178 本台机器的IP地址num-work.threads = 2 broker网络处理的线程数num.io.threads = 8 broker io处理的线程数,与 log.dirs 有关系socket.send.buffer.bytes=1048576 socker发送buffer大小,先发送到缓存区,提高性能socker.receive.buffer.bytes=1048576 接受缓存区,当大于数字时,序列化到磁盘socker.request.max.bytes=10483421 向kafka 发送消息的请求数,不能超过java堆栈大小

log.dirs = /data/kafka/logs 状态消息持久化的地方num.partitions = 2 默认的分区数,一个topic默认2个分区数log.retention.hours = 168 发送到kafka的消息过期时间,7天

在 log.retention.hours=168下添加message.max.byte = 5048576 kafka可以接收消息的最大大小是5Mdefault.replication.factor = 2 每个topic的每个patitions的消息默认只有一个副本,修改成保存2个副本数replica.fetch.max.bytes = 5048576 取消息的最大字节数5Mlog.segment.bytes = 452345245 日志文件大于数字,新启一个新的文件。log.retention.check.interval.ms=646452 每隔646452毫秒,查看一下log.retention.hours是否失效log.cleaner.enable = false 是否启用log压缩

zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181auto.leader.rebalance.enable=true leader自动平衡

消费者 consumer : 从消息队列中请求消息的客户端应用程序生产者 producer: 向broker发布消息的客户端应用程序AMPQ服务器端broker: 用来接收生产者发送的消息并将这些消息路由到服务器的队列。主题(topic): 类似新闻的 体育,娱乐,教育标题分区(partition)一个topic中的消息数据按照多个分区组织,分区是消息队列组织的最小单位,一个分区可以看作是一个FIFO的队列。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:springboot项目启动后执行方法的三种方式
下一篇:「PostgreSQL 」如何在CentOS 7 / CentOS 8上安装PostgreSQL 12
相关文章

 发表评论

暂时没有评论,来抢沙发吧~