分布式系统架构:分布式事务与最终一致性的工程实践
分布式系统架构分布式事务与最终一致性的工程实践一、分布式事务的现实困境一致性不是免费的在单体架构中事务由数据库的 ACID 机制保证开发者几乎不需要关心一致性问题。但进入微服务架构后一个业务操作往往跨越多个服务每个服务拥有独立的数据库。经典的例子是下单扣库存订单服务创建订单库存服务扣减库存支付服务冻结金额。如果库存扣减成功但订单创建失败就会出现库存少了但订单不存在的数据不一致。两阶段提交2PC是理论上最直接的解决方案但在微服务架构中几乎不被采用。原因有三第一2PC 要求所有参与者同步阻塞等待任意一个参与者的延迟都会拖慢整个事务第二协调者单点故障会导致所有参与者永久阻塞第三跨服务的 2PC 违反了服务自治原则——库存服务不应该因为订单服务的决策而阻塞自己的资源。最终一致性是微服务架构中的务实选择允许短暂的数据不一致但保证经过一段时间后所有副本的数据最终达到一致状态。核心问题从如何保证强一致转变为如何设计补偿机制让不一致状态可恢复。sequenceDiagram participant O as 订单服务 participant I as 库存服务 participant P as 支付服务 participant E as 事件总线 O-O: 1. 创建订单状态PENDING O-E: 2. 发布 OrderCreatedEvent E-I: 3. 库存预扣减 I-I: 4. 扣减成功 I-E: 5. 发布 StockReservedEvent E-P: 6. 冻结支付金额 P-P: 7. 冻结成功 P-E: 8. 发布 PaymentFrozenEvent E-O: 9. 确认订单状态CONFIRMED Note over O,P: 异常路径库存不足 I-I: 4a. 扣减失败 I-E: 5a. 发布 StockReserveFailedEvent E-O: 6a. 取消订单状态CANCELLED E-P: 7a. 释放冻结金额二、Saga 模式编排式与协调式的工程选择Saga 模式是分布式事务的主流实现方案分为两种编排方式。**编排式 SagaChoreography**通过事件驱动实现服务间的协作。每个服务完成本地事务后发布事件其他服务监听事件并执行相应操作。编排式的优势在于服务间完全解耦没有中心化的协调器劣势在于业务流程分散在各服务的事件处理器中难以全局理解。**协调式 SagaOrchestrator**引入一个中心化的 Saga 协调器负责按顺序调用各服务并在失败时触发补偿操作。协调式的优势在于流程逻辑集中易于理解和监控劣势在于协调器可能成为单点且服务间存在一定的耦合。生产环境中对于简单流程2-3 个步骤编排式更轻量对于复杂流程5 个以上步骤、有条件分支协调式更可控。两种方式可以混合使用主流程用协调式子流程用编排式。三、Saga 协调器的代码实现以下实现展示了基于状态机的 Saga 协调器支持正向执行、补偿回滚和超时处理。from dataclasses import dataclass, field from enum import Enum from typing import Optional, Callable, Any from datetime import datetime, timedelta import asyncio class SagaStepStatus(Enum): PENDING pending EXECUTING executing COMPLETED completed COMPENSATING compensating COMPENSATED compensated FAILED failed class SagaStatus(Enum): RUNNING running COMPLETED completed COMPENSATING compensating COMPENSATED compensated FAILED failed dataclass class SagaStep: Saga 步骤定义 name: str action: Callable # 正向操作 compensation: Callable # 补偿操作 timeout: timedelta timedelta(seconds30) status: SagaStepStatus SagaStepStatus.PENDING result: Optional[Any] None error: Optional[str] None dataclass class SagaDefinition: Saga 定义有序步骤列表 saga_id: str saga_type: str steps: list[SagaStep] field(default_factorylist) status: SagaStatus SagaStatus.RUNNING current_step_index: int 0 created_at: datetime field(default_factorydatetime.now) updated_at: datetime field(default_factorydatetime.now) class SagaOrchestrator: Saga 协调器管理分布式事务的执行与补偿 def __init__(self, saga_store, event_bus): self.store saga_store # 持久化存储用于故障恢复 self.event_bus event_bus # 事件总线用于通知外部系统 async def execute(self, saga: SagaDefinition) - SagaDefinition: 执行 Saga按顺序执行正向操作失败时触发补偿 try: # 正向执行依次执行每个步骤 for i in range(saga.current_step_index, len(saga.steps)): step saga.steps[i] saga.current_step_index i step.status SagaStepStatus.EXECUTING self._save(saga) try: # 带超时执行正向操作 step.result await asyncio.wait_for( step.action(), timeoutstep.timeout.total_seconds(), ) step.status SagaStepStatus.COMPLETED except asyncio.TimeoutError: step.status SagaStepStatus.FAILED step.error fStep {step.name} timed out after {step.timeout} raise except Exception as e: step.status SagaStepStatus.FAILED step.error str(e) raise self._save(saga) # 所有步骤执行成功 saga.status SagaStatus.COMPLETED self._save(saga) await self.event_bus.publish( fsaga.{saga.saga_type}.completed, {saga_id: saga.saga_id}, ) return saga except Exception: # 正向执行失败开始补偿 saga.status SagaStatus.COMPENSATING self._save(saga) return await self._compensate(saga) async def _compensate(self, saga: SagaDefinition) - SagaDefinition: 补偿回滚逆序执行已完成步骤的补偿操作 # 从当前步骤向前回溯对已完成的步骤执行补偿 for i in range(saga.current_step_index, -1, -1): step saga.steps[i] if step.status ! SagaStepStatus.COMPLETED: continue # 跳过未完成的步骤 step.status SagaStepStatus.COMPENSATING self._save(saga) try: # 补偿操作也需要超时保护 await asyncio.wait_for( step.compensation(), timeoutstep.timeout.total_seconds(), ) step.status SagaStepStatus.COMPENSATED except Exception as e: # 补偿失败记录错误但不中断继续尝试补偿其他步骤 step.status SagaStepStatus.FAILED step.error fCompensation failed: {e} self._save(saga) # 检查是否所有补偿都成功 all_compensated all( s.status in (SagaStepStatus.COMPENSATED, SagaStepStatus.PENDING) for s in saga.steps ) if all_compensated: saga.status SagaStatus.COMPENSATED else: saga.status SagaStatus.FAILED # 补偿失败的 Saga 需要人工介入 await self.event_bus.publish( fsaga.{saga.saga_type}.compensation_failed, {saga_id: saga.saga_id, failed_steps: [ s.name for s in saga.steps if s.status SagaStepStatus.FAILED ]}, ) self._save(saga) return saga def _save(self, saga: SagaDefinition): 持久化 Saga 状态确保故障后可恢复 saga.updated_at datetime.now() self.store.save(saga) async def recover(self, saga_id: str) - Optional[SagaDefinition]: 故障恢复从持久化存储加载 Saga 并继续执行 saga self.store.load(saga_id) if not saga: return None if saga.status SagaStatus.RUNNING: # 正向执行中断继续执行 return await self.execute(saga) elif saga.status SagaStatus.COMPENSATING: # 补偿中断继续补偿 return await self._compensate(saga) return saga四、分布式事务的边界与权衡补偿操作的幂等性。补偿操作可能被执行多次网络超时后重试、故障恢复后重新执行。如果补偿操作不是幂等的重复执行会导致数据错误。例如退款操作如果执行两次就会退两笔钱。所有补偿操作必须设计为幂等的——通过唯一业务 ID 去重或使用状态标记防止重复执行。补偿的补偿。当补偿操作本身失败时系统进入一个尴尬的状态正向操作已执行补偿也失败了。此时没有自动恢复的手段只能依赖人工介入或定时重试。这意味着系统必须提供补偿失败的告警和运维工具。最终一致性的时间窗口。最终是多久在业务层面需要定义不一致状态的最大容忍时间。例如订单创建后 5 分钟内库存必须完成扣减或释放预扣。超过这个时间窗口系统应自动触发补偿或告警。隔离性的缺失。Saga 模式不提供隔离性——在 Saga 执行过程中中间状态对其他事务可见。例如库存预扣减后、订单确认前其他事务看到的是库存已减少但订单可能最终被取消。脏读问题需要通过业务层面的设计来缓解如使用预扣减而非实际扣减。设计维度Saga 模式2PC一致性最终一致强一致可用性高无阻塞低同步阻塞性能高异步低同步等待复杂度补偿逻辑复杂协调器单点风险隔离性无隔离完全隔离五、总结分布式事务的核心权衡在于一致性与可用性的取舍。Saga 模式选择最终一致性通过补偿机制保证数据最终可恢复代价是中间状态可见和补偿逻辑的复杂度。协调式 Saga 适合复杂流程编排式 Saga 适合简单场景两者可以混合使用。落地路线建议第一所有补偿操作必须幂等通过唯一业务 ID 去重第二定义不一致状态的时间窗口超时自动触发补偿第三建立补偿失败的告警机制和运维工具确保异常状态可被人工介入恢复。