共计 9832 个字符,预计需要花费 25 分钟才能阅读完成。

公司内部有多个RocketMQ实例,自建的或是阿里云的上的都有,生产基本是云上的实例,其他自建要么是测试的要么是以前还没迁移的。对于RocketMQ的运维过程中,我这这边最常出现的就消息堆积严重,多则400w左右,似乎这些业务对这个消息的可用性和即时性要求不高😥。。。正好兴趣使然,就用python来折腾下rocketMQ的使用
快速部署一个rocketMQ
# 安装docker
xadocker@xadocker-virtual-machine:~$ curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
# 安装docker-compose
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo curl -L "https://github.com/docker/compose/releases/download/1.24.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
# 创建rocketmq目录
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ mkdir -p broker/{conf,logs,store} namesrv/{logs,store}
# 创建broker.conf
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ cat >broker/conf/broker.conf<<-'EOF'
#所属集群名字
brokerClusterName=DefaultCluster
#broker名字,注意此处不同的配置文件填写的不一样,如果在broker-a.properties使用:broker-a,
#在broker-b.properties使用:broker-b
brokerName=broker-a
#0 表示Master,>0 表示Slave
brokerId=0
#nameServer地址,分号分割
#namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
namesrvAddr=mall4cloud-rocketmq-namesrv:9876
#启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192.168.44.166
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 !!!这里仔细看是false,false,false
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
#storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
#commitLog 存储路径
#storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
#消费队列存储
#storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
#消息索引存储路径
#storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
#checkpoint 文件存储路径
#storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
#abort 文件存储路径
#abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
EOF
# 创建docker-compose.yaml文件
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ cat >docker-compose.yaml<<-'EOF'
version: '3'
services:
rocketmq-namesrv:
image: foxiswho/rocketmq:4.8.0
container_name: rocketmq-namesrv
restart: always
ports:
- 9876:9876
volumes:
- ./namesrv/logs:/home/rocketmq/logs
- ./namesrv/store:/home/rocketmq/store
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms512M -Xmx512M -Xmn128m"
command: ["sh","mqnamesrv"]
networks:
rocketmq_net:
aliases:
- rocketmq-namesrv
rocketmq-broker:
image: foxiswho/rocketmq:4.8.0
container_name: rocketmq-broker
restart: always
ports:
- 10909:10909
- 10911:10911
volumes:
- ./broker/logs:/home/rocketmq/logs
- ./broker/store:/home/rocketmq/store
- ./broker/conf/broker.conf:/etc/rocketmq/broker.conf
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms512M -Xmx512M -Xmn128m"
command: ["sh","mqbroker","-c","/etc/rocketmq/broker.conf","-n","rocketmq-namesrv:9876","autoCreateTopicEnable=true"]
depends_on:
- rocketmq-namesrv
networks:
rocketmq_net:
aliases:
- rocketmq-broker
rocketmq-console:
image: styletang/rocketmq-console-ng
container_name: rocketmq-console
restart: always
ports:
- 8180:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rocketmq-namesrv
networks:
rocketmq_net:
aliases:
- rocketmq-console
networks:
rocketmq_net:
driver: bridge
EOF
# 启动rocketmq
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo docker-compse up -d
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
rocketmq-broker sh mqbroker -c /etc/rocket ... Up 0.0.0.0:10909->10909/tcp,:::10909->10909/tcp, 0.0.0.0:10911->10911/tcp,:::10911->10911/tcp, 10912/tcp,
9876/tcp
rocketmq-console sh -c java $JAVA_OPTS -jar ... Up 0.0.0.0:8180->8080/tcp,:::8180->8080/tcp
rocketmq-namesrv sh mqnamesrv Up 10909/tcp, 10911/tcp, 10912/tcp, 0.0.0.0:9876->9876/tcp,:::9876->9876/tcp
在python中使用rocketMQ
准备环境依赖
# 安装librocketmq,参考链接:https://github.com/apache/rocketmq-client-python
# 若不安装则会在使用时报以下错误:
# raise ImportError('rocketmq dynamic library not found')
# 博主这里是ubuntu,所以按此方式安装即可
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb
# 此处安装rocketmq-client-python==2.0.0
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ pip3 install rocketmq-client-python==2.0.0
测试python使用mq收发消息
producer生产者代码
cat >mq-test-producer.py<<-'EOF'
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
from faker import Faker
from rocketmq.client import Producer, Message
producer = Producer('PID-XXX')
producer.set_name_server_address('192.168.44.166:9876')
producer.start()
faker = Faker(locale='zh_CN')
for i in range(100):
msg = Message('testff')
msg.set_keys('abc'+str(time.time()))
msg.set_tags('abcd')
msg.set_body(faker.name()+','+faker.address()+','+faker.email()+','+faker.phone_number())
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
EOF
# 测试运行
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ python3.8 mq-test-producer.py
SendStatus.OK 7F00010170230E6144137934D8010000 0
.....
consumer消费者代码
cat >mq-test-consumer.py<<-'EOF'
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
from rocketmq.client import PushConsumer, ConsumeStatus
def callback(msg):
print(msg.id, str(msg.body,'UTF-8'))
return ConsumeStatus.CONSUME_SUCCESS
consumer = PushConsumer('PID-CCC')
consumer.set_name_server_address('192.168.44.166:9876')
consumer.subscribe('testff', callback)
consumer.start()
while True:
time.sleep(120)
consumer.shutdown()
EOF
# 测试运行
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ python3.8 mq-test-consumer.py
7F00010170230E6144137934D8010000 何宁,上海市天津县魏都黄街z座 253178,tzheng@example.net,14503234314
....
普通消息发送
同步消息发送和接受
参考上面例子
Oneway单向消息
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
from rocketmq.client import Producer, Message
# 初始化生产者,并设置生产组信息
producer = Producer('group1')
# 设置服务地址
producer.set_name_server_address('192.168.44.166:9876')
# 启动生产者
producer.start()
# 组装消息
msg = Message('rocketmq-xxx|namespace_python%topic1')
# 设置keys
msg.set_keys('yourKey')
# 设置tags
msg.set_tags('yourTags')
# 消息内容
msg.set_body('This is a oneway message1.')
# 发送单向消息
producer.send_oneway(msg)
# 模拟业务
print('Send oneway message.')
producer.shutdown()
顺序消息发送
producer生产者
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
from rocketmq.client import Producer, Message
"""
顺序消息可使用 顺序类型的topic 来实现。顺序类型有全局顺序和局部顺序两种,可根据业务类型选择顺序类型
"""
# 初始化生产者,并设置生产组信息
producer = Producer('group2')
# 设置服务地址
producer.set_name_server_address('192.168.44.166:9876')
# 设置权限(角色名和密钥)
# 启动生产者
producer.start()
for i in range(10):
# 组装消息
msg = Message('rocketmq-xxx|namespace_python%topic2')
# 消息内容
msg.set_body('This is a new message' + str(i) + '.')
# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
consumer消费者
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
from rocketmq.client import PushConsumer, ConsumeStatus
"""
顺序消息可使用 顺序类型的topic 来实现。顺序类型有全局顺序和局部顺序两种,可根据业务类型选择顺序类型
"""
# 消息处理回调
def callback(msg):
# 模拟业务
print('Received message. messageId: ', msg.id, ' body: ', msg.body)
# 消费成功回复CONSUME_SUCCESS
return ConsumeStatus.CONSUME_SUCCESS
# 消费失败返回RECONSUME_LATER,该消息将会被重新消费
# return ConsumeStatus.RECONSUME_LATER
# 初始化消费者,并设置消费者组信息
consumer = PushConsumer('rocketmq-xxx|namespace_python%group22')
# 设置服务地址
consumer.set_name_server_address('192.168.44.166:9876')
# 订阅topic
consumer.subscribe('rocketmq-xxx|namespace_python%topic2', callback)
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()
延迟消息发送
producer生产者
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
from rocketmq.client import Producer, Message
# 初始化生产者,并设置生产组信息
producer = Producer('group1')
# 设置服务地址
producer.set_name_server_address('192.168.44.166:9876')
# 设置权限(角色名和密钥)
producer.set_session_credentials(
'eyJrZXlJZC......',
'admin',
''
)
# 启动生产者
producer.start()
# 组装消息
msg = Message('rocketmq-xxx|namespace_python%topic1')
# 设置keys
msg.set_keys('yourKey')
# 设置tags
msg.set_tags('yourTags')
# 设置消息延迟级别
# 1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
# 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
msg.set_delay_time_level(2)
# 消息内容
msg.set_body('This is a new message1.')
# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
consumer消费模式
广播模式
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调
from rocketmq.ffi import MessageModel
def callback(msg):
# 模拟业务
print('Received message. messageId: ', msg.id, ' body: ', msg.body)
# 消费成功回复CONSUME_SUCCESS
return ConsumeStatus.CONSUME_SUCCESS
# 消费失败返回RECONSUME_LATER,该消息将会被重新消费
# return ConsumeStatus.RECONSUME_LATER
# 初始化消费者,并设置消费者组信息
consumer = PushConsumer('rocketmq-xxx|namespace_python%group11')
# 设置服务地址
consumer.set_name_server_address('192.168.44.166:9876')
# 设置权限(角色名和密钥)
consumer.set_session_credentials(
'eyJrZXlJZC......',
'admin',
''
)
# 设置为集群消息模式
consumer.set_message_model(MessageModel.BROADCASTING)
# 订阅topic
consumer.subscribe('rocketmq-xxx|namespace_python%topic1', callback)
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()
集群模式
默认模式,略
正文完
隐私政策
留言板
金色传说
kubernetes
terraform
云生原
helm
代码编程
Java
Python
Shell
DevOps
Ansible
Gitlab
Jenkins
运维
老司机
Linux 杂锦
Nginx
数据库
elasticsearch
监控
上帝视角
DJI FPV
DJI mini 3 pro
关于本站