洞察企业如何通过FinClip提升跨平台小程序加载效率,适应多样化市场需求
2451
2022-11-23
消息队列重要机制讲解以及MQ设计思路(kafka、rabbitmq、rocketmq)
目录《Kafka篇》简述kafka的架构设计原理(入口点)消息队列有哪些作用(简单)消息队列的优缺点,使用场景(基础)消息队列如何保证消息可靠传输死信队列是什么?延时队列是什么?(经典)简述kafka的rebalance机制(比较深入)简述kafka的副本同步机制(比较深入)kafka中zookeeper的作用kafka中的pull、push的优劣势分析kafka中高读写性能原因分析kafka高性能高吞吐的原因kafka消息丢失的场景以及解决方案(重点)kafka为什么比RocketMQ的吞吐量高kafka、ActiveMQ、RabbitMQ、RocketMQ对比《RabbitMQ篇》RabbitMQ架构设计RabbitMQ的交换器类型RabbitMQ的普通集群模式RabbitMQ的镜像队列原理RabbitMQ持久化机制RabbitMQ事务消息RabbitMQ如何保证消息的可靠性传输RabbitMQ的死信队列原理RabbitMQ是否可以直连队列《RocketMQ篇》简述RocketMQ架构设计简述RocketMQ持久化机制RocketMQ怎么实现顺序消息RocketMQ的底层实现原理RocketMQ如何保证不丢失消息《MQ总结篇》如何设计一个MQ如何进行产品选型如何保证消息的顺序
《Kafka篇》
简述kafka的架构设计原理(入口点)
无论是那种MQ都会存在三个:producer、MQ的cluster、consumer的group kafka中还多出了zookeeper,用来维护集群的。 注意分区是将一个整体分割到不同的分区上,主从则是都保留数据整体,不过是主与副本的关系。Broker:单独的机器Consumer Group:消费者组,消费者组内每个消费者负责消费不同分区的数据,提高消费能力。逻辑上的一个订阅者Topic:可以理解为一个队列,Topic将消息分类,生产者和消费者面向的是同一个Topic,它是可以分区的,存在不同的Broker中Partition:为了实现扩展性,提高并发能力。一个Topic以多个Partition的方式分布到多个Broker上,每个Partition是一个有序的队列。一个Topic的每个Partition都有若干个副本,一个Leader和若干个Follower。生产者发送数据的对象以及消费者消费的数据对象都是Leader。(这一点可以从图中看出,红色的虚线便是如此)Follower负责实时从Leader中同步数据,保持和Leader数据同步。Leader发生故障时,某个Follower会被重新选举为新的Leader。
消息队列有哪些作用(简单)
1、解耦:使用消息队列来作为两个系统直接的通讯方式,两个系统不需要相互依赖了 2、异步:系统A给消费队列发送完消息之后,就可以继续做其他事情了 3、流量削峰:如果使用消息队列的方式来调用某个系统,那么消息将在队列中排队,由消费者自己控制消费速度。将流量从高峰期引入到低谷期进行处理,起到缓冲作用
消息队列的优缺点,使用场景(基础)
优点: 1、解耦:使用消息队列来作为两个系统直接的通讯方式,两个系统不需要相互依赖了 2、异步:系统A给消费队列发送完消息之后,就可以继续做其他事情了 3、流量削峰:如果使用消息队列的方式来调用某个系统,那么消息将在队列中排队,由消费者自己控制消费速度 缺点: 1、增加了系统复杂度,加上了与MQ交互的逻辑,带入了幂等、重复消费、消息丢失等问题 2、系统可用性降低,MQ的故障会影响系统可用 3、一致性,消费端可能失败。A端将消息送入MQ后就不知道B端对消息处理是否成功。 使用场景:日志采集:日志量较大时不希望影响到正常的业务,使用MQ异步传送出去,允许小部分的重复记录、记录消失发布订阅:类似与监听,对感兴趣的消费MQ中的消息
消息队列如何保证消息可靠传输
消息可靠传输代表两层意思:不多也不少 1、为了保证消息不多,也就是消息不能重复,也就是生产者不能重复生产消息,或者消费者不能重复消费消息:
要确保消息不多发,这个不容易出现,难以控制从MQ本身来看,尽管有ack或offset的机制,在网络不好或者消费者宕机时,这些标志上传会失败。所以MQ也不能保证正确感知消息是否被消费要避免不重复消费,最保险机制就是消费者实现幂等性,保证就算是重复消费,也不会出现问题。具体来讲,就是不管是MQ push消息还是消费者pull消息都要保证。幂等的概念就是用相同的参数请求C端,处理结果不会因为次数的增加而改变。这边提供三个方案:
1、如果是写redis,就没问题,每次都是set,天然幂等性。但是键值对的超时时间会随着刷set而往后延。 2、生产者发送消息的时候带上一个全局唯一的id,消费者拿到消息后,先根据这个id去redis里查一下,之前有没有被消费过,没有消费过就处理,并且写入这个id到redis。如果消费国了,则不处理 3、基于数据库的唯一键,主键唯一的话,重复的记录就不会被插入
2、消息不能少,也就是消息不能丢失,生产者发送的消息,消费者一定要能消费到:
生产者发送消息时,要确认broker确实收到并持久化了这条消息,比如RabbitMQ的confirm机制,Kafka的ack机制都可以保证生产者能正确的将消息发送给brokerbroker要等待消费者真正确认消费到了消息时才删除掉消息,这里通常就是消费端ack机制,消费者接收到一条消息后,如果确认没问题了,就可以给broker发送一个ack,broker接收到ack后才会删除消息
死信队列是什么?延时队列是什么?(经典)
1、死信队列也是一个消费队列,用来存放那些没有成功消费的消息,(重试之后还是失败则进入死信队列),可以用来作为消息重试 2、进入到这个队列中的消息,需要等待设置的时间之后才能被消费者消费到,延时队列就是用来存放需要在指定时间被处理的元素的队列,通常可以用来处理一些具有过期性操作的业务,如十分钟内未支付就取消订单
简述kafka的rebalance机制(比较深入)
1、consumer group 中的成员个数发生变化2、consumer 消费超时,一直没有提交offset3、group订阅的topic个数发生变化4、group订阅的topic的分区数发生变化
所以对应减少rebalance的方法有: 1、超时阈值调大 2、在业务低峰期的时候人工增加topic和partion
那么rebalance具体是什么样的操作呢,下面介绍coordinator发现 group 中的成员个数发生变化主动进行rebalance的操作过程: coordinator:通常是partion的leader节点(一个partion是有多个副本的,存在leader与follower节点)所在的broker,负责监控group中的consumer的存活,consumer维持到coordinator的心跳(消费者定时向协调者上报心跳),判断consumer是否消费超时
coordinator通过心跳返回通知consumer进行rebalance。举例,一个group中有C1 C2 C3,此时C1挂了,要进行rebalance,协调者也需要通知C1 C2不能进行消费,由于消费者与协调者之间是通过心跳通信,协调者通过回复心跳,通知消费者进入rebalance状态consumer请求coordinator加入group,coordinator会知道有哪些消费者请求它加入group,也就知道了group中有哪些消费者是存活的,coordinator就会选举产生leader consumerleader consumer从coordinator获取所有的consumer,然后将partion与所有的consumer进行分配,然后将分配结果封装成syncGroup,发送syncGroup(分配信息)到coordinatocoordinator拿到分配信息后,通过心跳机制将分配信息下发给consumer,consumer拿到分配信息后就知道它该去消费哪个partion了至此,完成rebalance
还有一种情况,就是leader consumer 监控topic or partion的变化,通知coordinator触发rebalance,之后的流程与上述一致。
rebalance存在的问题:如果C1消费消息超时(并没有提交offset),触发了rebalance,重新分配后,该消息极有可能会被其他消费者C2拿去消费,此时C1消费完成提交offset(表示该消息已经处理完了),那么C2消费完之后也会提交一个offset,导致错误 解决方案如下:coordinator每次rebalance,会标记一个Generation(表示rebalance的周期数)给到consumer,每次rebalance该Generation会+1,consumer提交offset的时候,coordinator会比对Generation,不一致则拒绝提交
简述kafka的副本同步机制(比较深入)
leader收消息后(offset肯定要移动)会更新本地的LEO,leader还会维护follower的LEO即remote_LEO。follower会发出一个fetch同步数据的请求(携带自身的LEO)给leader,leader就知道了ISR列表中所有follower的remote_LEO,然后比较得出最小的remote_LEO,然后作为分区的HW,然后进行更新,再把HW数据响应给follower ,follower拿到HW之后更新自身的HW(取响应的HW和自身LEO中的较小值),然后进行数据落盘,然后LEO+1。所以总的来说follower是异步的形式进行更新HW ISR:如果一个follower落后leader不超过某个时间阈值,那么则在ISR,否则放在OSR中。 在同步副本的时候,follower获取leader的LEO和LogStartOffset,与本地对比。如果本地的LogStartOffset超出了leader的值,则超过这个值的数据删除,再进行同步,如果本地的小于leader的,那么直接同步。 注意,同步的时候可能会导致消息丢失,leader接受到消息更新完本地后,LEO还没相应给follower的时候,leader自己就挂掉了。然后重启之后原leader就变成follower了(重新选举了),那么它再去向新leader同步的时候就会把原本本地没有同步出去的消息给删除,也代表着这个消息就丢失了。
kafka中zookeeper的作用
zookeeper负责的是集群的管理功能,后面的迭代中zk已经不再了。 看看zk在kafka中存储了哪些节点信息吧:/brokers/ids:临时节点,kafka连接到zk后创建的节点,保存所有broker节点信息,存储broker的物理地址、版本信息、启动时间等,节点名称为brokerID,broker定时发送心跳到zk,如果断开则该brokerID节点就会被删除。/brokers/topics:临时节点,节点保存broker节点下所有的topic信息,每一个topic节点下包含了一个固定的partitions节点(/brokers/topics/partitions),partitions的子节点就是topic的分区,每个分区下保存一个state节点,保存着当前leader分区和ISR(可靠的从节点列表)的brokerID,state节点由leader创建,若leader宕机该节点会被删除,直到有新的leader选举产生、重新生成state节点/consumer/[group_id]/owners/[topic]/[broker_id-partition_id]:维护消费者和分区的注册关系(哪个消费者消费哪个分区)/consumer/[group_id]/offsets/[topic]/[broker_id-partition_id]:分区消息的消息进度offset
cilent通过topic找到topic树下的state节点,获取leader的brokerID,到broker树中找到brokerID的物理地址,但是cilent不会直接连着zk,而是通过配置的broker获取到zk中的信息。
kafka中的pull、push的优劣势分析
pull模式:
根据consumer的消费能力进行数据拉取,可以控制速率可以批量拉取,也可以单条拉取可以设置不同的提交方式,实现不同的传输语义缺点:如果kafka没有数据,会导致consumer空循环,消耗资源解决:通过参数设置,consumer拉取数据为空或者没有达到一定数量时进行阻塞
push模式: 不会导致consumer循环等待。 缺点:速率固定,忽略了consumer的消费能力,可能导致拒绝服务或者网络拥塞等情况
kafka中高读写性能原因分析
原因两点:顺序写 + 零拷贝
kafka是一个文件系统,不基于内存,而是直接硬盘存储,因此消息堆积能力能强。 顺序写:利用磁盘的顺序访问速度可以接近内存,kafka的消息都是append操作,partition是有序的,节省了磁盘的寻道时间,同时通过批量操作节省了写入次数,partition(逻辑概念)物理上分为多个segment文件存储,方便删除 传统:
读取磁盘文件数据到内核缓冲区将内核缓冲区的数据copy到用户缓冲区将用户缓冲区的数据copy到socket的发送缓冲区将socket发送缓冲区中的数据发送到网卡、进行传输零拷贝:直接将内核缓冲区的数据发送到网卡传输,节省了数据在内核态与用户态直接的传递使用的是操作系统的指令支持kafka不太依赖jvm,主要是用的操作系统的pageCache(页存,之后会刷新到磁盘中),如果生产消费速率相当,则直接用pageCache交换数据,不需要经过磁盘IO
kafka高性能高吞吐的原因
1、磁盘顺序读写:保证了消息的堆积
顺序读写:磁盘会预读,即在读取的起始地址连续读取多个页面,主要时间花费在了传输时间,而这个时间两种读写可以认为是一样的随机读写,因为数据没有在一起,预读将会浪费时间,需要多次寻道和旋转延迟,而这个时间可能是传输时间的许多倍2、零拷贝:避免CPU将数据从一块存储拷贝到另外一块存储传统的数据拷贝:1、读取磁盘文件数据到内核缓冲区2、将内核缓冲区的数据copy到用户缓冲区3、将用户缓冲区的数据copy到socket的发送缓冲区4、将socket发送缓冲区的数据发送到网卡,进行传输零拷贝:磁盘文件->内核空间读取缓冲区->网卡接口->消费者进程3、分区分段 + 索引kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的,index文件。这种分区分段 + 索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度(有点类似与分段锁)4、批量压缩:存储不是直接存储原文,而是多条消息一起压缩,降低带宽。消费端收到消息后再解压5、批量读写6、直接操作的是pageCache,而不是JVM,避免GC耗时及对象创建耗时,且读写速度更高。进程重启缓存也不会丢失
kafka消息丢失的场景以及解决方案(重点)
1)消息发送时出现丢失的场景以及解决
1、ack = 0 ,不重试生产者发送消息完不管结果了,如果发送失败,消息也就丢失了2、ack = 1, leader 宕机了生产者发送消息完,只等待leader写入成功就返回了,但是leader之后宕机了,自此follower还没来得及同步,消息丢失3、unclean.leader.election.enable 配置true允许选举ISR以外的副本作为leader,也会导致数据丢失,默认为false。生产者发送异步消息之后,只等待leader写入成功就返回了,然后leader宕机了,这时ISR中没有follower,leader会从OSR中选举,因为OSR中的follower节点本身就落后与leader,就会造成消息丢失解决方案:1、配置:ack = all / -1, tries > 1, unclear.leader.election.enable : false生产者发送消息完,等待follower同步完再返回,如果异常则重试,副本的数量此时可能会影响吞吐量不允许选举ISR以外的副本作为leader2、配置:min.insync.replicas > 1,设置越大表示越可靠副本指定必须确认写操作成功的最小同步副本数量,如果不能满足这个最小值,则生产者将引发一个异常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)min.insync.replicas和ack是有区别的,min.insync.replicas(同步副本数量)指的是ISR中的数量必须要大于1ack = all / -1,表示ISR中的所有节点全部要确认此间还存在一个隐性的逻辑关系,只有ack = all / -1,那么min.insync.replicas才会生效。所以这两个参数要搭配着来使用,这样就可以确保如果大多数副本没有收到写操作,则生产者将引起异常。3、失败的offset单独记录生产者发送消息,会自动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存,进行单独处理
2)消费端
1、先commit offset再处理消息,如果再处理消息的时候出现异常了,但是offset已经提交了,这条消息对于该消费者来说就是丢失的,再也不会消费到了.2、先处理消息,处理完了再commit,有可能存在重复消费的情况。在处理完这条消息之后,还没来得及commit,就宕机了,重启之后还回去消费这条消息。解决方案:先做业务处理,再去commit,如果出现重复消费,就只需要保证接口的幂等性就行了
3)broker端的刷盘 从生产者发送出来的消息实际上是缓存在broker的pageCache上的,然后linux保证pageCache上的数据被刷入硬盘中。如果linux此时宕机了,那么就会有部分pageCache上的数据丢失了。 于是可以通过配置参数,减少系统刷盘间隔
kafka为什么比RocketMQ的吞吐量高
kafka的生产者采用的是异步发送消息机制,当发送一条消息时,消息并没有发送到broker节点上,而是先缓存起来,然后直接向业务返回成功,当缓存的消息积累到一定数量时再批量发送给broker。这种做法减少了网络io,从而提高了消息发送的吞吐量,但是如果消息生产者产生了宕机,会导致消息丢失,业务出错,所以理论上来说kafka利用此机制提高了性能却降低了可靠性。
kafka、ActiveMQ、RabbitMQ、RocketMQ对比
站在应用的角度来看:ActiveMQ:JMS规范,支持事务、支持XA协议,没有生产大规模支撑场景、官方维护越来越少RabbitMQ:erlang语言开发、性能好、高并发,支持多种语言,社区、文档方面有优势,erlang语言不利于java二次开发,依赖开源社区的维护和升级,需要学习AMP协议,学习成本相对较高 以上吞吐量单机都在万级kafka:高性能、高可用,生产环境有大规模使用场景,单机容量有限(超过64个分区响应明显变长)、社区更新慢 吞吐量单机百万RocketMQ:java实现,方便二次开发,设计参考了kafka,高可用、高可靠,社区活跃度一般,支持语言较少。 吞吐量单机十万
《RabbitMQ篇》
RabbitMQ架构设计
connection:与MQ交互是通过connection,需要建立一个TCP连接,一个connection里面可以开多个信道(channel),这些信道会复用这个TCP连接 。Broker:rabbitmq的服务节点Queue:队列,是RabbitMQ的内部对象,用于存储消息。RabbitMQ中消息只能存储在队列中,生产者投递消息到队列,消费者从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(轮询)给多个消费者进行消费,而不是每个消费者都收到所有的消息进行消费。(注意,RabbitMQ不支持队列层面的广播消费,如果需要广播消费,可以采用一个交换器通过路由Key绑定到多个队列,由多个消费者来订阅这些队列)Exchange:交换器,生产者将消息发送到Exchange,由交换器将消息路由到一个或多个队列中。交换器与不同的队列通过绑定键绑定RoutingKey:路由Key,生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。这个路由Key需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效(生产者指定的的RoutingKey会与BindingKey进行匹配,匹配规则与交换器类型有关)。在交换器类型和绑定键固定的情况下,生产者可以在发送消息给交换器时通过指定RoutingKey来决定消息流向哪里。消息发送流程: 生产者将routeKey、exchangeName、body通过信道传递到broker里面,根据exchangeName找到交换机,用该交换机的匹配规则将routeKey匹配到现有的BindingKey,如果匹配上了,将消息投放到对应的queue里面。消费流程:由Pull和Push两种方式 多个消费者消费同一个queue的话,queue里面的一条消息只会被一个消费者消费到 如果要发布订阅功能 ,生产者想要让多个消费者收到同一个消息,只需要通过交换器分发到多个queue上去即可。vhost:虚拟主机的概念。一个broker其实就是一个物理主机,vhost其实就是虚拟主机,可以在一个broker上建立多个vhost。每个vhost都包含着自己的Exchange和Queue。应用可以指定其中一个虚拟机,所以一个rabbitmq可以给多个不同的应用使用,同时也是应用隔离的
RabbitMQ的交换器类型
RabbitMQ的交换器类型决定了routeKey与BindingKey如何匹配,是精准匹配还是模糊匹配 有下面几种匹配规则:fanout:扇形交换器,不再判断routekey,直接将消息分发到所有绑定的队列
direct:判断routekey的规则是完全匹配模式,即发送消息时指定的routekey要等于绑定的routekey
topic:判断routekey的规则是模糊匹配模式
header:绑定队列与交换器的时候指定一个键值对,当交换器在分发消息的时候胡先解开消息体里面的headers数据,然后判断里面是否有所设置的键值对,如果发现匹配成功,才将消息分发到队列中。性能较差
RabbitMQ的普通集群模式
队列元数据:队列名称和它的属性交换器元数据:交换器名称、类型和属性绑定元数据:一张简单的表展示了如何将消息路由到队列vhost元数据:就是一个broker,为vhost内的队列、交换器和绑定提供命名空间和安全属性元数据每个节点都存了一份,是冗余的。消息的内容并没有每个节点都存,例如client1连节点1,那么queue1的消息内容只会存在节点1,不会同步到其他节点。所以某个节点宕机,就保证不了高可用。同步元数据,这样每个节点都可以对外服务,想去消费其他queue时可以通过路由表去转发对应的请求。为什么只同步元数据:存储空间考虑,每一个节点都保存全量数据会影响消息堆积能力性能考虑,消息的发布者需要将消息复制到每一个集群节点客户端连接的是非队列数据所在节点:则该节点会进行路由转发,包括发送和消费集群节点类型:磁盘节点:将配置信息和元信息存储在磁盘上内存节点:将配置信息和元信息存储在内存上,性能优于磁盘节点,依赖磁盘节点进行持久化RabbitMQ要求集群中至少有一个磁盘节点,当节点加入和离开集群时,必须通知磁盘节点(如果集群中唯一的磁盘节点崩溃,则不能进行创建队列、创建交换器、创建绑定、添加用户、更改权限、添加和删除集群节点)。如果唯一磁盘的磁盘节点崩溃,集群是可以保持运行的,但是不能更改任何东西。因此建议在集群中设置两个磁盘节点,只要一个正常,系统就能正常工作。
RabbitMQ的镜像队列原理
RabbitMQ持久化机制
RabbitMQ持久化分为三个方面: 1、交换器持久化:exchange_declare创建交换器的时候通过参数指定 2、队列持久化:queue_declare创建队列时通过参数指定 3、消息持久化:new AMQPMessage创建消息时通过参数指定 持久化的时候是按照append的方式去写文件,会根据大小自动生成新的文件(例如一个log是16M,满了之后就会写新的log文件)。rabbitmq在启动的时候会创建两个进程,一个负责持久化消息的存储,另一个负责非持久化消息的存储(内存不够时) 消息存储时会在ets表中记录消息在文件中的映射以及相关信息(包括id、偏移量、有效数据、左边文件、右边文件),消息读取时根据该信息到文件中读取,同时更新信息。 消息删除时只从ets删除,变为垃圾数据,当垃圾数据超出比例(默认为50%),并且文件数达到3个,触发垃圾回收,锁定左右两个文件,整理左边文件有效数据,将右边文件有效数据写入左边,更新文件信息,删除右边,完成合并。当一个文件的有用数据等于0时,删除该文件。 写入文件前先写buffer缓冲区,如果buffer已经满了,则写入文件(此时知识操作系统的页存)。每隔25ms刷一次磁盘,不管buffer满没满都将buffer和页存的数据落盘。每次消息写入后,如果没有后续写入请求,则直接刷盘。
RabbitMQ事务消息
通过对channel的设置实现 1、channel.txSelect():通知服务器开启事务模式,服务端会返回Tx.Select.Ok 2、channel.basicPublish:发送消息,可以是多条可以是消费消息提交ack 3、channel.txCommit():提交事务 4、channel.txRollback():回滚事务 消费者使用事务: 1、autoAck = false,手动提交ack,以事务提交或回滚为准 2、autoAck = true,不支持事务,即使再收到消息后再回滚事务也是于事无补的,队列已经把消息移除了 如果其中任意一个环节出现问题,就会抛出IoException异常,用户可以拦截异常进行事务回滚,或决定要不要重复消息 事务消息会降低RabbitMQ的性能,因为每一条消息都意味着好几次连接
RabbitMQ如何保证消息的可靠性传输
1、使用事务消息 2、使用消息的确认机制(即ack) 发送方确认发送出去:
将channel设置为confirm模式,则从该channel上发出的每条消息都会被分配一个唯一id消息投递成功后,channel会发送ack给生产者,包含了id,回调ConfirmCallback接口(该接口是异步的)如果发生错误导致消息丢失,发送nack给生产者,回调ReturnCallback接口ack和nack只有一个触发,且只有一次,异步触发,可以继续发送消息
发送到MQ之后,做了持久化之后数据才会可靠。
接收方确认消费完了:
RabbitMQ的死信队列原理
死信队列里面放的是死信消息,下面是死信消息产生的原因: 1、消息被消费方否定确认,使用channel.basicNack或channel.basicReject,并且此时requeue属性被设置为false,表示直接丢弃(requeue为true的话会重复投递) 2、消息在队列的存活时间超过设置的TTL时间 3、消息队列的消息数量已经超过最大队列长度 如果满足上面条件,那么该消息将成为死信消息,如果配置了死信队列信息,那么该消息将会被丢入死信队列中,如果没有配置,则该消息将会被丢弃 为每个需要使用死信队列的业务队列配置一个死信交换机,同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的routeKey,死信队列只不过是绑定在死信交换机上的队列。 TTL:一条消息或者该该队列中所有消息的最大存活时间 如果一条消息设置了TTL属性或者进入设置TTL属性的队列,那么这条消息在TTL设置的时间内没有被消费,则会成为死信,如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用
RabbitMQ是否可以直连队列
《RocketMQ篇》
简述RocketMQ架构设计
该架构参考了kafka,NameServer类似于kafka中的zookeeper,queue类似于kafka中的partition。 kafka中,zk本身存在主从,主从之间也会有数据同步。NameServer则是一个去中心化的结构,每个NameServer之间互相独立,不进行互相通信。只要NameServer存在一个可用节点,那么NameServer就是可用的,它的作用主要就是为了维护路由信息,发送者是谁->发给哪个topic的哪个queue、broker是哪一个->消费者是谁。 注意这里的queue是不存在主从的,而kafka的partition是存在主从的。所以RocketMQ里面的queue是冗余的,有n个broker,就会冗余n-1个数据。这样的好处体现在负载均衡上,如果broker1宕机了,生产者queue1连不上,之前可能会去连queue2,但是此时它会直接去连接broker2的queue1,提高成功率, 每一个Broker要和每一个NameServer建立长连接,底层是由netty维护通信,broker会定期地将自己地topic信息注册到NameServer里。 生产者首先需要连接NameServer,去拉取topic所属地broker,然后直连broker,发送消息到topic的dqueue里面去。 消费者也是需要连接NameServer,去拉取topic所属地broker,然后直连broker,从topic的queue里面获取消息进行消费。
与broker的持久化相关的涉及到三个日志文件:CommitLog:存储的具体的消息内容,但是不区分topic,是顺序读写ConsumeQueue:是commitlog基于topic的索引文件,所以是先根据topic到这个文件里面找索引,然后拿着索引去CommitLog里面找具体内容,顺序存储IndexFile:通过key或时间区间来建立索引,也是commitlog的索引文件
简述RocketMQ持久化机制
commitlog:日志数据文件,被所有的queue共享,1G,写满之后重新生成,顺序写consumeQueue:逻辑queue,消息先到到commitlog,然后异步转发到consumeQueue,包含queue在commitlog种的物理位置偏移量offset,消息实体内容的大小和Message Tag的hash值。大小约为600W个字节,写满之后重新生成,顺序写indexFile:通过key或者时间区间来查找commitlog种的消息,文件名以创建的时间戳命名,固定的单个indexFile大小为400M,可以保存2000W个索引
所有队列共用一个日志数据文件,避免了kafka分区数过多、日志文件过多导致磁盘IO读写压力较大造成性能瓶颈。rocketmq的queue只存储少量数据、更加轻量化,对于磁盘的访问时串行化避免磁盘竞争,缺点在于:写入是顺序写,读是随机读,先读consumeQueue,再读commitlog会降低消息读的效率。
消息发送到broker之后,会被写入commitlog,写之前加锁,保证顺序写入,然后转发到consumeQueue。
消息消费时先从consumeQueue读取消息在Commitlog中的起始物理偏移量offset,消息大小和消息Tag的HashCode值,在从commitlog读取消息内容
同步刷盘,消息持久化到磁盘才会给生产者返回ack,可以保证消息可靠、但是回影响性能异步刷盘,消息写入pagecache就返回ack给生产者,刷盘采用异步线程,降低读写延迟提高性能和吞吐
RocketMQ怎么实现顺序消息
默认是不能保证的,需要程序保证发送和消费的是同一个queue,多线程消费也无法保证
发送顺序:发送端自己的业务逻辑保证先后,发往一个固定的queue,生产者可以在消息体上设置消息顺序
发送者实现MessageQueueSelector接口,选择一个queue进行发送,也可以使用rocketmq提供的默认实现:
SelectMessageQueueByHash:按参数的hashcode与可选队列进行求余选择SelectMessageQueueByRandom:随机选择
mq:queue本身就是顺序追加写,只需要保证一个队列同一时间只有一个consumer消费,通过加锁实现,consumer上的顺序消费有一个定时任务、每隔一定时间向broker发送请求延长锁定
消费端:
pull模式:消费者需要自己维护需要拉取的queue,一次拉取的消息都是顺序的,需要消费端自己保证顺序消费
push模式:消费实例实现自己的MQPushConsumer接口,提供注册监听的方法消费消息,registerMessageListener、重载方法。
MessageListenerConcurrently:并行消费MessageListenerOrderly:串行消费,consumer会把消息放入本地队列并加锁,定时任务保证锁的同步
RocketMQ的底层实现原理
RocketMQ由NameServer集群、Producer集群、Consumer集群、Broker集群组成,消息生产和消费的大致原理如下:
1、Broker在启动的时候向所有的NameServer注册,并保持长连接,每30s发送一次心跳
2、Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息
3、Consumer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费
RocketMQ如何保证不丢失消息
生产者:
同步阻塞的方式发送消息,加上失败重试机制,可能broker存储失败,可以通过查询确认异步发送需要重写回调方法,检查发送结果ack机制,可能存储commitlog,存储consumerQueue失败,此时对消费者不可见
broker:同步刷盘、集群模式下采用同步复制、会等待slave复制完成才会返回确认
消费者:
offset手动提交,消息消费保证幂等
《MQ总结篇》
如何设计一个MQ
好的方式:
1、从整体到细节,从业务场景到技术实现
2、以现有产品为基础
实现:
1、先实现一个单机的先进先出的数据结构,对message设计封装。要高效、可扩展以及收缩
2、将单机队列扩展成为分布式队列,涉及到分布式集群管理,如zookeeper、NameServer
3、基于Topic定制路由策略(从生产者到消费者的完整链路): 发送者路由策略、消费者与队列对应关系、消费者路由策略
4、实现高效的网络通信。-> Netty、Http
5、规划日志文件,实现文件高效读写(零拷贝+顺序写)服务重启后,快速还原运行现场
6、定制高级功能,死信队列、延迟队列、事务消息等等。(需要贴合业务实际)
参考:
如何设计一个MQ
如何进行产品选型
kafka: 优点:吞吐量非常大,性能非常好,集群高可用 缺点:会丢失数据,功能单一。不具备死信队列等高级功能 使用场景:数据量大,频繁,且允许丢失数据:日志分析、大数据采集
RabbitMQ: 优点:消息可靠性高,功能全面 缺点:吞吐量比较低,并发性不高,消息积累会严重影响性能。适合在消息来了立马消费的场景使用。erlang开发,语言不好定制 使用场景:小规模场景
RocketMQ: 优点:高吞吐,高性能,高可用,功能全面的 缺点:开源版本功能不如云上商业版本。官方文档和周边生态不成熟。客户端只支持java 使用场景:几乎是全场景
如何保证消息的顺序
参考
1天刷完面试核心45问消息队列面试题(Kafka&RabbitMQ&RocketMQ)44讲
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~