码上敲享录 > RabbitMQ教程(java) > Rabbitmq直接(direct)路由模式

Rabbitmq直接(direct)路由模式

上一章章节目录下一章 2019-12-17已有1722人阅读 评论(0)

Rabbitmq直接(direct)路由模式

目录

idea创建java项目整合rabbitmq

Rabbitmq简单模式

Rabbitmq工作队列模式(任务队列模式)

Rabbitmq发布/订阅模式

Rabbitmq直接(direct)路由模式

Rabbitmq主题(topic)路由模式

Rabbitmq发布者确认模式

Rabbitmq远程过程调用(RPC)模式


从前面可知发布/订阅模式将一个消息群发给所有和交换机绑定的消息队列,交换机的类型是fanout,而直接路由模式把一个消息只群发和路由键绑定的队列上,交换机的类型是direct。


开始演示:

第一步、先执行消费者ReceiveLogsDirect.java,生产者把消息发布到交换机direct_logs,交换机再把消息存储在和路由键routeKey1、routeKey2绑定的临时队列中给消费者消费。

第二步、执行生产者EmitLogDirect.java,发布10条消息到交换机direct_logs,交换机将其中5条消息转发到和路由键routeKey1绑定的队列中,5条消息转发到和路由键routeKey2绑定的队列中。


消费者打印的结果:

消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey1':' Hello World!'

消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey2':' Hello World!'

消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey1':' Hello World!'

消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey2':' Hello World!'

消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey1':' Hello World!'

消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey2':' Hello World!'

消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey1':' Hello World!'

消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey2':' Hello World!'

消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey1':' Hello World!'

消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey2':' Hello World!'


演示结论:

消息通过路由键存储在相应的队列中。


生产者代码:

package com.demo;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.util.ArrayList;

import java.util.List;

public class EmitLogDirect {

   private static final String EXCHANGE_NAME = "direct_logs";

   public static void main(String[] argv) throws Exception {

       //创建连接

       ConnectionFactory factory = new ConnectionFactory();

       factory.setHost("127.0.0.1");

       factory.setPort(5672);

       factory.setUsername("guest");

       factory.setPassword("guest");

       List<String> routeKeys = new ArrayList<>();

       routeKeys.add("routeKey1");

       routeKeys.add("routeKey2");

       for(int i=0;i<5;i++) {

           try (Connection connection = factory.newConnection();

                Channel channel = connection.createChannel()) {

               channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

               String message = " Hello World!";

               for(String routeKey: routeKeys){

                  channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));

               }

           }

       }

   }

}


消费者代码:

package com.demo;

import com.rabbitmq.client.*;

import java.util.ArrayList;

import java.util.List;

public class ReceiveLogsDirect {

   private static final String EXCHANGE_NAME = "direct_logs";

   public static void main(String[] argv) throws Exception {

      ConnectionFactory factory = new ConnectionFactory();

       factory.setHost("127.0.0.1");

       factory.setPort(5672);

       factory.setUsername("guest");

       factory.setPassword("guest");

       Connection connection = factory.newConnection();

       Channel channel = connection.createChannel();

       channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

       String queueName = channel.queueDeclare().getQueue();

       List<String> routeKeys = new ArrayList<>();

       routeKeys.add("routeKey1");

       routeKeys.add("routeKey2");

       for (String routeKey : routeKeys) {

           channel.queueBind(queueName, EXCHANGE_NAME, routeKey);

       }

       DeliverCallback deliverCallback = (consumerTag, delivery) -> {

           String message = new String(delivery.getBody(), "UTF-8");

           System.out.println("消费者:"+ consumerTag+",路由键和消息:"+ delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");

       };

       channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {

       });

   }

}


0

有建议,请留言!

  • *您的姓名:

  • *所在城市:

  • *您的联系电话:

    *您的QQ:

  • 咨询问题:

  • 提 交