当前位置: 首页 > news >正文

网站建设培训珠海温县住房和城乡建设局网站

网站建设培训珠海,温县住房和城乡建设局网站,合肥建设工程质量监督局网站,宁波seo排名方案优化1.RabbitMQ简介AMQP#xff0c;即Advanced Message Queuing Protocol#xff0c;高级消息队列协议#xff0c;是应用层协议的一个开放标准#xff0c;为面向消息的中间件设计。消息中间件主要用于组件之间的解耦#xff0c;消息的发送者无需知道消息使用者的存在#xff…1.RabbitMQ简介AMQP即Advanced Message Queuing Protocol高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。消息中间件主要用于组件之间的解耦消息的发送者无需知道消息使用者的存在反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。RabbitMQ是一个开源的AMQP实现服务器端用Erlang语言编写支持多种客户端如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等支持AJAX。用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。2.RabbitMQ能为你做些什么消息系统允许软件、应用相互连接和扩展这些应用可以相互链接起来组成一个更大的应用或者将用户设备和数据进行连接消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶或许你正在考虑进行数据投递非阻塞操作或推送通知。或许你想要实现发布订阅异步处理或者工作队列。所有这些都可以通过消息系统实现。RabbitMQ是一个消息代理- 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台并且保证消息在传输过程中的安全。3.RabbitMQ 安装使用4.Python应用RabbitMQpython操作RabbitMQ的模块有三种pikaCeleryHaigha。本文使用的是pika。RabbitMQ-生产者。importpika声明socketconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))声明一个管道channelconnection.channel()定义一个queue定义queue名称标识channel.queue_declare(queuehello)定义queue中的消息内容channel.basic_publish(exchange,routing_keyhello,bodyHello World!)print([x] Sent Hello World!)RabbitMQ-消费者。importpika声明socketconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))声明一个管道channelconnection.channel()定义一个queue定义queue名称标识与生产者队列中对应channel.queue_declare(queuehello)defcallback(ch,method,properties,body):print(rev--,ch,method,properties,body)print(rev messages--,body)消费接收消息...channel.basic_consume(consumer_callbackcallback, #如果收到消息则回调这个函数处理消息queuehello, #queue_declare(queuehello) 对应no_ackTrue)消费者会一直监听这queue,如果队列中没有消息则会卡在这里等待消息队列中生成消息。print(waiting for meassages, to exit press CTRLC)channel.start_consuming()5.RabbitMQ消息持久化importpikaqueue_name xiaoxi_声明socketconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))声明一个管道channelconnection.channel()定义一个queue定义queue名称标识queue,durable 持久化channel.queue_declare(queuequeue_name)whileTrue:input_value input(:).strip()ifinput_value:定义queue中的消息内容print(producer messages:{0}.format(input_value))channel.basic_publish(exchange,routing_keyqueue_name,bodyinput_value,propertiespika.BasicProperties( #消息持久化.....delivery_mode2,))continueproducer.pyimportpika,timequeue_name xiaoxi_声明socketconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))声明一个管道channelconnection.channel()定义一个queue定义queue名称标识channel.queue_declare(queuequeue_name)defcallback(ch,method,properties,body):print(rev--,ch,method,properties,body)#time.sleep(5) # 模拟消费者丢失生产者发送的消息,生产者消息队列中的这一条消息则不会删除。print(rev messages--,body)手动向生产者确认收到消息#ch.basic_ack(delivery_tagmethod.delivery_tag)消费接收消息...channel.basic_consume(consumer_callbackcallback, #如果收到消息则回调这个函数处理消息queuequeue_name,#no_ackTrue #接收到消息主动向生产者确认已经接收到消息。)print(waiting for meassages, to exit press CTRLC)channel.start_consuming()consumer.py6.RabbitMQ消息公平分发importpikaqueue_name xiaoxi_1声明socketconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))声明一个管道channelconnection.channel()定义一个queue定义queue名称标识queue,durable 持久化channel.queue_declare(queuequeue_name)whileTrue:input_value input(:).strip()ifinput_value:定义queue中的消息内容print(producer messages:{0}.format(input_value))channel.basic_publish(exchange,routing_keyqueue_name,bodyinput_value,)continueproducer.pyimportpika,timequeue_name xiaoxi_1声明socketconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))声明一个管道channelconnection.channel()定义一个queue定义queue名称标识queue,durable 持久化channel.queue_declare(queuequeue_name)defcallback(ch,method,properties,body):print(rev--,ch,method,properties,body)print(rev messages--,body)模拟处理消息快慢速度time.sleep(1)ch.basic_ack(delivery_tagmethod.delivery_tag)根据消费者处理消息的快慢公平分发消息channel.basic_qos(prefetch_count1)消费接收消息...channel.basic_consume(consumer_callbackcallback, #如果收到消息则回调这个函数处理消息queuequeue_name,#no_ackTrue #接收到消息主动向生产者确认已经接收到消息。)print(waiting for meassages, to exit press CTRLC)channel.start_consuming()consumer.py7.RabbitMQ-广播模式。消息的发送模式类型1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式所有的consumer都能收到。2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 指定唯一的。3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。表达式符号说明#代表一个或多个字符*代表任何字符例#.a会匹配a.aaa.aaaa.a等*.a会匹配a.ab.ac.a等注使用RoutingKey为#Exchange Type为topic的时候相当于使用fanout4.headers: 通过headers 来决定把消息发给哪些queue (少用)7.1 topic 广播模式。importpika声明socketconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))声明一个管道channelconnection.channel()通过routingKey和exchange决定的那个唯一的queue可以接收消息 指定唯一的。exchange_name topic_messages1routing_key my_topic定义exchage模式 direct广播模式channel.exchange_declare(exchangeexchange_name,exchange_typetopic)消息的发送模式类型1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式所有的consumer都能收到。2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 指定唯一的。3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。表达式符号说明#代表一个或多个字符*代表任何字符例#.a会匹配a.aaa.aaaa.a等*.a会匹配a.ab.ac.a等注使用RoutingKey为#Exchange Type为topic的时候相当于使用fanout4.headers: 通过headers 来决定把消息发给哪些queue (少用)whileTrue:input_value input(:).strip()ifinput_value:定义queue中的消息内容print(producer messages:{0}.format(input_value))channel.basic_publish(exchangeexchange_name,routing_keyrouting_key,bodyinput_value,)continueproducer.pyimportpika,time声明socketconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))声明一个管道channelconnection.channel()通过routingKey和exchange决定的那个唯一的queue可以接收消息 指定唯一的。exchange_name topic_messages1routing_key my_topicchannel.exchange_declare(exchangeexchange_name,exchange_typetopic)不指定queue名字,rabbit会随机分配一个名字,exclusiveTrue会在使用此queue的消费者断开后,自动将queue删除res channel.queue_declare(exclusiveTrue)queue_nameres.method.queuechannel.queue_bind(exchangeexchange_name,queuequeue_name,routing_keyrouting_key)print(direct_key:{0}.format(routing_key))print(queue_name:{0}.format(queue_name))defcallback(ch,method,properties,body):print(rev--,ch,method,properties,body)print(rev messages--,body)ch.basic_ack(delivery_tagmethod.delivery_tag)消费接收消息...channel.basic_consume(consumer_callbackcallback, #如果收到消息则回调这个函数处理消息queuequeue_name,)print(waiting for meassages, to exit press CTRLC)channel.start_consuming()consumer.py7.2 direct 广播模式importpikaconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))channelconnection.channel()通过routingKey和exchange决定的那个唯一的queue可以接收消息 指定唯一的。exchange_name direct_messagesrouting_key my_direct定义exchage模式 direct广播模式消息的发送模式类型1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式所有的consumer都能收到。2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 指定唯一的。3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。表达式符号说明#代表一个或多个字符*代表任何字符例#.a会匹配a.aaa.aaaa.a等*.a会匹配a.ab.ac.a等注使用RoutingKey为#Exchange Type为topic的时候相当于使用fanout4.headers: 通过headers 来决定把消息发给哪些queue (少用)channel.exchange_declare(exchangeexchange_name,exchange_typedirect)channel.basic_publish(exchangeexchange_name,routing_keyrouting_key,bodyhello word!,)#while True:#input_value input(:).strip()#if input_value:#定义queue中的消息内容#print(producer messages:{0}.format(input_value))#channel.basic_publish(#exchangeexchange_name,#routing_keyrouting_key,#bodyinput_value,#)#continueproducer.pyimportpika,timeconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))channelconnection.channel()exchange_name direct_messagesrouting_key my_directchannel.exchange_declare(exchangeexchange_name,exchange_typedirect)不指定queue名字,rabbit会随机分配一个名字,exclusiveTrue会在使用此queue的消费者断开后,自动将queue删除res channel.queue_declare(exclusiveTrue)queue_nameres.method.queuechannel.queue_bind(exchangeexchange_name,queuequeue_name,routing_keyrouting_key)print(direct_key:{0}.format(routing_key))print(queue_name:{0}.format(queue_name))defcallback(ch,method,properties,body):print(rev--,ch,method,properties,body)print(rev messages--,body)ch.basic_ack(delivery_tagmethod.delivery_tag)消费接收消息...channel.basic_consume(consumer_callbackcallback, #如果收到消息则回调这个函数处理消息queuequeue_name,)print(waiting for meassages, to exit press CTRLC)channel.start_consuming()consumer.py7.3 fanout 广播模式importpika声明socketconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))声明一个管道channelconnection.channel()exchange_name messages定义exchage模式 fanout广播模式channel.exchange_declare(exchangeexchange_name,exchange_typefanout)消息的发送模式类型1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式所有的consumer都能收到。2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 指定唯一的。3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。表达式符号说明#代表一个或多个字符*代表任何字符例#.a会匹配a.aaa.aaaa.a等*.a会匹配a.ab.ac.a等注使用RoutingKey为#Exchange Type为topic的时候相当于使用fanout4.headers: 通过headers 来决定把消息发给哪些queue (少用)whileTrue:input_value input(:).strip()ifinput_value:定义queue中的消息内容print(producer messages:{0}.format(input_value))channel.basic_publish(exchangeexchange_name,routing_key,bodyinput_value,)continueproducer.pyimportpika,time声明socketconnectionpika.BlockingConnection(pika.ConnectionParameters(localhost))声明一个管道channelconnection.channel()exchange_name messageschannel.exchange_declare(exchangeexchange_name,exchange_typefanout)不指定queue名字,rabbit会随机分配一个名字,exclusiveTrue会在使用此queue的消费者断开后,自动将queue删除res channel.queue_declare(exclusiveTrue)queue_nameres.method.queuechannel.queue_bind(exchangeexchange_name,queuequeue_name)每一个消费者随机一个唯一的queue_nameprint(queue_name:{0},format(queue_name))defcallback(ch,method,properties,body):print(rev--,ch,method,properties,body)print(rev messages--,body)ch.basic_ack(delivery_tagmethod.delivery_tag)消费接收消息...channel.basic_consume(consumer_callbackcallback, #如果收到消息则回调这个函数处理消息queuequeue_name,#no_ackTrue #接收到消息主动向生产者确认已经接收到消息。)print(waiting for meassages, to exit press CTRLC)channel.start_consuming()consumer.py8 RabbitMQ 实现 RPCRabbitMQ-生产者。利用rabbitMQ 实现一个能收能发的RPC小程序。重点需要注意的是queue的绑定。接收的一端必选预先绑定queue生成队列,发送端才能根据queue发送。importpika,uuid,timeclassrabbitmqClient(object):def __init__(self,rpc_queue):self.rpc_queuerpc_queueself.app_idstr(uuid.uuid4())self.connection pika.BlockingConnection(pika.ConnectionParameters(localhost))self.channelself.connection.channel()生成一个自动queue传过去server,server再往这个自动queue回复数据autoqueue self.channel.queue_declare(exclusiveTrue)self.callback_queueautoqueue.method.queue先定义一个接收回复的动作self.channel.basic_consume(self.on_response, no_ackTrue,queueself.callback_queue)defon_response(self,ch,method,properties,body):if self.app_id properties.app_id:self.responsebodydefsend(self,msg):self.responseNoneself.channel.basic_publish(exchange,routing_keyself.rpc_queue,propertiespika.BasicProperties(reply_toself.callback_queue,app_idself.app_id,),bodystr(msg))#发送完消息进入接收模式。while self.response isNone:#print(callback_queue:{0} app_id:{1} wait....format(self.callback_queue,self.app_id))self.connection.process_data_events()#time.sleep(0.5)returnself.responserpc_request_queue rpc_request_queuerbrabbitmqClient(rpc_request_queue)whileTrue:msg input(input :).strip()ifmsg:print(rpc_queue:{0} app_id:{1}.format(rb.rpc_queue,rb.app_id))print(send msg:{}.format(msg))reponsesrb.send(msg)print(reponses msg:{}.format(reponses.decode(utf-8)))continueclient.pyRabbitMQ-消费者。importpikaclassrabbitmqServer(object):def __init__(self,rpc_queue):self.rpc_queuerpc_queueself.connection pika.BlockingConnection(pika.ConnectionParameters(localhost))self.channelself.connection.channel()self.channel.queue_declare(queueself.rpc_queue)defon_reponses(self,ch,method,properties,body):ifbody:#reponser ...ch.basic_publish(exchange,routing_keyproperties.reply_to,propertiespika.BasicProperties(reply_toproperties.reply_to,app_idproperties.app_id,),bodyreponses ok! msg is:{}.format(body.decode(utf-8)))defstart_consuming(self):self.channel.basic_consume(consumer_callbackself.on_reponses,queueself.rpc_queue,no_ackTrue)print(waiting for meassages, to exit press CTRLC)self.channel.start_consuming()rpc_request_queue rpc_request_queuerd_serverrabbitmqServer(rpc_request_queue)rd_server.start_consuming()server.py
http://www.pierceye.com/news/785815/

相关文章:

  • asp 网站名字免费的公众号排版工具
  • 郑州响应式建站查企业的信息在哪个官网
  • 大型企业网站开发怎么使用免费的wordpress
  • 大连做网站大公司建设项目咨询公司网站
  • 教育培训网站建设方案鞍山建设信息网站
  • 重庆网站建设哪家强平台如何做推广
  • 安徽省建设安全监督站的网站网站建设公司一般多少钱
  • 服装网站建设策划书3000字软件开发包含网站开发吗
  • 免费网站的建设绵阳网站建设制作
  • 学生处网站建设招标公告网站包括哪些主要内容
  • 成都门户网站建设多少钱聚合广告联盟
  • 坦克大战网站开发课程设计报告软文营销的本质
  • 美食网站开发网站登录验证码是怎么做的
  • 电子商务网站排名辽宁省建设工程信息网业绩公示
  • 天津建设科技杂志的官方网站wordpress cnzz插件
  • 滨州建设网站太原网站建设优化
  • 记事本做网站怎么改字体包装设计模板设计素材
  • 下载软件的网站推荐thinkphp和wordpress
  • 青海省城乡和住房建设厅网站合肥小吃培训网页设计
  • 财经门户网站建设django校园网站开发
  • 泉州网站建设报价广东建设厅网站
  • 建设网站的源代码的所有权wordpress网站打开慢
  • 印度外贸网站有哪些家居小程序源码下载
  • 上海网站建设中心pc官方网站
  • 深圳企业网站制作公司查询西安网站设计哪家好
  • 大埔做网站手机qq邮箱发布了wordpress
  • 寻找南昌网站设计单位网站建设 中企动力医院
  • 中间商可以做网站吗平面广告设计师的工作内容
  • 网站建设用户分析做网站有什么软件
  • 洛阳网站建设启辰网络wordpress怎么破解查看