springboot+rabbitmq延迟一段时间后再发送到队列
解决方法:
1.声明队列、交换机
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue smsWithTimeCacheQueue() { //创建缓存队列SmsWithTimeCacheQueue
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","SmsExchange");//缓存队列过期后,发送消息到对应的交换机SmsExchange上
arguments.put("x-dead-letter-routing-key","SmsRoutingKey");//缓存队列通过SmsRoutingKey,传递到实际消费队列
return new Queue("SmsWithTimeCacheQueue",true,false,false,arguments);
}
@Bean
public DirectExchange smsWithTimeCacheExchange() {//创建缓存交换机SmsWithTimeCacheExchange
return new DirectExchange("SmsWithTimeCacheExchange");
}
@Bean
Binding bingSmsWithTimeCacheQueue() {//声明SmsWithTimeCacheQueue作为 routingkey 并将其绑定交换机和队列
return BindingBuilder.bind(smsWithTimeCacheQueue()).to(smsWithTimeCacheExchange()).with("SmsWithTimeCacheQueue");
}
}
2.发送消息到缓存队列,延迟1分钟发送消息到SmsExchange交换机绑定的队列上
rabbitTemplate.convertAndSend("SmsWithTimeCacheQueue", "消息内容", new SurvetMessagePostProcessor(1000 * 60 * 1));
3.SurvetMessagePostProcessor类代码
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
public class SurvetMessagePostProcessor implements MessagePostProcessor {
private Integer expiration;//过期时间
public SurvetMessagePostProcessor(Integer expiration){
this.expiration=expiration;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(this.expiration.toString());
return message;
}
}