RocketMq事务消息发送代码流程详解

网友投稿 880 2023-05-12

RocketMq事务消息发送代码流程详解

RocketMq事务消息发送代码流程详解

一、RocketMq事务消息流程:

1、首先会向broker发送一个预请求消息,消费者不可见

2、回调执行本地事务(比如操作数据库)

3、事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见。如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功。

二、RocketMq事务消息实例

1、引入rocketMq相关的依赖:

org.apache.rocketmq

rocketmq-client

4.4.0

2、创建一个TransactionProducer类:

public class TransactionProducer {

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {

//创建生产者并制定组名

TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group");

//2.指定Nameserver地址

producer.setNamesrvAddr("192.168.***.***:9876");

//3、指定消息监听对象用于执行本地事务和消息回查

TransactionListener listener = new TransactionListenerImol();

producer.setTransactionListener(listener);

//4、线程池

ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

Thread thread = newThread(r);

thread.setName("client-tanscation-msg-check-thread");

return thread;

}

});

producer.setExecutorService(executorService);

//5、启动producer

producer.start();

//6.创建消息对象,指定主题Topic、Tag和消息体 String topic, String tags, String keys, byte[] body

Message message = new Message("Topic_transaction_demo", //主题

"Tags", //主要用于消息过滤

"Key_1", //消息唯一值

("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));

//7、发送事务消息

TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");

producer.shutdown();

}

}

3、发送事务消息还需要一个事务监听对象,它实现TransactionListener 接口,其中有两个方法作用分别是执行本地事务和消息回查:

public class TransactionListenerImol implements TransactionListener {

//存储事务状态信息 key:事务id value:当前事务执行的状态

private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();

//执行本地事务

@Override

public LocalTransactionState executeLocalTransaction(Message message, Object o) {

//事务id

String transactionId = message.getTransactionId();

//0:执行中,状态未知 1:执行成功 2:执行失败

localTrans.put(transactionId, 0);

//业务执行,本地事务,service

System.out.println("hello-demo-transaction");

try {

System.out.println("正在执行本地事务---");

Thread.sleep(60000*2);

System.out.println("本地事务执行成功---");

localTrans.put(transactionId, 1);

} catch (InterruptedException e) {

e.printStackTrace();

localTrans.put(transactionId, 2);

return LocalTransactionState.ROLLBACK_MESSAGE;

}

return LocalTransactionState.COMMIT_MESSAGE;

}

//消息回查

@Override

public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

//获取对应事务的状态信息

String transactionId = messageExt.getTransactionId();

//获取对应事务id执行状态

Integer status = localTrans.get(transactionId);

//消息回查

System.out.println("消息回查---transactionId:" + transactionId + "状态:" + status);

switch (status) {

case 0:

return LocalTransactionState.UNKNOW;

case 1:

return LocalTransactionState.COMMIT_MESSAGE;

case 2:

return LocalTransactionState.ROLLBACK_MESSAGE;

}

return LmMbCoPiXocalTransactionState.UNKNOW;

}

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:IDEA中使用Docker Compose容器编排的实现
下一篇:Spring Bean常用依赖注入方式详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~