码上敲享录 > RabbitMQ教程(java) > Rabbitmq在Springboot中的用法

Rabbitmq在Springboot中的用法

上一章章节目录 2020-01-13已有2459人阅读 评论(0)

RabbitmqSpringboot中的用法


前面的几种模式实际使用中不是独立使用的,以下使用工作队列模式、直接路由、发布者确认结合springboot使用。


1.pom.xml添加rabbitmq的依赖包:

<dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

由于我使用的springboot的版本是1.5.8.RELEASE,所以上面依赖不指定版本的话添加的依赖包也是1.5.8.RELEASE版本


2.application.xml配置:

spring:

    rabbitmq:

       host: 127.0.0.1

       port: 5672

       username: guest

       password: guest

       publisher-confirms: true #发布消息到交换机时回调ConfirmCallback的confirm,ack属性为标准,true到达,反之进入黑洞

       publisher-returns: true #消息没有正确到达队列时触发回调,ReturnCallback的returnedMessage方法,如果正确到达队列不执行

       template:

          mandatory: true #开启mandatory: true, 才会执行returnedMessage方法

       listener:

         simple:

           acknowledge-mode: manual #手动ACK

           default-requeue-rejected: false

           concurrency: 1

           max-concurrency: 1

           prefetch: 1

           retry:

             enabled: false #为true时配置的重发是在消费端应用内处理的,不是rabbitqq重发,默认执行@RabbitListener3次,都不成功就不再消费消息


3.项目启动时在rabbitmq服务器创建交换机、路由键、队列并通过路由键绑定交换机和队列

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class DirectRabbitConfig {

   @Bean

   public Queue smsQueue(){    //创建队列

       return new Queue("SmsQueue");

   }

   @Bean

   public DirectExchange smsExchange() {//创建交换机

       return new DirectExchange("SmsExchange");

   }

   @Bean

   Binding bingSmsQueue() {//标识routingkey并将其绑定交换机和队列

       return BindingBuilder.bind(smsQueue()).to(smsExchange()).with("SmsRoutingKey");

   }

}


4.生产者

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.support.CorrelationData;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import java.time.LocalDate;

import java.time.LocalDateTime;

/**

 * 生产者

*/

@Component

public class Sender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

   @Autowired

   RabbitTemplate rabbitTemplate;

   @PostConstruct

   public void init() {//在构造方法后执行

       rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this);  

}

   /**

    * 发布消息到SmsExchange交换机

   */

   public void send(Object message){

       rabbitTemplate.convertAndSend("SmsExchange","SmsRoutingKey",message);

   }

/**

   发布消息到交换机时回调ConfirmCallback的confirm,ack属性为true表示到达交换机,反之进入黑洞

   */

   @Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {      

if (ack) {

            logger.info("消息发送到交换机成功");

       } else {

            logger.info("消息发送到交换机失败:"+cause);

       }

   }

/**

   消息没有正确到达队列时触发回调,returnedMessage方法,如果正确到达队列不执行

   */

   @Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

logger.info("消息没有正确到达队列:"+new String(message.getBody()));

   }

}


5.消费者

@Component

public class Receiver {

   @Autowired

   RabbitTemplate rabbitTemplate;

   @RabbitListener(queues = "${SmsQueue}") //监听队列

   public void smsReceiver(Message message, Channel channel) throws UnsupportedEncodingException {

       try {

          //业务代码

       } catch (Exception e) {

           

       }finally {

           try {//手动确认

              channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

           } catch (IOException e) {

               e.printStackTrace();

           }

       }

   }

}


结论:

生产者开启了发布确认,消息发送到交换机时无论成功与否都会回调执行confirm方法,在这个方法中你可以重新发布操作或者记录日志等。当消息找不到队列或者不能到达所绑定的队列时才执行returnedMessage方法。

消费者启动了手动确认模式和前面所说的工作队列模式一样,可以启动多个消费者消费消息,能者多劳,一个消息只能给一个消费者消费,不能重复消费。如果消费者发生异常时你想把消息返回队列重新消费时可以设置配置文件的retry:enabled为true,就会默认重试3次,并把手动确认代码

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)不要放在finally中,放到代码最后就行,因为发生异常是就不会执行这行代码,消费者才会重试消费这条消息,值得注意的是如果重试了3次之后还是异常这个消费者就不在消费这个队列的其他消息,重启项目才会继续消费。


向大家推荐《Activiti工作流实战教程》:https://xiaozhuanlan.com/activiti
0

有建议,请留言!

  • *您的姓名:

  • *所在城市:

  • *您的联系电话:

    *您的QQ:

  • 咨询问题:

  • 提 交