侧边栏壁纸
博主头像
qingtian博主等级

喜欢是一件细水流长的事,是永不疲惫的双向奔赴~!

  • 累计撰写 104 篇文章
  • 累计创建 48 个标签
  • 累计收到 1 条评论

Seata分布式事务的实际应用

qingtian
2022-06-25 / 0 评论 / 0 点赞 / 466 阅读 / 12,340 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-06-25,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

Seata分布式事务的实际应用

Seata代理数据源及其配置

  1. pom.xml
 <!-- seata -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>
        <!-- 注册 Seata 数据源需要连接池 -->
        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
            <optional>true</optional>
        </dependency>
  1. 在涉及到分布式事务的数据库中执行undo_log.sql

    CREATE TABLE `undo_log` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `branch_id` bigint(20) NOT NULL,
      `xid` varchar(100) NOT NULL,
      `context` varchar(128) NOT NULL,
      `rollback_info` longblob NOT NULL,
      `log_status` int(11) NOT NULL,
      `log_created` datetime NOT NULL,
      `log_modified` datetime NOT NULL,
      `ext` varchar(100) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
    
    
  2. 配置事务分组

    nacos 的配置文件中增加事务分组的配置

    seataServer.properties

    # 存储模式
    store.mode=db
    
    store.db.datasource=druid
    store.db.dbType=mysql
    # 需要根据mysql的版本调整driverClassName
    # mysql8及以上版本对应的driver:com.mysql.cj.jdbc.Driver
    # mysql8以下版本的driver:com.mysql.jdbc.Driver
    store.db.driverClassName=com.mysql.cj.jdbc.Driver
    # 注意根据生产实际情况调整参数host和port
    store.db.url=jdbc:mysql://192.168.0.103:3306/seata-server?useUnicode=true&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false
    # 数据库用户名
    store.db.user=root
    # 用户名密码
    store.db.password=123456
    #自定义事务分组
    service.vgroupMapping.imooc-ecommerce=default
    

    在 Seata-Client 中配置 bootstrap.yml 文件

    seata:
      # 配置中心
      config:
        type: nacos
        nacos:
          server-addr: 192.168.0.103:8848 # Nacos 配置中心的地址
          group : "e-commerce"  #分组必须和 配置 nacos client 的分组一致
          namespace: "f2dfb4c7-a6cf-45d5-92e2-3912daf808d0" # namespace必须和nacos client 的namespace一致
          username: "nacos"   #Nacos 配置中心的用于名
          password: "nacos"  #Nacos 配置中心的密码
      # 注册中心
      registry:
        type: nacos
        nacos:
          application: seata-server
          server-addr: 192.168.0.103:8848
          group : "DEFAULT_GROUP"  #分组必须和nacos client 注册的服务的分组一致
          namespace: "f2dfb4c7-a6cf-45d5-92e2-3912daf808d0" # namespace必须和nacos client 的namespace一致
          username: "nacos"
          password: "nacos"
      tx-service-group: imooc-ecommerce # 自定义服务群组,该值必须与 Nacos 配置中的 service.vgroupMapping.{my-service-group}=default 中的 {my-service-group}相同
      service:
        vgroup-mapping:
          imooc-ecommerce: default  # 自定义服务群组,该值必须与 Nacos 配置中的 service.vgroupMapping.{my-service-group}=default 中的 {my-service-group}相同
    
    spring:
      cloud:
        alibaba:
          seata:
            tx-service-group: imooc-ecommerce # 自定义服务群组,该值必须与 Nacos 配置中的 service.vgroupMapping.{my-service-group}=default 中的 {my-service-group}相同
    
  3. 配置 Seata 的数据源代理

    /**
     * @author Guank
     * @version 1.0
     * @description: Seata 所需要的数据源代理
     * @date 2022/6/19 14:20
     */
    @Configuration
    public class DataSourceProxyAutoConfiguration {
    
        private final DataSourceProperties dataSourceProperties;
    
        public DataSourceProxyAutoConfiguration(DataSourceProperties dataSourceProperties) {
            this.dataSourceProperties = dataSourceProperties;
        }
    
        /**
         * 配置 Seata 的数据源代理 用于全局事务的回滚
         * Seata 记录一次全局事务的 before image + after image -> undo log
         * @return
         */
        @Primary
        @Bean("dataSource")
        public DataSource dataSource() {
            HikariDataSource dataSource = new HikariDataSource();
            dataSource.setJdbcUrl(dataSourceProperties.getUrl());
            dataSource.setUsername(dataSourceProperties.getUsername());
            dataSource.setPassword(dataSourceProperties.getPassword());
            dataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
    
            return new DataSourceProxy(dataSource);
        }
    
    }
    
  4. 加载拦截器 SeataHandlerInterceptor,实现微服务之间xid的传递

    /**
     * @author qingtian
     * @version 1.0
     * @description: WebMvc配置
     * @date 2021/12/27 20:40
     */
    @Configuration
    public class ImoocWebMvcConfig extends WebMvcConfigurationSupport {
    
        /**
         * @description: 添加拦截器配置
         * @param: registry
         * @return: void
         * @date: 2021/12/27 20:41
         */
        @Override
        protected void addInterceptors(InterceptorRegistry registry) {
    
            /**
             * 保存登录的用户信息拦截器
             */
            registry.addInterceptor(new LoginUserInfoInterceptor())
                    .addPathPatterns("/**").order(0);
            // Seata 传递 xid 事务 id 给其他事务
            // 只有这样,其他服务才会写 undo_log,才能实现回滚
            registry.addInterceptor(new SeataHandlerInterceptor()).addPathPatterns("/**");
        }
    }
    
  5. 自定义通信信道 :logisticsOutput

    cloud:
        stream:
          kafka:
            binder:
              brokers: 192.168.0.103:9092
              auto-create-topics: true
          bindings:
            logisticsOutput:
              destination: e-commerce-topic   # kafka topic
              content-type: text/plain
    
    • 自定义物流消息输出信道

      /**
       * @author Guank
       * @version 1.0
       * @description: 自定义物流消息通信信道
       * @date 2022/6/19 19:38
       */
      public interface LogisticsSource {
      
          /**
           * 输出信道的名称
           */
          String OUTPUT = "logisticsOutput";
      
          /**
           *  物流 Source -> logisticsOutput
           *  通信信道的名称 logisticsOutput 对应的是 yml 文件里的配置
           * @return
           */
          @Output(LogisticsSource.OUTPUT)
          MessageChannel logisticsOutput();
      }
      
      -------------------------------------------------------------------
      @Slf4j
      @Service
      @EnableBinding(LogisticsSource.class)
      public class OrderServiceImpl implements IOrderService {
      
          /** 表的 dao 接口 */
          private final EcommerceOrderDao orderDao;
      
          /** Feign 客户端 */
          private final AddressClient addressClient;
          private final SecuredGoodsClient securedGoodsClient;
          private final NotSecuredGoodsClient notSecuredGoodsClient;
          private final NotSecuredBalanceClient notSecuredBalanceClient;
      
          /** SpringCloud Stream 的发射器 */
          private final LogisticsSource logisticsSource;
      
          public OrderServiceImpl(EcommerceOrderDao orderDao,
                                  AddressClient addressClient,
                                  SecuredGoodsClient securedGoodsClient,
                                  NotSecuredGoodsClient notSecuredGoodsClient,
                                  NotSecuredBalanceClient notSecuredBalanceClient,
                                  LogisticsSource logisticsSource) {
              this.orderDao = orderDao;
              this.addressClient = addressClient;
              this.securedGoodsClient = securedGoodsClient;
              this.notSecuredGoodsClient = notSecuredGoodsClient;
              this.notSecuredBalanceClient = notSecuredBalanceClient;
              this.logisticsSource = logisticsSource;
          }
      
          /**
           * <h2>创建订单: 这里会涉及到分布式事务</h2>
           * 创建订单会涉及到多个步骤和校验, 当不满足情况时直接抛出异常;
           * 1. 校验请求对象是否合法
           * 2. 创建订单
           * 3. 扣减商品库存
           * 4. 扣减用户余额
           * 5. 发送订单物流消息 SpringCloud Stream + Kafka
           * */
          @Override
          @GlobalTransactional(rollbackFor = Exception.class)
          public TableId createOrder(OrderInfo orderInfo) {
      
              // 获取地址信息
              AddressInfo addressInfo = addressClient.getAddressInfoByTablesId(
                      new TableId(Collections.singletonList(
                              new TableId.Id(orderInfo.getUserAddress())))).getData();
      
              // 1. 校验请求对象是否合法(商品信息不需要校验, 扣减库存会做校验)
              if (CollectionUtils.isEmpty(addressInfo.getAddressItems())) {
                  throw new RuntimeException("user address is not exist: "
                          + orderInfo.getUserAddress());
              }
      
              // 2. 创建订单
              EcommerceOrder newOrder = orderDao.save(
                      new EcommerceOrder(
                              AccessContext.getLoginUserInfo().getId(),
                              orderInfo.getUserAddress(),
                              JSON.toJSONString(orderInfo.getOrderItems())
                      )
              );
              log.info("create order success: [{}], [{}]",
                      AccessContext.getLoginUserInfo().getId(), newOrder.getId());
      
              // 3. 扣减商品库存
              if (
                      !notSecuredGoodsClient.deductGoodsInventory(
                              orderInfo.getOrderItems()
                                      .stream()
                                      .map(OrderInfo.OrderItem::toDeductGoodsInventory)
                                      .collect(Collectors.toList())
                      ).getData()
              ) {
                  throw new RuntimeException("deduct goods inventory failure");
              }
      
              // 4. 扣减用户账户余额
              // 4.1 获取商品信息, 计算总价格
              List<SimpleGoodsInfo> goodsInfos = notSecuredGoodsClient.getSimpleGoodsInfoByTableId(
                      new TableId(
                              orderInfo.getOrderItems()
                                      .stream()
                                      .map(o -> new TableId.Id(o.getGoodsId()))
                              .collect(Collectors.toList())
                      )
              ).getData();
              Map<Long, SimpleGoodsInfo> goodsId2GoodsInfo = goodsInfos.stream()
                      .collect(Collectors.toMap(SimpleGoodsInfo::getId, Function.identity()));
              long balance = 0;
              for (OrderInfo.OrderItem orderItem : orderInfo.getOrderItems()) {
                  balance += goodsId2GoodsInfo.get(orderItem.getGoodsId()).getPrice()
                          * orderItem.getCount();
              }
              assert balance > 0;
      
              // 4.2 填写总价格, 扣减账户余额
              BalanceInfo balanceInfo = notSecuredBalanceClient.deductBalance(
                      new BalanceInfo(AccessContext.getLoginUserInfo().getId(), balance)
              ).getData();
              if (null == balanceInfo) {
                  throw new RuntimeException("deduct user balance failure");
              }
              log.info("deduct user balance: [{}], [{}]", newOrder.getId(),
                      JSON.toJSONString(balanceInfo));
      
              // 5. 发送订单物流消息 SpringCloud Stream + Kafka
              LogisticsMessage logisticsMessage = new LogisticsMessage(
                      AccessContext.getLoginUserInfo().getId(),
                      newOrder.getId(),
                      orderInfo.getUserAddress(),
                      null    // 没有备注信息
              );
              if (!logisticsSource.logisticsOutput().send(
                      MessageBuilder.withPayload(JSON.toJSONString(logisticsMessage)).build()
              )) {
                  throw new RuntimeException("send logistics message failure");
              }
              log.info("send create order message to kafka with stream: [{}]",
                      JSON.toJSONString(logisticsMessage));
      
              // 返回订单 id
              return new TableId(Collections.singletonList(new TableId.Id(newOrder.getId())));
          }
      
          @Override
          public PageSimpleOrderDetail getSimpleOrderDetailByPage(int page) {
      
              if (page <= 0) {
                  page = 1;   // 默认是第一页
              }
      
              // 这里分页的规则是: 1页10条数据, 按照 id 倒序排列
              Pageable pageable = PageRequest.of(page - 1, 10,
                      Sort.by("id").descending());
              Page<EcommerceOrder> orderPage = orderDao.findAllByUserId(
                      AccessContext.getLoginUserInfo().getId(), pageable
              );
              List<EcommerceOrder> orders = orderPage.getContent();
      
              // 如果是空, 直接返回空数组
              if (CollectionUtils.isEmpty(orders)) {
                  return new PageSimpleOrderDetail(Collections.emptyList(), false);
              }
      
              // 获取当前订单中所有的 goodsId, 这个 set 不可能为空或者是 null, 否则, 代码一定有 bug
              Set<Long> goodsIdsInOrders = new HashSet<>();
              orders.forEach(o -> {
                  List<DeductGoodsInventory> goodsAndCount = JSON.parseArray(
                          o.getOrderDetail(), DeductGoodsInventory.class
                  );
                  goodsIdsInOrders.addAll(goodsAndCount.stream()
                          .map(DeductGoodsInventory::getGoodsId)
                          .collect(Collectors.toSet()));
              });
      
              assert CollectionUtils.isNotEmpty(goodsIdsInOrders);
      
              // 是否还有更多页: 总页数是否大于当前给定的页
              boolean hasMore = orderPage.getTotalPages() > page;
      
              // 获取商品信息
              List<SimpleGoodsInfo> goodsInfos = securedGoodsClient.getSimpleGoodsInfoByTableId(
                      new TableId(goodsIdsInOrders.stream()
                              .map(TableId.Id::new).collect(Collectors.toList()))
              ).getData();
      
              // 获取地址信息
              AddressInfo addressInfo = addressClient.getAddressInfoByTablesId(
                      new TableId(orders.stream()
                              .map(o -> new TableId.Id(o.getAddressId()))
                              .distinct().collect(Collectors.toList()))
              ).getData();
      
              // 组装订单中的商品, 地址信息 -> 订单信息
              return new PageSimpleOrderDetail(
                      assembleSimpleOrderDetail(orders, goodsInfos, addressInfo),
                      hasMore
              );
          }
      
          /**
           * <h2>组装订单详情</h2>
           * */
          private List<PageSimpleOrderDetail.SingleOrderItem> assembleSimpleOrderDetail(
                  List<EcommerceOrder> orders, List<SimpleGoodsInfo> goodsInfos,
                  AddressInfo addressInfo
          ) {
              // goodsId -> SimpleGoodsInfo
              Map<Long, SimpleGoodsInfo> id2GoodsInfo = goodsInfos.stream()
                      .collect(Collectors.toMap(SimpleGoodsInfo::getId, Function.identity()));
              // addressId -> AddressInfo.AddressItem
              Map<Long, AddressInfo.AddressItem> id2AddressItem = addressInfo.getAddressItems()
                      .stream().collect(
                              Collectors.toMap(AddressInfo.AddressItem::getId, Function.identity())
                      );
      
              List<PageSimpleOrderDetail.SingleOrderItem> result = new ArrayList<>(orders.size());
              orders.forEach(o -> {
      
                  PageSimpleOrderDetail.SingleOrderItem orderItem =
                          new PageSimpleOrderDetail.SingleOrderItem();
                  orderItem.setId(o.getId());
                  orderItem.setUserAddress(id2AddressItem.getOrDefault(o.getAddressId(),
                          new AddressInfo.AddressItem(-1L)).toUserAddress());
                  orderItem.setGoodsItems(buildOrderGoodsItem(o, id2GoodsInfo));
      
                  result.add(orderItem);
              });
      
              return result;
          }
      
          /**
           * <h2>构造订单中的商品信息</h2>
           * */
          private List<PageSimpleOrderDetail.SingleOrderGoodsItem> buildOrderGoodsItem(
              EcommerceOrder order, Map<Long, SimpleGoodsInfo> id2GoodsInfo
          ) {
      
              List<PageSimpleOrderDetail.SingleOrderGoodsItem> goodsItems = new ArrayList<>();
              List<DeductGoodsInventory> goodsAndCount = JSON.parseArray(
                      order.getOrderDetail(), DeductGoodsInventory.class
              );
      
              goodsAndCount.forEach(gc -> {
      
                  PageSimpleOrderDetail.SingleOrderGoodsItem goodsItem =
                          new PageSimpleOrderDetail.SingleOrderGoodsItem();
                  goodsItem.setCount(gc.getCount());
                  goodsItem.setSimpleGoodsInfo(id2GoodsInfo.getOrDefault(gc.getGoodsId(),
                          new SimpleGoodsInfo(-1L)));
      
                  goodsItems.add(goodsItem);
              });
      
              return goodsItems;
          }
      }
      
      

      使用 Seata 分布式事务 在每一步出错时 自己都要抛出异常,用来全局事务的回滚

    • 自定义物流消息接收信道

      /**
       * <h1>自定义物流信息接收器(Sink)</h1>
       * */
      public interface LogisticsSink {
      
          /** 输入信道名称 */
          String INPUT = "logisticsInput";
      
          /**
           * <h2>物流 Sink -> logisticsInput</h2>
           * */
          @Input(LogisticsSink.INPUT)
          SubscribableChannel logisticsInput();
      }
      
      -----------------------------------------------------------------------
      /**
       * <h1>物流服务实现</h1>
       * */
      @Slf4j
      @EnableBinding(LogisticsSink.class)
      public class LogisticsServiceImpl {
      
          private final EcommerceLogisticsDao logisticsDao;
      
          public LogisticsServiceImpl(EcommerceLogisticsDao logisticsDao) {
              this.logisticsDao = logisticsDao;
          }
      
          /**
           * <h2>订阅监听订单微服务发送的物流消息</h2>
           * */
          @StreamListener("logisticsInput")
          public void consumeLogisticsMessage(@Payload Object payload) {
      
              log.info("receive and consume logistics message: [{}]", payload.toString());
              LogisticsMessage logisticsMessage = JSON.parseObject(
                      payload.toString(), LogisticsMessage.class
              );
              EcommerceLogistics ecommerceLogistics = logisticsDao.save(
                      new EcommerceLogistics(
                              logisticsMessage.getUserId(),
                              logisticsMessage.getOrderId(),
                              logisticsMessage.getAddressId(),
                              logisticsMessage.getExtraInfo()
                      )
              );
              log.info("consume logistics message success: [{}]", ecommerceLogistics.getId());
          }
      }
      
      
0

评论区