RocketMQ 分布式事务一致性详解(含 Java 完整代码)

张开发
2026/5/11 18:17:51 15 分钟阅读

分享文章

RocketMQ 分布式事务一致性详解(含 Java 完整代码)
目录一、核心原理RocketMQ 事务消息机制1. 执行流程4 步必懂2. 一致性保证二、环境准备三、Java 完整代码实现1. 常量配置2. 事务生产者核心分布式事务发起方1自定义事务监听器核心逻辑2事务生产者启动类3. 事务消费者积分服务四、运行测试步骤五、运行日志清晰看到事务流程生产者日志消费者日志异常场景事务回查六、关键知识点总结1. 为什么能保证分布式事务一致性2. 生产环境必须注意3. 适用场景总结RocketMQ 是业界主流的分布式事务解决方案基于可靠消息 事务消息实现最终一致性分布式事务首选方案性能远优于 2PC/3PC。一、核心原理RocketMQ 事务消息机制RocketMQ 分布式事务 半消息 事务状态回查 确认 / 回滚它解决的核心问题本地事务执行成功 → 消息必须投递成功本地事务失败 → 消息绝不投递保证分布式系统数据一致。1. 执行流程4 步必懂生产者发送 半消息Half Message消息先发送到 MQ但对消费者不可见MQ 只确认消息已存储。生产者执行本地事务比如扣减余额、创建订单。生产者向 MQ 提交事务状态本地事务成功 → 发送Commit半消息变为可消费消息本地事务失败 → 发送RollbackMQ 删除半消息MQ 事务状态回查兜底机制如果第 3 步网络中断MQ 会主动回调生产者查询本地事务最终状态保证数据不丢不乱。2. 一致性保证最终一致性本地事务成功 → 消息一定被消费无数据丢失回查机制解决网络超时 / 宕机问题无重复消费消费者做幂等控制即可二、环境准备启动 RocketMQ 4.9 / 5.x 服务Maven 依赖SpringBoot RocketMQ!-- RocketMQ 客户端 -- dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-client/artifactId version4.9.7/version /dependency三、Java 完整代码实现我们模拟订单创建 积分增加的分布式事务场景订单服务事务生产者创建订单本地事务积分服务消费者收到消息后增加积分1. 常量配置import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.TransactionMQProducer; public class RocketMQConstant { // NameServer 地址 public static final String NAME_SERVER 127.0.0.1:9876; // 事务消息 Topic public static final String TRANSACTION_TOPIC Topic_Transaction_Order; // 生产者组事务消息必须指定组 public static final String PRODUCER_GROUP Tx_Producer_Group; // 消费者组 public static final String CONSUMER_GROUP Tx_Consumer_Group; }2. 事务生产者核心分布式事务发起方必须使用TransactionMQProducer并实现事务监听器。1自定义事务监听器核心逻辑import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.util.concurrent.ConcurrentHashMap; /** * 事务监听器实现本地事务执行 事务回查 */ public class OrderTransactionListener implements TransactionListener { // 模拟数据库存储事务状态生产环境用DB private static final ConcurrentHashMapString, Boolean TRANSACTION_DB new ConcurrentHashMap(); /** * 第一步执行本地事务创建订单 */ Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String orderId msg.getKeys(); try { System.out.println( 开始执行本地事务创建订单订单ID orderId); // 模拟本地事务创建订单、扣减库存等 createOrder(orderId); // 本地事务执行成功 TRANSACTION_DB.put(orderId, true); System.out.println(本地事务执行成功订单ID orderId); // 返回 COMMIT消息对消费者可见 return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { System.out.println(本地事务执行失败订单ID orderId); TRANSACTION_DB.put(orderId, false); // 返回 ROLLBACK删除半消息 return LocalTransactionState.ROLLBACK_MESSAGE; } } /** * 第二步MQ 回调事务回查兜底机制 * 当executeLocalTransaction未返回结果时RocketMQ主动调用查询 */ Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId msg.getKeys(); System.out.println( RocketMQ 主动回查事务状态订单ID orderId); // 查询本地事务状态 Boolean isSuccess TRANSACTION_DB.get(orderId); if (isSuccess ! null isSuccess) { return LocalTransactionState.COMMIT_MESSAGE; } else { return LocalTransactionState.ROLLBACK_MESSAGE; } } // 模拟创建订单 private void createOrder(String orderId) { // 真实环境执行数据库操作 System.out.println(数据库插入订单记录订单ID orderId); } }2事务生产者启动类import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import java.nio.charset.StandardCharsets; import java.util.concurrent.*; /** * 事务消息生产者订单服务 */ public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { // 1. 创建事务生产者 TransactionMQProducer producer new TransactionMQProducer(RocketMQConstant.PRODUCER_GROUP); producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER); // 2. 创建线程池处理事务回调 ExecutorService executorService new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); // 3. 设置事务监听器 线程池 producer.setTransactionListener(new OrderTransactionListener()); producer.setExecutorService(executorService); // 4. 启动生产者 producer.start(); System.out.println( 事务生产者启动成功 ); try { // 5. 构建事务消息必须设置唯一KEY用于事务回查 String orderId ORDER_ System.currentTimeMillis(); Message message new Message( RocketMQConstant.TRANSACTION_TOPIC, Tag_Order, orderId, // 唯一KEY非常重要 (订单数据 orderId).getBytes(StandardCharsets.UTF_8) ); // 6. 发送事务消息半消息 producer.sendMessageInTransaction(message, null); System.out.println( 半消息发送成功 ); } catch (Exception e) { e.printStackTrace(); } // 保持程序运行 Thread.sleep(100000); producer.shutdown(); } }3. 事务消费者积分服务import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 事务消息消费者积分服务 * 功能收到订单消息 → 增加用户积分 */ public class TransactionConsumer { public static void main(String[] args) throws MQClientException { // 1. 创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(RocketMQConstant.CONSUMER_GROUP); consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER); // 2. 订阅Topic consumer.subscribe(RocketMQConstant.TRANSACTION_TOPIC, *); // 3. 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { String orderId msg.getKeys(); String body new String(msg.getBody()); System.out.println( 积分服务消费消息 ); System.out.println(订单ID orderId); System.out.println(消息内容 body); // 模拟增加积分 addUserPoints(orderId); System.out.println(积分增加成功\n); } catch (Exception e) { e.printStackTrace(); // 消费失败返回RECONSUME_LATERMQ会重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } // 消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 4. 启动消费者 consumer.start(); System.out.println( 事务消费者启动成功等待消息 ); } // 模拟增加积分 private static void addUserPoints(String orderId) { System.out.println(数据库用户积分 100关联订单 orderId); } }四、运行测试步骤启动 RocketMQ# 启动NameServer mqnamesrv.cmd # 启动Broker mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnabletrue启动消费者积分服务启动生产者订单服务五、运行日志清晰看到事务流程生产者日志 事务生产者启动成功 半消息发送成功 开始执行本地事务创建订单订单IDORDER_1735640000000 数据库插入订单记录订单IDORDER_1735640000000 本地事务执行成功订单IDORDER_1735640000000消费者日志 事务消费者启动成功等待消息 积分服务消费消息 订单IDORDER_1735640000000 消息内容订单数据ORDER_1735640000000 数据库用户积分 100关联订单ORDER_1735640000000 积分增加成功异常场景事务回查如果生产者发送半消息后宕机 / 断网RocketMQ 会自动触发事务回查 RocketMQ 主动回查事务状态订单IDORDER_1735640000000六、关键知识点总结1. 为什么能保证分布式事务一致性半消息机制先存消息再执行事务避免 “事务成功消息丢失”事务回查解决网络超时、服务宕机导致的状态未知问题最终一致性只要本地事务成功消息一定会被投递消费2. 生产环境必须注意消息必须带唯一 KEY订单 ID / 业务 ID用于事务回查本地事务状态必须持久化数据库不能用内存消费者做幂等控制避免重复增加积分消费失败返回 RECONSUME_LATER让 MQ 重试3. 适用场景订单 积分订单 物流支付 通知所有不需要强一致性、追求高性能的分布式业务总结RocketMQ 分布式事务基于事务消息实现最终一致性核心流程发半消息 → 执行本地事务 → 提交 / 回滚 → 事务回查Java 代码核心TransactionMQProducer 自定义TransactionListener生产环境可直接套用这套代码稳定可靠

更多文章