RocketMQ源码解析:主从同步和读写分离实现

网友投稿 873 2022-09-02

RocketMQ源码解析:主从同步和读写分离实现

RocketMQ源码解析:主从同步和读写分离实现

启动HAService

图片来自RocketMQ的官微,对整个主从同步实现的过程概括的比较清晰,对着图说一下具体的流程

master和slave端都会启动HAService,slave端的HAClient会和broker端的HAService建立连接master端的AcceptSocketService用来处理slave端的连接,并将连接封装成HAConnection,一个连接封装成一个HAConnection每个HAConnection会启动2个线程WriteSocketService和ReadSocketServiceWriteSocketService,读取salve已经同步的offset并保存。ReadSocketService,发送commitLog数据

源码解析

主从同步

之前的文章我们已经分析了往内存中存储消息,以及刷盘的过程,我们接着看最后一步,主从同步。

在RocketMQ中主从同步有两种方式

同步,感觉和mysql中的半同步差不多,只要消息写入一个slave即可异步,用户不用等到消息发送到slave即可收到ack

从代码中可以看出来,输盘和主从同步是同时进行(并不是刷盘完毕才进行主从同步) 写个例子演示一下

public CompletableFuture getResult() { CompletableFuture result1 = new CompletableFuture(); CompletableFuture result2 = new CompletableFuture(); new Thread(() -> { sleepRandom(); result1.complete(1); }).start(); new Thread(() -> { sleepRandom(); result2.complete(2); }).start(); return result1.thenCombine(result2, (num1, num2) -> { return num1 + num2; });}@Testpublic void demo() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); CompletableFuture future = getResult(); // 线程阻塞在这里等待结果,直到等到结果3 System.out.println(future.get()); // 2145 System.out.println(System.currentTimeMillis() - start);}

例子中的2个线程就相当于一个处理刷盘,一个处理主从同步

处理同步的套路和刷盘的套路差不多,将同步请求放到阻塞队列中,然后GroupTransferService不断处理这些请求,请求处理完毕则唤醒对应的线程

CommitLog#submitReplicaRequest

当slave和master相差太多的时候,会返回SLAVE_NOT_AVAILABLEGroupTransferService#doWaitTransfer

doWaitTransfer会不断的将push2SlaveMaxOffset(slave同步的最大偏移量,多个slave同时更新这一个值)和req.getNextOffset(当前消息存储完的偏移量)进行比较,如果大于说明至少有一个salve同步完成了,唤醒阻塞的线程即可

slave端的HAClient会不断上传同步偏移量,并读取master传送过来的commitlog

HAService.HAClient#run

master端的AcceptSocketService会将每个slave端的连接封装成HAConnectionHAService.AcceptSocketService#run

ReadSocketService会不断读取slave同步过来的offset并保存下来HAConnection.ReadSocketService#run

WriteSocketService则不断同步commitlog数据给slave。

本文只是梳理了一下整体流程,各种读写的流程涉及到大量的nio的api,想说明白得花费不少时间,单开一篇文章把

读写分离

RocketMQ的读写分离和其他中间件不太一样,因为在消费消息的过程中,RocketMQ有时会从master节点读取,有时会从slave节点读取。

那么读取的节点是如何确定的呢?刚开始的时候消费者从master节点读取,当要拉取的偏移量和现在的最大的偏移量相差过大时,就改为从slave拉取

为什么要这么实现呢?

当拉取的偏移量相差不大时,消息很大概率还在pagecache中,读取效率很高。当拉取的偏移量比较大时,消息很大概率被刷回磁盘了,此时拉取的话就会发生磁盘io

DefaultMessageStore#getMessage

当偏移量的差大于物理内存的40%时,就改为从slave拉取,返回的时候设置下次拉取的brokerIdPullMessageProcessor#processRequest

当Consumer端收到消息后,会回调PullCallback实现类(这部分内容我们在后面会详细解释的),接着调用PullAPIWrapper#processPullResult方法DefaultMQPushConsumerImpl.PullCallback

PullAPIWrapper#processPullResult方法又会调用PullAPIWrapper#updatePullFromWhichNode

,这个方法会将broker端返回的下次要拉取消息的brokerId缓存下来PullAPIWrapper#updatePullFromWhichNode

当再次执行消息拉取的时候,用的就是缓存下来的brokerId对应的地址,至此实现读写分离

参考博客

读写分离 [1]https://objcoding.com/2019/09/22/rocketmq-read-write-separation/

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:RocketMQ源码解析:高性能存储策略
下一篇:Linux下 php7安装redis的方法(linux下的dns功能是通过什么实现的)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~