博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
3.rabbitmq--发布订阅模式
阅读量:5836 次
发布时间:2019-06-18

本文共 8707 字,大约阅读时间需要 29 分钟。

rabbitmq-----发布订阅模式

 模型组成

一个消费者Producer,一个交换机Exchange,多个消息队列Queue,多个消费者Consumer

一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的exchange上面,那么该消息将会丢失,这是因为在rabbitMQ中exchange不具备存储消息的能力,只有队列具备存储消息的能力。

 

Exchange

相比较于前两种模型Hello World和Work,这里多一个一个Exchange。其实Exchange是RabbitMQ的标配组成部件之一,前两种没有提到Exchange是为了简化模型,即使模型中没有看到Exchange的声明,其实还是声明了一个默认的Exchange。

RabbitMQ中实际发送消息并不是直接将消息发送给消息队列,消息队列也没那么聪明知道这条消息从哪来要到哪去。RabbitMQ会先将消息发送个Exchange,Exchange会根据这条消息打上的标记知道该条消息从哪来到哪去。

Exchange凭什么知道消息的何去何从,因为Exchange有几种类型:direct,fanout,topic和headers。这里说的订阅者模式就可以认为是fanout模式了。

RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 

RabbitMQ提供了四种Exchangefanout,direct,topic,header .发布/订阅模式就是是基于fanout Exchange实现的。

    • fanout这种模式不需要指定队列名称,需要将Exchangequeue绑定,他们之间的关系是‘多对多’的关系 
      任何发送到fanout Exchange的消息都会被转发到与该Exchange绑定的queue上面。

订阅者模式有何不同

订阅者模式相对前面的Work模式有和不同?Work也有多个消费者,但是只有一个消息队列,并且一个消息只会被某一个消费者消费。但是订阅者模式不一样,它有多个消息队列,也有多个消费者,而且一条消息可以被多个消费者消费,类似广播模式。下面通过实例代码看看这种模式是如何收发消息的。

1 package com.maozw.mq.pubsub; 2  3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.Channel; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.rabbit.connection.Connection; 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 9 import org.springframework.beans.factory.annotation.Autowired;10 import org.springframework.web.bind.annotation.PathVariable;11 import org.springframework.web.bind.annotation.RequestMapping;12 import org.springframework.web.bind.annotation.RestController;13 14 import java.io.IOException;15 import java.util.concurrent.TimeoutException;16 17 import static org.apache.log4j.varia.ExternallyRolledFileAppender.OK;18 19 /**20  * work 模式21  * 两种分发: 轮询分发 + 公平分发22  * 轮询分发:消费端:自动确认消息;boolean autoAck = true;23  * 公平分发: 消费端:手动确认消息 boolean autoAck = false; channel.basicAck(envelope.getDeliveryTag(),false);24  *25  * @author MAOZW26  * @Description: ${todo}27  * @date 2018/11/26 15:0628  */29 @RestController30 @RequestMapping("/publish")31 public class PublishProducer {32     private static final Logger LOGGER = LoggerFactory.getLogger(PublishProducer.class);33     @Autowired34     RabbitConfig rabbitConfig;35 36 37     @RequestMapping("/send/{exchangeName}/{queueName}")38     public String send(@PathVariable String exchangeName, @PathVariable String queueName) throws IOException, TimeoutException {39         Connection connection = null;40         Channel channel= null;41         try {42             ConnectionFactory connectionFactory = rabbitConfig.connectionFactory();43             connection = connectionFactory.createConnection();44             channel = connection.createChannel(false);45 46             /**47              * 申明交换机48              */49             channel.exchangeDeclare(exchangeName,"fanout");50 51             /**52              * 发送消息53              * 每个消费者 发送确认消息之前,消息队列不会发送下一个消息给消费者,一次只处理一个消息54              * 自动模式无需设置下面设置55              */56             int prefetchCount = 1;57             channel.basicQos(prefetchCount);58 59             String Hello = ">>>> Hello Simple <<<<";60             for (int i = 0; i < 5; i++) {61                 String message = Hello + i;62                 channel.basicPublish(RabbitConfig.EXCHANGE_AAAAA, "", null, message.getBytes());63                 LOGGER.info("生产消息: " + message);64             }65             return "OK";66         }catch (Exception e) {67 68         } finally {69             connection.close();70             channel.close();71             return OK;72         }73     }74 }

 

 订阅1

1 package com.maozw.mq.pubsub; 2  3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.AMQP; 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import org.slf4j.Logger; 9 import org.slf4j.LoggerFactory;10 import org.springframework.amqp.rabbit.connection.Connection;11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;12 13 import java.io.IOException;14 15 /**16  * @author MAOZW17  * @Description: ${todo}18  * @date 2018/11/26 15:0619  */20 21 public class SubscribeConsumer {22     private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeConsumer.class);23 24     public static void main(String[] args) throws IOException {25         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();26         Connection connection = connectionFactory.createConnection();27         Channel channel = connection.createChannel(false);28         /**29          * 创建队列申明30          */31         boolean durable = true;32         channel.queueDeclare(RabbitConfig.QUEUE_PUBSUB_FANOUT, durable, false, false, null);33         /**34          * 绑定队列到交换机35          */36         channel.queueBind(RabbitConfig.QUEUE_PUBSUB_FANOUT, RabbitConfig.EXCHANGE_AAAAA,"");37 38         /**39          * 改变分发规则40          */41         channel.basicQos(1);42         DefaultConsumer consumer = new DefaultConsumer(channel) {43             @Override44             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {45                 super.handleDelivery(consumerTag, envelope, properties, body);46                 System.out.println("[2] 接口数据 : " + new String(body, "utf-8"));47                 try {48                     Thread.sleep(300);49                 } catch (InterruptedException e) {50                     e.printStackTrace();51                 } finally {52                     System.out.println("[2] done!");53                     //消息应答:手动回执,手动确认消息54                     channel.basicAck(envelope.getDeliveryTag(),false);55                 }56             }57         };58         //监听队列59         /**60          * autoAck 消息应答61          *  默认轮询分发打开:true :这种模式一旦rabbitmq将消息发送给消费者,就会从内存中删除该消息,不关心客户端是否消费正常。62          *  使用公平分发需要关闭autoAck:false  需要手动发送回执63          */64         boolean autoAck = false;65         channel.basicConsume(RabbitConfig.QUEUE_PUBSUB_FANOUT,autoAck, consumer);66     }67     68 }
1 package com.maozw.mq.pubsub; 2  3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.AMQP; 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import org.slf4j.Logger; 9 import org.slf4j.LoggerFactory;10 import org.springframework.amqp.rabbit.connection.Connection;11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;12 13 import java.io.IOException;14 15 /**16  * @author MAOZW17  * @Description: ${todo}18  * @date 2018/11/26 15:0619  */20 21 public class SubscribeConsumer2 {22     private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeConsumer2.class);23 24     public static void main(String[] args) throws IOException {25         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();26         Connection connection = connectionFactory.createConnection();27         Channel channel = connection.createChannel(false);28         /**29          * 创建队列申明30          */31         boolean durable = true;32         channel.queueDeclare(RabbitConfig.QUEUE_PUBSUB_FANOUT2, durable, false, false, null);33         /**34          * 绑定队列到交换机35          */36         channel.queueBind(RabbitConfig.QUEUE_PUBSUB_FANOUT2, RabbitConfig.EXCHANGE_AAAAA,"");37 38         /**39          * 改变分发规则40          */41         channel.basicQos(1);42         DefaultConsumer consumer = new DefaultConsumer(channel) {43             @Override44             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {45                 super.handleDelivery(consumerTag, envelope, properties, body);46                 System.out.println("[2] 接口数据 : " + new String(body, "utf-8"));47                 try {48                     Thread.sleep(400);49                 } catch (InterruptedException e) {50                     e.printStackTrace();51                 } finally {52                     System.out.println("[2] done!");53                     //消息应答:手动回执,手动确认消息54                     channel.basicAck(envelope.getDeliveryTag(),false);55                 }56             }57         };58         //监听队列59         /**60          * autoAck 消息应答61          *  默认轮询分发打开:true :这种模式一旦rabbitmq将消息发送给消费者,就会从内存中删除该消息,不关心客户端是否消费正常。62          *  使用公平分发需要关闭autoAck:false  需要手动发送回执63          */64         boolean autoAck = false;65         channel.basicConsume(RabbitConfig.QUEUE_PUBSUB_FANOUT2,autoAck, consumer);66     }67 }

 

转载于:https://www.cnblogs.com/Mao-admin/p/10033999.html

你可能感兴趣的文章
python debug
查看>>
java 连接数据库之一个完整的函数
查看>>
mysql脚本
查看>>
OllyDBG 入门系列教学--让你瞬间成为破解高手
查看>>
Dubbo点滴(2)之集群容错
查看>>
检测不到兼容的键盘驱动程序
查看>>
listbox用法
查看>>
冲刺第九天 1.10 THU
查看>>
传值方式:ajax技术和普通传值方式
查看>>
Linux-网络连接-(VMware与CentOS)
查看>>
寻找链表相交节点
查看>>
AS3——禁止swf缩放
查看>>
linq 学习笔记之 Linq基本子句
查看>>
[Js]布局转换
查看>>
Hot Bath
查看>>
国内常用NTP服务器地址及
查看>>
Java annotation 自定义注释@interface的用法
查看>>
Apache Spark 章节1
查看>>
phpcms与discuz的ucenter整合
查看>>
Linux crontab定时执行任务
查看>>