Rabbitmq发布/订阅模式
目录
从前面可知工作队列模式的主要将每个消息都只能给一个消息队列。而发布/订阅模式将一个消息群发给和交换机绑定的消息队列。生产者和队列通过交换机来通信,这个模式的交换机的类型是fanout,忽略路由键routeKey。由于发布的消息只能存储在消息队列中,不能存储在交换机中,所以在生产者发布消息之前,要先保证消费者端声明的队列是存在的,否则生产者发布的消息无地方存放,导致消息丢失。
如果群发的消息不是很重要,消费者可以设置自动确认,消费者创建的队列可以使用临时队列,关闭或者重启消费者项目时就会自动删除。如果要消息很重要而且不能丢失那就要像工作队列模式那样手动确认,而且队列和消息要持久化。
开始演示:
第一步、先执行消费者ReceiveLogs .java,监听logs消息队列。
第二步、执行生产者EmitLog .java,往logs队列中添加10条消息。
消费者1打印的结果:
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者:amq.ctag-SWmpZ3xLnpmFYzQgg0AlTw 接收的消息:Hello World!
消费者2打印的结果:
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
消费者:amq.ctag-EBP8i9CJc6e-PogDNqC0fQ 接收的消息:Hello World!
演示结论:
消费者1和消费者2各自获取了完成10个消息。生产者发布的消息都群发给和交换机绑定的队列。
生产者代码:
package com.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
//创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
for(int i=0;i<10;i++) {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = " Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
}
}
}
}
消费者代码:
package com.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者:"+consumerTag+" 接收的消息:" + message );
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}