app开发者平台在数字化时代的重要性与发展趋势解析
989
2022-10-07
RocketMQ源码解析:事务消息是如何实现的?
用RocketMQ事务消息实现分布式事务
在分布式系统中为了保证数据的一致性,通常要使用分布式事务。分布式事务的解决方案有很多,比如TCC,SAGA,今天我们就来看一下如何用RocketMQ事务消息实现分布式事务?
RocketMQ实现分布式事务的流程如下
producer向mq server发送一个半消息mq server将消息持久化成功后,向发送方确认消息已经发送成功,此时消息并不会被consumer消费producer开始执行本地事务逻辑producer根据本地事务执行结果向mq server发送二次确认,mq收到commit状态,将消息标记为可投递,consumer会消费该消息。mq收到rollback则删除半消息,consumer将不会消费该消息,如果收到unknow状态,mq会对消息发起回查在断网或者应用重启等特殊情况下,步骤4提交的2次确认有可能没有到达mq server,经过固定时间后mq会对该消息发起回查producer收到回查后,需要检查本地事务的执行状态producer根据本地事务的最终状态,再次提交二次确认,mq仍按照步骤4对半消息进行操作
可以看到RocketMQ事务消息解决了Producer端事务执行和消息发送的一致性。并没有解决Producer端事务执行,消息发送和消息消费的一致性问题
当我们用RocketMQ实现分布式事务时,只需要实现TransactionListener接口即可,接口的2个方法作用如下
executeLocalTransaction,执行本地事务checkLocalTransaction,回查本地事务状态
用官方的Demo演示一下具体的用法,写一个Producer类
public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue
写一个TransactionListener接口的实现类
public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap
针对这个例子,所有的消息都会回查,因为executeLocalTransaction返回的都是UNKNOW,回查的时候status=1的数据会被消费,status=2的数据会被删除,status=0的数据会一直回查,直到超过默认的回查次数。
看到这,可能有人会问了,我们先执行本地事务,执行成功后再发送消息,不也可以实现类似的功能吗
其实这样做还是有可能会造成数据不一致的问题。假如本地事务执行成功,发送消息,由于网络延迟,消息发送成功,但是回复超时了,抛出异常,本地事务回滚。但是消息其实投递成功并被消费了,此时就会造成数据不一致的情况。
消息发送成功,但是提交事务失败了,例如事务提交超时,然后回滚,或者还没提交JVM宕机了,也会造成数据不一致
那消息投递到mq server,consumer消费失败怎么办?
如果是消费超时,重试即可。如果是由于代码等原因真的消费失败了,此时就得人工介入,重新手动发送消息,达到最终一致性。
源码解析
我将事务消息的执行流程画了如下一个执行流程图
Producer端处理流程
// DefaultMQProducerImpl#sendMessageInTransactionpublic TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 添加属性,表明消息为 prepare 消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); // 设置消息所属生产组,设置生产组的目的是在查询事务消息本地事务状态时,从生产组中随机选择一个生产者即可 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } // 发送者成功发送PREPARED消息后,执行本地事务方法 if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { // 结束事务,提交或回滚,向rocketmq发送请求 this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult;}
事务消息的发送流程比较简单
首先给消息添加属性,表明这是一个半消息发送消息,如果消息发送成功,则执行本地事务根据本地事务的执行情况,向broker端发送结束事务的请求(commit rollback unknow)
Broker端处理流程
prepare消息存储
SendMessageProcessor#asyncSendMessage
当消息是一个半消息的时候,备份原来的topic和queueId到消息属性中,然后重新设置topic=RMQ_SYS_TRANS_HALF_TOPIC,queueId=0,存储到commitLog中,这样消费者就消费不了这条消息
接收结束事务请求
borker端通过EndTransactionProcessor来接收二阶段的消息,提交事务消息或者回滚事务消息
EndTransactionProcessor#processRequest
提交事务消息和回滚事务消息的逻辑差不多,先说提交事务消息的逻辑
根据消息偏移量从commitLog中找到消息(topic=RMQ_SYS_TRANS_HALF_TOPIC,queueId=0),然后将消息的topic和queueId重新设置为它原来的(存储的时候原来的topic和queueId已经备份到消息属性中了哈)将消息重新存在commitLog中,这样消息就能被消费了哈然后将消息存储在 topic 为 RMQ_SYS_TRANS_OP_HALF_TOPIC 的队列中,代表消息已经被处理(提交或回滚)
回滚事务失效的逻辑如下
根据消息偏移量从commitLog中找到消息(topic=RMQ_SYS_TRANS_HALF_TOPIC,queueId=0)然后将消息存储在 topic 为 RMQ_SYS_TRANS_OP_HALF_TOPIC 的队列中,代表消息已经被处理(提交或回滚)
区别就是提交事务消息需要把原始的消息重新放到commitLog中,而回滚事务消息就不用
事务状态回查
TransactionalMessageCheckService#onWaitEnd
事务状态回查就是通过RMQ_SYS_TRANS_HALF_TOPIC(半消息存储的topic)和RMQ_SYS_TRANS_OP_HALF_TOPIC(已经处理的半消息的topic)来确定那些需要回查的半消息,根据回查结果确实是否要投递消息
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~