EasyTransaction可靠消息机制:保证消息最终一致性的完整指南
EasyTransaction可靠消息机制保证消息最终一致性的完整指南【免费下载链接】EasyTransactionA distribute transaction solution分布式事务 unified the usage of TCC , SAGA ,FMT (seata/fescar AutoCompensation) reliable message, compensate and so on;项目地址: https://gitcode.com/gh_mirrors/ea/EasyTransactionEasyTransaction是一款强大的分布式事务解决方案它统一了TCC、SAGA、FMTseata/fescar自动补偿、可靠消息、补偿等多种事务模式的使用方式。本文将重点介绍如何利用EasyTransaction的可靠消息机制轻松实现分布式系统中的消息最终一致性确保业务数据的准确性和可靠性。一、什么是可靠消息机制可靠消息机制是分布式系统中保障数据一致性的关键技术之一。它通过一系列机制确保消息在生产、传输和消费过程中的可靠性即使在系统出现异常的情况下也能保证消息最终被正确处理。在EasyTransaction中可靠消息机制具有以下特点事务提交后发送消息会在本地事务成功提交后才被发送出去持久化存储消息会进行持久化存储防止系统崩溃导致消息丢失重试机制提供完善的消息重试机制确保消息最终被消费二、可靠消息与最大努力交付消息的区别EasyTransaction提供了两种消息投递模式需要根据业务场景选择合适的模式2.1 可靠消息Reliable Message可靠消息保证消息一定会被发布出去即使在发布过程中出现异常系统也会进行重试直到消息成功发布。// 可靠消息示例代码 OrderMessage orderMessage new OrderMessage(); orderMessage.setUserId(userId); orderMessage.setAmount(money); FuturePublishResult reliableMessage transaction.execute(orderMessage);可靠消息适用于对消息可靠性要求极高的场景如金融交易、订单状态更新等核心业务流程。2.2 最大努力交付消息Best Effort Message最大努力交付消息不保证消息一定被发布出去它不需要持久化因此速度更快。适用于对消息可靠性要求不高的场景。// 最大努力交付消息示例代码 NotReliableOrderMessage notReliableMessage new NotReliableOrderMessage(); notReliableMessage.setUserId(userId); notReliableMessage.setAmount(money); transaction.execute(notReliableMessage);三、如何使用EasyTransaction可靠消息机制3.1 引入依赖首先需要在项目的pom.xml中引入EasyTransaction相关依赖。具体的依赖配置可以参考项目中的示例模块如easytrans-demo/tcc-and-msg/pom.xmleasytrans-demo/log-redis/pom.xml3.2 配置消息队列EasyTransaction支持多种消息队列实现如Kafka、ONS等。你需要根据项目需求选择合适的队列实现并进行相应配置。以Kafka为例相关配置类位于easytrans-queue-kafka-starter/src/main/java/com/yiqiniu/easytrans/queue/impl/kafka/KafkaQueueProperties.java3.3 创建消息对象创建消息对象定义需要传递的数据字段public class OrderMessage implements ReliableMessagePublishRequest { private Long userId; private BigDecimal amount; // getter和setter方法 }3.4 发送可靠消息在业务方法中通过EasyTransaction的事务管理器发送可靠消息Transactional public void createOrder(Long userId, BigDecimal money) { // 本地业务逻辑创建订单 Order order new Order(); order.setUserId(userId); order.setAmount(money); orderMapper.insert(order); // 发送可靠消息 OrderMessage orderMessage new OrderMessage(); orderMessage.setUserId(userId); orderMessage.setAmount(money); transaction.execute(orderMessage); }3.5 消费消息创建消息消费者处理接收到的消息Component public class OrderMessageConsumer implements EasyTransMsgListenerOrderMessage { Override public void onMessage(OrderMessage message) { // 处理消息如增加用户积分、发送通知等 pointService.addPoints(message.getUserId(), message.getAmount().intValue()); } }四、可靠消息机制的实现原理EasyTransaction的可靠消息机制主要通过以下几个组件实现4.1 消息存储消息存储相关的实现类位于easytrans-log-database-starter/src/main/java/com/yiqiniu/easytrans/log/impl/database/消息会先被存储到数据库中确保在事务提交前消息不会丢失。4.2 消息发送器消息发送器的实现位于easytrans-queue-kafka-starter/src/main/java/com/yiqiniu/easytrans/queue/impl/kafka/KafkaEasyTransMsgPublisherImpl.java事务提交后发送器会从数据库中读取消息并发送到消息队列。4.3 消息消费者消息消费者的实现位于easytrans-queue-kafka-starter/src/main/java/com/yiqiniu/easytrans/queue/impl/kafka/KafkaEasyTransMsgConsumerImpl.java消费者负责从消息队列中获取消息并调用相应的处理方法。4.4 消息重试机制如果消息消费失败EasyTransaction会自动进行重试。相关的重试逻辑可以在以下类中找到easytrans-core/src/main/java/com/yiqiniu/easytrans/recovery/ConsistentGuardianDaemon.java五、实际应用场景可靠消息机制在分布式系统中有广泛的应用例如5.1 订单创建后发送通知当用户创建订单后发送可靠消息通知库存系统减少库存通知物流系统准备发货。5.2 支付完成后更新订单状态支付系统完成支付后发送可靠消息通知订单系统更新订单状态通知积分系统增加用户积分。5.3 跨系统数据同步当一个系统的数据发生变更时通过可靠消息通知其他相关系统进行数据同步。六、使用注意事项消息幂等性由于消息可能会被重试因此消息处理逻辑必须保证幂等性即多次处理同一消息不会产生副作用。消息顺序EasyTransaction的可靠消息机制不保证消息的顺序如果业务需要严格的消息顺序需要额外处理。消息积压需要监控消息队列的状态避免消息积压影响系统性能。相关的监控实现可以参考easytrans-core/src/main/java/com/yiqiniu/easytrans/monitor/事务边界确保消息发送操作在事务边界内执行以保证本地事务和消息发送的原子性。七、总结EasyTransaction的可靠消息机制为分布式系统提供了简单而强大的消息一致性解决方案。通过本文的介绍你应该已经了解了可靠消息的基本概念、使用方法和实现原理。无论是构建电商平台、支付系统还是其他分布式应用EasyTransaction都能帮助你轻松解决消息一致性问题提高系统的可靠性和稳定性。如果你想深入了解更多关于EasyTransaction的功能可以参考项目中的示例代码如easytrans-demo/tcc-and-msg/easytrans-starter/src/test/java/com/yiqiniu/easytrans/test/mockservice/order/OrderService.java开始使用EasyTransaction让分布式事务变得简单 【免费下载链接】EasyTransactionA distribute transaction solution分布式事务 unified the usage of TCC , SAGA ,FMT (seata/fescar AutoCompensation) reliable message, compensate and so on;项目地址: https://gitcode.com/gh_mirrors/ea/EasyTransaction创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考