文章目录1. RPC 通信2. 引入依赖3. 客户端代码编写3.1 声明队列3.2 发送请求消息3.3 使用阻塞队列存储回调结果3.4 获取回调结果3.5 完整代码4. 服务端代码编写4.1 声明队列4.2 设置同时最多只能获取一个消息4.3 接收消息并做出相应处理4.4 完整代码5. 运行程序1. RPC 通信RPCRemote Procedure Call即远程过程调用。它是一种通过网络从远程计算机上请求服务而不需要了解底层网络的技术。类似于 HTTP 远程调用。RabbitMQ 实现 RPC 通信的过程大概是通过两个队列实现一个可回调的过程。大概流程如下1、客户端发送消息到一个指定的队列并在消息属性中设置 replyTo 字段这个字段指定了一个回调队列服务端处理后会把响应结果发送到这个队列。2、服务端接收到请求后处理请求并发送响应消息到 replyTo 指定的回调队列。3、客户端在回调队列上等待响应消息。一旦收到响应客户端会检查消息的 correlationId 属性以确保它是所期望的响应。接下来我们看看 RPC 模式的实现步骤1、引入依赖2、编写客户端3、编写服务端2. 引入依赖先引入 rabbitmq 的依赖!-- Source: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.20.0/versionscopecompile/scope/dependency那么先去 Constants.java 里面定义队列。// RPC模式publicstaticfinalStringRPC_REQUEST_QUEUErpc.request.queue;publicstaticfinalStringRPC_RESPONSE_QUEUErpc.response.queue;3. 客户端代码编写客户端代码主要流程如下1、声明两个队列包含回调队列 replyQueueName声明本次请求的唯一标志 corrId。2、将 replyQueueName 和 corrId 配置到要发送的消息队列中。3、使用阻塞队列来阻塞当前进程监听回调队列中的消息把请求放到阻塞队列中。4、阻塞队列有消息后主线程被唤醒打印返回内容。3.1 声明队列代码如下所示channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);3.2 发送请求消息代码如下所示// 3. 发生请求Stringmsghello rpc......;// 设置请求的唯一标识StringcorrelationIdUUID.randomUUID().toString();// 设置请求的相关属性AMQP.BasicPropertiespropsnewAMQP.BasicProperties().builder().correlationId(correlationId).replyTo(Constants.RPC_RESPONSE_QUEUE).build();channel.basicPublish(,Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());如下所示3.3 使用阻塞队列存储回调结果代码如下所示// 4. 接收响应// 使用阻塞队列来存储响应信息其实就是等待响应完成finalBlockingQueueStringmsgQueuenewArrayBlockingQueue(1);DefaultConsumerconsumernewDefaultConsumer(channel){OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{StringrespMsgnewString(body);System.out.println(接收到回调消息: respMsg);if(correlationId.equals(properties.getCorrelationId())){// 如果correlationId校验一致, 说明就是我们想要的响应msgQueue.offer(respMsg);}}};3.4 获取回调结果代码如下所示StringresultmsgQueue.take();System.out.println([RPC Client 响应结果]: result);3.5 完整代码代码如下所示packagerpc;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.UUID;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.LinkedBlockingQueue;importjava.util.concurrent.TimeoutException;/* rpc 客户端 1. 发生请求 2. 接收响应 */publicclassRpcClient{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException,InterruptedException{// 1. 建立连接ConnectionFactoryfactorynewConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnectionfactory.newConnection();// 2. 开启 channel 通道Channelchannelconnection.createChannel();// 3. 发生请求Stringmsghello rpc......;// 设置请求的唯一标识StringcorrelationIdUUID.randomUUID().toString();// 设置请求的相关属性AMQP.BasicPropertiespropsnewAMQP.BasicProperties().builder().correlationId(correlationId).replyTo(Constants.RPC_RESPONSE_QUEUE).build();channel.basicPublish(,Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());// 4. 接收响应// 使用阻塞队列来存储响应信息其实就是等待响应完成finalBlockingQueueStringmsgQueuenewArrayBlockingQueue(1);DefaultConsumerconsumernewDefaultConsumer(channel){OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{StringrespMsgnewString(body);System.out.println(接收到回调消息: respMsg);if(correlationId.equals(properties.getCorrelationId())){// 如果correlationId校验一致, 说明就是我们想要的响应msgQueue.offer(respMsg);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,consumer);StringresultmsgQueue.take();System.out.println([RPC Client 响应结果]: result);}}4. 服务端代码编写服务端代码主要流程如下1、接收消息。2、根据消息内容进行响应处理把应答结果返回到回调队列中。4.1 声明队列代码如下所示channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);4.2 设置同时最多只能获取一个消息如果不设置 basicQosRabbitMQ 会使用默认的 QoS 设置其 prefetchCount 默认值为 0。当 prefetchCount 为 0 时RabbitMQ 会根据内部实现和当前的网络状况等因素可能会同时发送多条消息给消费者。这意味着在默认情况下消费者可能会同时接收到多条消息但具体数量不是严格保证的可能会有所波动。在 RPC 模式下通常期望的是一对一的消息处理即一个请求对应一个响应。消费者在处理完一个消息并确认之后才会接收到下一条消息。代码如下所示channel.basicQos(1);System.out.println(Awaiting RPC request...);4.3 接收消息并做出相应处理代码如下所示DefaultConsumerconsumernewDefaultConsumer(channel){OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{StringrequestnewString(body,UTF-8);System.out.println(接收到请求: request);Stringresponse针对request: request, 响应成功;AMQP.BasicPropertiesbasicPropertiesnewAMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish(,Constants.RPC_RESPONSE_QUEUE,basicProperties,response.getBytes());channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);RabbitMQ 消息确定机制在RabbitMQ中basicConsume 方法的 autoAck 参数用于指定消费者是否应该自动向消息队列确认消息。自动确认autoAcktrue消息队列在将消息发送给消费者后会立即从内存中删除该消息。这意味着如果消费者处理消息失败消息将丢失因为消息队列认为消息已经被成功消费。手动确认autoAckfalse消息队列在将消息发送给消费者后需要消费者显式地调用 basicAck 方法来确认消息。手动确认提供了更高的可靠性确保消息不会被意外丢失适用于消息处理重要且需要确保每个消息都被正确处理的场景。4.4 完整代码代码如下所示packagerpc;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/* * RPC 服务端 * 1. 接收请求 * 2. 发生响应 */publicclassRpcServer{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactorynewConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnectionfactory.newConnection();// 2. 开启 channel 通道Channelchannelconnection.createChannel();// 3. 接收请求channel.basicQos(1);System.out.println(Awaiting RPC request...);DefaultConsumerconsumernewDefaultConsumer(channel){OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{StringrequestnewString(body,UTF-8);System.out.println(接收到请求: request);Stringresponse针对request: request, 响应成功;AMQP.BasicPropertiesbasicPropertiesnewAMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish(,Constants.RPC_RESPONSE_QUEUE,basicProperties,response.getBytes());channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);}}5. 运行程序先运行客户端代码可以看到在 request_queue 中有一条消息它就相当于是生产者点击去然后可以看到详细消息另外可以看到在 response_queue 中有一个消费者它就相当于是消费者然后再运行服务端代码可以看到已经接收到请求然后再回到客户端可以看到已经收到响应了