前言
前面学习了消息队列RocketMQ的基本理论,现在来动手实操一下。
注意:
- RocketMQ和微服务有强相关的联系,在下载之前一定要核对一下版本
- JDK
- Spring Cloud/Spring cloud Alibaba
- RocketMQ
由于只是测试,我只用SpringBoot简单搭建一下。
- 环境
- JDK 17.0.12
- RocketMQ 5.3.0
- SpringBoot 3.3.4
查看RocketMQ所支持的Spring Cloud版本:https://github.com/alibaba/spring-cloud-alibaba/wiki/版本说明
需要注意的是RocketMQ 4.x的版本都不支持JDK17,所以只能下载5.x
下载链接:https://rocketmq.apache.org/zh/download
开始
RocketMQ的启动
下载好RocketMQ的二进制包以后,解压在一个顺眼的目录。
配置系统环境变量(不嫌麻烦也可以不用配置,直接在bin目录中打开cmd),打开cmd
先启动名字服务器,启动NameServer的命令(默认监听端口9876):
mqnamesrv
再启动代理服务器,命令:
broker -n 127.0.0.1:9876
-n的意思是指定名字服务器的ip和端口号
需要注意的是,代理服务器启动时,如果本地存在虚拟网卡,会默认选择虚拟网卡进行启动,对于我这个强迫症患者来说就很难受。
解决方案:
随便找个地方新建一个broker.properties文件,指定一下代理服务器启动的ip
brokerIP1=192.168.5.19
然后启动命令改为
mqbroker -n 127.0.0.1:9876 -c "C:\Users\G\Documents\rocketmq-all-5.3.0-bin-release\broker.properties"
-c的意思是指定一个配置文件路径
生产者和消费者程序的创建
IDEA里新建两个SpringBoot项目,分别代表生产者和消费者
在各自的pom.xml中引入RocketMQ启动器的坐标
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version> <----版本号根据情况填
</dependency>生产者的配置
application.properties的配置
server.port=9999 #指定生产者服务启动的端口
rocketmq.name-server=localhost:9876 #指定NameServer的IP和端口
rocketmq.producer.group=PRO_GROUP #指定生产者的组创建一个类进行测试
public class Produce {
private final RocketMQTemplate rocketMQTemplate;
String produce(){
Map<String, Object> m = new HashMap<>();
m.put("id", UUID.randomUUID().toString());
m.put("price",9.9);
m.put("des","哈哈哈哈");
rocketMQTemplate.convertAndSend("DEMO",m);
return "success\r======"+m;
}
}@AllArgsConstructor ——Lombok的注解,意思是创建了该类的所有全局变量的构造函数以便进行bean注入
RocketMQTemplate ——RocketMQ启动器提供的工具类
convertAndSend方法——发布消息,”DEMO”为主题,m为消息的内容(这一步实际上先进行了序列化的操作再进行发布)
消费者的测试代码
application.properties的配置
server.port=10000 #指定消费者服务启动的端口
rocketmq.name-server=localhost:9876 #指定NameServer的IP和端口创建一个类进行测试
public class Consumer implements RocketMQListener<MessageMQ> {
public void onMessage(MessageMQ messageMQ) {
System.out.println("主题:"+messageMQ.getId()+"价格:"+messageMQ.getPrice()+"描述:"+messageMQ.getDes());
}
}@RocketMQMessageListener(consumerGroup = “CON_GROUP”,topic = “DEMO”)——分别指定要监听的生产者的组和消息主题
RocketMQListener<?>——要想监听消息队列必须也要重写onMessage方法
MessageMQ ——用于存放接收到的内容的实体类
测试
最后,同时启动NameServer、Broker、生产者、消费者;由于测试代码使用的GET请求,所以此处直接使用浏览器进行测试
看起来,实际上的消息内容是一个json格式的字符串。此时回到IDEA的控制台。发现消费者一端接收到了消息
一个简单的部署和测试大功告成!
可视化面板的搭建
为了更方便我们观察,可以使用rocketmq-dashboard这个工具进行可视化管理。
部署流程
由于在我系统的WSL上有docker环境,所以直接使用docker进行部署
编写一个docker-compose.yml文件
version: '3' # 指定 Docker Compose 文件的版本
services:
rocketmq-dashboard:
image: apacherocketmq/rocketmq-dashboard:latest # 使用的镜像
container_name: rocketmq-dashboard # 容器名称
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=192.168.5.19:9876" # 环境变量指定NameServer
ports:
- "8080:8080" # 端口映射在终端直接运行”docker-compose up -d”命令完成部署
最后,访问WSL的IP:8080就可以很清楚地看到消息的主题、消费者等信息了