消息队列-RabbitMq
1.概述消息队列Message Queue简称MQ从字面意思上看本质是个队列FIFO先入先出只不过队列中存放的内容是message而已。其主要用途:不同进程Process/线程Thread之间通信为什么会产生消息队列?有几个原因不同进程process之间传递消息时两个进程之间耦合程度过高改动一个进程引发必须修改另一个进程为了隔离这两个进程在两进程间抽离出一层一个模块所有两进程之间传递的消息都必须通过消息队列来传递单独修改某一个进程不会影响另一个不同进程process之间传递消息时为了实现标准化将消息的格式规范化了并且某一个进程接受的消息太多一下子无法处理完并且也有先后顺序必须对收到的消息进行排队因此诞生了事实上的消息队列MQ框架有很多,比较流行的有 RabbitMq, ActiveMq, ZeroMq, kafka, 以及阿里开源的RocketMq;为什么用mq?1.模块之间耦合度过高,导致一个模块宕机后,全部功能都不能用了2.同步通讯的时间成本问题,mq可以进行消息分发异步处理为什么要用rabbitmq?1.activemq,ROCKETMQ,只支持java语言,kafka可以支持多们语言,rabbitmq支持多种语言2.效率方面:activemq, rocketmq,kafka效率都是毫秒级别,rabbitmq是微妙级别3.消息丢失,消息重复问题:rabbitmq针对消息持久化和重复问题都有比较成熟的解决方案4.学习成本:rabbitmq非常简单rabbitmq是由rabbit公司去研发与维护的,最终是在pivotal维护rabbitmq严格遵循amqp协议,高级消息队列协议,帮助我们在进程之间传递异步消息2.RabbitMq2.1.RabbitMq简介RabbitMQ是消息代理它接受并转发消息。您可以将其视为邮局将您要发布的邮件放在邮箱中时可以确保Mailperson先生或女士最终将邮件传递给您的收件人。以此类推RabbitMQ是一个邮政信箱一个邮局和一个邮递员。RabbitMQ与邮局之间的主要区别在于它不处理纸张而是接收存储和转发数据消息的二进制数据。RabbitMQ和一般的消息传递使用一些术语。生产仅意味着发送。发送消息的程序是生产者。队列是RabbitMQ内部的邮政信箱的名称。尽管消息流经RabbitMQ和您的应用程序但它们只能存储在队列中。甲队列仅由主机的存储器磁盘限制约束它本质上是一个大的消息缓冲器。许多生产者可以发送进入一个队列的消息许多消费者可以尝试从一个队列接收数据。这就是我们表示队列的方式。消费与接收具有相似的含义。一个消费者是一个程序主要是等待接收信息。请注意生产者消费者和经纪人不必位于同一主机上。实际上在大多数应用程序中却没有。一个应用程序既可以是生产者也可以是消费者。AMQP即Advanced Message Queuing Protocol一个提供统一消息服务的应用层标准高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中间件不同产品不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。2.2 RabbitMq环境搭建安装步骤此处省略,安装完成后,通过 rabbitmq-plugins enable rabbitmq_management 启用管理插件。查看管理界面通过默认账户 guest/guest 登录,登录成功则说明安装成功。2.3 添加用户2.3.1 添加admin用户2.3.2 用户角色超级管理员(administrator)可登陆管理控制台可查看所有的信息并且可以对用户策略(policy)进行操作。监控者(monitoring)可登陆管理控制台同时可以查看节点的相关信息(进程数内存使用情况磁盘使用情况等)策略制定者(policymaker)可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。普通管理者(management)仅可登陆管理控制台无法看到节点信息也无法对策略进行管理。其他无法登陆管理控制台通常就是普通的生产者和消费者。2.3.3 创建Virtual Hosts (虚拟主机)选中admin用户设置权限看到权限已加2.3.4 管理界面中的功能2.4 五种队列RabbitMQ提供了多种消息模型官网上第6种是RPC不属于常规的消息队列。属于消息模型的是前5种Hello World 模型简单的一对一工作队列模型一个生产者将消息分发给多个消费者发布/订阅模型生产者发布消息多个消费者同时收取路由模型生产者通过关键字发送消息给特定消费者主题模型路由模式基础上在关键字里加入了通配符2.4.1 简单队列一对一的队列。生产者P生产消息放入队列这里不是简单地直接放入队列中消费者C消费消息消费者和生产者是一种一对一的关系。2.4.1.1测试demo引入包dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.10.0/version/dependencyRabbitMq连接工具类package com.zjk.demo.common.rabbitmq.util;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/** * author tomatoes * date2021/3/22 * description **/ public class RabbitMqConnectionUtils{public static Connection getConnection()throws Exception{returngetConnection(ip,5672,adminHost,admin,admin);}public static Connection getConnection(String host, int port, String vHost, String userName, String passWord)throws Exception{//1、定义连接工厂 ConnectionFactory factorynew ConnectionFactory();//2、设置服务器地址 factory.setHost(host);//3、设置端口 factory.setPort(port);//4、设置虚拟主机、用户名、密码 factory.setVirtualHost(vHost);factory.setUsername(userName);factory.setPassword(passWord);//5、通过连接工厂获取连接 Connection connectionfactory.newConnection();returnconnection;}}生产者public class RabbitMqProducer{private final static String QUEUE_NAMEhello;public static void main(String[]args)throws Exception{//1、获取连接 Connection connectionRabbitMqConnectionUtils.getConnection();//2、声明信道 Channel channelconnection.createChannel();//3、声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//4、定义消息内容 String messagehello rabbitmq, my name is tomcatoes ;//5、发布消息 channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println([x] Sent message );//6、关闭通道 channel.close();//7、关闭连接 connection.close();}}消费者public class RabbitMqConsumer{private final static String QUEUE_NAMEhello;public static void main(String[]args)throws Exception{//1、获取连接 Connection connectionRabbitMqConnectionUtils.getConnection(47.99.199.41,5672,/,guest,guest);//2、声明通道 Channel channelconnection.createChannel();//3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//4、定义队列的消费者 QueueingConsumer queueingConsumernew QueueingConsumer(channel);//5、监听队列 /* true:表示自动确认只要消息从队列中获取无论消费者获取到消息后是否成功消费都会认为消息已经成功消费 false:表示手动确认消费者获取消息后服务器会将该消息标记为不可用状态等待消费者的反馈 如果消费者一直没有反馈那么该消息将一直处于不可用状态并且服务器会认为该消费者已经挂掉不会再给其 发送消息直到该消费者反馈。 */ channel.basicConsume(QUEUE_NAME, true, queueingConsumer);//6、获取消息while(true){QueueingConsumer.Delivery deliveryqueueingConsumer.nextDelivery();String messagenew String(delivery.getBody());System.out.println( [x] Received message );}}}2.4.2 work模式一个生产者、2个消费者。但MQ中一个消息只能被一个消费者获取。即消息要么被C1获取要么被C2获取。这种模式适用于类似集群能者多劳。性能好的可以安排多消费性能低的可以安排低消费。但如果面对我需要多个消费者都对这一消息进行消费的需求这种模式显然就不适用了。那就可以采用发布订阅模式。简而言之 一个生产者多个消费者一个消息只能被一个消费者获取。多个消费者只有一个队列。2.4.2.1 轮询分发策略(round robin)使用工作队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作我们可以通过增加工作者消费者来解决这一问题使得系统的伸缩性更加容易。在默认情况下RabbitMQ采用轮询分发策略将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等且是提前一次性分配并非一个一个分配)。平均每个消费者获得相同数量的消息。生产者public class Producer{private final static String QUEUE_NAMEwork_queue;public static void main(String[]args)throws Exception{//1、获取连接 Connection connectionRabbitMqConnectionUtils.getConnection();//2、声明信道 Channel channelconnection.createChannel();//3、声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//4、定义消息内容(发布多条消息)for(int i0;i10;i){String messagehello rabbitmq i;//5、发布消息 channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println([z] Sent message );//模拟发送消息延时便于演示多个消费者竞争接受消息 Thread.sleep(i *10);}//6、关闭通道 channel.close();//7、关闭连接 connection.close();}}消费者1public class Consumer1{private final static String QUEUE_NAMEwork_queue;public static void main(String[]args)throws Exception{//1、获取连接 Connection connectionRabbitMqConnectionUtils.getConnection();//2、声明通道 Channel channelconnection.createChannel();//3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//同一时刻服务器只会发送一条消息给消费者 //channel.basicQos(1);//4、定义队列的消费者 QueueingConsumer queueingConsumernew QueueingConsumer(channel);//5、监听队列,手动返回完成状态 channel.basicConsume(QUEUE_NAME, false, queueingConsumer);//6、获取消息while(true){QueueingConsumer.Delivery deliveryqueueingConsumer.nextDelivery();String messagenew String(delivery.getBody());System.out.println( [z] Received message );//消费者1接收一条消息后休眠10毫秒 Thread.sleep(10);//返回确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}}}消费者2public class Consumer2{private final static String QUEUE_NAMEwork_queue;public static void main(String[]args)throws Exception{//1、获取连接 Connection connectionRabbitMqConnectionUtils.getConnection();//2、声明通道 Channel channelconnection.createChannel();//3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//同一时刻服务器只会发送一条消息给消费者 //channel.basicQos(1);//4、定义队列的消费者 QueueingConsumer queueingConsumernew QueueingConsumer(channel);//5、监听队列,手动返回完成状态 channel.basicConsume(QUEUE_NAME, false, queueingConsumer);//6、获取消息while(true){QueueingConsumer.Delivery deliveryqueueingConsumer.nextDelivery();String messagenew String(delivery.getBody());System.out.println( [z] Received message );//消费者2接收一条消息后休眠1000毫秒 Thread.sleep(1000);//返回确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}}}测试结果分析:首先生产者一次打印从0-9条消息消费者1消费偶数的消息消费者2消费奇数的消息结果分析:消费者1和消费者2获取到的消息内容是不同的也就是说同一个消息只能被一个消费者获取。消费者1和消费者2分别获取奇数条消息和偶数条消息两种获取消息的条数是一样的。前面我们说这种模式是竞争消费者模式一条队列被多个消费者监听这里两个消费者其中消费者1和消费者2在获取消息后分别休眠了10毫秒和1000毫秒也就是说两个消费者获取消息的效率是不一样的但是结果却是两者获得的消息条数是一样的这根本就不构成竞争关系那么我们应该怎么办才能让工作效率高的消费者获取消息更多也就是消费者1获取消息更多呢PS在增加一个消费者其实获取消息条数也是一样的消费者1获取0,3,6,9消费者2获取1,4,7消费者3获取2,5,82.4.2.2 公平分发Fair Dispatch有可能消费者处理消息的能力有差异硬件设备网络原因我们期望处理能力强的消费者多处理消息处理能力弱的消费者少处理消息。通过basicQos(perfetch)和autoAck配合也可以实现。实现:basicQos设置同一时刻服务器只会发perfetch**(此处为1)**条消息给消费者autoAck将自动应答改为手动。就处理完一条消息后手动提交。两个消费者类 Consumer1 与 Consumer2类中新增channel.basicQos(1);修改后测试结果注意使用公平分发必须关闭自动应答ack然后改成手动应答方式。2.4.3 发布订阅模式(publish/subscribe)一个生产者发送的消息可能会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者。注X表示交换器在RabbitMQ中交换器主要有四种类型:direct、fanout、topic、headers。这里采用的是fanout类型。后面会详细介绍这几种交换器。模式特点归纳:一个生产者多个消费者每个消费者都有自己的队列生产者没有直接将消息发送到队列而是发送到交换机Exchange每个队列都要绑定交换机生产者发送到消息经过交换机 -- 到达队列– 可以实现一个消息被多个消费者消费2.4.3.1测试代码public class Consumer1{private final static String EXCHANGE_NAMEfanout_exchange;private final static String QUEUE_NAMEfanout_exchange_queue1;public static void main(String[]args)throws Exception{Connection connectionRabbitMqConnectionUtils.getConnection();Channel channelconnection.createChannel();//声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,);// 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1);DeliverCallback deliverCallback(consumerTag, delivery)-{String messagenew String(delivery.getBody(),utf-8);System.out.println( [z] Received message );channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//修改为手动应答true为自动应答,false相反 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -{});}}Productpublic class Producer{private final static String EXCHANGE_NAMEfanout_exchange;public static void main(String[]args)throws Exception{//1、获取连接 Connection connectionRabbitMqConnectionUtils.getConnection();//2、声明信道 Channel channelconnection.createChannel();//3、声明交换器 channel.exchangeDeclare(EXCHANGE_NAME,fanout);for(int i0;i10;i){//4、创建消息 String messagehello rabbitmqi;//5、发布消息 channel.basicPublish(EXCHANGE_NAME,, null, message.getBytes());System.out.println([z] Sent message);}//6、关闭通道 channel.close();//7、关闭连接 connection.close();}}测试结果ProductConsumer1Consumer22.4.4 路由模式(routing模式)生产者将消息发送到direct交换器在绑定队列和交换器的时候有一个路由key生产者发送的消息会指定一个路由key那么消息只会发送到相应key相同的队列接着监听该队列的消费者消费消息。也就是让消费者有选择性的接收消息。2.4.4.1测试代码//绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “delete”);ProducerConsumer1Consumer22.4.4.2测试结果ProducerConsumer1Consumer2经过测试可以看出,消费者所监听的队列中只接收到指定路由key的消息。2.4.5 主题模式(topic)上面的路由模式是根据路由key进行完整的匹配完全相等才发送消息这里的通配符模式通俗的来讲就是模糊匹配。发送到topic交换的消息不能具有任意的 routing_key- 它必须是由点(.)分隔的单词列表。单词可以是任何内容但通常它们指定与消息相关的一些功能。一些有效的路由键示例stock.usd.nysenyse.vmwquick.orange.rabbit。路由密钥中可以包含任意数量的字符最多可达255个字节。绑定键有两个重要的特殊特性是用.分割的单词而不是字符*可以替代一个单词。#可以替换零个或多个单词。2.4.5.1测试代码Producerpublic class Producer{private final static String EXCHANGE_NAMEtopic_exchange;public static void main(String[]args)throws Exception{//1、获取连接 Connection connectionRabbitMqConnectionUtils.getConnection();//2、声明信道 Channel channelconnection.createChannel();//3、声明交换器 channel.exchangeDeclare(EXCHANGE_NAME,topic);for(int i0;i10;i){//4、创建消息 String messagehello rabbitmq i;//5、发布消息 channel.basicPublish(EXCHANGE_NAME,routeKey.z, null, message.getBytes());System.out.println([z] Sent message );}//6、关闭通道 channel.close();//7、关闭连接 connection.close();}}Consumer1public class Consumer1{private final static String EXCHANGE_NAMEtopic_exchange;private final static String QUEUE_NAMEtopic_exchange_queue1;public static void main(String[]args)throws Exception{Connection connectionRabbitMqConnectionUtils.getConnection();Channel channelconnection.createChannel();//声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,*.*);// 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1);DeliverCallback deliverCallback(consumerTag, delivery)-{String messagenew String(delivery.getBody(),utf-8);System.out.println( [z] Received message );channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//修改为手动应答true为自动应答,false相反 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -{});}}Consumer2public class Consumer2{private final static String EXCHANGE_NAMEtopic_exchange;private final static String QUEUE_NAMEtopic_exchange_queue2;public static void main(String[]args)throws Exception{Connection connectionRabbitMqConnectionUtils.getConnection();Channel channelconnection.createChannel();//声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,routeKey.z);// 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1);DeliverCallback deliverCallback(consumerTag, delivery)-{String messagenew String(delivery.getBody(),utf-8);System.out.println( [z] Received message );channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//修改为手动应答true为自动应答,false相反 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -{});}}2.4.5.2测试结果2.5 四种交换器Exchange前面五种队列模式介绍完了但是实际上只有三种第一种简单队列第二种工作模式剩下的三种都是和交换器绑定的合起来称为一种这小节我们就来详细介绍交换器。交换器分为四种分别是direct、fanout、topic和 headers。前面三种分别对应路由模式、发布订阅模式和通配符模式headers 交换器允许匹配 AMQP 消息的 header 而非路由键除此之外header 交换器和 direct 交换器完全一致但是性能却差很多因此基本上不会用到该交换器这里也不详细介绍。2.5.1 direct如果路由键完全匹配的话消息才会被投放到相应的队列。2.5.2 fanout当发送一条消息到fanout交换器上时它会把消息投放到所有附加在此交换器上的队列。2.5.3 topic设置模糊的绑定方式“*”操作符将“.”视为分隔符匹配单个字符“#”操作符没有分块的概念它将任意“.”均视为关键字的匹配部分能够匹配多个字符。2.6 总结关于 RabbitMQ 的五种队列其实实际使用最多的是最后一种主题模式通过模糊匹配使得操作更加自如。那么我们总结一下有交换器参与的队列最后三种队列工作方式如下