Kafka

网友投稿 861 2022-10-28

Kafka

Kafka

一、kafka基本介绍

1.Kafka基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该 具备的功能,但是确有着独特的设计。可以这样来说,Kafka借鉴了JMS规范的思想,但是确 并 没有完全遵循JMS规范。

首先,让我们来看一下基础的消息(Message)相关术语:

名称

解释

Broker

消息中间件处理节点,⼀个Kafka服务器就是⼀个broker,⼀个或者多个Broker可以组成⼀个Kafka集群

Topic

Kafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都需要指定⼀个topic

Producer

消息⽣产者,向Broker发送消息的客户端

Consumer

消息消费者,从Broker读取消息的客户端

ConsumerGroup(消费组)

⼀条消息可以被多个不同的消费组消费。同组中的多个消费者属于竞争关系,一条消息只能被同组中的某一个消费者消费,其他消费者不能消费。

Partition

物理上的概念,⼀个topic可以分为多个partition,每个partition内部消息是有序的

Replica(副本)

为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。

1.2使用场景

1.1 Kafka消费模式

1)点对点(一对一)

消息生产者发布消息到Queue队列中,通知消费者从队列中拉取消息进行消费。消息被消费之后则删除,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费。

2)发布/订阅模式(一对多)

消息生产者将消息发布到Topic中,同时有多个消费者订阅此topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,消费者消费数据之后,数据不会被清除,Kafka会默认保留一段时间,然后再删除。

二、分区、集群

1、分区(partition)

一个主题中的消息量是非常大的,因此可以通过分区的设置,来分布式存储这些消息。比如一个topic创建了 3 个分区。那么topic中的消息就会分别存放在这三个分区中。

​​分区的作用:​​

可以分布式存储可以并行写

2、Kafka集群

集群,由多个Kafka服务器(Broker)构成。

每一个分区都由集群中的某个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。下图是一个样例:

Kafka 的一个关键性质是日志保留(retention),我们可以配置主题的消息保留策略,譬如只保留一段时间的日志或者只保留特定大小的日志。当超过这些限制时,老的消息会被删除。我们也可以针对某个主题单独设置消息过期策略,这样对于不同应用可以实现个性化。

3、副本的概念

副本是对分区的备份。在集群中,不同的副本会被部署在不同的broker上。比如:创建 1个主题, 2 个分区、 3 个副本。

关键数据:

replicas:当前副本存在的broker节点leader:副本里的概念

每个partition都有一个broker作为leader。消息发送方要把消息发给哪个broker?就看副本的leader是在哪个broker上面。副本里的leader专⻔用来接收消息。接收到消息,其他follower通过poll的方式来同步数据。

follower:leader处理所有针对这个partition的读写请求,而follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的一致性),如果leader所在的broker挂掉,那么就会进行新leader的选举,至于怎么选,在之后的controller的概念中介绍。

4、集群中broker、分区、副本的关系

1)一个broker中存放一个topic的不同partition

2)集群中由leader和follower构成

3)每一个分区都存在一个leader,每个分区的leader可以不相同。即一个broker是一个partition的leader,也是其他partition的副本。

三、生产者

1、发送消息

1)partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。

2)如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。

3)如果既没指定partition,又没有设置key,则会轮询选出一个partition。

2、生产者push模式

producer采用push模式:是将数据发布到broker,每条消息追加到分区中,顺序写入磁盘。

需要注意:同一分区内的数据是有序的。消息写入leader后,follower是主动的去leader进行同步的!

3、ACK应答机制

在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为0、1、all。

0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。all(或-1)代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

4、保存数据

Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)

四、消费者

1、消费消息

1)指定分区消费

2)消息回溯消费

3)指定offset消费

4)设置消费组,消费指定topic

@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")public void listenGroup(ConsumerRecord record,Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //手动提交offset ack.acknowledge();}

5)设置消费组、多topic、指定分区、指定偏移量消费及设置消费者个数。

@KafkaListener(groupId = "testGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}),@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100"))},concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建议小于等于分区总数public void listenGroup(ConsumerRecord record,Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //手动提交offset ack.acknowledge();}

2、消费者pull模式

pull 模式:消费者向broker 要消息。

消费者应该向 Broker 要数据(pull)还是 Broker 向消费者推送数据(push)?

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。

pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。

3、offset偏移量

Consumer消费数据时的可靠性很容易保证,因为数据在Kafka中是持久化的,不用担心数据丢失问题。但由于Consumer在消费过程中可能遭遇断电或者宕机等故障,Consumer恢复之后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费的offset位置,以便故障恢复后可以继续消费。

1)自动提交offset

消费者pull到消息后默认情况下,会自动向broker的topic提交当前主题-分区消费的偏移量offset。

自动提交会丢消息: 因为如果消费者还没消费完poll下来的消息就自动提交了偏移量,那么此 时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息。之前未被消费的消息就丢失掉了。

2)手动提交offset

手动提交offset的方法主要有两种:

commitSync:同步提交

commitAsync:异步提交

相同点:两种方式的提交都会将本次pull拉取的一批数据的最高的偏移量提交。

不同点:同步提交阻塞当前线程,持续到提交成功,失败会自动重试(由于不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,有可能提交失败。

手动同步提交

if (records.count() > 0 ) {// 手动同步提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了consumer.commitSync();}Copy to clipboardErrorCopied

手动异步提交

if (records.count() > 0 ) {// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Mapoffsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for " + offsets); System.err.println("Commit failed exception: " +exception.getStackTrace()); } } });}

五、Springboot中使用Kafka

1.引入依赖

org.springframework.kafka spring-kafkaCopy to clipboardErrorCopied

2.配置文件

server: port: 8080spring: kafka: bootstrap-servers: 172.16.253.21: 9093 producer: # 生产者 retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送 batch-size: 16384 buffer-memory: 33554432 acks: 1 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 listener: # 当每一条记录被消费者-(ListenerConsumer)处理之后提交 # RECORD # 当每一批poll()的数据被消费者-(ListenerConsumer)处理之后提交 # BATCH # 当每一批poll()的数据被消费者-(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交 # TIME # 当每一批poll()的数据被消费者-(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交 # COUNT # TIME | COUNT 有一个条件满足时提交 # COUNT_TIME # 当每一批poll()的数据被消费者-(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交 # MANUAL # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种 # MANUAL_IMMEDIATE ack-mode: MANUAL_IMMEDIATE redis: host: 172.16.253.21Copy to clipboardErrorCopied

3.消息生产者

发送消息到指定topic

@RestControllerpublic class KafkaController { private final static String TOPIC_NAME = "my-replicated-topic"; @Autowired private KafkaTemplate kafkaTemplate; @RequestMapping("/send") public void send() { kafkaTemplate.send(TOPIC_NAME, 0 , "key", "this is a msg"); }}Copy to clipboardErrorCopied

4.消息消费者

设置消费组,消费指定topic

@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")public void listenGroup(ConsumerRecord record,Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //手动提交offset ack.acknowledge();}Copy to clipboardErrorCopied

设置消费组、多topic、指定分区、指定偏移量消费及设置消费者个数。

@KafkaListener(groupId = "testGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}),@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100"))},concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建议小于等于分区总数public void listenGroup(ConsumerRecord record,Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //手动提交offset ack.acknowledge();}

六、Kafka集群Controller、Rebalance和HW

1.Controller

Kafka集群中的broker在zk中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群的controller,负责管理整个集群中的所有分区和副本的状态:

当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。

2.Rebalance(重平衡)机制

前提是:消费者没有指明分区消费。当消费组里消费者和分区的关系发生变化,那么就会触发rebalance机制。

这个机制会重新调整消费者消费哪个分区。

在触发rebalance机制之前,消费者消费哪个分区有三种策略:

range:通过公示来计算某个消费者消费哪个分区轮询:大家轮着消费sticky:在触发了rebalance后,在消费者消费的原分区不变的基础上进行调整。

3.HW和LEO

HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW,consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。

七、Kafka线上问题优化

1.如何防止消息丢失

发送方(ACK机制): ack是 1 或者-1/all 可以防止消息丢失,如果要做到99.9999%,ack设成all,把min.insync.replicas配置成分区备份数消费方(offset机制):把自动提交改为手动提交。

2.如何防止消息的重复消费

一条消息被消费者消费多次。如果为了消息的不重复消费,而把生产端的重试机制关闭、消费端的手动提交改成自动提交,这样反而会出现消息丢失,那么可以直接在防治消息丢失的手段上再加上消费消息时的幂等性保证,就能解决消息的重复消费问题。

幂等性如何保证:

mysql 插入业务id作为主键,主键是唯一的,所以一次只能插入一条使用redis或zk的分布式锁(主流的方案)

3.如何做到顺序消费RocketMQ

发送方:在发送时将ack不能设置 0 ,关闭重试,使用同步发送,等到发送成功再发送下一条。确保消息是顺序发送的。接收方:一个分区,一个消费者。消息是发送到一个分区中,只能有一个消费组的消费者来接收消息。因此,kafka的顺序消费会牺牲掉性能。

4.解决消息​积压​问题

消息积压会导致很多问题,比如磁盘被打满、生产端发消息导致kafka性能过慢,就容易出现服务雪崩,​就需要有相应的手段:

方案一:多线程。在一个消费者中启动多个线程,让多个线程同时消费。——提升一个消费者的消费能力(增加分区增加消费者)。方案二:多消费者。如果方案一还不够的话,这个时候可以启动多个消费者,多个消费者部署在不同的服务器上。其实多个消费者部署在同一服务器上也可以提高消费能力——充分利用服务器的cpu资源。

八、Kafka-eagle监控平台

官网-压缩包:

​​http://kafka-eagle.org/​​

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:简单了解流
下一篇:release.sh 一个shell脚本用于在GitHub上构建和发布Go程序
相关文章

 发表评论

暂时没有评论,来抢沙发吧~