共计 9832 个字符,预计需要花费 25 分钟才能阅读完成。
公司内部有多个RocketMQ实例,自建的或是阿里云的上的都有,生产基本是云上的实例,其他自建要么是测试的要么是以前还没迁移的。对于RocketMQ的运维过程中,我这这边最常出现的就消息堆积严重,多则400w左右,似乎这些业务对这个消息的可用性和即时性要求不高😥。。。正好兴趣使然,就用python来折腾下rocketMQ的使用
快速部署一个rocketMQ
# 安装docker
xadocker@xadocker-virtual-machine:~$ curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
# 安装docker-compose
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo curl -L "https://github.com/docker/compose/releases/download/1.24.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
# 创建rocketmq目录
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ mkdir -p broker/{conf,logs,store} namesrv/{logs,store}
# 创建broker.conf
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ cat >broker/conf/broker.conf<<-'EOF'
#所属集群名字
brokerClusterName=DefaultCluster
#broker名字,注意此处不同的配置文件填写的不一样,如果在broker-a.properties使用:broker-a,
#在broker-b.properties使用:broker-b
brokerName=broker-a
#0 表示Master,>0 表示Slave
brokerId=0
#nameServer地址,分号分割
#namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
namesrvAddr=mall4cloud-rocketmq-namesrv:9876
#启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192.168.44.166
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 !!!这里仔细看是false,false,false
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
#storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
#commitLog 存储路径
#storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
#消费队列存储
#storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
#消息索引存储路径
#storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
#checkpoint 文件存储路径
#storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
#abort 文件存储路径
#abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
EOF
# 创建docker-compose.yaml文件
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ cat >docker-compose.yaml<<-'EOF'
version: '3'
services:
rocketmq-namesrv:
image: foxiswho/rocketmq:4.8.0
container_name: rocketmq-namesrv
restart: always
ports:
- 9876:9876
volumes:
- ./namesrv/logs:/home/rocketmq/logs
- ./namesrv/store:/home/rocketmq/store
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms512M -Xmx512M -Xmn128m"
command: ["sh","mqnamesrv"]
networks:
rocketmq_net:
aliases:
- rocketmq-namesrv
rocketmq-broker:
image: foxiswho/rocketmq:4.8.0
container_name: rocketmq-broker
restart: always
ports:
- 10909:10909
- 10911:10911
volumes:
- ./broker/logs:/home/rocketmq/logs
- ./broker/store:/home/rocketmq/store
- ./broker/conf/broker.conf:/etc/rocketmq/broker.conf
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms512M -Xmx512M -Xmn128m"
command: ["sh","mqbroker","-c","/etc/rocketmq/broker.conf","-n","rocketmq-namesrv:9876","autoCreateTopicEnable=true"]
depends_on:
- rocketmq-namesrv
networks:
rocketmq_net:
aliases:
- rocketmq-broker
rocketmq-console:
image: styletang/rocketmq-console-ng
container_name: rocketmq-console
restart: always
ports:
- 8180:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rocketmq-namesrv
networks:
rocketmq_net:
aliases:
- rocketmq-console
networks:
rocketmq_net:
driver: bridge
EOF
# 启动rocketmq
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo docker-compse up -d
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
rocketmq-broker sh mqbroker -c /etc/rocket ... Up 0.0.0.0:10909->10909/tcp,:::10909->10909/tcp, 0.0.0.0:10911->10911/tcp,:::10911->10911/tcp, 10912/tcp,
9876/tcp
rocketmq-console sh -c java $JAVA_OPTS -jar ... Up 0.0.0.0:8180->8080/tcp,:::8180->8080/tcp
rocketmq-namesrv sh mqnamesrv Up 10909/tcp, 10911/tcp, 10912/tcp, 0.0.0.0:9876->9876/tcp,:::9876->9876/tcp
在python中使用rocketMQ
准备环境依赖
# 安装librocketmq,参考链接:https://github.com/apache/rocketmq-client-python
# 若不安装则会在使用时报以下错误:
# raise ImportError('rocketmq dynamic library not found')
# 博主这里是ubuntu,所以按此方式安装即可
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb
# 此处安装rocketmq-client-python==2.0.0
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ pip3 install rocketmq-client-python==2.0.0
测试python使用mq收发消息
producer生产者代码
cat >mq-test-producer.py<<-'EOF'
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
from faker import Faker
from rocketmq.client import Producer, Message
producer = Producer('PID-XXX')
producer.set_name_server_address('192.168.44.166:9876')
producer.start()
faker = Faker(locale='zh_CN')
for i in range(100):
msg = Message('testff')
msg.set_keys('abc'+str(time.time()))
msg.set_tags('abcd')
msg.set_body(faker.name()+','+faker.address()+','+faker.email()+','+faker.phone_number())
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
EOF
# 测试运行
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ python3.8 mq-test-producer.py
SendStatus.OK 7F00010170230E6144137934D8010000 0
.....
consumer消费者代码
cat >mq-test-consumer.py<<-'EOF'
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
from rocketmq.client import PushConsumer, ConsumeStatus
def callback(msg):
print(msg.id, str(msg.body,'UTF-8'))
return ConsumeStatus.CONSUME_SUCCESS
consumer = PushConsumer('PID-CCC')
consumer.set_name_server_address('192.168.44.166:9876')
consumer.subscribe('testff', callback)
consumer.start()
while True:
time.sleep(120)
consumer.shutdown()
EOF
# 测试运行
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ python3.8 mq-test-consumer.py
7F00010170230E6144137934D8010000 何宁,上海市天津县魏都黄街z座 253178,tzheng@example.net,14503234314
....
普通消息发送
同步消息发送和接受
参考上面例子
Oneway单向消息
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
from rocketmq.client import Producer, Message
# 初始化生产者,并设置生产组信息
producer = Producer('group1')
# 设置服务地址
producer.set_name_server_address('192.168.44.166:9876')
# 启动生产者
producer.start()
# 组装消息
msg = Message('rocketmq-xxx|namespace_python%topic1')
# 设置keys
msg.set_keys('yourKey')
# 设置tags
msg.set_tags('yourTags')
# 消息内容
msg.set_body('This is a oneway message1.')
# 发送单向消息
producer.send_oneway(msg)
# 模拟业务
print('Send oneway message.')
producer.shutdown()
顺序消息发送
producer生产者
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
from rocketmq.client import Producer, Message
"""
顺序消息可使用 顺序类型的topic 来实现。顺序类型有全局顺序和局部顺序两种,可根据业务类型选择顺序类型
"""
# 初始化生产者,并设置生产组信息
producer = Producer('group2')
# 设置服务地址
producer.set_name_server_address('192.168.44.166:9876')
# 设置权限(角色名和密钥)
# 启动生产者
producer.start()
for i in range(10):
# 组装消息
msg = Message('rocketmq-xxx|namespace_python%topic2')
# 消息内容
msg.set_body('This is a new message' + str(i) + '.')
# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
consumer消费者
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
from rocketmq.client import PushConsumer, ConsumeStatus
"""
顺序消息可使用 顺序类型的topic 来实现。顺序类型有全局顺序和局部顺序两种,可根据业务类型选择顺序类型
"""
# 消息处理回调
def callback(msg):
# 模拟业务
print('Received message. messageId: ', msg.id, ' body: ', msg.body)
# 消费成功回复CONSUME_SUCCESS
return ConsumeStatus.CONSUME_SUCCESS
# 消费失败返回RECONSUME_LATER,该消息将会被重新消费
# return ConsumeStatus.RECONSUME_LATER
# 初始化消费者,并设置消费者组信息
consumer = PushConsumer('rocketmq-xxx|namespace_python%group22')
# 设置服务地址
consumer.set_name_server_address('192.168.44.166:9876')
# 订阅topic
consumer.subscribe('rocketmq-xxx|namespace_python%topic2', callback)
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()
延迟消息发送
producer生产者
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
from rocketmq.client import Producer, Message
# 初始化生产者,并设置生产组信息
producer = Producer('group1')
# 设置服务地址
producer.set_name_server_address('192.168.44.166:9876')
# 设置权限(角色名和密钥)
producer.set_session_credentials(
'eyJrZXlJZC......',
'admin',
''
)
# 启动生产者
producer.start()
# 组装消息
msg = Message('rocketmq-xxx|namespace_python%topic1')
# 设置keys
msg.set_keys('yourKey')
# 设置tags
msg.set_tags('yourTags')
# 设置消息延迟级别
# 1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
# 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
msg.set_delay_time_level(2)
# 消息内容
msg.set_body('This is a new message1.')
# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
consumer消费模式
广播模式
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调
from rocketmq.ffi import MessageModel
def callback(msg):
# 模拟业务
print('Received message. messageId: ', msg.id, ' body: ', msg.body)
# 消费成功回复CONSUME_SUCCESS
return ConsumeStatus.CONSUME_SUCCESS
# 消费失败返回RECONSUME_LATER,该消息将会被重新消费
# return ConsumeStatus.RECONSUME_LATER
# 初始化消费者,并设置消费者组信息
consumer = PushConsumer('rocketmq-xxx|namespace_python%group11')
# 设置服务地址
consumer.set_name_server_address('192.168.44.166:9876')
# 设置权限(角色名和密钥)
consumer.set_session_credentials(
'eyJrZXlJZC......',
'admin',
''
)
# 设置为集群消息模式
consumer.set_message_model(MessageModel.BROADCASTING)
# 订阅topic
consumer.subscribe('rocketmq-xxx|namespace_python%topic1', callback)
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()
集群模式
默认模式,略
正文完