侧边栏壁纸
博主头像
qingtian博主等级

喜欢是一件细水流长的事,是永不疲惫的双向奔赴~!

  • 累计撰写 100 篇文章
  • 累计创建 48 个标签
  • 累计收到 1 条评论

RabbitMQ使用以及特性

qingtian
2022-10-25 / 0 评论 / 0 点赞 / 45 阅读 / 9,478 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-10-25,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

RabbitMQ

RabbitMQ 四种集群架构

  • 主备模式
  • 远程模式
  • 镜像模式
  • 多活模式

主备模式

主备方案(主节点如果挂了,从节点提供服务,和 ActiveMQ 利用 ZooKeeper 做主备一样)RabbitMQ 的主备模式是采用 HAProxy 做的

image-20220919233347260

主备模式-HaProxy配置

listen rabbitmq_cluster # 主备集群的名字
bind 0.0.0.0:5672 # 配置 tcp 模式
mode tcp 
balance roudrobin # 简单的轮询
server bhz76 192.168.11.76:5672 check inter 5000 rise 2 fall 2  # 主节点
server bhz77 192.168.11.77:5672 backup check inter 5000 rise 2 fall 2

远程模式

  • 远距离通信和复制,可以实现双活的一种模式,简称Shovel模式
  • 所谓 Shovel 就是我们可以把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联

Shovel模式启动步骤

  1. 启动 rabbitMQ 插件:
    • rabbitmq-plugins enbale amqp_client
    • rabbitmq-plugins enbale rabbitmq_shovel
  2. 创建 rabbitmq,conf 文件
    • touch /etc/rabbitmq/rabbitmq.config
  3. 添加配置
  4. 源与目的服务器使用相同的配置文件 (rabbitmq.config)

image-20220925101811222

镜像模式

  • 集群模式非常经典的就是 Mirror 镜像模式,保证 100% 数据不丢失

Mirror 镜像队列

image-20220925102043313

RabbitMQ集群架构图

image-20220925102121201

多活模式

  • 这种模式也是实现异地数据复制的主流模式,因为 Shovel 模式配置比较复杂,所以一般来说实现异地集群都是使用多活模型来实现的

  • 这种模型需要以来 RabbitMQ 的 federation 插件, 可以实现持续的可靠的 AMQP 数据通信,多活模式实际配置与应用非常简单。

  • RabbitMQ 部署架构采用双中心模式 (多中心),在两套(或多套)数据中心各部署一套 RabbitMQ 集群,各中心的 RabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分消息队列消息共享

多活模式架构图

image-20220925104301825

AMQP 核心概念

  • Server : 又称 Broker,接受客户端的链接,实现 AMQP 实体服务

  • Connection:连接,应用程序与 Broker 的网络链接

  • Channel:网络信道,几乎所有的操作都在 Channel 中进行,Channel 是进行消息读写的通道。客户端可建立多个 Channel ,每个 Channel 代表一个会话任务。

  • Message:消息,服务器和应用程序之间传递的数据,由 Properties 和 body 组成。 Properties 可以对消息进行修饰,比如消息的优先级、延迟等高级特性; Body 则是消息的内容。

  • Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个 Exchange 和 Queue,同一个虚拟主机里面不能有相同名称的 Exchange 或 Queue。

  • Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ 常用的交换器常用类型有 direct、topic、fanout、headers 四种,后面详细介绍。

    • Exchange(交换机)属性:

      Type:交换机类型 direct、topic、fanout、headers

      Durability:是否需要持久化 ,默认为 true 表示持久化

      Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange

      Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False

      Arguments:扩展参数,用于扩展AMQP协议自制定化使用Name:交换机名称

  • Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个 RoutingKey。

  • RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个 RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个 “.” 分割的字符串,例如“com.rabbitmq”。

  • Queue:消息队列,用来保存消息,供消费者消费。

消息如何保障 100% 的投递成功

生产端的可靠性投递

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)的确认应答
  • 完善的消息进行补偿机制

可行的解决方案

  • 消息落库,对消息状态进行标记
  • 消息的延迟投递,做二次确认,回调检查
  1. 对消息状态进行标记

    image-20220925165430247

  2. 消息的延迟投递,做二次确认,回调检查

    image-20220925170530742

消费端-幂等性保障

在 Broker 重试投递的情况下,如何避免消息重复消费

  • 消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息

主流的幂等性保证

  • 唯一ID + 指纹码 使用数据库主键去重
  • 利用 redis 的原子性去实现
  1. 唯一ID + 指纹码
    • 唯一ID + 指纹码 使用数据库主键去重
    • select count(1) from T_ORDER where ID = 唯一ID + 指纹码
    • 好处:实现简单
    • 坏处:高并发下有数据库写入的性能瓶颈
    • 解决方案:根据 ID 进行分库分表进行算法路由
  2. 利用 redis 的原子性去实现

rabbitMQ 和 Springboot 整合

生产端核心配置

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

消费端的配置

spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5

@RabbitListener 的使用

image-20220925182123360

rabbit-producer

server:
  port: 7000
  servlet:
    context-path: /

spring:
  application:
    name: rabbit-producer
  rabbitmq:
    addresses: 192.168.0.103:5672
    username: guest
    password: guest
    virtual-host: /
    # 设置 return 消息模式,注意和 mandatory 一起使用
    publisher-returns: true
    template:
      mandatory: true
    publisher-confirm-type: correlated
    connection-timeout: 15000


@Configuration
public class RabbitMqConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private Integer port;

    @Value("${spring.rabbitmq.addresses}")
    private String addresses;
    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    private static final Integer TTL_TIME = 1800;

    private static final String MES_LOT_HISTORY_SAVE_QUEUE = "MES_LOT_HISTORY_SAVE_QUEUE";
    private static final String MES_LOT_HISTORY_SAVE_DLX_QUEUE = "MES_LOT_HISTORY_SAVE_DLX_QUEUE";
    private static final String MES_DIRECT_EXCHANGE = "MES_DIRECT_EXCHANGE";
    private static final String MES_DIRECT_DLX_EXCHANGE = "MES_DIRECT_DLX_EXCHANGE";
    private static final String MES_LOT_HISTORY_SAVE_KEY = "MES_LOT_HISTORY_SAVE_KEY";
    private static final String MES_LOT_HISTORY_SAVE_DLX_KEY = "MES_LOT_HISTORY_SAVE_DLX_KEY";


    /**
     * 中心节点rabbitmq连接
     *
     * @return
     */
    @Bean(name = "connectionFactory")
    @Primary
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        // 确认消息发送到交换器
        connectionFactory.setPublisherConfirmType(CORRELATED);
        connectionFactory.setAddresses(addresses);
        // 确认消息发送到队列
        connectionFactory.setPublisherReturns(true);
        connectionFactory.setChannelCacheSize(50);
        return connectionFactory;
    }

    /**
     * rabbitAdmin管理类
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    /**
     * rabbitmq监听工厂类
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    /**
     * rabbitmq json转换器
     *
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
    }

    /**
     * 中心节点rabbitmq操作模板类
     *
     * @param connectionFactory
     * @return
     */
    @Bean(name = "rabbitTemplate")
    public RabbitTemplate rabbitTemplate(@Qualifier("connectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        /**
         *
         * @param correlationData correlation data 唯一标识
         * @param ack true for ack, false for nack 是否发送到交换机
         * @param cause 异样原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

        });
         /**
         * 确认消息从 broker 发送到 queue
         */
        rabbitTemplate.setReturnsCallback(returned -> log.info("[{}],[{}],[{}],[{}],[{}]",
                returned.getMessage(),returned.getReplyCode(),
                returned.getReplyText(),returned.getExchange(),returned.getRoutingKey()));
        return rabbitTemplate;
    }

    /**
     * mes交换机
     *
     * @return
     */
    @Bean
    public DirectExchange mesDirectExchange() {
        return new DirectExchange(MES_DIRECT_EXCHANGE);
    }

    /**
     * mes死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange mesDirectDlxExchange() {
        return new DirectExchange(MES_DIRECT_DLX_EXCHANGE);
    }

    /**
     * mes测试队列
     *
     * @return
     */
    @Bean
    public Queue mesLotHistorySaveQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", MES_DIRECT_DLX_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", MES_LOT_HISTORY_SAVE_DLX_KEY);
        arguments.put("x-message-ttl", TTL_TIME);
        return new Queue(MES_LOT_HISTORY_SAVE_QUEUE, true, false, false, arguments);
    }

    /**
     * mes测试队列
     *
     * @return
     */
    @Bean
    public Queue mesLotHistorySaveDlxQueue() {
        return new Queue(MES_LOT_HISTORY_SAVE_DLX_QUEUE, true, false, false);
    }

    /**
     * mes测试队列key
     *
     * @return
     */
    @Bean
    public Binding bindingMesLotHistorySaveQueue() {
        return BindingBuilder.bind(mesLotHistorySaveQueue()).to(mesDirectExchange()).with(MES_LOT_HISTORY_SAVE_KEY);
    }

    /**
     * mes测试队列key
     *
     * @return
     */
    @Bean
    public Binding bindingMesLotHistorySaveDlxQueue() {
        return BindingBuilder.bind(mesLotHistorySaveDlxQueue()).to(mesDirectDlxExchange()).with(MES_LOT_HISTORY_SAVE_DLX_KEY);
    }

}

----------------------------------------------------------------------------------
@Component
@Slf4j
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息的方法
     * @param message 消息体
     * @param properties 额外属性
     * @throws Exception
     */
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders messageHeaders = new MessageHeaders(properties);
        Message<?> msg = MessageBuilder.createMessage(message, messageHeaders);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        MessagePostProcessor messagePostProcessor = message1 -> {
            log.info(JSON.toJSONString(message1));
            return message1;
        };
        rabbitTemplate.convertAndSend("exchange-1","springboot.rabbit",
                msg,messagePostProcessor,correlationData);
    }
}

一些 rabbit的配置实例:

https://blog.csdn.net/weixin_43606226/article/details/114372349?spm=1001.2014.3001.5506

rabbit-consumer

server:
  port: 7001
  servlet:
    context-path: /

spring:
  application:
    name: rabbit-consumer
  rabbitmq:
    addresses: 192.168.0.103:5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        concurrency: 1
        max-concurrency: 5
        acknowledge-mode: manual
        # 每个 consumer 最大未处理消息的积压数
        prefetch: 1
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 2000
    connection-timeout: 15000

@Component
@Slf4j
public class RabbitConsumer {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitHandler
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1",durable = "true"),
            exchange = @Exchange(value = "exchange-1",durable = "true",
                    type = "topic",ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
        )
    )
    public void onMesssage(Message message, Channel channel) throws Exception {
        log.info("message is : [{}]", JSON.toJSONString(message.getPayload()));
        // 手动进行消息签收
        // 获取 deliveryTag
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag,false);
    }

}

RabbitMQ基础组件封装

基础组件要实现的功能点

image-20221015205828275

https://github.com/guankang1314/rabbit-parent

延迟队列插件

step1:upload the ‘rabbitmq_delayed_message_exchange-0.0.1.ez’ file:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

http://www.rabbitmq.com/community-plugins.html

step2:PUT Directory:

/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/plugins

step3:Then run the following command:

Start the rabbitmq cluster for command ## rabbitmq-server -detached

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

img

访问地址:http://192.168.1.21:15672/#/exchanges,添加延迟队列

image-20221025080555038

0

评论区