RabbitMQ
RabbitMQ 四种集群架构
- 主备模式
- 远程模式
- 镜像模式
- 多活模式
主备模式
主备方案(主节点如果挂了,从节点提供服务,和 ActiveMQ 利用 ZooKeeper 做主备一样)RabbitMQ 的主备模式是采用 HAProxy 做的
主备模式-HaProxy配置
listen rabbitmq_cluster # 主备集群的名字
bind 0.0.0.0:5672 # 配置 tcp 模式
mode tcp
balance roudrobin # 简单的轮询
server bhz76 192.168.11.76:5672 check inter 5000 rise 2 fall 2 # 主节点
server bhz77 192.168.11.77:5672 backup check inter 5000 rise 2 fall 2
远程模式
- 远距离通信和复制,可以实现双活的一种模式,简称Shovel模式
- 所谓 Shovel 就是我们可以把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联
Shovel模式启动步骤
- 启动 rabbitMQ 插件:
- rabbitmq-plugins enbale amqp_client
- rabbitmq-plugins enbale rabbitmq_shovel
- 创建 rabbitmq,conf 文件
- touch /etc/rabbitmq/rabbitmq.config
- 添加配置
- 源与目的服务器使用相同的配置文件 (rabbitmq.config)
镜像模式
- 集群模式非常经典的就是 Mirror 镜像模式,保证 100% 数据不丢失
Mirror 镜像队列
RabbitMQ集群架构图
多活模式
-
这种模式也是实现异地数据复制的主流模式,因为 Shovel 模式配置比较复杂,所以一般来说实现异地集群都是使用多活模型来实现的
-
这种模型需要以来 RabbitMQ 的 federation 插件, 可以实现持续的可靠的 AMQP 数据通信,多活模式实际配置与应用非常简单。
-
RabbitMQ 部署架构采用双中心模式 (多中心),在两套(或多套)数据中心各部署一套 RabbitMQ 集群,各中心的 RabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分消息队列消息共享
多活模式架构图
AMQP 核心概念
-
Server : 又称 Broker,接受客户端的链接,实现 AMQP 实体服务
-
Connection:连接,应用程序与 Broker 的网络链接
-
Channel:网络信道,几乎所有的操作都在 Channel 中进行,Channel 是进行消息读写的通道。客户端可建立多个 Channel ,每个 Channel 代表一个会话任务。
-
Message:消息,服务器和应用程序之间传递的数据,由 Properties 和 body 组成。 Properties 可以对消息进行修饰,比如消息的优先级、延迟等高级特性; Body 则是消息的内容。
-
Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个 Exchange 和 Queue,同一个虚拟主机里面不能有相同名称的 Exchange 或 Queue。
-
Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ 常用的交换器常用类型有 direct、topic、fanout、headers 四种,后面详细介绍。
-
Exchange(交换机)属性:
Type:交换机类型 direct、topic、fanout、headers
Durability:是否需要持久化 ,默认为 true 表示持久化
Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
Arguments:扩展参数,用于扩展AMQP协议自制定化使用Name:交换机名称
-
-
Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个 RoutingKey。
-
RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个 RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个 “.” 分割的字符串,例如“com.rabbitmq”。
-
Queue:消息队列,用来保存消息,供消费者消费。
消息如何保障 100% 的投递成功
生产端的可靠性投递
- 保障消息的成功发出
- 保障MQ节点的成功接收
- 发送端收到MQ节点(Broker)的确认应答
- 完善的消息进行补偿机制
可行的解决方案
- 消息落库,对消息状态进行标记
- 消息的延迟投递,做二次确认,回调检查
-
对消息状态进行标记
-
消息的延迟投递,做二次确认,回调检查
消费端-幂等性保障
在 Broker 重试投递的情况下,如何避免消息重复消费
- 消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息
主流的幂等性保证
- 唯一ID + 指纹码 使用数据库主键去重
- 利用 redis 的原子性去实现
- 唯一ID + 指纹码
- 唯一ID + 指纹码 使用数据库主键去重
- select count(1) from T_ORDER where ID = 唯一ID + 指纹码
- 好处:实现简单
- 坏处:高并发下有数据库写入的性能瓶颈
- 解决方案:根据 ID 进行分库分表进行算法路由
- 利用 redis 的原子性去实现
rabbitMQ 和 Springboot 整合
生产端核心配置
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
消费端的配置
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
@RabbitListener 的使用
rabbit-producer
server:
port: 7000
servlet:
context-path: /
spring:
application:
name: rabbit-producer
rabbitmq:
addresses: 192.168.0.103:5672
username: guest
password: guest
virtual-host: /
# 设置 return 消息模式,注意和 mandatory 一起使用
publisher-returns: true
template:
mandatory: true
publisher-confirm-type: correlated
connection-timeout: 15000
@Configuration
public class RabbitMqConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private Integer port;
@Value("${spring.rabbitmq.addresses}")
private String addresses;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
private static final Integer TTL_TIME = 1800;
private static final String MES_LOT_HISTORY_SAVE_QUEUE = "MES_LOT_HISTORY_SAVE_QUEUE";
private static final String MES_LOT_HISTORY_SAVE_DLX_QUEUE = "MES_LOT_HISTORY_SAVE_DLX_QUEUE";
private static final String MES_DIRECT_EXCHANGE = "MES_DIRECT_EXCHANGE";
private static final String MES_DIRECT_DLX_EXCHANGE = "MES_DIRECT_DLX_EXCHANGE";
private static final String MES_LOT_HISTORY_SAVE_KEY = "MES_LOT_HISTORY_SAVE_KEY";
private static final String MES_LOT_HISTORY_SAVE_DLX_KEY = "MES_LOT_HISTORY_SAVE_DLX_KEY";
/**
* 中心节点rabbitmq连接
*
* @return
*/
@Bean(name = "connectionFactory")
@Primary
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
// 确认消息发送到交换器
connectionFactory.setPublisherConfirmType(CORRELATED);
connectionFactory.setAddresses(addresses);
// 确认消息发送到队列
connectionFactory.setPublisherReturns(true);
connectionFactory.setChannelCacheSize(50);
return connectionFactory;
}
/**
* rabbitAdmin管理类
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
/**
* rabbitmq监听工厂类
*
* @param connectionFactory
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
/**
* rabbitmq json转换器
*
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
}
/**
* 中心节点rabbitmq操作模板类
*
* @param connectionFactory
* @return
*/
@Bean(name = "rabbitTemplate")
public RabbitTemplate rabbitTemplate(@Qualifier("connectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
/**
*
* @param correlationData correlation data 唯一标识
* @param ack true for ack, false for nack 是否发送到交换机
* @param cause 异样原因
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
});
/**
* 确认消息从 broker 发送到 queue
*/
rabbitTemplate.setReturnsCallback(returned -> log.info("[{}],[{}],[{}],[{}],[{}]",
returned.getMessage(),returned.getReplyCode(),
returned.getReplyText(),returned.getExchange(),returned.getRoutingKey()));
return rabbitTemplate;
}
/**
* mes交换机
*
* @return
*/
@Bean
public DirectExchange mesDirectExchange() {
return new DirectExchange(MES_DIRECT_EXCHANGE);
}
/**
* mes死信交换机
*
* @return
*/
@Bean
public DirectExchange mesDirectDlxExchange() {
return new DirectExchange(MES_DIRECT_DLX_EXCHANGE);
}
/**
* mes测试队列
*
* @return
*/
@Bean
public Queue mesLotHistorySaveQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", MES_DIRECT_DLX_EXCHANGE);
arguments.put("x-dead-letter-routing-key", MES_LOT_HISTORY_SAVE_DLX_KEY);
arguments.put("x-message-ttl", TTL_TIME);
return new Queue(MES_LOT_HISTORY_SAVE_QUEUE, true, false, false, arguments);
}
/**
* mes测试队列
*
* @return
*/
@Bean
public Queue mesLotHistorySaveDlxQueue() {
return new Queue(MES_LOT_HISTORY_SAVE_DLX_QUEUE, true, false, false);
}
/**
* mes测试队列key
*
* @return
*/
@Bean
public Binding bindingMesLotHistorySaveQueue() {
return BindingBuilder.bind(mesLotHistorySaveQueue()).to(mesDirectExchange()).with(MES_LOT_HISTORY_SAVE_KEY);
}
/**
* mes测试队列key
*
* @return
*/
@Bean
public Binding bindingMesLotHistorySaveDlxQueue() {
return BindingBuilder.bind(mesLotHistorySaveDlxQueue()).to(mesDirectDlxExchange()).with(MES_LOT_HISTORY_SAVE_DLX_KEY);
}
}
----------------------------------------------------------------------------------
@Component
@Slf4j
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息的方法
* @param message 消息体
* @param properties 额外属性
* @throws Exception
*/
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message<?> msg = MessageBuilder.createMessage(message, messageHeaders);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
MessagePostProcessor messagePostProcessor = message1 -> {
log.info(JSON.toJSONString(message1));
return message1;
};
rabbitTemplate.convertAndSend("exchange-1","springboot.rabbit",
msg,messagePostProcessor,correlationData);
}
}
一些 rabbit
的配置实例:
https://blog.csdn.net/weixin_43606226/article/details/114372349?spm=1001.2014.3001.5506
rabbit-consumer
server:
port: 7001
servlet:
context-path: /
spring:
application:
name: rabbit-consumer
rabbitmq:
addresses: 192.168.0.103:5672
username: guest
password: guest
virtual-host: /
listener:
simple:
concurrency: 1
max-concurrency: 5
acknowledge-mode: manual
# 每个 consumer 最大未处理消息的积压数
prefetch: 1
retry:
enabled: true
max-attempts: 3
initial-interval: 2000
connection-timeout: 15000
@Component
@Slf4j
public class RabbitConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitHandler
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1",durable = "true"),
exchange = @Exchange(value = "exchange-1",durable = "true",
type = "topic",ignoreDeclarationExceptions = "true"),
key = "springboot.*"
)
)
public void onMesssage(Message message, Channel channel) throws Exception {
log.info("message is : [{}]", JSON.toJSONString(message.getPayload()));
// 手动进行消息签收
// 获取 deliveryTag
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false);
}
}
RabbitMQ基础组件封装
基础组件要实现的功能点
https://github.com/guankang1314/rabbit-parent
延迟队列插件
step1:upload the ‘rabbitmq_delayed_message_exchange-0.0.1.ez’ file:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
http://www.rabbitmq.com/community-plugins.html
step2:PUT Directory:
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/plugins
step3:Then run the following command:
Start the rabbitmq cluster for command ## rabbitmq-server -detached
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
访问地址:http://192.168.1.21:15672/#/exchanges,添加延迟队列
评论区