SpringBoot+RabbitMQ削峰入门


前言

当大量的客户访问请求打到后端,去访问数据库等,瞬间会爆炸的。
经过前端或者其他的方案进行限流外。
还是有大量的请求,这个时候需要削峰了。

简单的削峰例子

先设置小一点,然后循环往队列里面放消息,消费的时候延迟2

spring:
  rabbitmq:
    host: 192.168.21.129
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      type: simple
      simple:
        prefetch: 1 # 消费者每次从队列获取的消息数量
        concurrency: 1 # 消费者数量
        max-concurrency: 1 # 启动消费者最大数量

调用生产者

        for (int i=0;i<10;i++){
            // 调用消息生产者
            sender.sender(msg+i);
        }

消费者

    @RabbitListener(queues = MQConfig.QUEUE)
    public void receiver(String msg) throws InterruptedException {
        Thread.sleep(2000L); // 模拟处理需要2秒
        log.info("消费者消费。。。。。。{}", msg);
    }

页面访问:

http://localhost:8088/hello?msg=GMaya

此时消息会全部放到列队,但是会一条一条消费。简单的实现了削峰处理


调整消费者的数量

        prefetch: 1 # 消费者每次从队列获取的消息数量
        concurrency: 2 # 消费者数量
        max-concurrency: 10 # 启动消费者最大数量

此时就会有两个消费者同时去消费队列中的消息。所以这个消费者数量需要根据实际的情况去设置所能承受的一个值,也就是峰值。

重试策略

如果说消费者在消费的过程中失败了,那么会一直消费,一直到成功为止。

但是也可以添加重试策略,比如失败三次就不在消费了。

    listener:
      type: simple
      simple:
        prefetch: 1 # 消费者每次从队列获取的消息数量
        concurrency: 2 # 消费者数量
        max-concurrency: 10 # 启动消费者最大数量
        # 重试策略相关配置
        retry:
          enabled: true #开启消费者重试
          max-attempts: 2 #最大重试次数
          initial-interval: 2000 #重试间隔时间

模拟异常

    @RabbitListener(queues = MQConfig.QUEUE)
    public void receiver(String msg) throws InterruptedException {
        Thread.sleep(2000L); // 模拟处理需要2秒
        if("GMaya8".equals(msg)){
            System.out.println(1/0);
        }
        log.info("消费者消费。。。。。。{}", msg);
    }

此时的结果就是其他的消息被消费,但是这个GMaya8没了,失败三次之后就没了。队列中也消失了。

如何保证rabbitmq消息不丢失

丢失数据场景:

  1. 生产者没有生产成功,即生产者丢失
  2. rabbitmq丢失了
  3. 消费端丢失,即消费端没消费成功。

开启confirm回调,启动手动确定消息消费。

server:
  port: 8088
spring:
  rabbitmq:
    host: 192.168.21.129
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-returns: true # 实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发
    publisher-confirm-type: correlated
    listener:
      type: simple
      simple:
        acknowledge-mode: manual # 消息消费确认,可以手动确认
        prefetch: 1 # 消费者每次从队列获取的消息数量
        concurrency: 2 # 消费者数量
        max-concurrency: 10 # 启动消费者最大数量
        # 重试策略相关配置
        retry:
          enabled: true #开启消费者重试
          max-attempts: 3 #最大重试次数
          initial-interval: 2000 #重试间隔时间
    template:
      #在消息没有被路由到合适队列情况下会将消息返还给消息发布者
      #当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
      # 那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,
      # 出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
      # 否则就将消息return给发送者;
      #: true # 启用强制信息
      mandatory: true

使用交换机模式
生产者

@Component
@Slf4j
public class Sender implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public Sender(RabbitTemplate rabbitTemplate){
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * fanout 模式
     * @param msg
     */
    public void fanoutSender(String msg) {
        log.info("fanout生产者生产消息。。。。。{}", msg);
        // 会把消息发送给 所有绑定到此交换机的全部列队;routing_key会被忽略。
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg, correlationData);
    }

    @Override public void confirm(CorrelationData correlationData, boolean b, String s) {
        if(b){
            log.info("消息生产成功");
        }else{
            log.info("消息生产失败");
            // 可以自己写重新发送消息
        }
    }
}

消费者

    @RabbitListener(queues = MQConfig.QUEUE_A)
    public void receiverA(String msg,Message message, Channel channel) throws IOException {
        log.info("QUEUE_A消费者消费。。。。。。{}", msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

    @RabbitListener(queues = MQConfig.QUEUE_B)
    public void receiverB(String msg,Message message, Channel channel) throws IOException {
        log.info("QUEUE_B消费者消费。。。。。。{}", msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

这个时候已经解决出现问题,倒是消息消失的情况。当处理好问题之后,重启就会重新消费


文章作者: GMaya
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 GMaya !
评论
 上一篇
分布式雪花算法工具类 分布式雪花算法工具类
SnowFlake 算法,是 Twitter 开源的分布式 id 生成算法。 SnowFlake算法的优点: 高性能高可用:生成时不依赖于数据库,完全在内存中生成。 容量大:每秒中能生成数百万的自增ID。 ID自增:存入数据库中,索引
2020-04-21
下一篇 
给hexo博客加404公益页面 给hexo博客加404公益页面
腾讯公益404页面,寻找丢失儿童,让大家一起关注此项公益事业!效果如:https://gmaya.top/404.html 目的:如果访问网站不存在的页面,则返回腾讯公益404页面 使用方法,新建 404.html 页面,放到主题的 s
2020-04-19
  目录