Skip to content

rabbitmq

rabbitmq是一种分布式消息中间件, 常用与上下游的“逻辑解耦+物理解耦”(异步)。

异步调用的优势

  • 耦合度低,拓展性强
  • 异步调用,无需等待,性能好
  • 故障隔离,下游服务故障不影响上游业务
  • 缓存消息,流量削谷填峰

异步调用的问题

  • 不能立即得到调用结果,时效性差
  • 不确定下游业务执行是否成功
  • 业务安全依赖于Broker的可靠性

启动命令

shell
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:latest

提示

rabbitmq的后台管理页面默认是关闭的,需要手动开启插件:rabbitmq-plugins enable rabbitmq_management

消息发送模型

rabbitmq_01

  • 消息发送者
  • 交换机
  • 队列
  • 消息消费者
  • 路由键
  • 虚拟主机

消费模型

  1. work模型
  • 多个消费者绑定到一个队列,可以加块消息处理速度。
  • 同一个消息只会被一个消费者处理。
  • 通过设置prefetch来控制消费者预取消息的数量,处理完一条再处理下一条(可以一定程度上解决消息堆积问题)。

交换机

Fanout(广播)

  • 将消息广播到每一个跟其绑定的queue

Direct(定向)

  • 会将接收到的消息按照RoutingKey投递到指定的队列
  • 每个Queue都应该与Exchange绑定一个BindingKey

Topic(主题)

  • RoutingKey是多个单词列表以.分割
  • Queue与Exchange指定BindingKey时可以使用通配符
    • #: 0个或者多个单词
    • *: 一个单词

消息可靠性问题

rabbitmq_03

在正常的业务流程中可能会存在一系列问题

  • publisher和mq之间交互出现故障
  • mq宕机导致消息没有投递出去
  • consumer收到消息处理的时候异常

发送者的可靠性

  1. 生产者重连 由于网络波动可能出现客户端连接mq失败的情况,可以通过配置开启连接失败后的重连机制
yaml
spring:
  application:
    name: publisher
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: user
    password: password
    virtual-host: /warmwind
    listener:
      simple:
        prefetch: 1 # 每次只处理一个消息
    connection-timeout: 1s #超时时间
    template:
      retry:
        enabled: true #开启重试
        initial-interval: 1s # 失败后的初始等待时间
        multiplier: 1 # 失败后下次等待时长倍数, 下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

提示

当网络不稳定的时候,可以利用重试机制提高消息发送的成功率,不过springamqp的重试是阻塞式的会影响业务性能, 如果对业务性能有要求,建议禁用重试机制,如果一定要使用,需要配置合理的等待时长和重试次数,也可以考虑用异步线程来执行发送消息的业务

  1. 生产者确认

rabbitmq提供了publisher confirmpublisher return两种确认机制,开启确认机制后,在mq成功收到消息后会返回确认消息给生产者

  • 消息投递到了mq,但是路由失败,此时会通过publisher return返回路由异常原因,然后返回ack,告知投递成功(出现这种情况要么是交换机没有正常绑定队列,要么是代码写的有问题,可以在开发层面避免,一般不会出现)
  • 临时消息投递到了mq,并且入队成功,返回ack,告知投递成功
  • 持久化消息投递到了mq,并且入队完成持久化成功,返回ack,告知投递成功
  • 其他情况都会返回nack,告知投递失败
yaml
spring: 
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制(none: 关闭,simple:同步阻塞等待mq回执,correlated:mq异步回调)
    publisher-returns: true # 开启publisher returns机制(一般情况不用开启)

publisher return(每个RabbitTemplate只能配置一个ReturnsCallback):

java
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnsCallback(returnedMessage -> System.out.println("消息被退回:" + returnedMessage));
    }
}

publisher confirm(发送消息,指定消息id,消息ConfirmCallback):

java
@Test
void contextLoads() {
    String uuid = IdUtil.fastSimpleUUID();
    CorrelationData cd = new CorrelationData(uuid);
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            System.out.println("send msg success,id: " + correlationData.getId());
        } else {
            System.out.println("send msg err,id: " + cd.getId() + ", reason: " + cause);
        }
    });
    rabbitTemplate.convertAndSend("ww.direct.exchange", "notice", "test notice", cd);;
}

提示

生产者确认机制需要额外的网络和系统资源开销,一般不使用
如果一定要使用无需开启publisher return机制,路由失败一般是业务问题
对于nack消息可以使用有限重试,依然失败则记录异常消息

mq的可靠性

在默认情况下,mq将收到的消息放到内存里,降低消息的延迟,但是会导致一些问题:

  • mq挂掉后消息丢失
  • 内存空间有限,当消费者故障或者处理太慢的情况下会造成消息积压,引发mq阻塞

解决方案:

数据持久化

  • 交换机持久化
  • 队列持久化
  • 消息持久化(springboot发送消息默认是持久化的)

Lazy Queue(rabbitmq3.6.0后引入,3.12.0后默认启用无法更改)

  • 接收到消息直接写入磁盘而非内存,内存中只保留最近的消息默认2048条
  • 消费者需要消费时才会从磁盘从取出并加载
  • 支持百万消息的存储

消费者的可靠性

为了确认消费者是否成功处理消息,rabbitmq提供了消费者确认机制,当消费者处理消息后,应该向mq 发送一个回执,告知mq消息处理完毕,回执有三种状态:

  • ack:成功处理消息,rabbitmq中队列中删除消息
  • nack:消息处理失败,rabbitmq需要再次投递消息
  • reject:消息处理失败并拒绝消息,rabbitmq从队列中删除消息

sringamqp以已经默认实现了,通过配置文件选择ack处理方式:

  • none:不处理,消息投递后消费者立马返回ack,消息立刻从mq删除,非常不安全,不建议使用
  • manual:手动模式,需要在业务代码中调用api,发送ack或者reject,存在业务入侵,但是更灵活
  • auto:自动模式,springamqp,利用aop对消息处理逻辑做了增强,当业务执行正常的时候返回ack,当业务出现异常时,根据异常返回不同结果
    • 业务异常:返回nack
    • 消息处理或者校验异常:返回reject
yaml
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动确认

失败重试机制:当消费者消费失败并且返回nack时,消息会不断重新入队,然后再次异常,无限循环,导致mq的消息处理彪升,带来不必要的压力, 可以利用spring的本地消费重试机制,而不是无限mq队列

yaml
spring:
  rabbitmq:
    template:
      retry:
        enabled: true #开启重试
        initial-interval: 1s # 失败后的初始等待时间
        multiplier: 1 # 失败后下次等待时长倍数, 下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数
        stateless: true # 如果业务有事务这里为false

在开启消费重试之后,重试次数耗尽,如果消息依然失败,则由MessageRecoverer接口处理,包含三种实现

  • RejectAndDontRequeueRecoverer:重试耗尽,直接reject,丢弃消息,默认
  • ImmediateRequeueMessageRecoverer:重试耗尽,返回nack,重新入队
  • RepublishMessageRecoverer:重试耗尽,失败消息投递到指定的交换机

业务幂等性

同一个业务,执行一次或者执行多次对业务状态的影响是一致的

唯一消息id

  1. 给每条消息都生产一个唯一的业务标识,与消息一起投递给消费者
  2. 消费者收到消息后,处理自己的业务,业务处理成功后将处理成功的消息id保存到数据库
  3. 如果下次又收到相同的消息,去数据库判断是否存在,存在则为重复消息直接放弃
java
// 通过MessageConverter实现,如果后续需要改为如雪花id,则可以直接配置后置处理器
@Bean
public MessageConverter messageConverter() {
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

基于业务本身判断是否有幂等性

延迟消息

  • 生产者发送消息时,指定一个时间,在一段时间后才会收到消息
  • 在一段时间后才执行的任务

死信交换机

当一个队列中的任意一条消息满足下列情况之一就会成为死信:

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的request参数设为false
  • 消息是一个过期消息,达到了队列或消息设置的时间,超时无人消费
  • 要投递的队列满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到该交换机中。可以用来实现延时任务

  1. 消费者投递一条延时消息
  2. 当消息在队列里过期后被投递到死信交换机
  3. 死信交换机投递到死信队列
  4. 消费者消费

rabbitmq_04.png

延迟消息插件

rabbitmq官方提供的插件,原生支持延迟消息,原理是设计了一种支持延迟消息的交换机,当消息投递到交换机可以暂存一段时间, 到期后再投递到队列

java
// consumer
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue"), declare = "true",
        exchange = @Exchange(name = "delay.direct", type = ExchangeTypes.DIRECT, delayed = "true"),
        key = {"delay"}
))
public void listenDlxQueue(String message) {
    System.err.println("延迟消息:" + message);
}

// publisher
@Test
void sendTTLMessage() {
    rabbitTemplate.convertAndSend("delay.direct", "delay", "delay message", message -> {
        message.getMessageProperties().setDelayLong(10000L);
        return message;
    });
}

// 或者 
private static Message postProcessMessage(Message message) {
    message.getMessageProperties().setDelayLong(10000L);
    return message;
}

@Test
void sendTTLMessage() {
    rabbitTemplate.convertAndSend("delay.direct", "delay", "delay message", PublisherApplicationTests::postProcessMessage);
}

// 上述方法调用等价于
// message -> PublisherApplicationTests.postProcessMessage(message)

延迟消息弊端

基本上所有的的定时功能(redis除外)都会有一些性能损耗,因为在程序内部维护一个时钟,每隔一段时间往前跳一次, 当定时任务很多的时候每个定时任务都会维护一个自己的时钟,就需要cpu不断计算,定时任务越多,对于cpu占用越多,所以定时任务属于cpu密集型任务, 所以采用延迟消息就会给服务器的压力带来额外的压力,特别是如果有非常多的延迟消息的时候,就会给服务器带来很多的压力, 所以定时任务(延迟消息)不能将时间设置的很长,所以mq的延迟消息只适合延迟时间较短的场景

延迟消息引申

rabbitmq_05.png

  • 用户下单后,到交易服务,交易服务新增一个15分钟的延迟消息,到mq,但是后续会有一些问题,在大多数情况下,用户下完单后会马上付款, 所以如果每一个订单都发15分钟的延时消息就会浪费很多cpu
  • 根据业务考虑可以将15分钟的定时任务拆成一个个小的定时任务比如10s,如果用户10s后还是没有支付,那么就再重新发布一个延时消息, 可以逐步增加这个延时消息的时常, 这样可以减轻mq的压力

rabbitmq_06.png