模型图
我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型
在前面的教程中,我们创建了一个工作队列,都是一个任务只交给一个消费者。这次我们做 将消息发送给多个消费者。这种模式叫做“发布/订阅”。
举列:
类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)
那么咱们来看一下图,我们学过前两种有一些不一样,work 模式 是不是同一个队列 多个消费者,而 ps 这种模式呢,是一个队列对应一个消费者,pb 模式还多了一个 X(交换机 转发器) ,这时候我们要获取消息 就需要队列绑定到交换机上,交换机把消息发送到队列 , 消费者才能获取队列的消息
解读:1、1 个生产者,多个消费者2、每一个消费者都有自己的一个队列3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)4、每个队列都要绑定到交换机5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
生产者
1 package cn.wh.simple; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 7 import cn.wh.util.RabbitMqConnectionUtil; 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection;10 11 public class Send {12 13 private static final String EXCHANGE_NAME="test_exchange_fanout";14 public static void main(String[] args) throws IOException, TimeoutException {15 16 Connection connection = RabbitMqConnectionUtil.getConnection();17 18 Channel channel = connection.createChannel();19 20 //声明交换机21 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//分发22 23 //发送消息24 String msg="hello ps";25 26 channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());27 28 System.out.println("Send :"+msg);29 30 channel.close();31 connection.close();32 }33 }
消费者1
1 package cn.wh.simple; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import cn.wh.util.RabbitMqConnectionUtil; 7 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection;10 import com.rabbitmq.client.Consumer;11 import com.rabbitmq.client.DefaultConsumer;12 import com.rabbitmq.client.Envelope;13 import com.rabbitmq.client.AMQP.BasicProperties;14 15 public class Recv1 {16 17 private static final String QUEUE_NAME="test_queue_fanout_email";18 private static final String EXCHANGE_NAME="test_exchange_fanout";19 public static void main(String[] args) throws IOException, TimeoutException {20 Connection connection = RabbitMqConnectionUtil.getConnection();21 final Channel channel = connection.createChannel();22 23 //队列声明24 channel.queueDeclare(QUEUE_NAME, false, false, false, null);25 26 //绑定队列到交换机 转发器27 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");28 29 30 channel.basicQos(1);//保证一次只分发一个 31 32 //定义一个消费者33 Consumer consumer=new DefaultConsumer(channel){34 //消息到达 触发这个方法35 @Override36 public void handleDelivery(String consumerTag, Envelope envelope,37 BasicProperties properties, byte[] body) throws IOException {38 39 String msg=new String(body,"utf-8");40 System.out.println("[1] Recv msg:"+msg);41 42 try {43 Thread.sleep(2000);44 } catch (InterruptedException e) {45 e.printStackTrace();46 }finally{47 System.out.println("[1] done ");48 channel.basicAck(envelope.getDeliveryTag(), false);49 }50 }51 };52 53 boolean autoAck=false;//自动应答 false54 channel.basicConsume(QUEUE_NAME,autoAck , consumer);55 }56 }
消费者2
package cn.wh.simple;import java.io.IOException;import java.util.concurrent.TimeoutException;import cn.wh.util.RabbitMqConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;public class Recv2 { private static final String QUEUE_NAME="test_queue_fanout_sms"; private static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMqConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); //队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定队列到交换机 转发器 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1);//保证一次只分发一个 //定义一个消费者 Consumer consumer=new DefaultConsumer(channel){ //消息到达 触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg=new String(body,"utf-8"); System.out.println("[2] Recv msg:"+msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("[2] done "); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck=false;//自动应答 false channel.basicConsume(QUEUE_NAME,autoAck , consumer); }}
测试
一个消息 可以被多个消费者