Rabbitmq发布者确认模式
目录
发布者确认是实现可靠发布的RabbitMQ扩展。在信道上启用发布者确认后,发布者确认来确保发布的消息已安全到达消息代理,也就是rabbitmq队列。发布者确认是AMQP 0.9.1协议的RabbitMQ扩展,因此默认情况下未启用它们。发布者确认在通道级别使用confirmSelect方法启用,channel.confirmSelect(),确认仅应启用一次,而不是对每个已发布的消息都启用。我们将介绍三种发布者确认的策略。
分别发布消息,同步等待确认:简单,但吞吐量非常有限。
批量发布消息,同步等待批量确认:简单,合理的吞吐量,但是很难推断出什么时候出了问题。
异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制。
第一种策略:
分别发布消息,这是最简单的发布确认方法,发布消息后通过执行channel.waitForConfirmsOrDie(long)方法或者channel.waitForConfirms(long)等待代理的确认,都具有阻塞性,只是waitForConfirmsOrDie异常后信道被关闭,生产者发布不了消息了,这两个个方法的参数就是确认的超时时间。如果未在超时时间内消息代理确认该消息,则该方法将引发超时的异常。异常的处理通常包括记录错误消息和/或重试发送消息。这种策略主要缺点:由于消息的确认会阻止所有后续消息的发布,因此它会大大减慢发布速度。
开始演示1:
执行生产者IndividuallyConfirms.java,发布5条消息到消息代理,waitForConfirmsOrDie的确认超时时间是6,也就是6毫秒。
测试打印的结果:
消息:0,已被消息代理确认
消息:1,已被消息代理确认
消息:2,未被消息代理确认,这里可以记录错误或者重发
演示结论:
发布后6毫秒内消息只有前面2条被代理确认,第三条消息开始发生异常没被确认,后面的消息都不能继续发布。
开始演示2:
执行生产者IndividuallyConfirms.java,发布5条消息到消息代理,waitForConfirms的确认超时时间是1,也就是1毫秒。
测试打印的结果:
消息:0,未被消息代理确认,这里可以记录错误或者重发
消息:1,未被消息代理确认,这里可以记录错误或者重发
消息:2,未被消息代理确认,这里可以记录错误或者重发
消息:3,未被消息代理确认,这里可以记录错误或者重发
消息:4,已被消息代理确认
演示结论:
发布后4毫秒内消息前四个消息未被代理确认,第五个消息被确认了,waitForConfirms发生异常后还可以继续发布消息。
生产者代码:
package com.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.UUID;
public class IndividuallyConfirms {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);
cf.setUsername("guest");
cf.setPassword("guest");
try (Connection connection =cf.newConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
for (int i = 0; i < 5; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
try {
ch.waitForConfirms(6);//秒
System.out.println("消息:"+body+",已被消息代理确认");
}catch (Exception e){
System.out.println("消息:"+body+",未被消息代理确认,这里可以记录错误或者重发");
//e.printStackTrace();
}
}
}
}
}
第二种策略:
批量发布消息,与等待确认单个消息相比,等待一批消息被确认可以极大地提高吞吐量,其实代码和分别发布差不多,只是加了一些批量确认的业务代码,这策略就不多说了。而且该解决方案仍然是同步的,因此它还是减缓消息的发布。
开始演示:
执行生产者BatchConfirms.java,发布12条消息到消息代理,每5条为一批确认,waitForConfirmsOrDie的确认超时时间是5000,也就是5秒。
测试打印的结果:
一批消息已被消息代理确认
一批消息已被消息代理确认
一批消息已被消息代理确认
生产者代码:
package com.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.UUID;
public class BatchConfirms {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);
cf.setUsername("guest");
cf.setPassword("guest");
try (Connection connection =cf.newConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
int batchSize = 5;
int outstandingMessageCount = 0;
for (int i = 0; i < 12; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
try {
ch.waitForConfirmsOrDie(5000);//秒
System.out.println("一批消息已被消息代理确认");
}catch (Exception e){
System.out.println("一批消息未被消息代理确认,这里可以记录错误或者重发");
}
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
try {
ch.waitForConfirmsOrDie(5000);//秒
System.out.println("一批消息已被消息代理确认");
}catch (Exception e){
System.out.println("一批消息未被消息代理确认,这里可以记录错误或者重发");
}
}
}
}
}
第三种策略:发布者异步确认
生产者端注册一个回调addConfirmListener监听器,即可收到消息代理确认的通知,其中监听器中有两种回调方法,回调的次数不固定,由sequenceNumber决定的,假如现在发布了10条消息,如果回调时sequenceNumber是10,那么确认10条信息就只回调一次,如果回调时sequenceNumber是3,那么这次回调只确认前面3条消息,还有7条下次回调,如果下次回调时8,那么这次回调只确认第4条到第8条,还剩2条下次回调,以此类推。
第一个回调方法是在代理正常确认消息的情况下执行。
第二种回调方法是在代理认为消息丢失的情况下执行。
每个回调都有2个参数:sequenceNumber, multiple
sequenceNumber:上面解释过了。
multiple:如果为false,则仅确认/取消一条消息;如果为true,则确认小于或等于sequenceNumber的所有消息。
开始演示:
第一步、执行生产者AsynConfirms.java,发布10条消息到消息代理,同时把所有消息存放到ConcurrentSkipListMap并发集合中来跟踪消息代理未确认的消息,在两个回调方法中都移除ConcurrentSkipListMap中对应的的消息,如果60秒后ConcurrentSkipListMap中还有消息,就说明ConcurrentSkipListMap剩下的消息是消息代理无法确认的消息(杳无音讯,神秘失踪的消息)。
测试打印的结果:
已确认的消息:{1=0, 2=1}
已确认的消息:{3=2, 4=3}
已确认的消息:{5=4, 6=5}
已确认的消息:{9=8, 10=9}
测试结论:
发布了10条信息,代理回调了四次
生产者代码:
package com.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class AsynConfirms {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1);
cf.setPort(5672);
cf.setUsername("guest");
cf.setPassword("guest");
try (Connection connection = cf.newConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
//代理已确认的消息会执行这里
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
System.out.println("已确认的消息:"+confirmed.toString());
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};
ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
//代理认为丢失的消息就会回调执行这里
String body = outstandingConfirms.get(sequenceNumber);
System.out.println("消息:"+body+",已丢失");
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
for (int i = 0; i < 10; i++) {
String body = String.valueOf(i);
outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
ch.basicPublish("", queue, null, body.getBytes());
}
int waited = 0;
while (!outstandingConfirms.isEmpty() && waited < Duration.ofSeconds(60).toMillis()) {
Thread.sleep(100L);
waited = +100;
}
if(!outstandingConfirms.isEmpty()){
System.out.println("60秒内还没确认的消息:"+outstandingConfirms.toString());
}
}
}
}