文章目录一、本篇前言理论落地从部署到代码实操二、前置准备项目环境必备配置1. 基础环境要求2. 导入RocketMQ核心Maven依赖三、核心基础RocketMQ消息核心对象说明1. DefaultMQProducer消息生产者核心类2. DefaultMQPushConsumer消息消费者核心类3. Message消息实体对象四、Java实操一三种常用生产者消息发送Demo1. 同步发送消息生产最常用金融/订单核心业务2. 异步发送消息高并发吞吐业务3. 单向发送消息极低优先级无需确认业务五、Java实操二消费者订阅消费消息完整Demo六、代码运行顺序控制台验证步骤七、新手常见踩坑问题快速排查一、本篇前言理论落地从部署到代码实操前面两篇我们已经搞定了RocketMQ核心概念工作原理、单机集群环境安装部署服务已经稳稳跑在服务器上。环境搭好只是基础真正开发工作中我们都是通过Java代码对接RocketMQ实现消息生产发送、订阅消费业务逻辑。本篇零基础新手跟着步骤复制代码就能快速跑通创建Topic、发消息、收消息全链路彻底弄懂Java和RocketMQ的基础交互逻辑为后续SpringBoot整合、高阶消息类型使用打好编码根基。二、前置准备项目环境必备配置1. 基础环境要求已搭建完成RocketMQ单机/集群环境NameServer、Broker正常启动运行Java开发环境JDK8及以上IDEA/Eclipse开发工具Maven项目工程普通Java项目即可无需Spring框架服务器防火墙开放9876、10911端口本地电脑能正常连通RocketMQ服务。2. 导入RocketMQ核心Maven依赖在pom.xml文件中引入RocketMQ官方Java客户端依赖版本和服务端版本保持一致即可兼容性拉满稳定无冲突。!-- RocketMQ Java客户端核心依赖 --dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client-java/artifactIdversion5.1.4/version/dependency依赖刷新下载完成后即可开始编写生产者、消费者核心代码所有API均为官方原生标准接口无第三方封装简单易懂好上手。三、核心基础RocketMQ消息核心对象说明写代码前先记三个核心基础对象所有后续编码都围绕这三个对象展开不用死记看懂用途即可1. DefaultMQProducer消息生产者核心类负责连接RocketMQ服务、创建生产实例、发送各类业务消息必须指定生产组名称和NameServer地址启动后才能正常投递消息。2. DefaultMQPushConsumer消息消费者核心类业务开发最常用的消费者模式消费者主动监听订阅的TopicBroker推送消息回调处理自动负载均衡、自动维护消费偏移量无需手动管控消费进度开箱即用。3. Message消息实体对象消息封装载体构造方法核心四个参数Topic消息主题、Tag消息标签、Key业务唯一标识、Body消息体真实业务数据字节数组精准匹配之前学的核心概念。四、Java实操一三种常用生产者消息发送DemoRocketMQ原生Java API提供三种核心消息发送模式适配不同业务场景下面逐个编写可直接运行的完整代码附带场景说明和详细注释。1. 同步发送消息生产最常用金融/订单核心业务适用场景支付下单、订单创建、资金扣款等必须保证消息发送成功的核心业务发送消息后阻塞等待Broker返回发送结果确认成功再执行业务后续逻辑可靠性最高。importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;/** * 同步消息生产者Demo * 场景核心业务必须确认消息发送成功 */publicclassSyncProducerDemo{publicstaticvoidmain(String[]args)throwsException{// 1. 创建生产者实例指定生产组名称自定义同业务生产者组名一致DefaultMQProducerproducernewDefaultMQProducer(order_sync_producer_group);// 2. 设置RocketMQ NameServer地址替换为自己服务器IP:9876producer.setNamesrvAddr(127.0.0.1:9876);// 3. 启动生产者producer.start();System.out.println(同步生产者启动成功);// 4. 循环发送5条测试消息for(inti1;i5;i){// 5. 构建消息实体Topic主题、Tag标签、业务Key、消息体内容MessagemessagenewMessage(order_test_topic,// 消息主题自定义命名order_create_tag,// 消息标签订单创建标签order_key_00i,// 业务唯一Key用于消息排查追踪(订单编号00i订单创建成功).getBytes(RemotingHelper.DEFAULT_CHARSET));// 6. 同步发送消息等待Broker返回发送结果SendResultsendResultproducer.send(message);// 打印发送结果状态、消息ID等信息System.out.println(第i条消息发送结果sendResult);}// 7. 发送完成后关闭生产者实际项目常驻服务无需关闭producer.shutdown();}}2. 异步发送消息高并发吞吐业务适用场景日志埋点、短信通知、运营推送等高并发、不等待响应业务发送消息后不阻塞主线程通过回调接口接收发送成功或失败结果吞吐量远高于同步发送。importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.client.producer.SendCallback;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;/** * 异步消息生产者Demo * 场景高并发业务无需同步等待响应追求吞吐量 */publicclassAsyncProducerDemo{publicstaticvoidmain(String[]args)throwsException{// 1. 创建生产者实例指定生产组DefaultMQProducerproducernewDefaultMQProducer(order_async_producer_group);// 2. 设置NameServer地址producer.setNamesrvAddr(127.0.0.1:9876);// 3. 启动生产者producer.start();System.out.println(异步生产者启动成功);// 4. 循环发送5条异步消息for(inti1;i5;i){MessagemessagenewMessage(order_test_topic,order_notice_tag,notice_key_00i,(短信通知用户00i支付成功).getBytes(RemotingHelper.DEFAULT_CHARSET));// 5. 异步发送注册回调函数处理发送结果producer.send(message,newSendCallback(){// 发送成功回调OverridepublicvoidonSuccess(SendResultsendResult){System.out.println(异步消息发送成功sendResult.getMsgId());}// 发送失败回调处理异常重试、日志记录OverridepublicvoidonException(Throwablee){System.err.println(异步消息发送失败异常信息e.getMessage());e.printStackTrace();}});}// 异步发送无需等待短暂休眠保证回调执行完成Thread.sleep(1000);producer.shutdown();}}3. 单向发送消息极低优先级无需确认业务适用场景系统日志统计、简单埋点上报等无需确认发送结果、不关心是否投递成功的低优先级业务只管发送无需响应性能极致最高。importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;/** * 单向发送生产者Demo * 场景日志埋点、简单统计无需确认发送结果 */publicclassOneWayProducerDemo{publicstaticvoidmain(String[]args)throwsException{DefaultMQProducerproducernewDefaultMQProducer(log_oneway_producer_group);producer.setNamesrvAddr(127.0.0.1:9876);producer.start();System.out.println(单向生产者启动成功);// 发送埋点日志消息MessagemessagenewMessage(log_test_topic,log_click_tag,click_key_001,用户页面点击行为埋点日志.getBytes(RemotingHelper.DEFAULT_CHARSET));// 单向发送无返回值、无回调producer.sendOneway(message);System.out.println(单向消息发送完成无需确认结果);producer.shutdown();}}五、Java实操二消费者订阅消费消息完整Demo生产者发送消息后必须通过消费者订阅对应Topic和Tag才能拉取并处理业务消息。生产环境默认使用PushConsumer模式代码如下常驻运行持续监听消息。importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;importorg.apache.rocketmq.common.message.MessageExt;importjava.util.List;/** * 消息消费者Demo * 订阅order_test_topic主题消费对应消息 */publicclassDefaultConsumerDemo{publicstaticvoidmain(String[]args)throwsException{// 1. 创建消费者实例指定消费组名称DefaultMQPushConsumerconsumernewDefaultMQPushConsumer(order_consumer_group);// 2. 设置NameServer连接地址consumer.setNamesrvAddr(127.0.0.1:9876);// 3. 订阅需要消费的Topic和Tag*代表订阅该主题下所有Tag消息consumer.subscribe(order_test_topic,*);// 4. 注册消息监听回调收到消息后执行业务处理consumer.registerMessageListener((ListMessageExtmessageExtList,ConsumeConcurrentlyContextcontext)-{// 循环处理每一条消费到的消息for(MessageExtmessageExt:messageExtList){// 获取消息主题、标签、业务Key、消息体内容StringtopicmessageExt.getTopic();StringtagmessageExt.getTags();StringmsgKeymessageExt.getKeys();StringmsgBodynewString(messageExt.getBody());// 打印消费到的消息信息模拟业务处理逻辑System.out.println(收到RocketMQ消息);System.out.println(消息Topictopic);System.out.println(消息Tagtag);System.out.println(业务KeymsgKey);System.out.println(消息内容msgBody);System.out.println();}// 返回消费成功状态Broker更新消费偏移量不再重复消费returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5. 启动消费者常驻监听消息consumer.start();System.out.println(消费者启动成功持续监听消费消息中...);}}消费核心关键点代码最后返回CONSUME_SUCCESS代表消费成功Broker记录消费位置如果消费异常返回RECONSUME_LATERRocketMQ会自动重试消费重试多次失败后自动转入死信队列和之前讲的死信概念完美对应。六、代码运行顺序控制台验证步骤第一步确保RocketMQ NameServer、Broker全部正常启动无报错日志第二步先运行消费者代码常驻监听Topic消息等待消息投递第三步运行任意一个生产者代码发送测试消息第四步查看消费者控制台正常打印消息内容代表生产消费全链路通第五步打开RocketMQ可视化Dashboard查看Topic消息生产数量、消费堆积、死信情况可视化验证运行状态。七、新手常见踩坑问题快速排查连接不上NameServerIP地址写错、9876端口防火墙未开放、RocketMQ服务未启动生产者发消息失败报错Broker未关联NameServer、autoCreateTopicEnable未开启Topic不存在消费者收不到消息订阅Topic名称和生产者不一致、消费组名称重复、消费者启动晚于生产者程序启动内存报错本地开发无需修改JVM内存服务端已在上篇安装时优化配置。