风萧萧兮易水寒

Spring Boot整合RabbitMQ入门

前言

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

安装完MQ,就该写个例子研究研究了。
新建Spring Boot 项目。

修改pom文件

新增RabbitMQ 支持

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

修改配置文件

默认网页guest用户是不允许访问的,需要修改一下权限。上篇有写

server:
  port: 8088
spring:
  rabbitmq:
    host: 192.168.21.129
    port: 5672
    username: guest
    password: guest
    virtual-host: /

四种交换机

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息。

交换机有四种类型:Direct, topic, Headers and Fanout

Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.

topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.

Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.

Direct

创建配置类


/**
 * 消息队列配置类
 * @author GMaya
 * @dateTime 2020/4/9 16:02
 */
@Configuration
public class MQConfig {

    public static final String QUEUE = "hello-top-gmaya";

    /**
     * Direct Exchange(交换机最简单的方式)
     */
    @Bean
    public Queue queue() {
        // durable = true 开启持久化
        return new Queue(QUEUE, true);
    };

};

创建生产者

用来提供消息产生的


/**
 * 消息队列生产者,比如订单下完,需要生产一个邮件推送的消息
 * @author GMaya
 * @dateTime 2020/4/9 15:42
 */
@Component
@Slf4j
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sender(String msg) {
        log.info("生产者生产消息。。。。。{};", msg);
        // 向key = MQConfig.QUEUE 中发送消息
        this.amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
    };

};

创建消费者

用来消费消息的


/**
 * 消息列队 , 消费者, 用来消费队列中的消息
 * @author GMaya
 * @dateTime 2020/4/9 15:58
 */
@Component
@Slf4j
public class Receiver {
    // 接收key = MQConfig.QUEUE 中消息
    @RabbitListener(queues = MQConfig.QUEUE)
    public void receiver(String msg) {
        log.info("消费者消费。。。。。。{};", msg);
    };

};

测试

写个hello,然后调用。

@RestController
@Slf4j
public class HelloController {

    @Autowired
    private Sender sender;

    @RequestMapping("hello")
    public String getHello(@RequestParam String msg) {
        log.info("hello自己的业务。。。");
        sender.sender(msg);
        return "hello rabbitmq!!!";
    };

};

浏览器访问 http://localhost:8088/hello?msg=你好啊

topic

修改配置类


/**
 * 消息队列配置类
 * @author GMaya
 * @dateTime 2020/4/9 16:02
 */
@Configuration
public class MQConfig {

    public static final String QUEUE = "hello-top-gmaya";
    public static final String QUEUE_A = "topic-a";
    public static final String QUEUE_B = "topic-b";
    public static final String ROUTING_KEYA = "topic.gmaya";
    public static final String ROUTING_KEYB = "topic.#";
    public static final String TOPIC_EXCHANGE = "TOPIC_EXCHANGE"; // topic 交换机名称

    // Direct Exchange(交换机最简单的方式)----begin
    @Bean
    public Queue queue() {
        // durable = true 开启持久化
        return new Queue(QUEUE, true);
    };
    // Direct Exchange(交换机最简单的方式)----end

    // Topic Exchange 可以用通配符 ---begin
    @Bean
    public Queue topicQueueA() {
        return new Queue(QUEUE_A, true);
    };
    @Bean
    public Queue topicQueueB() {
        return new Queue(QUEUE_B, true);
    };
    // 定义topic交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(TOPIC_EXCHANGE);
    };
    // routing_key为topic.gmaya,就是完全匹配
    // 其实就相当于,你发送的时候 routing_key 和我的一样, 我就进入我的队列中,不然就不进
    @Bean
    public Binding topicBindingA(){
        return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(ROUTING_KEYA);
    };
    // routing_key为topic.#,就是模糊匹配
    // 其实就相当于,你发送的时候 routing_key 满足我的routing_key, 就进入我的队列中,不然就不进
    @Bean
    public Binding topicBindingB(){
        return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(ROUTING_KEYB);
    };
    // Topic Exchange 可以用通配符 ---begin

};

修改生产者

/**
 * 消息队列生产者,比如订单下完,需要生产一个邮件推送的消息
 * @author GMaya
 * @dateTime 2020/4/9 15:42
 */
@Component
@Slf4j
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sender(String msg) {
        log.info("生产者生产消息。。。。。{};", msg);
        // 向key = MQConfig.QUEUE 中发送消息
        this.amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
    };

    public void topicSender(String msg) {
        log.info("topic生产者生产消息。。。。。{};", msg);
        // 交换机中发送消息
        // 这次是完全匹配, 结果应该a和b都能接收到
        // 因为b是模糊, 此条件满足模糊的条件,所有b会接收
        this.amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,MQConfig.ROUTING_KEYA, msg + "完全匹配");
        // 这次是模糊匹配,不满足a的条件,所以只有b模糊可以接收
//        this.amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.xixihaha", msg + "模糊匹配");
        // 这次是routing_key 不满足a,也不满足b。所以两个都接收不到。
//        this.amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"gmaya.top", msg + "不存在");

    };

};

修改消费者

/**
 * 消息列队 , 消费者, 用来消费队列中的消息
 * @author GMaya
 * @dateTime 2020/4/9 15:58
 */
@Component
@Slf4j
public class Receiver {
    // 接收key = MQConfig.QUEUE 中消息
    @RabbitListener(queues = MQConfig.QUEUE)
    public void receiver(String msg) {
        log.info("消费者消费。。。。。。{};", msg);
    };

    @RabbitListener(queues = MQConfig.QUEUE_A)
    public void receiverA(String msg) {
        log.info("QUEUE_A消费者消费。。。。。。{};", msg);
    };

    @RabbitListener(queues = MQConfig.QUEUE_B)
    public void receiverB(String msg) {
        log.info("QUEUE_B消费者消费。。。。。。{};", msg);
    };
};

测试

完全匹配

模糊匹配

Fanout

修改配置类

topic配置类基础上新增代码

    // Fanout Exchange 消息广播的模式 ---begin

    // 定义Fanout交换机名称
    // 不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(FANOUT_EXCHANGE);
    };

    @Bean
    public Binding fanoutBindingA(){
        return BindingBuilder.bind(topicQueueA()).to(fanoutExchange());
    };
    @Bean
    public Binding fanoutBindingB(){
        return BindingBuilder.bind(topicQueueB()).to(fanoutExchange());
    };

    // Fanout Exchange 消息广播的模式 ---end

消费者不需要更改(因为是使用的topic中的两个,已经写过了)

修改生产者

    /**
     * fanout 模式
     * @param msg
     */
    public void fanoutSender(String msg) {
        log.info("fanout生产者生产消息。。。。。{};", msg);
        // 会把消息发送给 所有绑定到此交换机的全部列队;routing_key会被忽略。
        this.amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
    };

测试

@RestController
@Slf4j
public class HelloController {

    @Autowired
    private Sender sender;

    @RequestMapping("hello")
    public String getHello(@RequestParam String msg) {
        log.info("hello自己的业务。。。");
//        sender.sender(msg);
//        sender.topicSender(msg);
        sender.fanoutSender(msg);
        return "hello rabbitmq!!!";
    };

};

延迟队列(死信队列)

模拟订单创建完,如果30分钟后还没有支付,则取消订单。
要实现的办法很多, 使用RabbitMQ延迟队列是其中一个办法!

使用死信队列大致思路:

订单MQ配置类


/**
 * 订单业务配置
 * 死信对列使用的 orderTTLExchange,orderTTLQueue,orderTTLKey
 * 正常对列使用的 orderExchange,orderQueue,orderKey
 * 在orderTTLQueue中设置,当消息设置的时间到了,消失了,那么我就去调用orderQueue,orderKey,通知它。
 * 其实没有对应的rderTTLQueue消费类,肯定到期就没了,然后就实现了一定时间后把消息传给某个队列。
 * @author GMaya
 * @dateTime 2020/4/10 15:37
 */
@Configuration
public class OrderMQConfig {

    /**
     * 创建延迟队列(死信对列)交换机orderTTLExchange
     * @return
     */
    @Bean
    public DirectExchange orderTTLExchange() {
        return new DirectExchange("orderTTLExchange", true, false);
    };

    /**
     * 创建实际消费交换机orderExchange
     * @return
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("orderExchange", true, false);
    };

    /**
     * 创建延迟队列(死信对列)orderTTLQueue
     * @return
     */
    @Bean
    public Queue orderTTLQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange", "orderExchange"); // 到期后转发的交换机
        map.put("x-dead-letter-routing-key", "orderKey"); // 到期后转发的路由key
        return QueueBuilder.durable("orderTTLQueue").withArguments(map).build();
    };

    /**
     * 创建实际orderQueue
     * @return
     */
    @Bean
    public Queue orderQueue() {
        return new Queue("orderQueue", true);
    };

    /**
     * 将(延迟队列orderTTLQueue)和(交换机orderTTLExchange)绑定
     * @return
     */
    @Bean
    public Binding orderTTLBinding() {
        return BindingBuilder.bind(orderTTLQueue()).to(orderTTLExchange()).with("orderTTLKey");
    };

    /**
     * 将(延迟队列orderQueue)和(交换机orderExchange)绑定
     * @return
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("orderKey");
    };

};

订单生产者

/**
 * 订单消息生产着
 * @author GMaya
 * @dateTime 2020/4/10 15:48
 */
@Slf4j
@Component
public class OrderSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMsg(String msg) {
        log.info("生产者===订单号:{};", msg);
        // 这里把消息生产出来,传到TTL的队列中去,那么到期还没有被消费,就认为死信息,就会调用设置好的队列了
        amqpTemplate.convertAndSend("orderTTLExchange", "orderTTLKey", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置失效时间,毫秒,一般订单设置30分钟,30*60*1000
                // 过期直接转发到指定的路由
                // 由于模拟设置20秒
                MessageProperties messageProperties = message.getMessageProperties();
                messageProperties.setContentEncoding("utf-8");
                messageProperties.setExpiration("20000");
                return message;
            };
        };);
    };

};

订单消费者

/**
 * 订单消息消费者
 * 主要用于检查订单状态是否支付
 * @author GMaya
 * @dateTime 2020/4/10 15:54
 */
@Slf4j
@Component
public class OrderReceiver {
    // 这里只是消费了orderQueue。没有人去消费orderTTLQueue.一旦写了TTL那么就是实时消费了,不叫死信了。
    @RabbitListener(queues = "orderQueue")
    public void orderMsg(String msg) throws IOException {
        log.info("消费者===订单号:{};", msg);
        // TODO 处理判断此订单状态是否支付,如果没有支付则取消订单!
    };
};

调用测试

/**
 *
 * @author GMaya
 * @dateTime 2020/4/9 15:37
 */
@RestController
@Slf4j
public class OrderController {

    @Autowired
    private OrderSender orderSender;

    @RequestMapping("order")
    public String getOrder(@RequestParam String msg) {
        log.info("Order创建成功。。。");
        orderSender.sendMsg(msg);
        return "order创建成功!";
    };

};

模拟20秒后进行实际调用处理
20秒换成30分钟,实现超过30分钟还未支付的订单处理


上述代码全部上传到gitee
项目地址:https://gitee.com/GMaya/springboot-rabbitmq
欢迎star

坚持原创技术分享,您的支持将鼓励我继续创作!