延迟队列

延迟队列

Scroll Down

延迟队列与轮询

✉解决消息队列的不确定性

分布式事务的异步通信问题

  • 使用分布式事务异步通信的结构,一个很大的问题就是不确定性。一个消息发送过去了,不管结果如何发送端都不会原地等待接收端。直到接收端再推送回来回执消息,发送端才直到结果。但是也有可能发送端消息发送后,石沉大海,杳无音信。这时候就需要一种机制能够对这种不确定性进行补充

  • 比如你给有很多笔友,平时写信一去一回,但是有时候会遇到迟迟没有回信的情况。那么针对这种偶尔出现的情况,你可以选择两种策略。一种方案是你发信的时候用定个闹钟,设定1天以后去问一下对方收没收到信。另一种方案就是每天夜里定个时间查看一下所有发过信但是已经一天没收到回复的信。然后挨个打个电话问一下。

  • 第一种策略就是实现起来就是延迟队列,第二种策略就是定时轮询扫描。

    二者的区别是延迟队列更加精准,但是如果周期太长,任务留在延迟队列中时间的就会非常长,会把队列变得冗长。

    那么如果遇到这种长周期的事件,而且并不需要精确到分秒级的事件,可以利用定时扫描来实现,尤其是比较消耗性能的大范围扫描,可以安排到夜间执行。

应用场景

  • 当用户选择支付后,通常来说用户都会在支付宝正常支付,支付宝转账成功后,通过后台异步发送成功的请求到电商支付模块。

    但是如果用户点击支付后,支付模块可能会长时间没有收到支付宝的支付成功通知。这种情况会‘有两种可能性,一种是用户在弹出支付宝付款界面时没有继续支付,另一种就是用户支付成功了,但是因为网络等各种问题,支付模块没有收到通知

    如果是上述第二种可能性,对于用户来说体验是非常糟糕的,甚至会怀疑平台的诚信。

    所以为了尽可能避免第二种情况,在用户点击支付后一段时间后,不管用户是否付款,都要去主动询问支付宝,该笔单据是否付款。

sadasdas

业务使用

PaymentService接口

/**
     * 根据out-trde-no查询交易记录
     * @param paymentInfoQuery
     * @return
     */
    boolean checkPayment(PaymentInfo paymentInfoQuery);

PaymentServiceImpl

public boolean checkPayment(PaymentInfo paymentInfoQuery) {

        //AlipayClient alipayClient = new DefaultAlipayClient("https://openapi.alipay.com/gateway.do","app_id","your private_key","json","GBK","alipay_public_key","RSA2");
        AlipayTradeQueryRequest request = new AlipayTradeQueryRequest();

        if (paymentInfoQuery.getPaymentStatus().equals(ProcessStatus.CLOSED) || paymentInfoQuery.getPaymentStatus().equals(PaymentStatus.PAID)) {
            return true;
        }

        Map<String,Object> map = new HashMap<>();

        //在map中放入数据
        map.put("out_trade_no",paymentInfoQuery.getOutTradeNo());

        request.setBizContent(JSON.toJSONString(map));
//        request.setBizContent("{" +
//                "\"out_trade_no\":\"20150320010101001\"," +
//                "\"trade_no\":\"2014112611001004680 073956707\"," +
//                "\"org_pid\":\"2088101117952222\"," +
//                "      \"query_options\":[" +
//                "        \"TRADE_SETTLE_INFO\"" +
//                "      ]" +
//                "  }");
        AlipayTradeQueryResponse response = null;
        try {
            response = alipayClient.execute(request);
        } catch (AlipayApiException e) {
            e.printStackTrace();
        }
        if(response.isSuccess()){

            //判断交易是否成功
            if ("TRADE_SUCCESS".equals(response.getTradeStatus())||"TRADE_FINISHED".equals(response.getTradeStatus())) {

                //支付成功
                //更新状态
                PaymentInfo paymentInfo = new PaymentInfo();
                paymentInfo.setPaymentStatus(PaymentStatus.PAID);

                updatePaymentInfo(paymentInfoQuery.getOutTradeNo(),paymentInfo);
                //发送消息队列
                sendPaymentResult(paymentInfoQuery,"success");
                return true;

            }
            System.out.println("调用成功");
        } else {
            System.out.println("调用失败");
        }
        return false;
    }

controller测试

@RequestMapping("queryPaymentResult")
    @ResponseBody
    public String queryPaymentResult(String orderId) {

        //根据orderId查询paymentInfo
        PaymentInfo paymentInfo = new PaymentInfo();
        paymentInfo.setOrderId(orderId);
        PaymentInfo queryPaymentInfo1 = paymentService.getPaymentInfo(paymentInfo);
        boolean flag = paymentService.checkPayment(queryPaymentInfo1);
        return ""+flag;


    }

利用延迟队列反复调用查询接口。

  • 首先在消息队列中打开延迟队列配置:在activemq的conf目录下activemq.xml中

    vim /opt/activemq/conf/activemq.xml
    
    
  • 2020-07-30_161612

  • 重启activemq

    service activemq restart
    
  • 接口

    /**
         * 发送延迟队列
         * @param outTradeNo
         * @param delaySec
         * @param checkCount
         */
        void sendDelayPaymentResult(String outTradeNo,int delaySec ,int checkCount);
    
    
  • 实现类

     @Override
        public void sendDelayPaymentResult(String outTradeNo, int delaySec, int checkCount) {
    
            //创建工厂,获取连接
            Connection connection = activeMQUtil.getConnection();
    
            try {
    
                //开启链接
                connection.start();
    
                //创建session
                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
    
                //创建队列
                Queue payment_result_check_queue = session.createQueue("PAYMENT_RESULT_CHECK_QUEUE");
                //创建producer
                MessageProducer producer = session.createProducer(payment_result_check_queue);
    
                //创建消息对象
                ActiveMQMapMessage activeMQMapMessage = new ActiveMQMapMessage();
                activeMQMapMessage.setString("outTradeNo",outTradeNo);
                activeMQMapMessage.setInt("delaySec",delaySec);
                activeMQMapMessage.setInt("checkCount",checkCount);
    
                //设置延迟时间
                activeMQMapMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delaySec*1000);
    
                producer.send(activeMQMapMessage);
    
                //提交
                session.commit();
                //关闭
                closeAll(connection,session,producer);
    
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
  • 创建延迟队列消费者PaymentConsumer

    @Component
    public class PaymentConsumer {
    
    
        @Reference
        private PaymentService paymentService;
    
        @JmsListener(destination = "PAYMENT_RESULT_CHECK_QUEUE",containerFactory = "jmsQueueListener")
        public void consumeSkuDeduct(MapMessage mapMessage) throws JMSException {
    
            //获得数据
            String outTradeNo = mapMessage.getString("outTradeNo");
            int delaySec = mapMessage.getInt("delaySec");
            int checkCount = mapMessage.getInt("checkCount");
    
            //判断是否支付成功
            PaymentInfo paymentInfo = new PaymentInfo();
            paymentInfo.setOutTradeNo(outTradeNo);
            PaymentInfo paymentInfoQuery = paymentService.getPaymentInfo(paymentInfo);
            boolean checkPayment = paymentService.checkPayment(paymentInfoQuery);
    
            System.err.println(checkPayment);
    
            //循环检查消息队列
            if (!checkPayment && checkCount != 0) {
    
                //检查次数
                System.err.println(checkCount);
                paymentService.sendDelayPaymentResult(outTradeNo,delaySec,checkCount-1);
            }
        }
    }
    
    
  • 在生成支付宝二维码方法中添加

    @RequestMapping("alipay/submit")
        @ResponseBody
        public String alipaysubmit(HttpServletRequest request, HttpServletResponse httpResponse) {
    
            .......
            .......
            //A调用延迟队列
            paymentService.sendDelayPaymentResult(paymentInfo.getOutTradeNo(),15,3);
        }
    

轮询扫描

应用场景 :

  • 长期没有付款的订单,要定期关闭掉。

  • 如果时限比较小,比如30分钟未付款的订单就关闭(一般是锁了库存的订单),也可以用延时队列解决。

  • 如果时限比较长比如1-2天,可以选择用轮询扫描。

实现方式 spring task(定时任务)

测试

创建OrderTask类
@EnableScheduling
@Component
public class OrderTask {


    @Reference
    private OrderService orderService;

    //每分钟的第5秒执行该方法
    @Scheduled(cron = "5 * * * * ?")
    public void test01() {
        System.err.println(Thread.currentThread().getName()+"**************************01");
    }

    //每隔5秒执行一次
    @Scheduled(cron = "0/5 * * * * ?")
    public void test02() {
        System.err.println(Thread.currentThread().getName()+"********************************02");
    }
}
有关@Scheduled
0-59
0-59
小时0-23
日期1-31
月份1-12
星期1-7
年(可选)1970-2099

实际应用

  • 在OrderTask加入定时器关闭过期订单

        //每20秒执行一次
        @Scheduled(cron = "0/20 * * * * ?")
        public void checkOrder() {
    
            //关闭过期订单
            List<OrderInfo> orderInfoList = orderService.getExpiredOrderList();
    
            for (OrderInfo orderInfo : orderInfoList) {
    
                //处理过期订单
                orderService.execExpiredOrder(orderInfo);
            }
        }
    
  • 接口

     /**
         * 处理过期订单
         * @param orderInfo
         */
        void execExpiredOrder(OrderInfo orderInfo);
    
  • 实现类

    @Override
        @Async
        public void execExpiredOrder(OrderInfo orderInfo) {
    
            //更新订单状态为已关闭
            updateOrderStatus(orderInfo.getId(),ProcessStatus.CLOSED);
    
            //调用paymentService处理过期订单
            paymentService.closePayment(orderInfo.getId());
        }
    
  • paymentService接口中

     /**
         * 处理过期订单
         * @param id
         */
        void closePayment(String id);
    
  • 实现类

    @Override
        public void closePayment(String id) {
    
            Example example = new Example(PaymentInfo.class);
            example.createCriteria().andEqualTo("orderId",id);
            PaymentInfo paymentInfo = new PaymentInfo();
            paymentInfo.setPaymentStatus(PaymentStatus.ClOSED);
    
            paymentInfoMapper.updateByExampleSelective(paymentInfo,example);
        }
    

延迟队列和轮询该选择谁?

看精准度,看时间间隔,时间间隔短的用延迟队列,周期性比较长,用轮询!