Rabbitmq工作队列模式(任务队列模式)
目录
从前面可知简单模式的缺点是队列和消息都没持久化,如果如果rabbitmq服务器重启,那些声明的队列和发布的消息就会丢失;消费者把消息加载到代码缓存耗内存,重启代码服务的话消息也会丢失;如果有多个消费者,消息都平均加载到各个消费者的代码缓存中,消息在消费者之间不共享,导致有的消费者闲置,有的消费者还有多个任务在等待执行。所以工作队列模式就是来解决这些问题的。
工作队列模式特点:
循环分发:多个消费者一起工作时,在消费者自动确认的情况下,RabbitMQ将每个消息依次发送给下一个使用者。每个消费者都会收到相同数量的消息,这种分发消息的方式称为循环。
持久性:队列和消息都持久化到磁盘,rabbitmq服务重启数据不会丢失。生产者代码channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null)第二个参数true就是设置队列持久化。MessageProperties.PERSISTENT_TEXT_PLAIN设置消息也持久化。
消息确认:如果消费者把自己消息一下子全部加载到自己的代码缓存中,代码服务器宕机的话消息就会丢失。消费者手动消息确认可以保证这个消息被正常消费之后才会获取下一个消息。
消费者代码中 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {})的basicConsume方法第二个参数false就是设置为手动确认,channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)就是手动确认的方法。
公平派遣:有多个消费者的情况下实现能者多劳,也就是已经处理完记得消息的消费者可以帮忙消费别的消费者的消息,保证资源得到很好的利用。channel.basicQos(1,false);和手动确认配置实现公平派遣。
开始演示:
第一步、先执行消费者Worker.java,再复制Worker.java类命名为Worker2.java(作为第二个消费者),修改Worker2.java的睡眠时间长一点为5秒,Thread.sleep(5000),然后执行Worker2.java,这样的话就会有两个消费者同时监听task_queue队列,生产者一发布消息到task_queue队列,消息就会被公平派遣到这两个消费者。
第二步、执行生产者NewTask.java,往task_queue队列中添加10条消息。
消费者1打印的结果:
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者2打印的结果:
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
演示结论:
消费者1完成8个任务,消费者2完成2个任务,多个消费者能者多劳,相互协助完成整个任务,不会让任务等待很久才执行。
生产者源码:
package com.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
//创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
for(int i=0;i<10;i++){
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//第二个参数true设置队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "Hello World!";
//将队列设置为持久化之后,还需要将消息也设为可持久化的,
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
}
}
}
}
消费者源码:
package com.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private final static String QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
//创建tcp连接rabbitmq服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
//在tcp连接中创建一个信道来通信
Channel channel = connection.createChannel();
//第二个参数true设置队列持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//手动确认,如果设置为手动确认而不执行basicAck的话就会导致循环重复消费这些消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者:"+consumerTag+" 接收的消息:" + message );
};
//每次从队列中获取一个消息,设置为手动确认才有效
channel.basicQos(1,false);
//第二个参数设置为手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}