Rocketmq学习笔记
一、安装教程1.创建目录mkdir -p ~/rocketmq-docker cd ~/rocketmq-docker2.创建brokeer.conf配置文件cat broker.conf EOF brokerClusterName DefaultCluster brokerName broker-a brokerId 0 deleteWhen 04 fileReservedTime 48 brokerRole ASYNC_MASTER flushDiskType ASYNC_FLUSH #虚拟机地址 brokerIP1 192.x.x.x EOF3.创建dokcer-compose.yml文件cat docker-compose.yml EOF version: 3 services: namesrv: image: apache/rocketmq:4.9.6 container_name: rmqnamesrv ports: - 9876:9876 environment: - JAVA_OPT_EXT-Xms512m -Xmx512m command: sh mqnamesrv networks: - rocketmq broker: image: apache/rocketmq:4.9.6 container_name: rmqbroker ports: - 10909:10909 - 10911:10911 environment: - JAVA_OPT_EXT-Xms1g -Xmx1g - NAMESRV_ADDRnamesrv:9876 volumes: - ./broker.conf:/home/rocketmq/rocketmq-4.9.6/conf/broker.conf command: sh mqbroker -c /home/rocketmq/rocketmq-4.9.6/conf/broker.conf depends_on: - namesrv networks: - rocketmq shm_size: 512m dashboard: image: apacherocketmq/rocketmq-dashboard:latest container_name: rocketmq-dashboard ports: - 8080:8080 environment: - JAVA_OPTS-Xms512m -Xmx512m -Drocketmq.namesrv.addrnamesrv:9876 depends_on: - namesrv networks: - rocketmq networks: rocketmq: driver: bridge EOF4.拉取镜像docker pull apache/rocketmq:4.9.6 docker pull apacherocketmq/rocketmq-dashboard:1.0.05.启动服务俩种方式composerun)docker-compose up -d6.run启动# 1. 先创建网络 docker network create rocketmq # 2. 启动 NameServer docker run -d \ --name rmqnamesrv \ --network rocketmq \ -p 9876:9876 \ -e JAVA_OPT_EXT-Xms512m -Xmx512m \ apache/rocketmq:4.9.6 \ sh mqnamesrv # 3. 启动 Broker docker run -d \ --name rmqbroker \ --network rocketmq \ -p 10911:10911 \ -p 10909:10909 \ -e JAVA_OPT_EXT-Xms1g -Xmx1g \ -e NAMESRV_ADDRrmqnamesrv:9876 \ -v $(pwd)/broker.conf:/home/rocketmq/rocketmq-4.9.6/conf/broker.conf \ --shm-size512m \ apache/rocketmq:4.9.6 \ sh mqbroker -c /home/rocketmq/rocketmq-4.9.6/conf/broker.conf # 4. 启动 Dashboard docker run -d \ --name rocketmq-dashboard \ --network rocketmq \ -p 8080:8080 \ -e JAVA_OPTS-Xms512m -Xmx512m -Drocketmq.namesrv.addrrmqnamesrv:9876 \ apacherocketmq/rocketmq-dashboard:latest7.访问呢客户端http://localhost:8080如果有重复的端口重新删除设置# 停止并删除 docker stop rocketmq-dashboard docker rm rocketmq-dashboard # 重新启动映射到 8087 端口 docker run -d \ --name rocketmq-dashboard \ --network rocketmq \ -p 8087:8080 \ -e JAVA_OPTS-Xms512m -Xmx512m -Drocketmq.namesrv.addrrmqnamesrv:9876 \ apacherocketmq/rocketmq-dashboard:latest二、基础使用1.同步发送发送方orderpackage com.test.rocketdemo.demo; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class SimpleProducer { public static void main(String[] args) throws Exception { // 1. 创建生产者指定生产者组名 DefaultMQProducer producer new DefaultMQProducer(demo-producer-group); // 2. 设置 NameServer 地址使用你的宿主机IP producer.setNamesrvAddr(192.168.137.1:9876); // 3. 启动生产者 producer.start(); System.out.println(生产者启动成功); // 4. 发送 10 条消息 for (int i 1; i 10; i) { // 创建消息主题、标签、消息内容 String body Hello RocketMQ, 这是第 i 条消息; Message msg new Message(DemoTopic, TagA, body.getBytes()); // 发送消息同步发送 SendResult sendResult producer.send(msg); System.out.println(发送结果: sendResult.getSendStatus() , msgId: sendResult.getMsgId()); } // 5. 关闭生产者 producer.shutdown(); System.out.println(生产者已关闭); } }接收方注册监听器1.顺序的Messagelistenerorderly 2.并发的接收Messagelistenerconcurrentlypackage com.test.rocketdemo.demo; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; public class OrderConsumer { public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, IOException { DefaultMQPushConsumer order new DefaultMQPushConsumer(order); order.setNamesrvAddr(192.168.137.1:9876); order.subscribe(order,*); // order.registerMessageListener(new MessageListenerOrderly() { // Override // public ConsumeOrderlyStatus consumeMessage(ListMessageExt list, ConsumeOrderlyContext consumeOrderlyContext) { // for (MessageExt msg : list) { // String body new String(msg.getBody()); // System.out.println(收到消息: body // , msgId: msg.getMsgId() // , 标签: msg.getTags()); // } // // return ConsumeOrderlyStatus.SUCCESS; // } // }); order.registerMessageListener(new MessageListenerConcurrently() { Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { String body new String(msg.getBody()); System.out.println(收到消息: body , msgId: msg.getMsgId() , 标签: msg.getTags()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); order.start(); System.in.read(); System.out.println(消息接收成功---); order.shutdown(); } }2.广播接收发送代码package com.test.rocketdemo.demo.guangbo; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; public class guangboProducer { public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, IOException { DefaultMQProducer order new DefaultMQProducer(order); order.setNamesrvAddr(192.168.137.1:9876); order.start(); for (int i 0; i 5; i) { String body发送的消息_ i _序号-- i; Message message new Message(order,taga,body.getBytes(StandardCharsets.UTF_8)); SendResult sendResult order.send(message); System.out.println(消息发送成功---sendResult); } order.shutdown(); } }1.群广播接收order.setMessageModel(MessageModel.BROADCASTING);2.集群接收会把消息分隔开分别发送增强性能。package com.test.rocketdemo.demo.guangbo; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.IOException; import java.util.List; public class guangboConsumer { public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, IOException { DefaultMQPushConsumer order new DefaultMQPushConsumer(order); order.setNamesrvAddr(192.168.137.1:9876); order.subscribe(order,*); order.setMessageModel(MessageModel.CLUSTERING); order.registerMessageListener(new MessageListenerOrderly() { Override public ConsumeOrderlyStatus consumeMessage(ListMessageExt list, ConsumeOrderlyContext consumeOrderlyContext) { for (MessageExt msg : list) { String body new String(msg.getBody()); System.out.println(收到消息: body , msgId: msg.getMsgId() , 标签: msg.getTags()); } return ConsumeOrderlyStatus.SUCCESS; } }); order.start(); System.out.println(消息者启动成功---); } }3.延迟消息