Seata分布式事务的实际应用
Seata代理数据源及其配置
pom.xml
<!-- seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!-- 注册 Seata 数据源需要连接池 -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<optional>true</optional>
</dependency>
-
在涉及到分布式事务的数据库中执行
undo_log.sql
CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-
配置事务分组
在
nacos
的配置文件中增加事务分组的配置seataServer.properties
# 存储模式 store.mode=db store.db.datasource=druid store.db.dbType=mysql # 需要根据mysql的版本调整driverClassName # mysql8及以上版本对应的driver:com.mysql.cj.jdbc.Driver # mysql8以下版本的driver:com.mysql.jdbc.Driver store.db.driverClassName=com.mysql.cj.jdbc.Driver # 注意根据生产实际情况调整参数host和port store.db.url=jdbc:mysql://192.168.0.103:3306/seata-server?useUnicode=true&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false # 数据库用户名 store.db.user=root # 用户名密码 store.db.password=123456 #自定义事务分组 service.vgroupMapping.imooc-ecommerce=default
在 Seata-Client 中配置
bootstrap.yml
文件seata: # 配置中心 config: type: nacos nacos: server-addr: 192.168.0.103:8848 # Nacos 配置中心的地址 group : "e-commerce" #分组必须和 配置 nacos client 的分组一致 namespace: "f2dfb4c7-a6cf-45d5-92e2-3912daf808d0" # namespace必须和nacos client 的namespace一致 username: "nacos" #Nacos 配置中心的用于名 password: "nacos" #Nacos 配置中心的密码 # 注册中心 registry: type: nacos nacos: application: seata-server server-addr: 192.168.0.103:8848 group : "DEFAULT_GROUP" #分组必须和nacos client 注册的服务的分组一致 namespace: "f2dfb4c7-a6cf-45d5-92e2-3912daf808d0" # namespace必须和nacos client 的namespace一致 username: "nacos" password: "nacos" tx-service-group: imooc-ecommerce # 自定义服务群组,该值必须与 Nacos 配置中的 service.vgroupMapping.{my-service-group}=default 中的 {my-service-group}相同 service: vgroup-mapping: imooc-ecommerce: default # 自定义服务群组,该值必须与 Nacos 配置中的 service.vgroupMapping.{my-service-group}=default 中的 {my-service-group}相同 spring: cloud: alibaba: seata: tx-service-group: imooc-ecommerce # 自定义服务群组,该值必须与 Nacos 配置中的 service.vgroupMapping.{my-service-group}=default 中的 {my-service-group}相同
-
配置
Seata
的数据源代理/** * @author Guank * @version 1.0 * @description: Seata 所需要的数据源代理 * @date 2022/6/19 14:20 */ @Configuration public class DataSourceProxyAutoConfiguration { private final DataSourceProperties dataSourceProperties; public DataSourceProxyAutoConfiguration(DataSourceProperties dataSourceProperties) { this.dataSourceProperties = dataSourceProperties; } /** * 配置 Seata 的数据源代理 用于全局事务的回滚 * Seata 记录一次全局事务的 before image + after image -> undo log * @return */ @Primary @Bean("dataSource") public DataSource dataSource() { HikariDataSource dataSource = new HikariDataSource(); dataSource.setJdbcUrl(dataSourceProperties.getUrl()); dataSource.setUsername(dataSourceProperties.getUsername()); dataSource.setPassword(dataSourceProperties.getPassword()); dataSource.setDriverClassName(dataSourceProperties.getDriverClassName()); return new DataSourceProxy(dataSource); } }
-
加载拦截器
SeataHandlerInterceptor
,实现微服务之间xid
的传递/** * @author qingtian * @version 1.0 * @description: WebMvc配置 * @date 2021/12/27 20:40 */ @Configuration public class ImoocWebMvcConfig extends WebMvcConfigurationSupport { /** * @description: 添加拦截器配置 * @param: registry * @return: void * @date: 2021/12/27 20:41 */ @Override protected void addInterceptors(InterceptorRegistry registry) { /** * 保存登录的用户信息拦截器 */ registry.addInterceptor(new LoginUserInfoInterceptor()) .addPathPatterns("/**").order(0); // Seata 传递 xid 事务 id 给其他事务 // 只有这样,其他服务才会写 undo_log,才能实现回滚 registry.addInterceptor(new SeataHandlerInterceptor()).addPathPatterns("/**"); } }
-
自定义通信信道 :
logisticsOutput
cloud: stream: kafka: binder: brokers: 192.168.0.103:9092 auto-create-topics: true bindings: logisticsOutput: destination: e-commerce-topic # kafka topic content-type: text/plain
-
自定义物流消息输出信道
/** * @author Guank * @version 1.0 * @description: 自定义物流消息通信信道 * @date 2022/6/19 19:38 */ public interface LogisticsSource { /** * 输出信道的名称 */ String OUTPUT = "logisticsOutput"; /** * 物流 Source -> logisticsOutput * 通信信道的名称 logisticsOutput 对应的是 yml 文件里的配置 * @return */ @Output(LogisticsSource.OUTPUT) MessageChannel logisticsOutput(); } ------------------------------------------------------------------- @Slf4j @Service @EnableBinding(LogisticsSource.class) public class OrderServiceImpl implements IOrderService { /** 表的 dao 接口 */ private final EcommerceOrderDao orderDao; /** Feign 客户端 */ private final AddressClient addressClient; private final SecuredGoodsClient securedGoodsClient; private final NotSecuredGoodsClient notSecuredGoodsClient; private final NotSecuredBalanceClient notSecuredBalanceClient; /** SpringCloud Stream 的发射器 */ private final LogisticsSource logisticsSource; public OrderServiceImpl(EcommerceOrderDao orderDao, AddressClient addressClient, SecuredGoodsClient securedGoodsClient, NotSecuredGoodsClient notSecuredGoodsClient, NotSecuredBalanceClient notSecuredBalanceClient, LogisticsSource logisticsSource) { this.orderDao = orderDao; this.addressClient = addressClient; this.securedGoodsClient = securedGoodsClient; this.notSecuredGoodsClient = notSecuredGoodsClient; this.notSecuredBalanceClient = notSecuredBalanceClient; this.logisticsSource = logisticsSource; } /** * <h2>创建订单: 这里会涉及到分布式事务</h2> * 创建订单会涉及到多个步骤和校验, 当不满足情况时直接抛出异常; * 1. 校验请求对象是否合法 * 2. 创建订单 * 3. 扣减商品库存 * 4. 扣减用户余额 * 5. 发送订单物流消息 SpringCloud Stream + Kafka * */ @Override @GlobalTransactional(rollbackFor = Exception.class) public TableId createOrder(OrderInfo orderInfo) { // 获取地址信息 AddressInfo addressInfo = addressClient.getAddressInfoByTablesId( new TableId(Collections.singletonList( new TableId.Id(orderInfo.getUserAddress())))).getData(); // 1. 校验请求对象是否合法(商品信息不需要校验, 扣减库存会做校验) if (CollectionUtils.isEmpty(addressInfo.getAddressItems())) { throw new RuntimeException("user address is not exist: " + orderInfo.getUserAddress()); } // 2. 创建订单 EcommerceOrder newOrder = orderDao.save( new EcommerceOrder( AccessContext.getLoginUserInfo().getId(), orderInfo.getUserAddress(), JSON.toJSONString(orderInfo.getOrderItems()) ) ); log.info("create order success: [{}], [{}]", AccessContext.getLoginUserInfo().getId(), newOrder.getId()); // 3. 扣减商品库存 if ( !notSecuredGoodsClient.deductGoodsInventory( orderInfo.getOrderItems() .stream() .map(OrderInfo.OrderItem::toDeductGoodsInventory) .collect(Collectors.toList()) ).getData() ) { throw new RuntimeException("deduct goods inventory failure"); } // 4. 扣减用户账户余额 // 4.1 获取商品信息, 计算总价格 List<SimpleGoodsInfo> goodsInfos = notSecuredGoodsClient.getSimpleGoodsInfoByTableId( new TableId( orderInfo.getOrderItems() .stream() .map(o -> new TableId.Id(o.getGoodsId())) .collect(Collectors.toList()) ) ).getData(); Map<Long, SimpleGoodsInfo> goodsId2GoodsInfo = goodsInfos.stream() .collect(Collectors.toMap(SimpleGoodsInfo::getId, Function.identity())); long balance = 0; for (OrderInfo.OrderItem orderItem : orderInfo.getOrderItems()) { balance += goodsId2GoodsInfo.get(orderItem.getGoodsId()).getPrice() * orderItem.getCount(); } assert balance > 0; // 4.2 填写总价格, 扣减账户余额 BalanceInfo balanceInfo = notSecuredBalanceClient.deductBalance( new BalanceInfo(AccessContext.getLoginUserInfo().getId(), balance) ).getData(); if (null == balanceInfo) { throw new RuntimeException("deduct user balance failure"); } log.info("deduct user balance: [{}], [{}]", newOrder.getId(), JSON.toJSONString(balanceInfo)); // 5. 发送订单物流消息 SpringCloud Stream + Kafka LogisticsMessage logisticsMessage = new LogisticsMessage( AccessContext.getLoginUserInfo().getId(), newOrder.getId(), orderInfo.getUserAddress(), null // 没有备注信息 ); if (!logisticsSource.logisticsOutput().send( MessageBuilder.withPayload(JSON.toJSONString(logisticsMessage)).build() )) { throw new RuntimeException("send logistics message failure"); } log.info("send create order message to kafka with stream: [{}]", JSON.toJSONString(logisticsMessage)); // 返回订单 id return new TableId(Collections.singletonList(new TableId.Id(newOrder.getId()))); } @Override public PageSimpleOrderDetail getSimpleOrderDetailByPage(int page) { if (page <= 0) { page = 1; // 默认是第一页 } // 这里分页的规则是: 1页10条数据, 按照 id 倒序排列 Pageable pageable = PageRequest.of(page - 1, 10, Sort.by("id").descending()); Page<EcommerceOrder> orderPage = orderDao.findAllByUserId( AccessContext.getLoginUserInfo().getId(), pageable ); List<EcommerceOrder> orders = orderPage.getContent(); // 如果是空, 直接返回空数组 if (CollectionUtils.isEmpty(orders)) { return new PageSimpleOrderDetail(Collections.emptyList(), false); } // 获取当前订单中所有的 goodsId, 这个 set 不可能为空或者是 null, 否则, 代码一定有 bug Set<Long> goodsIdsInOrders = new HashSet<>(); orders.forEach(o -> { List<DeductGoodsInventory> goodsAndCount = JSON.parseArray( o.getOrderDetail(), DeductGoodsInventory.class ); goodsIdsInOrders.addAll(goodsAndCount.stream() .map(DeductGoodsInventory::getGoodsId) .collect(Collectors.toSet())); }); assert CollectionUtils.isNotEmpty(goodsIdsInOrders); // 是否还有更多页: 总页数是否大于当前给定的页 boolean hasMore = orderPage.getTotalPages() > page; // 获取商品信息 List<SimpleGoodsInfo> goodsInfos = securedGoodsClient.getSimpleGoodsInfoByTableId( new TableId(goodsIdsInOrders.stream() .map(TableId.Id::new).collect(Collectors.toList())) ).getData(); // 获取地址信息 AddressInfo addressInfo = addressClient.getAddressInfoByTablesId( new TableId(orders.stream() .map(o -> new TableId.Id(o.getAddressId())) .distinct().collect(Collectors.toList())) ).getData(); // 组装订单中的商品, 地址信息 -> 订单信息 return new PageSimpleOrderDetail( assembleSimpleOrderDetail(orders, goodsInfos, addressInfo), hasMore ); } /** * <h2>组装订单详情</h2> * */ private List<PageSimpleOrderDetail.SingleOrderItem> assembleSimpleOrderDetail( List<EcommerceOrder> orders, List<SimpleGoodsInfo> goodsInfos, AddressInfo addressInfo ) { // goodsId -> SimpleGoodsInfo Map<Long, SimpleGoodsInfo> id2GoodsInfo = goodsInfos.stream() .collect(Collectors.toMap(SimpleGoodsInfo::getId, Function.identity())); // addressId -> AddressInfo.AddressItem Map<Long, AddressInfo.AddressItem> id2AddressItem = addressInfo.getAddressItems() .stream().collect( Collectors.toMap(AddressInfo.AddressItem::getId, Function.identity()) ); List<PageSimpleOrderDetail.SingleOrderItem> result = new ArrayList<>(orders.size()); orders.forEach(o -> { PageSimpleOrderDetail.SingleOrderItem orderItem = new PageSimpleOrderDetail.SingleOrderItem(); orderItem.setId(o.getId()); orderItem.setUserAddress(id2AddressItem.getOrDefault(o.getAddressId(), new AddressInfo.AddressItem(-1L)).toUserAddress()); orderItem.setGoodsItems(buildOrderGoodsItem(o, id2GoodsInfo)); result.add(orderItem); }); return result; } /** * <h2>构造订单中的商品信息</h2> * */ private List<PageSimpleOrderDetail.SingleOrderGoodsItem> buildOrderGoodsItem( EcommerceOrder order, Map<Long, SimpleGoodsInfo> id2GoodsInfo ) { List<PageSimpleOrderDetail.SingleOrderGoodsItem> goodsItems = new ArrayList<>(); List<DeductGoodsInventory> goodsAndCount = JSON.parseArray( order.getOrderDetail(), DeductGoodsInventory.class ); goodsAndCount.forEach(gc -> { PageSimpleOrderDetail.SingleOrderGoodsItem goodsItem = new PageSimpleOrderDetail.SingleOrderGoodsItem(); goodsItem.setCount(gc.getCount()); goodsItem.setSimpleGoodsInfo(id2GoodsInfo.getOrDefault(gc.getGoodsId(), new SimpleGoodsInfo(-1L))); goodsItems.add(goodsItem); }); return goodsItems; } }
使用 Seata 分布式事务 在每一步出错时 自己都要抛出异常,用来全局事务的回滚
-
自定义物流消息接收信道
/** * <h1>自定义物流信息接收器(Sink)</h1> * */ public interface LogisticsSink { /** 输入信道名称 */ String INPUT = "logisticsInput"; /** * <h2>物流 Sink -> logisticsInput</h2> * */ @Input(LogisticsSink.INPUT) SubscribableChannel logisticsInput(); } ----------------------------------------------------------------------- /** * <h1>物流服务实现</h1> * */ @Slf4j @EnableBinding(LogisticsSink.class) public class LogisticsServiceImpl { private final EcommerceLogisticsDao logisticsDao; public LogisticsServiceImpl(EcommerceLogisticsDao logisticsDao) { this.logisticsDao = logisticsDao; } /** * <h2>订阅监听订单微服务发送的物流消息</h2> * */ @StreamListener("logisticsInput") public void consumeLogisticsMessage(@Payload Object payload) { log.info("receive and consume logistics message: [{}]", payload.toString()); LogisticsMessage logisticsMessage = JSON.parseObject( payload.toString(), LogisticsMessage.class ); EcommerceLogistics ecommerceLogistics = logisticsDao.save( new EcommerceLogistics( logisticsMessage.getUserId(), logisticsMessage.getOrderId(), logisticsMessage.getAddressId(), logisticsMessage.getExtraInfo() ) ); log.info("consume logistics message success: [{}]", ecommerceLogistics.getId()); } }
-
评论区