博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ 之 订阅模式 Publish/Subscribe
阅读量:6997 次
发布时间:2019-06-27

本文共 5281 字,大约阅读时间需要 17 分钟。

模型图

我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型

在前面的教程中,我们创建了一个工作队列,都是一个任务只交给一个消费者。这次我们做 将消息发送给多个消费者。这种模式叫做“发布/订阅”。

举列:    

类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)

那么咱们来看一下图,我们学过前两种有一些不一样,work 模式 是不是同一个队列 多个消费者,而 ps 这种模式呢,是一个队列对应一个消费者,pb 模式还多了一个 X(交换机 转发器) ,这时候我们要获取消息 就需要队列绑定到交换机上,交换机把消息发送到队列 , 消费者才能获取队列的消息

解读:

1、1 个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

生产者

1 package cn.wh.simple; 2  3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5  6  7 import cn.wh.util.RabbitMqConnectionUtil; 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection;10 11 public class Send {12 13     private static final String  EXCHANGE_NAME="test_exchange_fanout";14     public static void main(String[] args) throws IOException, TimeoutException {15         16         Connection connection = RabbitMqConnectionUtil.getConnection();17         18         Channel channel = connection.createChannel();19         20         //声明交换机21         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//分发22         23         //发送消息24         String msg="hello ps";25         26         channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());27         28         System.out.println("Send :"+msg);29         30         channel.close();31         connection.close();32     }33 }

消费者1

1 package cn.wh.simple; 2  3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5  6 import cn.wh.util.RabbitMqConnectionUtil; 7  8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection;10 import com.rabbitmq.client.Consumer;11 import com.rabbitmq.client.DefaultConsumer;12 import com.rabbitmq.client.Envelope;13 import com.rabbitmq.client.AMQP.BasicProperties;14 15 public class Recv1 {16     17     private static final String QUEUE_NAME="test_queue_fanout_email";18     private static final String  EXCHANGE_NAME="test_exchange_fanout";19     public static void main(String[] args) throws IOException, TimeoutException {20         Connection connection = RabbitMqConnectionUtil.getConnection();21         final Channel channel = connection.createChannel();22         23         //队列声明24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);25         26         //绑定队列到交换机 转发器27         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");28         29         30         channel.basicQos(1);//保证一次只分发一个  31         32         //定义一个消费者33         Consumer consumer=new DefaultConsumer(channel){34             //消息到达 触发这个方法35             @Override36             public void handleDelivery(String consumerTag, Envelope envelope,37                     BasicProperties properties, byte[] body) throws IOException {38              39                 String msg=new String(body,"utf-8");40                 System.out.println("[1] Recv msg:"+msg);41                 42                 try {43                     Thread.sleep(2000);44                 } catch (InterruptedException e) {45                     e.printStackTrace();46                 }finally{47                     System.out.println("[1] done ");48                     channel.basicAck(envelope.getDeliveryTag(), false);49                 }50             }51         };52         53         boolean autoAck=false;//自动应答 false54         channel.basicConsume(QUEUE_NAME,autoAck , consumer);55     }56 }

消费者2

package cn.wh.simple;import java.io.IOException;import java.util.concurrent.TimeoutException;import cn.wh.util.RabbitMqConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;public class Recv2 {        private static final String QUEUE_NAME="test_queue_fanout_sms";    private static final String  EXCHANGE_NAME="test_exchange_fanout";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitMqConnectionUtil.getConnection();        final Channel channel = connection.createChannel();        //队列声明        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        //绑定队列到交换机 转发器        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");        channel.basicQos(1);//保证一次只分发一个        //定义一个消费者        Consumer consumer=new DefaultConsumer(channel){            //消息到达 触发这个方法            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                    BasicProperties properties, byte[] body) throws IOException {                String msg=new String(body,"utf-8");                System.out.println("[2] Recv msg:"+msg);                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally{                    System.out.println("[2] done ");                    channel.basicAck(envelope.getDeliveryTag(), false);                }            }        };        boolean autoAck=false;//自动应答 false        channel.basicConsume(QUEUE_NAME,autoAck , consumer);    }}

 测试

一个消息 可以被多个消费者

 

转载于:https://www.cnblogs.com/wh1520577322/p/10065914.html

你可能感兴趣的文章