网站建设保密协议范本,wordpress不能上传到,搭建模板,wordpress的域名不备案简介 MessageQueue用于解决跨进程、跨线程、跨应用、跨网络的通信问题。 RabbitMQ使用erlang开发#xff0c;在windows上使用时要先安装erlang。 官方的示例比较容易理解#xff0c;可以点这里去看看。 结构 生产者 --- exchange --- queue --- 消费者。 生产者负… 简介 MessageQueue用于解决跨进程、跨线程、跨应用、跨网络的通信问题。 RabbitMQ使用erlang开发在windows上使用时要先安装erlang。 官方的示例比较容易理解可以点这里去看看。 结构 生产者 --- exchange --- queue --- 消费者。 生产者负责提供消息exchange负责分发消息到指定queuequeue存储消息默认临时可设置持久化消费者接收处理消息。 基本模型 流程 建立到rabbitmq的连接建立通道声明使用的队列生产者和消费者都要声明因为不能确定两者谁先运行生产/消费持续监听/关闭连接python中使用pika模块来处理与rabbitmq的连接。 # 生产者
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost)
)
channel connection.channel()
r channel.queue_declare(queuename, exclusiveFalse, durableFalse) # exclusive设置True是随机生成一个queue名字并返回durable设置True是队列持久化
queue_name r.method.queuechannel.basic_publish(exchange , # 使用默认分发器routing_key queue_name,properties pika.BasicProperties(delivery_mode 2 # 这个参数用于设置消息持久化),body [data]
)connection.close()# 消费者
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost)
)
channel connection.channel()
r channel.queue_declare(queuename, exclusiveFalse, durableFalse)
queue_name r.method.queuedef callback(channel, method, properties, body):pass# channel.basic_ack(delivery_tag method.delivery_tag) 在回调函数最后调用手工应答表示消息处理完毕queue可以删除消息了channel.basic_consume(callback, # 这是个回调函数接收生产者发来的bodyqueue queue_name,no_ack True # 设置True表示消息一经获取就被从queue中删除如果这时消费者崩溃则这条消息将永久丢失所以去掉这个属性在回调函数中手工应答比较安全
)channel.basic_qos(prefetch_count [num]) # 设置消费者的消费能力数字越大则说明该消费者能力越强往往与设备性能成正比channel.start_consuming() # 阻塞模式获取消息
# connection.process_data_events() 非阻塞模式获取消息 发布订阅模型 类似收音机广播订阅者只要打开收音机就能收听信息但接收不到它打开之前的消息。 包括发布订阅模型以及接下来的一些模型都是通过exchange和routing_key这两个属性来控制的。直接用官网的源码来做注释。 流程 发布者设置发布频道收听者配置频道信息收听者通过随机queue绑定频道接收消息# 发布者
#!/usr/bin/env python
import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()# 创建一个命名exchange并设置其type为fanout表示广播
channel.exchange_declare(exchangelogs,exchange_typefanout)# 从命令行接收输入
message .join(sys.argv[1:]) or info: Hello World!# 由于是广播模式任意消费者只要设置同样的exchange就能以任意queue来接收消息所以这里routing_key置空
channel.basic_publish(exchangelogs,routing_key,bodymessage)
print( [x] Sent %r % message)
connection.close()# 收听者
#!/usr/bin/env python
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()# 这里使用同样的exchange配置就像调节收音机频道
channel.exchange_declare(exchangelogs,exchange_typefanout)# 在基础模型中提到过设置exclusiveTrue表示生成随机的queue
result channel.queue_declare(exclusiveTrue)
queue_name result.method.queue# 生成了queue还要将它与exchange进行绑定这样消息才能通过exchange进入queue
channel.queue_bind(exchangelogs,queuequeue_name)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r % body)channel.basic_consume(callback,queuequeue_name,no_ackTrue)channel.start_consuming() 路由/级别模型 将消息发送到指定的路由处类似于logging模块的分级日志消息。 主要利用channel.queue_bind(routing_key[route])这个方法来为queue增加路由。 流程 生产者向指定路由发送消息消费者绑定路由根据路由接收到不同的消息# 生产者
#!/usr/bin/env python
import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()# 同样使用命名exchange主要是type为direct
channel.exchange_declare(exchangedirect_logs,exchange_typedirect)# 将命令行输入的路由作为接收消息的queue的属性只有匹配的才能接收到消息
severity sys.argv[1] if len(sys.argv) 2 else info
message .join(sys.argv[2:]) or Hello World!
channel.basic_publish(exchangedirect_logs,routing_keyseverity, bodymessage)
print( [x] Sent %r:%r % (severity, message))
connection.close()# 消费者
#!/usr/bin/env python
import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.exchange_declare(exchangedirect_logs,exchange_typedirect)result channel.queue_declare(exclusiveTrue)
queue_name result.method.queue# 指定该消费者接收的消息路由
severities sys.argv[1:]
if not severities:sys.stderr.write(Usage: %s [info] [warning] [error]\n % sys.argv[0])sys.exit(1)# 对该消费者的queue绑定路由
for severity in severities:channel.queue_bind(exchangedirect_logs,queuequeue_name,routing_keyseverity)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r:%r % (method.routing_key, body))channel.basic_consume(callback,queuequeue_name,no_ackTrue)channel.start_consuming() 细目模型/更细致的划分 这个模型比前几种更强大但是原理与路由模型是相同的。 如果routing_key#它就相当于发布订阅模式向所有queue发送消息如果routing_key值中不包含*#则相当于路由模型。 该模型主要是实现了模糊匹配。 # 生产者
#!/usr/bin/env python
import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.exchange_declare(exchangetopic_logs,exchange_typetopic)routing_key sys.argv[1] if len(sys.argv) 2 else anonymous.info
message .join(sys.argv[2:]) or Hello World!
channel.basic_publish(exchangetopic_logs,routing_keyrouting_key,bodymessage)
print( [x] Sent %r:%r % (routing_key, message))
connection.close()# 消费者
#!/usr/bin/env python
import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.exchange_declare(exchangetopic_logs,exchange_typetopic)result channel.queue_declare(exclusiveTrue)
queue_name result.method.queuebinding_keys sys.argv[1:]
if not binding_keys:sys.stderr.write(Usage: %s [binding_key]...\n % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchangetopic_logs,queuequeue_name,routing_keybinding_key)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r:%r % (method.routing_key, body))channel.basic_consume(callback,queuequeue_name,no_ackTrue)channel.start_consuming() RPC模型 前面的几种模型都只能是一端发消息另一端接收RPC模型实现的就是单端收发功能。 主要是通过两个队列实现一个发一个收。 流程 客户端发送消息到约定队列并且附带返回队列的名称和验证id服务器接到消息将处理过的消息发送给指定队列并附带验证id客户端接到消息先验证id通过则处理消息# 服务器
#!/usr/bin/env python
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.queue_declare(queuerpc_queue)def fib(n):if n 0:return 0elif n 1:return 1else:return fib(n-1) fib(n-2)def on_request(ch, method, props, body):n int(body)print( [.] fib(%s) % n)response fib(n)ch.basic_publish(exchange,routing_keyprops.reply_to,propertiespika.BasicProperties(correlation_id \props.correlation_id),bodystr(response))ch.basic_ack(delivery_tag method.delivery_tag)channel.basic_qos(prefetch_count1)
channel.basic_consume(on_request, queuerpc_queue)print( [x] Awaiting RPC requests)
channel.start_consuming()# 客户端
#!/usr/bin/env python
import pika
import uuidclass FibonacciRpcClient(object):def __init__(self):self.connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))self.channel self.connection.channel()result self.channel.queue_declare(exclusiveTrue)self.callback_queue result.method.queueself.channel.basic_consume(self.on_response, no_ackTrue,queueself.callback_queue)def on_response(self, ch, method, props, body):if self.corr_id props.correlation_id:self.response bodydef call(self, n):self.response Noneself.corr_id str(uuid.uuid4())self.channel.basic_publish(exchange,routing_keyrpc_queue,propertiespika.BasicProperties(reply_to self.callback_queue,correlation_id self.corr_id,),bodystr(n))while self.response is None:self.connection.process_data_events()return int(self.response)fibonacci_rpc FibonacciRpcClient()print( [x] Requesting fib(30))
response fibonacci_rpc.call(30)
print( [.] Got %r % response) 转载于:https://www.cnblogs.com/ikct2017/p/9434468.html