Rabbitmq主题(topic)路由模式
目录
从前面可知直接路由模式(交换机的类型是direct)把一个消息只群发和路由键绑定的队列上,但是不是很灵活,生产者稍微对路由键更换一下,发布的消息就可能到达不了消息队列中,主题路由模式(交换机的类型是topic)就是解决这个问题的,路由键中的*可以代替一个单词,#可以替代零个或多个单词。
开始演示:
第一步、先执行消费者ReceiveLogsTopic.java,消费者声明的临时队列绑定的路由键为*.*.routeKey.#
第二步、执行生产者EmitLogTopic.java,生产者把10条消息发布到已绑定one.two.routeKey.three.four和one2.two2.routeKey.three2.four2路由键的交换机topic_logs。
消费者打印的结果:
消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one.two.routeKey.three.four':' Hello World!'
消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one2.two2.routeKey.three2.four2':' Hello World!'
消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one.two.routeKey.three.four':' Hello World!'
消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one2.two2.routeKey.three2.four2':' Hello World!'
消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one.two.routeKey.three.four':' Hello World!'
消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one2.two2.routeKey.three2.four2':' Hello World!'
消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one.two.routeKey.three.four':' Hello World!'
消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one2.two2.routeKey.three2.four2':' Hello World!'
消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one.two.routeKey.three.four':' Hello World!'
消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one2.two2.routeKey.three2.four2':' Hello World!'
演示结论:
消息通过路由键存储在相应的队列中。
生产者代码:
package com.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.ArrayList;
import java.util.List;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
//创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
List<String> routeKeys = new ArrayList<>();
routeKeys.add("one.two.routeKey.three.four");
for(int i=0;i<5;i++) {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = " Hello World!";
for(String routeKey: routeKeys){
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
}
}
}
}
}
消费者代码:
package com.demo;
import com.rabbitmq.client.*;
import java.util.ArrayList;
import java.util.List;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.routeKey.#");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者:"+ consumerTag+",路由键和消息:"+ delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}