RabbitMQ队列

网友投稿 938 2022-09-27

RabbitMQ队列

RabbitMQ队列

rabbit MQ 基本

基础准备:

# -rabbitmq-server-3.6.9.tar.xzwget # 对于-xz包进行解压,首先先-xz压缩工具:yum install xz # 对rabbitmq包进行解压:xz -d xz -d rabbitmq-server-generic-unix-3.6.9.tar.xz tar -xvf rabbitmq-server-generic-unix-3.6.9.tar # 将压的文件移动至/usr/local/下 改名rabbitmq:cp -r rabbitmq_server-3.6.9 /usr/local/rabbitmq # 这种-的方式解压后直接可以使用,无需再编译安装; 进入到rabbit文件内,其命令文件存在于sbin文件夹下,因此需要将sbin文件夹的路径添加到PATH中:修改/etc/profile export PATH=/usr/local/rabbitmq/sbin:$PATH 执行source /etc/profile使得PATH路径更新,rabbitMQ安装成功。 # 启用MQ管理方式:rabbitmq-plugins enable rabbitmq_management #启动后台管理系统 rabbitmq-server -detached #后台运行rabbitmq netstat -tulnp # 查看运行中的进程信息iptables -I INPUT -p tcp --dport 15672 -j ACCEPT # 设置端口号,可供外部访问# 添加用户和权限:(默认网页guest用户是不允许访问的,需要增加一个用户修改一下权限)sudo rabbitmqctl add_user tom 123 #在rabbitmq server上创建一个用户(服务器重启后需要重新注册用户)sudo rabbitmqctl set_permissions -p / tom ".*" ".*" ".*" # 配置权限,允许从外面访问(服务器重启后需要重新配置权限)rabbitmqctl set_user_tags tom administrator # 修改用户角色

然后就可以远程访问了,可直接配置用户权限等信息。

rabbitMQ基本操作:

rabbitmqctl list_queues # 显示当前队列列表信息

向rabbitMQ软件发送消息:

安装python rabbitMQ modulepip install pikaoreasy_install pikaor

实现最简单的队列通信:

队列rabbit MQ 基本队列

produce.py 发送方:

consumer.py接收端:

import pika# 客户端连接的时候需要配置认证参数credentials = pika.PlainCredentials('tom', '123') # 凭证paramenters = pika.ConnectionParameters('192.168.121.128', credentials = credentials)connection = pika.BlockingConnection(paramenters)channel = connection.channel() # 队列连接到通道def callback(ch, method, properties, body): # 回调函数 print(" [x] Received %r" % body)channel.basic_consume(callback, # 消费端取到消息后,调用callback 函数 queue='task123', no_ack=True) #接收端接收消息后,不向rabbit-server发送确认接收信息,发送方将自动完成消息的销毁channel.start_consuming() # 阻塞模式,发送方发送的消息可以实时的进行接收

队列rabbit MQ 消息持久化

produce.py

consumer.py

import pikaimport time# 客户端连接的时候需要配置认证参数credentials = pika.PlainCredentials('tom', '123') # 凭证paramenters = pika.ConnectionParameters('192.168.121.128', credentials = credentials)connection = pika.BlockingConnection(paramenters)channel = connection.channel() # 队列连接到通道def callback(ch, method, properties, body): # 回调函数 # time.sleep(5) # 用于模拟接收端未完成消息接收的状况 print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认,向发送方发送已接收消息,让发送方将消息丢弃,与 no_ack=True形成对应关系channel.basic_consume(callback, # 消费端取到消息后,调用callback 函数 queue='task123',) #no_ack=True ) # no_ack=True表示消息处理后,不向rabbit-server确认消息已消费完毕channel.start_consuming() # 阻塞模式

队列rabbit MQ 消息广播之公平分发

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

producer.py

略……与上面的代码相同

consumer.py

import pikaimport timecredentials = pika.PlainCredentials('tom', '123') # 凭证paramenters = pika.ConnectionParameters('192.168.121.128', credentials = credentials)connection = pika.BlockingConnection(paramenters)channel = connection.channel() # 队列连接到通道def callback(ch, method, properties, body): # 回调函数 # time.sleep(5) # 用于模拟接收端未完成消息接收的状况 print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认,向发送方发送已接收消息,让发送方将消息丢弃,与 no_ack=True形成对应关系channel.basic_consume(callback, # 消费端取到消息后,调用callback 函数 queue='task123',) #no_ack=True ) # no_ack=True表示消息处理后,不向rabbit-server确认消息已消费完毕channel.basic_qos(prefetch_count=1) # 公平分发channel.start_consuming() # 阻塞模式

队列rabbit MQ 消息广播fanout(参考微博)

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了.

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。fanout: 所有bind到此exchange的queue都可以接收消息direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息   表达式符号说明:#代表一个或多个字符,*代表任何字符 例:#.a会匹配a.a,aa.a,aaa.a等 *.a会匹配a.a,b.a,c.a等 注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout headers:

消息publisher

消息subscriber

队列rabbit MQ 消息组播direct

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

publisher

__author__ = 'Administrator'import pikaimport syscredentials = pika.PlainCredentials('tom', '123')parameters = pika.ConnectionParameters(host='192.168.121.128',credentials=credentials)connection = pika.BlockingConnection(parameters)channel = connection.channel() #队列连接通道channel.exchange_declare(exchange='direct_log',exchange_type='direct')log_level = sys.argv[1] if len(sys.argv) > 1 else 'info' # sys.argv[1]代表从命令行接收的参数,作为关键字message = ' '.join(sys.argv[1:]) or "info: Hello World!"channel.basic_publish(exchange='direct_log', routing_key=log_level, # 关键字 body=message)print(" [x] Sent %r" % message)connection.close()

consumer:

__author__ = 'Administrator'import pika,syscredentials = pika.PlainCredentials('tom', '123')parameters = pika.ConnectionParameters(host='192.168.121.128',credentials=credentials)connection = pika.BlockingConnection(parameters)channel = connection.channel() #队列连接通道queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除queue_name = queue_obj.method.queueprint('queue name',queue_name,queue_obj)log_levels = sys.argv[1:] # 接收的参数 info warning errrif not log_levels: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1)for level in log_levels: channel.queue_bind(exchange='direct_log', queue=queue_name, routing_key=level) #绑定队列到Exchangeprint(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name, no_ack=True)channel.start_consuming()

队列rabbit MQ 消息组播topic(规则播)

publish

__author__ = 'Administrator'import pikaimport syscredentials = pika.PlainCredentials('tom', '123')parameters = pika.ConnectionParameters(host='192.168.121.128',credentials=credentials)connection = pika.BlockingConnection(parameters)channel = connection.channel() #队列连接通道channel.exchange_declare(exchange='topic_log',exchange_type='topic')#log_level = sys.argv[1] if len(sys.argv) > 1 else 'info'log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info'message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"channel.basic_publish(exchange='topic_log', routing_key=log_level, body=message)print(" [x] Sent %r" % message)connection.close()

consumer

__author__ = 'Administrator'import pika,syscredentials = pika.PlainCredentials('tom', '123')parameters = pika.ConnectionParameters(host='192.168.121.128',credentials=credentials)connection = pika.BlockingConnection(parameters)channel = connection.channel() #队列连接通道queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除queue_name = queue_obj.method.queuelog_levels = sys.argv[1:] # info warning errrif not log_levels: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1)for level in log_levels: channel.queue_bind(exchange='topic_log', queue=queue_name, routing_key=level) #绑定队列到Exchangeprint(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name, no_ack=True)channel.start_consuming()

consumer接收的规则:

To receive all the logs run: python receive_logs_topic.py “#” To receive all logs from the facility “kern”:

python receive_logs_topic.py “kern.*” Or if you want to hear only about “critical” logs:

python receive_logs_topic.py “*.critical” You can create multiple bindings:

python receive_logs_topic.py “kern.” “.critical” And to emit a log with a routing key “kern.critical” type:

python emit_log_topic.py “kern.critical” “A critical kernel error”

RPC(由client消费方先发送数据)

client:

server:

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:springboot连接sqllite遇到的坑及解决
下一篇:【K8S运维知识汇总】第2天6:安装部署主控节点服务——etcd配置存储
相关文章

 发表评论

暂时没有评论,来抢沙发吧~