Python38RabbitMQ消息队列
title: Python38 RabbitMQ
tags: Python学习
grammar_cjkRuby: true
RabbitMQ 消息队列介绍
RabbitMQ是一种消息队列,与线程queue和进程QUEUE作用是一样的。
RabbitMQ是一个中间程序,可以实现不同进程之间的通信(比如python和Java之间,QQ和Word之间等);
普通情况下A进程与B进程之间通信,两者之间需要建立很多连接和单独写一些代码,但是使用RabbitMQ的话就可以实现帮助不同进程之间的数据通信。
A进程交给RabbitMQ,RabbitMQ在交给B,同样B交给RabbitMQ,RabbitMQ在交给A,RabbitMQ可以实现A与B进程之间的连接和信息转换。
使用RabbitMQ可以实现很多个独立进程之间的交互,所有其他独立进程都可以用RabbitMQ作为中间程序。
创新互联建站专注于淮南企业网站建设,成都响应式网站建设公司,商城建设。淮南网站建设公司,为淮南等地区提供建站服务。全流程定制开发,专业设计,全程项目跟踪,创新互联建站专业和态度为您提供的服务
py 消息队列:
线程 queue(同一进程下线程之间进行交互)
进程 Queue(父子进程进行交互 或者 同属于同一进程下的多个子进程进行交互)
如果是两个完全独立的python程序,也是不能用上面两个queue进行交互的,或者和其他语言交互有哪些实现方式呢。
【Disk、Socket、其他中间件】这里中间件不仅可以支持两个程序之间交互,可以支持多个程序,可以维护好多个程序的队列。
虽然可以通过硬盘的方式实现多个独立进程交互,但是硬盘速度比较慢,而RabbitMQ则能够很好的、快速的帮助两个独立进程实现交互。
像这种公共的中间件有好多成熟的产品:
RabbitMQ
ZeroMQ
ActiveMQ
……
RabbitMQ:erlang语言 开发的。
Python中连接RabbitMQ的模块:pika 、Celery(分布式任务队列) 、haigha
可以维护很多的队列
其中pika是RabbitMQ常用的模块
RabbitMQ 教程官网:http://www.rabbitmq.com/getstarted.html
几个概念说明:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
RabbitMQ不像之前学的python Queue都在一个队列里实现交互,RabbitMQ有多个队列(图中红色部分代表队列),每个队列都可以将消息发给多个接收端(C是接收端,P是生产消息端)
RabbitMQ基本示例.
1、Rabbitmq 安装
Windos系统
pip install pika
ubuntu系统
install rabbitmq-server # 直接搞定
以下centos系统
1)Install Erlang
# For EL5:
rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
# For EL6:
rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
# For EL7:
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm
yum install erlang
2)Install RabbitMQ Server
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpm
3)use RabbitMQ Server
chkconfig rabbitmq-server on
service rabbitmq-server stop/start
或者
rabbitmq-server start
rabbitmq已经开启,等待传输
2、基本示例
发送端 producer
import pika
# 建立一个实例;相当于建立一个socket。
#通过ctrl+ConnectionParameters可以看到能传很多参数,如果远程还可以传用户名密码。
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost',5672) # 默认端口5672,可不写
)
# 声明一个管道,在管道里发消息
channel = connection.channel()
# 在管道里声明一个叫hello的queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello', # queue名字,将消息发给hello这个queue
body='Hello World!') # 消息内容
print(" [x] Sent 'Hello World!'")
connection.close() # 发完消息后关闭队列
执行结果:
[x] Sent 'Hello World!'
注意一定要开启rabbitmq,否则会报错
接收端 consumer
import pika
import time
# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# 声明管道
channel = connection.channel()
# 为什么又声明了一个‘hello’队列?
# 如果这个queue确定已经声明了,可以不声明。但是你不知道是生产者还是消费者先运行,所以要声明两次。如果消费者没声明,且消费者先运行的话,就会报错。
# 生产者先声明queue,消费者不声明,但是消费者后运行就不会报错。
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body): # 四个参数为标准格式
print(ch, method, properties) # 打印看一下是什么
# ch是管道内存对象地址;method是内容相关信息 properties后面讲 body消息内容
print(" [x] Received %r" % body)
#time.sleep(15)
#ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume( # 消费消息
'hello', # 你要从哪个队列里收消息
callback, # 如果收到消息,就调用callback函数来处理消息 # 注意调用的函数(callback)以前在basic_consume模块是放在形参第一个位置的,后面修改到第二个位置了,如果放错位置会报错
# auto_ack=True # 写的话,如果接收消息,机器宕机消息就丢了
# 一般不写。宕机则生产者检测到发给其他消费者
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 开始消费消息(开始接收消息,一直收,如果没消息就卡主在这里了)
执行结果:
[*] Waiting for messages. To exit press CTRL+C
params=>>>
[x] Received b'Hello World!'
收到了bytes格式的 Hello World!
消费者(接收端)这边看到已经卡主了
如果此时单独在运行一下生产者(发送端),直接可以从消费者看到新收到的消息
rabbitmq 消息分发轮询
重新开启rabbitmq
运行三个接收者(消费者)
运行发送者,可以看到被第一个接收者给收到信息了
第二次运行发送者,第二个接收者收到信息了
第三次运行发送者,第三个接收者收到信息了
上面几次运行说明了,依次的将信息发送每一个接收者
接收端 consumer
import pika
import time
# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# 声明管道
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
# 正常回调函数(callback)执行完成就表示信息接收完成,如果在还没执行完成时就出现异常就表示信息没有正常接收,比如断网、断电等,会导致信息不能正常接收。
# 下面sleep 60秒,在60秒之前就将该模块终止执行来模拟异常情况。
time.sleep(60)
#ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
'hello',
callback,
# auto_ack=True 表示不管消息是否接收(处理)完成,都不会回复确认消息
# 如果producer不关心 comsumer是否处理完,可以使用该参数
# 但是一般都不会使用它
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #
在centos中重新执行rabbitmq-server start来清空队列里的消息
然后在pycharm开启三个comsumer,在去运行等待接收消息
再去执行producer来发送消息,执行producer后,立即关闭第一个comsumer,这样消息就会因为第一个comsumer没接收成功跑到第二个comsumer去,以此类推。
关闭第二个comsumer,第三个comsumer收到信息
这张图是将三个comsumer同时都关闭了,这样三个comsumer都收不到消息,说明producer的消息没有被接收,此时再去开启第一个comsumer,这时第一个comsumer会将消息给接收过来。
我们将sleep注释掉,也是这种现象,这是因为comsumer并没有发送确认消息给producer
import pika
import time
# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# 声明管道
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
time.sleep(30)
ch.basic_ack(delivery_tag = method.delivery_tag) # 告诉生成者,消息处理完成
channel.basic_consume(
'hello',
callback,
# auto_ack=True
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #
此时的代码:当其中一个comsumer执行完成,并发送确认消息后再去中断,下一个comsumer就不会收到消息;反之,如果还没发送确认消息就中断了,那么消息就会被下一个comsumer接收到。
rabbitmq 消息持久化
如果producer端宕机,那么队列的数据也会消失;这样就需要让队列消息持久化
# durable=True 该代码只是将生成的队列持久化(不是消息),如果producer宕机,队列会存在,单消息会丢
# 要注意需要在producer端和 comsumer端都要 写durable=True
channel.queue_declare(queue='hello',durable=True)
在centos重新开启 rabbitmq-server start
在producer端
将producer代码执行三次,将三个消息放入队列
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost',5672)
)
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
# 下面的代码是让消息持久化
properties = pika.BasicProperties(delivery_mode=2)
)
print(" [x] Sent 'Hello World!'")
connection.close()
将producer代码执行三次,将三个消息放入队列
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
# time.sleep(30) #注释掉
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
'hello',
callback
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #
可以看到因为producer执行了三次,所以运行comsumer端收到了三条消息
- 协商处理
producer端没有改变
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost',5672)
)
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties = pika.BasicProperties(delivery_mode=2)
)
print(" [x] Sent 'Hello World!'")
connection.close()
comsumer 1(消费者:1)
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
# time.sleep(30) #注释掉
ch.basic_ack(delivery_tag = method.delivery_tag)
# channel.basic_qos可以使其消费者最多同时多少个消息;如果其中一个消费者处理慢(如:CPU处理性能低下),达到了最多处理的限制的话 生产者就不会再发送给该消费者。
channel.basic_qos(prefetch_count=1) #这里限制最多同时只处理1个消息
channel.basic_consume(
'hello',
callback
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #
此时有两个comsumer模块,comsumer2比comsumer1多用了sleep 30秒来模拟性能处理慢的情况
comsumer 2(消费者:2)
复制一个comsumer模块为comsumer2
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
time.sleep(30) #comsumer2这里sleep30秒
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
'hello',
callback
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #
我们运行两个comsumer后,一直去运行producer。 可以看到comsumer 1接收到了3条信息,而comsumer 2只接收到了1条信息,这是因为comsumer 2 sleep了30秒来模拟信息处理慢的情况;
comsumer 1 和 comsumer 2都指定了同时只能处理1条信息,producer会与comsumer 2协商,因为comsumer2一直没有处理完限制的1条信息,所以信息都被comsumer 1处理了。
Rabbitmq fanout广播模式
新建fanout_publiser模块,用于发送广播的producer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
# 定义一个转发器叫logs,属于一个中间人的角色,用于将producer的消息转发给消费者(comsumer)
# 定义广播类型使用fanout
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# message = ''.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message) # routing_key为空即可,因为是广播没有定义队列,所以也不需要指定队列,但这里必须要定义为空
print(" [x] Send %r " % message)
connection.close()
新建fanout_consumer模块,用于接收广播的消费者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费断开后,自动将queue删除
# 就是这里会随机生成一个随机的唯一queue,用完之后会将生成的queue删除
# 这里要写queue='',如果不指定队列名字,但也要写一个空的字符串,不然会报错缺少参数
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue # 拿到随机生成的queue名字
# producer绑定了logs转发器
# 消费者将随机生成的队列也绑定了logs转发器
# 这样producer将消息交给logs转发器,logs转发器将消息交给对应绑定的随机队列,消费者从队列里在拿消息
channel.queue_bind(exchange='logs',queue=queue_name)
print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print("[x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback
# auto_ack=True # 写的话,如果接收消息,机器宕机消息就丢了
)
channel.start_consuming()
因为是广播,所以两个consumer都收到了发送者发送的消息。
不过有一点要注意!!!!!!!!!
要先运行consumer(接收者),在运行发送者。就好比收音机一样,只有你先打开收音机,发送者才能将信息发给你。 如果发送者先发送,你却没有接收,之前发送的信息,你就不会再接收到了。
Rabbitmq direct广播模式
direct 可以区分广播,将指定的消息发送给指定的接收者;
图中显示了将error级别消息发送给C1,将info、error、warning级别消息发送给C2。
producer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 定义消息级别
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ''.join(sys.argv[2:]) or "direct info: Hello World!"
# message = "direct info: Hello World!"
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message) # routing_key为空即可,因为是广播没有定义队列,所以也不需要指定队列,但这里必须要定义为空
print(" [x] Send %r " % message)
connection.close()
consumer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
# 获取参数列表
log_levels = sys.argv[1:]
if not log_levels: # 如果没有参数,就报错,提示要指定消息级别
sys.stderr.write("Usage: %s [info] [warning] [error] \n" % sys.argv[0])
sys.exit(1) # 没有参数就退出程序
# print(log_levels)
for severity in log_levels: # 循环参数列表并绑定
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity
) #所有发送到severity的参数,都接收
print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print('[x] %r:%r' % (method.routing_key, body))
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
下面在centos中运行代码
运行C1,只接收error的消息
运行C2,接收 info、warning、error的消息
producer运行,指定发送消息给error,可以看到两个consumer都接收到了error的消息
只有C2接收到了warning的消息
RabbitMQ topic细致的消息过滤广播模式
producer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 定义消息级别
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' # 发送*.info的信息
message = ''.join(sys.argv[2:]) or "topic info: Hello World!"
# message = "direct info: Hello World!"
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Send %r:%r " % (routing_key,message))
connection.close()
consumer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key=binding_key
)
print('[*] Waiting for logs. To exit press CTRL+c')
def callback(ch, method, properties, body):
print('[x %r:%r' % (method.routing_key, body))
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
图中显示过滤中间有".orange."的数据,过滤以rabbit为结尾的数据,过滤以lazy开头的数据。
运行了两个consumer。C1接收.info为结尾的数据,C2接收.error为结尾和MySQL为开头的数据。
在运行publisher(已经定义了发送anonymous.info,相当于以.info为结尾的信息)
C1接收到了信息
执行publisher代码时 后面加上 test.error,然后此时在去看C2
C2 看到test.error相关信息
执行publisher代码 加上 mysql.info,这样C1和C2都可以收到消息了
运行C3,代码后面加一个 '#' 符号,表示C3可以接收所有信息(注意#号要被引号括起来)
在publisher随意发送信息,C3都能收到
当前名称:Python38RabbitMQ消息队列
文章起源:http://myzitong.com/article/ggooeg.html