Rabbitmq直接(direct)路由模式
目录
从前面可知发布/订阅模式将一个消息群发给所有和交换机绑定的消息队列,交换机的类型是fanout,而直接路由模式把一个消息只群发和路由键绑定的队列上,交换机的类型是direct。
开始演示:
第一步、先执行消费者ReceiveLogsDirect.java,生产者把消息发布到交换机direct_logs,交换机再把消息存储在和路由键routeKey1、routeKey2绑定的临时队列中给消费者消费。
第二步、执行生产者EmitLogDirect.java,发布10条消息到交换机direct_logs,交换机将其中5条消息转发到和路由键routeKey1绑定的队列中,5条消息转发到和路由键routeKey2绑定的队列中。
消费者打印的结果:
消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey1':' Hello World!'
消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey2':' Hello World!'
消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey1':' Hello World!'
消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey2':' Hello World!'
消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey1':' Hello World!'
消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey2':' Hello World!'
消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey1':' Hello World!'
消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey2':' Hello World!'
消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey1':' Hello World!'
消费者:amq.ctag-3iBERFABW1EOfyVnzuCEVw,路由键和消息:routeKey2':' Hello World!'
演示结论:
消息通过路由键存储在相应的队列中。
生产者代码:
package com.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.ArrayList;
import java.util.List;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
//创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
List<String> routeKeys = new ArrayList<>();
routeKeys.add("routeKey1");
routeKeys.add("routeKey2");
for(int i=0;i<5;i++) {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = " Hello World!";
for(String routeKey: routeKeys){
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
}
}
}
}
}
消费者代码:
package com.demo;
import com.rabbitmq.client.*;
import java.util.ArrayList;
import java.util.List;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
List<String> routeKeys = new ArrayList<>();
routeKeys.add("routeKey1");
routeKeys.add("routeKey2");
for (String routeKey : routeKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, routeKey);
}
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者:"+ consumerTag+",路由键和消息:"+ delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}