Windows版Fritzing便携包:开箱即用的Arduino电路图与PCB设计工具
2026/6/7 20:15:02
在微服务架构盛行的今天,分布式事务已经成为每个后端工程师必须掌握的核心技能。本文将从本地事务出发,循序渐进地带你深入理解分布式事务的各种解决方案,并结合实战代码帮你快速落地!
事务是数据库操作的基本单元,具有四大特性(ACID):
在单体应用中,本地事务非常简单:
@ServicepublicclassOrderService{@AutowiredprivateJdbcTemplatejdbcTemplate;@TransactionalpublicvoidcreateOrder(Orderorder){// 1. 插入订单主表jdbcTemplate.update("INSERT INTO orders (user_id, total_amount, status) VALUES (?, ?, ?)",order.getUserId(),order.getTotalAmount(),"CREATED");// 2. 插入订单明细for(OrderItemitem:order.getItems()){jdbcTemplate.update("INSERT INTO order_items (order_id, product_id, quantity, price) VALUES (?, ?, ?, ?)",order.getId(),item.getProductId(),item.getQuantity(),item.getPrice());}// 3. 更新库存jdbcTemplate.update("UPDATE products SET stock = stock - ? WHERE id = ?",item.getQuantity(),item.getProductId());// 如果任何一步失败,整个事务回滚}}优点:
@Transactional注解即可局限:
在微服务架构中,一个业务流程可能涉及多个服务:
// 订单服务@ServicepublicclassOrderService{@AutowiredprivateStockServiceClientstockService;@AutowiredprivateAccountServiceClientaccountService;publicvoidplaceOrder(OrderRequestrequest){// 1. 创建订单(订单服务的数据库)Orderorder=createOrder(request);// 2. 扣减库存(库存服务的数据库)stockService.deductStock(request.getProductId(),request.getQuantity());// 3. 扣减账户余额(账户服务的数据库)accountService.deductBalance(request.getUserId(),order.getTotalAmount());// 问题:如果第3步失败,前两步已经提交,如何回滚?}}CAP 定理:在分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition Tolerance)三者不可兼得,最多只能同时满足两个。
刚性事务追求强一致性,保证分布式环境下的 ACID 特性。
执行流程:
阶段一:准备阶段(Prepare)
阶段二:提交阶段(Commit)
// 使用 JTA(Java Transaction API)实现 2PC@TransactionalpublicclassDistributedOrderService{@Resource(name="orderDataSource")privateDataSourceorderDS;@Resource(name="stockDataSource")privateDataSourcestockDS;publicvoidplaceOrder(OrderRequestrequest)throwsException{// 获取 UserTransactionUserTransactionuserTransaction=(UserTransaction)newInitialContext().lookup("java:comp/UserTransaction");try{// 开启全局事务userTransaction.begin();// 操作订单数据库ConnectionorderConn=orderDS.getConnection();orderConn.createStatement().execute("INSERT INTO orders (user_id, amount) VALUES ("+request.getUserId()+","+request.getAmount()+")");// 操作库存数据库ConnectionstockConn=stockDS.getConnection();stockConn.createStatement().execute("UPDATE stock SET quantity = quantity - "+request.getQuantity()+" WHERE product_id = "+request.getProductId());// 提交全局事务userTransaction.commit();}catch(Exceptione){// 回滚全局事务userTransaction.rollback();throwe;}}}优点:
缺点:
3PC 在 2PC 基础上增加了一个预提交阶段,减少阻塞时间:
阶段一:CanCommit(询问)
阶段二:PreCommit(预提交)
阶段三:DoCommit(提交)
改进点:
缺点:
柔性事务放弃强一致性,追求最终一致性,更适合互联网高并发场景。
TCC 将事务分为三个阶段:
// 库存服务 - TCC 接口实现@ServicepublicclassStockTccService{@AutowiredprivateStockMapperstockMapper;/** * Try 阶段:冻结库存 */publicbooleantryDeduct(LongproductId,Integerquantity,StringorderId){Stockstock=stockMapper.selectById(productId);// 检查库存是否充足if(stock.getAvailable()<quantity){returnfalse;}// 冻结库存:available - quantity, frozen + quantityintupdated=stockMapper.freezeStock(productId,quantity);// 记录冻结日志stockMapper.insertFreezeLog(orderId,productId,quantity,"TRY");returnupdated>0;}/** * Confirm 阶段:确认扣减 */publicbooleanconfirmDeduct(StringorderId){FreezeLoglog=stockMapper.selectFreezeLog(orderId);// 扣减冻结的库存:frozen - quantityintupdated=stockMapper.deductFrozenStock(log.getProductId(),log.getQuantity());// 更新日志状态stockMapper.updateFreezeLog(orderId,"CONFIRM");returnupdated>0;}/** * Cancel 阶段:释放库存 */publicbooleancancelDeduct(StringorderId){FreezeLoglog=stockMapper.selectFreezeLog(orderId);// 解冻库存:available + quantity, frozen - quantityintupdated=stockMapper.unfreezeStock(log.getProductId(),log.getQuantity());// 更新日志状态stockMapper.updateFreezeLog(orderId,"CANCEL");returnupdated>0;}}// 订单服务 - TCC 事务协调@ServicepublicclassOrderTccCoordinator{@AutowiredprivateStockTccServicestockTccService;@AutowiredprivateAccountTccServiceaccountTccService;publicvoidplaceOrder(OrderRequestrequest){StringorderId=UUID.randomUUID().toString();try{// Try 阶段booleanstockOk=stockTccService.tryDeduct(request.getProductId(),request.getQuantity(),orderId);booleanaccountOk=accountTccService.tryDeduct(request.getUserId(),request.getAmount(),orderId);if(stockOk&&accountOk){// Confirm 阶段stockTccService.confirmDeduct(orderId);accountTccService.confirmDeduct(orderId);}else{// Cancel 阶段if(stockOk)stockTccService.cancelDeduct(orderId);if(accountOk)accountTccService.cancelDeduct(orderId);}}catch(Exceptione){// 异常时 CancelstockTccService.cancelDeduct(orderId);accountTccService.cancelDeduct(orderId);}}}优点:
缺点:
Saga 将长事务拆分为多个本地短事务,每个事务都有对应的补偿事务。
// Saga 编排示例@ServicepublicclassOrderSagaService{@AutowiredprivateOrderServiceorderService;@AutowiredprivateStockServicestockService;@AutowiredprivateAccountServiceaccountService;publicvoidplaceOrder(OrderRequestrequest){List<Compensation>compensations=newArrayList<>();try{// 步骤 1:创建订单Orderorder=orderService.createOrder(request);compensations.add(()->orderService.cancelOrder(order.getId()));// 步骤 2:扣减库存stockService.deductStock(request.getProductId(),request.getQuantity());compensations.add(()->stockService.addStock(request.getProductId(),request.getQuantity()));// 步骤 3:扣减余额accountService.deductBalance(request.getUserId(),request.getAmount());compensations.add(()->accountService.addBalance(request.getUserId(),request.getAmount()));// 所有步骤成功orderService.confirmOrder(order.getId());}catch(Exceptione){// 逆序执行补偿操作for(inti=compensations.size()-1;i>=0;i--){try{compensations.get(i).compensate();}catch(Exceptionex){// 记录补偿失败,人工介入log.error("补偿失败: {}",ex.getMessage());}}thrownewBusinessException("下单失败");}}}@FunctionalInterfaceinterfaceCompensation{voidcompensate();}优点:
缺点:
通过消息队列实现异步事务,保证最终一致性。
// 订单服务 - 发送可靠消息@ServicepublicclassOrderMessageService{@AutowiredprivateRocketMQTemplaterocketMQTemplate;@AutowiredprivateOrderMapperorderMapper;@TransactionalpublicvoidcreateOrderWithMessage(OrderRequestrequest){// 1. 创建订单(本地事务)Orderorder=newOrder();order.setUserId(request.getUserId());order.setAmount(request.getAmount());order.setStatus("PENDING");orderMapper.insert(order);// 2. 发送事务消息(确保消息和订单在同一事务中)rocketMQTemplate.sendMessageInTransaction("order-topic",MessageBuilder.withPayload(order).build(),null);}// 事务消息监听器@RocketMQTransactionListenerclassOrderTransactionListenerimplementsRocketMQLocalTransactionListener{@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){try{// 本地事务已在 createOrderWithMessage 中执行returnRocketMQLocalTransactionState.COMMIT;}catch(Exceptione){returnRocketMQLocalTransactionState.ROLLBACK;}}@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){// 回查本地事务状态Orderorder=JSON.parseObject(msg.getPayload(),Order.class);OrderdbOrder=orderMapper.selectById(order.getId());if(dbOrder!=null){returnRocketMQLocalTransactionState.COMMIT;}else{returnRocketMQLocalTransactionState.ROLLBACK;}}}}// 库存服务 - 消费消息@Service@RocketMQMessageListener(topic="order-topic",consumerGroup="stock-consumer")publicclassStockMessageConsumerimplementsRocketMQListener<Order>{@AutowiredprivateStockServicestockService;@OverridepublicvoidonMessage(Orderorder){try{// 执行扣减库存(需要保证幂等性)stockService.deductStock(order.getProductId(),order.getQuantity());}catch(Exceptione){// 消费失败,消息会重试log.error("扣减库存失败: {}",e.getMessage());thrownewRuntimeException(e);}}}关键点:
优点:
缺点:
适用于对一致性要求不高的场景,通过重试机制尽最大努力通知。
@ServicepublicclassNotificationService{@AutowiredprivateRestTemplaterestTemplate;@AutowiredprivateNotificationLogMapperlogMapper;/** * 发送通知,失败后定时重试 */publicvoidsendNotification(StringtargetUrl,Objectdata){// 记录通知日志NotificationLoglog=newNotificationLog();log.setTargetUrl(targetUrl);log.setData(JSON.toJSONString(data));log.setRetryCount(0);log.setStatus("PENDING");logMapper.insert(log);// 异步发送executeNotification(log);}privatevoidexecuteNotification(NotificationLoglog){try{// 发送 HTTP 请求ResponseEntity<String>response=restTemplate.postForEntity(log.getTargetUrl(),log.getData(),String.class);if(response.getStatusCode().is2xxSuccessful()){// 成功log.setStatus("SUCCESS");logMapper.updateById(log);}else{// 失败,等待重试scheduleRetry(log);}}catch(Exceptione){// 异常,等待重试scheduleRetry(log);}}/** * 定时重试任务 */@Scheduled(fixedDelay=60000)// 每分钟执行一次publicvoidretryFailedNotifications(){List<NotificationLog>pendingLogs=logMapper.selectPendingLogs();for(NotificationLoglog:pendingLogs){if(log.getRetryCount()>=5){// 超过最大重试次数,标记为失败log.setStatus("FAILED");logMapper.updateById(log);// 发送告警,人工介入alertService.sendAlert("通知失败: "+log.getTargetUrl());}else{// 增加重试次数log.setRetryCount(log.getRetryCount()+1);logMapper.updateById(log);// 重新发送executeNotification(log);}}}}适用场景:
Seata 包含三大角色:
AT 模式是 Seata 的默认模式,对业务代码几乎无侵入。
// 1. 引入依赖(Spring Boot)/* <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> */// 2. 配置文件/* seata: enabled: true application-id: order-service tx-service-group: my_tx_group registry: type: nacos nacos: server-addr: 127.0.0.1:8848 */// 3. 业务代码 - 订单服务@ServicepublicclassOrderServiceImpl{@AutowiredprivateStockServiceClientstockService;@AutowiredprivateAccountServiceClientaccountService;@AutowiredprivateOrderMapperorderMapper;@GlobalTransactional(name="create-order",rollbackFor=Exception.class)publicvoidcreateOrder(OrderRequestrequest){// 1. 创建订单Orderorder=newOrder();order.setUserId(request.getUserId());order.setAmount(request.getAmount());orderMapper.insert(order);// 2. 远程调用库存服务stockService.deduct(request.getProductId(),request.getQuantity());// 3. 远程调用账户服务accountService.deduct(request.getUserId(),request.getAmount());// 任何一步失败,Seata 会自动回滚所有操作}}// 4. 库存服务(只需要本地事务)@ServicepublicclassStockServiceImpl{@AutowiredprivateStockMapperstockMapper;@Transactionalpublicvoiddeduct(LongproductId,Integerquantity){stockMapper.deduct(productId,quantity);}}一阶段:
二阶段:
优势:
| 方案 | 一致性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 2PC/3PC | 强一致 | 低 | 低 | 金融核心系统 |
| TCC | 最终一致 | 高 | 高 | 资金交易 |
| Saga | 最终一致 | 高 | 中 | 长流程业务 |
| 可靠消息 | 最终一致 | 很高 | 中 | 高并发场景 |
| 最大努力通知 | 弱一致 | 很高 | 低 | 通知类场景 |
| Seata AT | 最终一致 | 高 | 低 | 通用场景 |