码上敲享录 > rabbitmq常见问题详解 > springboot+rabbitmq实现延迟队列

springboot+rabbitmq实现延迟队列

上一章章节目录下一章 2020-07-23已有1849人阅读 评论(0)

springboot+rabbitmq延迟一段时间后再发送到队列


解决方法:

1.声明队列、交换机

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;


@Configuration

public class DirectRabbitConfig {  

   @Bean

   public Queue smsWithTimeCacheQueue() {  //创建缓存队列SmsWithTimeCacheQueue

       Map<String, Object> arguments = new HashMap<>();

       arguments.put("x-dead-letter-exchange","SmsExchange");//缓存队列过期后,发送消息到对应的交换机SmsExchange上

       arguments.put("x-dead-letter-routing-key","SmsRoutingKey");//缓存队列通过SmsRoutingKey,传递到实际消费队列

       return new Queue("SmsWithTimeCacheQueue",true,false,false,arguments);

   }

 

   @Bean

   public DirectExchange smsWithTimeCacheExchange() {//创建缓存交换机SmsWithTimeCacheExchange

       return new DirectExchange("SmsWithTimeCacheExchange");

   }


   @Bean

   Binding bingSmsWithTimeCacheQueue() {//声明SmsWithTimeCacheQueue作为 routingkey 并将其绑定交换机和队列

       return BindingBuilder.bind(smsWithTimeCacheQueue()).to(smsWithTimeCacheExchange()).with("SmsWithTimeCacheQueue");

   }

 

}


2.发送消息到缓存队列,延迟1分钟发送消息到SmsExchange交换机绑定的队列上

rabbitTemplate.convertAndSend("SmsWithTimeCacheQueue", "消息内容", new SurvetMessagePostProcessor(1000 * 60 * 1));


3.SurvetMessagePostProcessor类代码

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;


public class SurvetMessagePostProcessor implements MessagePostProcessor {


   private Integer expiration;//过期时间

   public SurvetMessagePostProcessor(Integer expiration){

       this.expiration=expiration;

   }

   @Override

   public Message postProcessMessage(Message message) throws AmqpException {

       message.getMessageProperties().setExpiration(this.expiration.toString());

       return message;

   }

}


0

有建议,请留言!

  • *您的姓名:

  • *所在城市:

  • *您的联系电话:

    *您的QQ:

  • 咨询问题:

  • 提 交