码上敲享录 > RabbitMQ教程(java) > Rabbitmq发布者确认模式

Rabbitmq发布者确认模式

上一章章节目录下一章 2019-12-17已有1630人阅读 评论(0)

Rabbitmq发布者确认模式

目录

idea创建java项目整合rabbitmq

Rabbitmq简单模式

Rabbitmq工作队列模式(任务队列模式)

Rabbitmq发布/订阅模式

Rabbitmq直接(direct)路由模式

Rabbitmq主题(topic)路由模式

Rabbitmq发布者确认模式

Rabbitmq远程过程调用(RPC)模式


发布者确认是实现可靠发布的RabbitMQ扩展。在信道上启用发布者确认后,发布者确认来确保发布的消息已安全到达消息代理,也就是rabbitmq队列。发布者确认是AMQP 0.9.1协议的RabbitMQ扩展,因此默认情况下未启用它们。发布者确认在通道级别使用confirmSelect方法启用,channel.confirmSelect(),确认仅应启用一次,而不是对每个已发布的消息都启用。我们将介绍三种发布者确认的策略。

分别发布消息,同步等待确认:简单,但吞吐量非常有限。

批量发布消息,同步等待批量确认:简单,合理的吞吐量,但是很难推断出什么时候出了问题。

异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制。


第一种策略:

分别发布消息,这是最简单的发布确认方法,发布消息通过执行channel.waitForConfirmsOrDie(long)方法或者channel.waitForConfirms(long)等待代理的确认,都具有阻塞性,只是waitForConfirmsOrDie异常后信道被关闭,生产者发布不了消息了这两个个方法的参数就是确认的超时时间。如果未在超时时间内消息代理确认该消息,则该方法将引发超时的异常。异常的处理通常包括记录错误消息和/或重试发送消息。这种策略主要缺点:由于消息的确认会阻止所有后续消息的发布,因此它会大大减慢发布速度。


开始演示1:

执行生产者IndividuallyConfirms.java,发布5条消息到消息代理,waitForConfirmsOrDie的确认超时时间是6,也就是6毫秒。


测试打印的结果:

消息:0,已被消息代理确认

消息:1,已被消息代理确认

消息:2,未被消息代理确认,这里可以记录错误或者重发


演示结论:

发布后6毫秒内消息只有前面2条被代理确认,第三条消息开始发生异常没被确认,后面的消息都不能继续发布。


开始演示2:

执行生产者IndividuallyConfirms.java,发布5条消息到消息代理,waitForConfirms的确认超时时间是1,也就是1毫秒。


测试打印的结果:

消息:0,未被消息代理确认,这里可以记录错误或者重发

消息:1,未被消息代理确认,这里可以记录错误或者重发

消息:2,未被消息代理确认,这里可以记录错误或者重发

消息:3,未被消息代理确认,这里可以记录错误或者重发

消息:4,已被消息代理确认


演示结论:

发布后4毫秒内消息前四个消息未被代理确认,第五个消息被确认了,waitForConfirms发生异常后还可以继续发布消息。


生产者代码:

package com.demo;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.util.UUID;

public class IndividuallyConfirms {

    public static void main(String[] args) throws Exception {

      ConnectionFactory cf = new ConnectionFactory();

       cf.setHost("127.0.0.1");

       cf.setPort(5672);

       cf.setUsername("guest");

       cf.setPassword("guest");

       try (Connection connection =cf.newConnection()) {

           Channel ch = connection.createChannel();

           String queue = UUID.randomUUID().toString();

           ch.queueDeclare(queue, false, false, true, null);

           ch.confirmSelect();

           for (int i = 0; i < 5; i++) {

               String body = String.valueOf(i);

               ch.basicPublish("", queue, null, body.getBytes());

               try {

                   ch.waitForConfirms(6);//秒

                   System.out.println("消息:"+body+",已被消息代理确认");

               }catch (Exception e){

                   System.out.println("消息:"+body+",未被消息代理确认,这里可以记录错误或者重发");

                   //e.printStackTrace();

               }

           }

       }

   }

}


第二种策略:

批量发布消息,与等待确认单个消息相比,等待一批消息被确认可以极大地提高吞吐量,其实代码和分别发布差不多,只是加了一些批量确认的业务代码,这策略就不多说了。而且该解决方案仍然是同步的,因此它还是减缓消息的发布。


开始演示:

执行生产者BatchConfirms.java,发布12条消息到消息代理,每5条为一批确认,waitForConfirmsOrDie的确认超时时间是5000,也就是5秒。


测试打印的结果:

一批消息已被消息代理确认

一批消息已被消息代理确认

一批消息已被消息代理确认


生产者代码:

package com.demo;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.util.UUID;

public class BatchConfirms {

    public static void main(String[] args) throws Exception {

      ConnectionFactory cf = new ConnectionFactory();

       cf.setHost("127.0.0.1");

       cf.setPort(5672);

       cf.setUsername("guest");

       cf.setPassword("guest");

       try (Connection connection =cf.newConnection()) {

           Channel ch = connection.createChannel();

           String queue = UUID.randomUUID().toString();

           ch.queueDeclare(queue, false, false, true, null);

           ch.confirmSelect();

           int batchSize = 5;

           int outstandingMessageCount = 0;

           for (int i = 0; i < 12; i++) {

               String body = String.valueOf(i);

               ch.basicPublish("", queue, null, body.getBytes());

               outstandingMessageCount++;

                if (outstandingMessageCount == batchSize) {

                    try {

                        ch.waitForConfirmsOrDie(5000);//秒

                        System.out.println("一批消息已被消息代理确认");

                    }catch (Exception e){

                        System.out.println("一批消息未被消息代理确认,这里可以记录错误或者重发");

                    }

                   outstandingMessageCount = 0;

               }

           }

           if (outstandingMessageCount > 0) {

              try {

                        ch.waitForConfirmsOrDie(5000);//秒

                         System.out.println("一批消息已被消息代理确认");

                    }catch (Exception e){

                        System.out.println("一批消息未被消息代理确认,这里可以记录错误或者重发");

                    }

           }

       }

   }

}


第三种策略:发布者异步确认

生产者端注册一个回调addConfirmListener监听器,即可收到消息代理确认的通知其中监听器中有两种回调方法,回调的次数不固定,由sequenceNumber决定的,假如现在发布了10条消息,如果回调时sequenceNumber10,那么确认10条信息就只回调一次,如果回调时sequenceNumber3,那么这次回调只确认前面3条消息,还有7条下次回调,如果下次回调时8,那么这次回调只确认第4条到第8条,还剩2条下次回调,以此类推

第一个回调方法是在代理正常确认消息的情况下执行。

第二回调方法是在代理认为消息丢失的情况下执行

每个回调都有2个参数:sequenceNumber, multiple

sequenceNumber:上面解释过了。

multiple:如果为false,则仅确认/取消一条消息;如果为true,则确认小于或等于sequenceNumber的所有消息


开始演示:

第一步、执行生产者AsynConfirms.java,发布10条消息到消息代理,同时把所有消息存放到ConcurrentSkipListMap并发集合中来跟踪消息代理未确认的消息,在两个回调方法中都移除ConcurrentSkipListMap中对应的的消息,如果60秒后ConcurrentSkipListMap中还有消息,就说明ConcurrentSkipListMap剩下的消息是消息代理无法确认的消息(杳无音讯,神秘失踪的消息)。


测试打印的结果:

已确认的消息:{1=0, 2=1}

已确认的消息:{3=2, 4=3}

已确认的消息:{5=4, 6=5}

已确认的消息:{9=8, 10=9}


测试结论:

发布了10条信息,代理回调了四次


生产者代码:

package com.demo;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.ConfirmCallback;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.time.Duration;

import java.util.UUID;

import java.util.concurrent.ConcurrentNavigableMap;

import java.util.concurrent.ConcurrentSkipListMap;

public class AsynConfirms {

    public static void main(String[] args) throws Exception {

      ConnectionFactory cf = new ConnectionFactory();

       cf.setHost("127.0.0.1);

       cf.setPort(5672);

       cf.setUsername("guest");

       cf.setPassword("guest");

       try (Connection connection = cf.newConnection()) {

           Channel ch = connection.createChannel();

           String queue = UUID.randomUUID().toString();

           ch.queueDeclare(queue, false, false, true, null);

           ch.confirmSelect();

           ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

           ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {

               //代理已确认的消息会执行这里

               if (multiple) {

                   ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);

                   System.out.println("已确认的消息:"+confirmed.toString());

                   confirmed.clear();

               } else {

                   outstandingConfirms.remove(sequenceNumber);

               }

           };

           ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {

                //代理认为丢失的消息就会回调执行这里

               String body = outstandingConfirms.get(sequenceNumber);

               System.out.println("消息:"+body+",已丢失");

               cleanOutstandingConfirms.handle(sequenceNumber, multiple);

           });

           for (int i = 0; i < 10; i++) {

               String body = String.valueOf(i);

               outstandingConfirms.put(ch.getNextPublishSeqNo(), body);

               ch.basicPublish("", queue, null, body.getBytes());

           }

           int waited = 0;

           while (!outstandingConfirms.isEmpty() && waited < Duration.ofSeconds(60).toMillis()) {

               Thread.sleep(100L);

               waited = +100;

           }

           if(!outstandingConfirms.isEmpty()){

              System.out.println("60秒内还没确认的消息:"+outstandingConfirms.toString());

           }

       }

   }

}

向大家推荐《Activiti工作流实战教程》:https://xiaozhuanlan.com/activiti
0

有建议,请留言!

  • *您的姓名:

  • *所在城市:

  • *您的联系电话:

    *您的QQ:

  • 咨询问题:

  • 提 交