RocketMQ源码解析:事务消息是如何实现的?

网友投稿 929 2022-10-07

RocketMQ源码解析:事务消息是如何实现的?

RocketMQ源码解析:事务消息是如何实现的?

用RocketMQ事务消息实现分布式事务

在分布式系统中为了保证数据的一致性,通常要使用分布式事务。分布式事务的解决方案有很多,比如TCC,SAGA,今天我们就来看一下如何用RocketMQ事务消息实现分布式事务?

RocketMQ实现分布式事务的流程如下

producer向mq server发送一个半消息mq server将消息持久化成功后,向发送方确认消息已经发送成功,此时消息并不会被consumer消费producer开始执行本地事务逻辑producer根据本地事务执行结果向mq server发送二次确认,mq收到commit状态,将消息标记为可投递,consumer会消费该消息。mq收到rollback则删除半消息,consumer将不会消费该消息,如果收到unknow状态,mq会对消息发起回查在断网或者应用重启等特殊情况下,步骤4提交的2次确认有可能没有到达mq server,经过固定时间后mq会对该消息发起回查producer收到回查后,需要检查本地事务的执行状态producer根据本地事务的最终状态,再次提交二次确认,mq仍按照步骤4对半消息进行操作

可以看到RocketMQ事务消息解决了Producer端事务执行和消息发送的一致性。并没有解决Producer端事务执行,消息发送和消息消费的一致性问题

当我们用RocketMQ实现分布式事务时,只需要实现TransactionListener接口即可,接口的2个方法作用如下

executeLocalTransaction,执行本地事务checkLocalTransaction,回查本地事务状态

用官方的Demo演示一下具体的用法,写一个Producer类

public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); }}

写一个TransactionListener接口的实现类

public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); System.out.println("status " + status); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; }}

针对这个例子,所有的消息都会回查,因为executeLocalTransaction返回的都是UNKNOW,回查的时候status=1的数据会被消费,status=2的数据会被删除,status=0的数据会一直回查,直到超过默认的回查次数。

看到这,可能有人会问了,我们先执行本地事务,执行成功后再发送消息,不也可以实现类似的功能吗

其实这样做还是有可能会造成数据不一致的问题。假如本地事务执行成功,发送消息,由于网络延迟,消息发送成功,但是回复超时了,抛出异常,本地事务回滚。但是消息其实投递成功并被消费了,此时就会造成数据不一致的情况。

消息发送成功,但是提交事务失败了,例如事务提交超时,然后回滚,或者还没提交JVM宕机了,也会造成数据不一致

那消息投递到mq server,consumer消费失败怎么办?

如果是消费超时,重试即可。如果是由于代码等原因真的消费失败了,此时就得人工介入,重新手动发送消息,达到最终一致性。

源码解析

我将事务消息的执行流程画了如下一个执行流程图

Producer端处理流程

// DefaultMQProducerImpl#sendMessageInTransactionpublic TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 添加属性,表明消息为 prepare 消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); // 设置消息所属生产组,设置生产组的目的是在查询事务消息本地事务状态时,从生产组中随机选择一个生产者即可 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } // 发送者成功发送PREPARED消息后,执行本地事务方法 if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { // 结束事务,提交或回滚,向rocketmq发送请求 this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult;}

事务消息的发送流程比较简单

首先给消息添加属性,表明这是一个半消息发送消息,如果消息发送成功,则执行本地事务根据本地事务的执行情况,向broker端发送结束事务的请求(commit rollback unknow)

Broker端处理流程

prepare消息存储

SendMessageProcessor#asyncSendMessage

当消息是一个半消息的时候,备份原来的topic和queueId到消息属性中,然后重新设置topic=RMQ_SYS_TRANS_HALF_TOPIC,queueId=0,存储到commitLog中,这样消费者就消费不了这条消息

接收结束事务请求

borker端通过EndTransactionProcessor来接收二阶段的消息,提交事务消息或者回滚事务消息

EndTransactionProcessor#processRequest

提交事务消息和回滚事务消息的逻辑差不多,先说提交事务消息的逻辑

根据消息偏移量从commitLog中找到消息(topic=RMQ_SYS_TRANS_HALF_TOPIC,queueId=0),然后将消息的topic和queueId重新设置为它原来的(存储的时候原来的topic和queueId已经备份到消息属性中了哈)将消息重新存在commitLog中,这样消息就能被消费了哈然后将消息存储在 topic 为 RMQ_SYS_TRANS_OP_HALF_TOPIC 的队列中,代表消息已经被处理(提交或回滚)

回滚事务失效的逻辑如下

根据消息偏移量从commitLog中找到消息(topic=RMQ_SYS_TRANS_HALF_TOPIC,queueId=0)然后将消息存储在 topic 为 RMQ_SYS_TRANS_OP_HALF_TOPIC 的队列中,代表消息已经被处理(提交或回滚)

区别就是提交事务消息需要把原始的消息重新放到commitLog中,而回滚事务消息就不用

事务状态回查

TransactionalMessageCheckService#onWaitEnd

事务状态回查就是通过RMQ_SYS_TRANS_HALF_TOPIC(半消息存储的topic)和RMQ_SYS_TRANS_OP_HALF_TOPIC(已经处理的半消息的topic)来确定那些需要回查的半消息,根据回查结果确实是否要投递消息

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:IDEA中的maven没有dependencies解决方案
下一篇:最简单的微信小程序Demo(最简单的微信小程序怎么做)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~