RocketMQ入门手册

网友投稿 809 2022-11-03

RocketMQ入门手册

RocketMQ入门手册

前言

继我上一篇博客后​分布式消息队列RocketMQ学习教程① 上一篇博客最主要介绍了几种常用的MQ,所以本博客再简单介绍一下RocketMQ的原理和简单的例子,基于Java实现,希望可以帮助学习者

RoketMQ搭建Linux版

“工于利其事,必先利其器”,所以我们首先需要搭建好RocketMQ, 考虑到学习者不一定有Linux系统的服务器,所以本博客介绍一下Linux和Window系统的两种安装方法,以补充上一篇博客

因为阿里已经将RocketMQ捐给Apache了,所以现在我们需要去Apache官网-​​​RocketMQ官网​​

注意RocketMQ是基于Java开发的,所以安装前必须安装JDK,Linux JDK安装的可以看分布式消息队列RocketMQ学习教程① -文件解压后,可以看到conf文件夹里有2m-noslave、2m-2s-async、2m-2s-sync文件夹

2m-noslave 两主,无从的配置

2m-2s-async 两主,两从,同步复制数据的配置

2m-2s-sync 两主,两从,异步复制数据的配置

我们找到2m-noslave的broker-a.properties文件,修改完善配置 broker-a.properties

#所属集群名字 brokerClusterName=DefaultCluster#broker名字,注意此处不同的配置文件填写的不一样brokerName=broker-a#0 表示 Master,>0 表示 SlavebrokerId=0#nameServer地址,分号分割namesrvAddr=127.0.0.1:9876#关键brokerIP1=127.0.0.1#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=true#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=10911#删除文件时间点,默认凌晨 4点deleteWhen=04#文件保留时间,默认 48 小时fileReservedTime=48#commitLog每个文件的大小默认1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每个文件默认存30W条,根据业务情况调整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#这里是我的 日志配置#存储路径storePathRootDir=/usr/local/rocketmq/store#commitLog 存储路径storePathCommitLog=/usr/local/rocketmq/store/commitlog#消费队列存储路径存储路径storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue#消息索引存储路径storePathIndex=/usr/local/rocketmq/store/index#checkpoint 文件存储路径storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存储路径abortFile=/usr/local/rocketmq/store/abort#限制的消息大小 maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128

先介绍一下linux系统的 一般将压缩文件解压到/usr/local

cd /usr/localtar -xzf apache-rocketmq.tar.gzmv apache-rocketmq rocketmqmkdir /usr/rocketmq/logs

环境变量配置

vim /etc/profile

修改如下配置

export JAVA_HOME=/usr/java/jdk1.8.0_102export ROCKETMQ_HOME=/usr/local/rocketmqexport PATH=$PATH:$JAVA_HOME/bin:/usr/local/src/redis-3.2.8/bin:$ROCKETMQ_HOME/bin:$PATHexport CLASSPATH=.:$JAVA_HOME/lib

启动mqnamesrv

cd /usr/local/rocketmq/binnohup sh /usr/local/rocketmq/bin/mqnamesrv >/usr/local/rocketmq/logs/mqnamesrv.log 2>&1 &

启动Broker

nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties > /usr/local/rocketmq/logs/mqbroker.log 2>&1 &

要设置自动创建Topic,需要加上 autoCreateTopicEnable=true

关闭Broker服务 sh mqshutdown broker

启动成功可以用jps查看

这里写图片描述

RocketMQ搭建Window版

1、-RocketMQ后,解压到D:\alibaba-rocketmq

2、在D:\alibaba-rocketmq,Ctrl+Shift,右键,打开dom界面,输入如下命令行 start /b bin/mqnamesrv.exe >D:\alibaba-rocketmq\logs\mqnamesrv.log 查看nameserver是否启动 jps -v

3、启动Broker

start /b bin/mqbroker.exe -n "127.0.0.1:9876" autoCreateTopicEnable=true >D:\alibaba-rocketmq\logs\mqbroker.log

Caused by: com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, huang_1See for further details. at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:525) ~[rocketmq-client-3.5.3.jar:na] at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1011) ~[rocketmq-client-3.5.3.jar:na] at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:970) ~[rocketmq-client-3.5.3.jar:na] at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90) ~[rocketmq-client-3.5.3.jar:na] at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send(ProducerImpl.java:107) ~[ons-client-1.2.3.jar:na]

出现以上异常启动时添加autoCreateTopicEnable=true

4、查看topic命令:mqadmin topicList -n "127.0.0.1:9876"

cd 到bin目录,执行下面命令 mqadmin updateTopic -t test_1 -b "127.0.0.1:10911" -n "127.0.0.1:9876" 添加如下参数到eclipse启动工程的VM参数里 -Drocketmq.namesrv.addr=127.0.0.1:9876

RocketMq监控平台搭建

需要去github-,-链接​​​rocketmq-console​​

-后在rocketmq-console文件夹里,ctrl+shift,右键,在此处打开命令窗口,打开cmd窗口,主要要先搭建好maven环境

mvn clean package -Dmaven.test.skip=true

打包完成之后,我们去target文件夹找到rocketmq-console-ng-1.0.0.jar 然后

mkdir rocketmq-consolecd /usr/local/rocketmq-console

使用xftp上传rocketmq-console-ng-1.0.0.jar到/usr/local/rocketmq-console

nohup java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=127.0.0.1:9876 >/usr/local/rocketmq-console/run.log 2>&1 &

端口检查

netstat -anp|grep 12581

部署成功,打开 com.alibaba.rocketmq rocketmq-client 3.0.10 com.alibaba.rocketmq rocketmq-all 3.0.10 pom

消息队列消费者消费消息实例

package com.mq.test;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;public class MQConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "mq-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("TopicA-test", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("RocketMQ Consumer Started..."); }}

消息队列生产者产生消息实例

package com.mq.test;import java.util.Date;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;public class MQProducer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("mq-group");// producer.setNamesrvAddr("123.207.63.192:9876"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i = 0; i < 10; i++) { Thread.sleep(1000); //MQ每隔一秒发送一条消息 Message msg = new Message("TopicA-test",// topic "TagA",// tag ("RocketMQ message"+i) .getBytes()// body ); SendResult sendResult = producer.send(msg);//发送消息 } } catch (Exception e) { e.printStackTrace(); } producer.shutdown();//关闭消息生产者 }}

下面是来自github wiki的学习例子

Filter网络架构,以CPU资源换取宝贵的网卡流量资源

screenshot

启动Broker时,增加以下配置,可以自动加载Filter Server进程

filterServerNums=1

Filter样本(Consumer仅负责将代码上传到Filter Server,由Filter Server编译后执行)

package com.alibaba.rocketmq.example.filter;import com.alibaba.rocketmq.common.filter.MessageFilter;import com.alibaba.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter { @Override public boolean match(MessageExt msg) { String property = msg.getUserProperty("SequenceId"); if (property != null) { int id = Integer.parseInt(property); if ((id % 3) == 0 && (id > 10)) { return true; } } return false; }}

Consumer例子

public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java"); consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", filterCode); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
consumer.start(); System.out.println("Consumer Started."); }

附录

RocketMQ原理与安装教程

RocketMQ实例

阿里RocketMQ Quick Start

RocketMQ集群安装

rocketMq监控平台rocketmq-console搭建

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:webpack 多页面脚手架,基于webpack的前端工程化开发解决方案
下一篇:XBlink- 轻量级的序列化/反序列化工具
相关文章

 发表评论

暂时没有评论,来抢沙发吧~