• 极客专栏正式上线!欢迎访问 https://www.jikewenku.com/topic.html
  • 极客专栏正式上线!欢迎访问 https://www.jikewenku.com/topic.html

Spring Boot 中使用 RocketMQ

技术杂谈 勤劳的小蚂蚁 3个月前 (01-29) 83次浏览 已收录 0个评论 扫描二维码

本文快速入门,RocketMQ消息系统的安装部署,发送,和接收消息,监控消息,的详细说明。

环境需要

64位操作系统,建议使用Linux / Unix /
  • CentOs7.3
  • 64bit JDK 1.8+
  • Maven 3.2.x
  • Git 1.8.3.1

环境安装

请参考我的另一篇文章
搭建 Apache RocketMQ 单机环境

新加项目

新建一个 maven 项目,这里就不详细操作了,大家都会的
不过也可以下载我的示例源码,下载地址如下
GitHub 源码:https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rocketmq

添加依赖

在POM 中添加如下依赖
  1. <!-- RocketMq客户端相关依赖 -->
  2. <dependency>
  3.    <groupId>org.apache.rocketmq</groupId>
  4.    <artifactId>rocketmq-client</artifactId>
  5.    <version>4.1.0-incubating</version>
  6. </dependency>
  7. <dependency>
  8.    <groupId>org.apache.rocketmq</groupId>
  9.    <artifactId>rocketmq-common</artifactId>
  10.    <version>4.1.0-incubating</version>
  11. </dependency>

配置文件

在配置文件 application.properties 添加一下内容
  1. # 消费者的组名
  2. apache.rocketmq.consumer.PushConsumer=PushConsumer
  3. # 生产者的组名
  4. apache.rocketmq.producer.producerGroup=Producer
  5. # NameServer地址
  6. apache.rocketmq.namesrvAddr=192.168.252.121:9876

消息生产者

  1. @Component
  2. publicclassProducer{
  3.    /**
  4.     * 生产者的组名
  5.     */
  6.    @Value("${apache.rocketmq.producer.producerGroup}")
  7.    privateString producerGroup;
  8.    /**
  9.     * NameServer 地址
  10.     */
  11.    @Value("${apache.rocketmq.namesrvAddr}")
  12.    privateString namesrvAddr;
  13.    @PostConstruct
  14.    publicvoid defaultMQProducer(){
  15.        //生产者的组名
  16.        DefaultMQProducer producer =newDefaultMQProducer(producerGroup);
  17.        //指定NameServer地址,多个地址以 ; 隔开
  18.        producer.setNamesrvAddr(namesrvAddr);
  19.        try{
  20.            /**
  21.             * Producer对象在使用之前必须要调用start初始化,初始化一次即可
  22.             * 注意:切记不可以在每次发送消息时,都调用start方法
  23.             */
  24.            producer.start();
  25.            for(int i =0; i <100; i++){
  26.                String messageBody ="我是消息内容:"+ i;
  27.                String message =newString(messageBody.getBytes(),"utf-8");
  28.                //构建消息
  29.                Message msg =newMessage("PushTopic"/* PushTopic */,"push"/* Tag  */,"key_"+ i /* Keys */, message.getBytes());
  30.                //发送消息
  31.                SendResult result = producer.send(msg);
  32.                System.out.println("发送响应:MsgId:"+ result.getMsgId()+",发送状态:"+ result.getSendStatus());
  33.            }
  34.        }catch(Exception e){
  35.            e.printStackTrace();
  36.        }finally{
  37.            producer.shutdown();
  38.        }
  39.    }
  40. }

消息消费者

  1. @Component
  2. publicclassConsumer{
  3.    /**
  4.     * 消费者的组名
  5.     */
  6.    @Value("${apache.rocketmq.consumer.PushConsumer}")
  7.    privateString consumerGroup;
  8.    /**
  9.     * NameServer地址
  10.     */
  11.    @Value("${apache.rocketmq.namesrvAddr}")
  12.    privateString namesrvAddr;
  13.    @PostConstruct
  14.    publicvoid defaultMQPushConsumer(){
  15.        //消费者的组名
  16.        DefaultMQPushConsumer consumer =newDefaultMQPushConsumer(consumerGroup);
  17.        //指定NameServer地址,多个地址以 ; 隔开
  18.        consumer.setNamesrvAddr(namesrvAddr);
  19.        try{
  20.            //订阅PushTopic下Tag为push的消息
  21.            consumer.subscribe("PushTopic","push");
  22.            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
  23.            //如果非第一次启动,那么按照上次消费的位置继续消费
  24.            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  25.            consumer.registerMessageListener(newMessageListenerConcurrently(){
  26.                @Override
  27.                publicConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext context){
  28.                    try{
  29.                        for(MessageExt messageExt : list){
  30.                            System.out.println("messageExt: "+ messageExt);//输出消息内容
  31.                            String messageBody =newString(messageExt.getBody(),"utf-8");
  32.                            System.out.println("消费响应:Msg: "+ messageExt.getMsgId()+",msgBody: "+ messageBody);//输出消息内容
  33.                        }
  34.                    }catch(Exception e){
  35.                        e.printStackTrace();
  36.                        returnConsumeConcurrentlyStatus.RECONSUME_LATER;//稍后再试
  37.                    }
  38.                    returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;//消费成功
  39.                }
  40.            });
  41.            consumer.start();
  42.        }catch(Exception e){
  43.            e.printStackTrace();
  44.        }
  45.    }
  46. }

启动服务

  1. @SpringBootApplication
  2. public class SpringBootRocketmqApplication{
  3.    public static void main(String[] args){
  4.        SpringApplication.run(SpringBootRocketmqApplication.class, args);
  5.    }
  6. }
控制台会有响应
  1. 发送响应:MsgId:0AFF015E556818B4AAC208A0504F0063,发送状态:SEND_OK
  2. messageExt:MessageExt[queueId=0, storeSize=195, queueOffset=113824, sysFlag=0, bornTimestamp=1517559124047, bornHost=/192.168.252.1:62165, storeTimestamp=1517559135052, storeHost=/192.168.252.121:10911, msgId=C0A8FC7900002A9F00000000056F499C, commitLogOffset=91179420, bodyCRC=1687852546, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message[topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=113825, KEYS=key_99, CONSUME_START_TIME=1517559124049, UNIQ_KEY=0AFF015E556818B4AAC208A0504F0063, WAIT=true, TAGS=push}, body=21]]
  3. 消费响应:Msg:0AFF015E556818B4AAC208A0504F0063,msgBody:我是消息内容:99
  4. ...

监控服务

RocketMQ web界面监控RocketMQ-Console-Ng部署

下载并且编译

下载并且 maven 编译
  1. git clone https://github.com/apache/rocketmq-externals.git
  2. rocketmq-externals/rocketmq-console/
  3. mvn clean package -Dmaven.test.skip=true

启动监控服务

rocketmq.config.namesrvAddr NameServer 地址,默认启动端口8080
  1. nohup java -jar target/rocketmq-console-ng-1.0.0.jar--rocketmq.config.namesrvAddr=127.0.0.1:9876

访问监控服务

GitHub 源码:https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rocketmq
Gitee 源码:https://gitee.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq

Contact

  • 作者:鹏磊
  • 出处:http://www.ymq.io/2018/02/02/spring-boot-rocketmq-example
  • Email:admin@souyunku.com
  • 版权归作者所有,转载请注明出处
  • Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享

丨极客文库, 版权所有丨如未注明 , 均为原创丨
本网站采用知识共享署名-非商业性使用-相同方式共享 3.0 中国大陆许可协议进行授权
转载请注明原文链接:Spring Boot 中使用 RocketMQ
喜欢 (0)
[247507792@qq.com]
分享 (0)
勤劳的小蚂蚁
关于作者:
温馨提示:本文来源于网络,转载文章皆标明了出处,如果您发现侵权文章,请及时向站长反馈删除。

您必须 登录 才能发表评论!

  • 精品技术教程
  • 编程资源分享
  • 问答交流社区
  • 极客文库知识库

客服QQ


QQ:2248886839


工作时间:09:00-23:00