良许Linux教程网 干货合集 Linux下部署分布式消息系统RocketMQ

Linux下部署分布式消息系统RocketMQ

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。

image-20211127111503927

一、本篇所需文件下载

链接:https://pan.baidu.com/s/17iUB1lBOjv4CBAEQFvn65A 提取码:v0sn

在这里插入图片描述

一、Linux环境搭建

1、安装 jdk环境

RocketMQ java编写,需要jdk环境

下载jdk 1.7.0_80 上传到linux ,必须64位,32位RocketMQ不支持

tar -zxvf  jdk-7u80-linux-x64.tar.gz        //解压

修改环境变量 vim /etc/profile

export JAVA_HOME=/usr/local/jdk1.7.0_80
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH

刷新配置

source /etc/profile

或jdk1.8下载安装教程:https://blog.csdn.net/qq_41463655/article/details/99173682

2、安装RocketMQ

2.1、上传alibaba-rocketmq-3.2.6.tar.gz 上传到linux解压安装

tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local                //解压到 /usr/local
mv /usr/local/alibaba-rocketmq /usr/local/alibaba-rocketmq-3.2.6      //重命名
ln -s /usr/local/alibaba-rocketmq-3.2.6 rocketmq                      //安装

安装好了 在这里插入图片描述2.2、创建存储路径

cd  /usr/local/rocketmq
mkdir store
mkdir store/commitlog
mkdir store/consumequeue
mkdir store/index

2.3、日志配置

cd  /usr/local/rocketmq
mkdir logs        
cd conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

2.4、配置 broker-a.properties / broker-b.properties /usr/local/rocketmq/conf/2m-noslave/ 目录下

2.4.1、broker-a.properties

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

2.4.2、broker-b.properties

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

两个配置文件需修改处

brokerName=broker-a|broker-b      集群a服务器配置修改为   brokerName=broker-a
brokerName=broker-a|broker-b      集群b服务器配置修改为   brokerName=broker-b

2.5、修改启动参数 /rocketm/bin下 (jvm)

runbroker.sh 的JAVA_OPT runserver.sh 的JAVA_OPT

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
修改为
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"

2.6、启动 NameServer 安装目录 /usr/local/ /rocketmq/bin 目录下

nohup sh mqnamesrv &

2.7、启动 BrokerServer /rocketmq/bin 目录下

nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
netstat -ntlp

查看启动状态

jps

结果如下启动成功 在这里插入图片描述

3.修改linux 服务器host

本机ip,配置域名

192.168.177.128 rocketmq-nameserver1
192.168.177.128 rocketmq-master1
192.168.111.129 rocketmq-nameserver2
192.168.111.129 rocketmq-master2

图片 在这里插入图片描述

4.安装后台管理平台

解压安装 tomcat 7.0到 /usr/local/

tar -zxvf apache-tomcat-7.0.65.tar.gz -C /usr/local

rocketmq-web-console.war 复制到apache-tomcat-7.0.65 的webapps 目录下 在这里插入图片描述 启动tomcat 自动解压,然后修改config /rocketmq-web-console/WEB-INF/classes 的 config.properties 配置 修改ip 在这里插入图片描述

单服务器
rocketmq.namesrv.addr=192.168.177.128:9876

多服务器  
rocketmq.namesrv.addr=192.168.177.128:9876;192.168.177.129:9876

关闭tomcat / 重启tomcat

关闭防火墙

systemctl disable firewalld   或  chkconfig iptables off

访问 —-》 ip:8080/rocketmq-web-console 出现下方界面就ok了 在这里插入图片描述

java 操作

1、生产者

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer {

   public static void main(String[] args) throws MQClientException {
       DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
       producer.setNamesrvAddr("192.168.177.128:9876;192.268.177.129:9876");
       producer.setInstanceName("producer");
       producer.start();
       try {
           for (int i = 0; i "test-topic",
                       "TagA",
                       ("test-topic-"+i).getBytes()
               );
               SendResult sendResult = producer.send(msg);
               System.out.println(sendResult.toString());
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
       producer.shutdown();
   }

}

2、消费者

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
   public static void main(String[] args) throws MQClientException {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

       consumer.setNamesrvAddr("192.168.177.128:9876;192.268.177.129:9876");
       consumer.setInstanceName("consumer");
       consumer.subscribe("test-topic""TagA");

       consumer.registerMessageListener(new MessageListenerConcurrently() {
           @Override
           public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
               for (MessageExt msg : msgs) {
                   System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
               }
               //返回成功消费状态
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
       });
       consumer.start();
       System.out.println("Consumer Started.");
   }
}

会出现幂等问题,使用全局id,或者时间戳,业务的唯一id 进行判断,使用redis等日志记录判断是否存在,存在表示已经成功消费

以上就是良许教程网为各位朋友分享的Linu系统相关内容。想要了解更多Linux相关知识记得关注公众号“良许Linux”,或扫描下方二维码进行关注,更多干货等着你 !

img
本文由 良许Linux教程网 发布,可自由转载、引用,但需署名作者且注明文章出处。如转载至微信公众号,请在文末添加作者公众号二维码。
良许

作者: 良许

良许,世界500强企业Linux开发工程师,公众号【良许Linux】的作者,全网拥有超30W粉丝。个人标签:创业者,CSDN学院讲师,副业达人,流量玩家,摄影爱好者。
上一篇
下一篇

发表评论

联系我们

联系我们

公众号:良许Linux

在线咨询: QQ交谈

邮箱: yychuyu@163.com

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

关注微博
返回顶部