国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

python RabbitMQ 使用詳細(xì)介紹(小結(jié))

2020-01-04 14:07:04
字體:
供稿:網(wǎng)友

上節(jié)回顧

主要講了協(xié)程、進(jìn)程、異步IO多路復(fù)用。 

協(xié)程和IO多路復(fù)用都是單線程的。

epoll  在linux下通過這個(gè)模塊libevent.so實(shí)現(xiàn)
gevent  在底層也是用了libevent.so

gevent可以理解為一個(gè)更上層的封裝。 

使用select或者selectors,每接收或發(fā)送數(shù)據(jù)一次都要select一次

twisted異步網(wǎng)絡(luò)框架,強(qiáng)大又龐大,不支持python3 (代碼量python中排top3)。幾乎把所有的網(wǎng)絡(luò)服務(wù)都重寫了一遍。

一、RabbitMQ 消息隊(duì)列介紹

RabbitMQ也是消息隊(duì)列,那RabbitMQ和之前python的Queue有什么區(qū)別么?

py 消息隊(duì)列:
    線程 queue(同一進(jìn)程下線程之間進(jìn)行交互)
    進(jìn)程 Queue(父子進(jìn)程進(jìn)行交互 或者 同屬于同一進(jìn)程下的多個(gè)子進(jìn)程進(jìn)行交互)

如果是兩個(gè)完全獨(dú)立的python程序,也是不能用上面兩個(gè)queue進(jìn)行交互的,或者和其他語言交互有哪些實(shí)現(xiàn)方式呢。 

【Disk、Socket、其他中間件】這里中間件不僅可以支持兩個(gè)程序之間交互,可以支持多個(gè)程序,可以維護(hù)好多個(gè)程序的隊(duì)列。

像這種公共的中間件有好多成熟的產(chǎn)品: 
RabbitMQ 
ZeroMQ 
ActiveMQ 
……

RabbitMQ:erlang語言 開發(fā)的。 

Python中連接RabbitMQ的模塊:pika 、Celery(分布式任務(wù)隊(duì)列) 、haigha 

可以維護(hù)很多的隊(duì)列

RabbitMQ 教程官網(wǎng):http://www.rabbitmq.com/getstarted.html

幾個(gè)概念說明:

Broker:簡單來說就是消息隊(duì)列服務(wù)器實(shí)體。 
Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。 
Queue:消息隊(duì)列載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列。 
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。 
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。 
vhost:虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離。 
producer:消息生產(chǎn)者,就是投遞消息的程序。 
consumer:消息消費(fèi)者,就是接受消息的程序。 
channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)

二、RabbitMQ基本示例.

1、Rabbitmq 安裝

ubuntu系統(tǒng)

install rabbitmq-server # 直接搞定

以下centos系統(tǒng) 

1)Install Erlang

# For EL5:rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm# For EL6:rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm# For EL7:rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpmyum install erlang

2)Install RabbitMQ Server

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.ascyum install rabbitmq-server-3.6.5-1.noarch.rpm

3)use RabbitMQ Server

chkconfig rabbitmq-server onservice rabbitmq-server stop/start

2、基本示例

發(fā)送端 producer

import pika# 建立一個(gè)實(shí)例connection = pika.BlockingConnection(  pika.ConnectionParameters('localhost',5672) # 默認(rèn)端口5672,可不寫  )# 聲明一個(gè)管道,在管道里發(fā)消息channel = connection.channel()# 在管道里聲明queuechannel.queue_declare(queue='hello')# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.channel.basic_publish(exchange='',           routing_key='hello', # queue名字           body='Hello World!') # 消息內(nèi)容print(" [x] Sent 'Hello World!'")connection.close() # 隊(duì)列關(guān)閉

接收端 consumer

import pikaimport time# 建立實(shí)例connection = pika.BlockingConnection(pika.ConnectionParameters(        'localhost'))# 聲明管道channel = connection.channel()# 為什么又聲明了一個(gè)‘hello'隊(duì)列?# 如果確定已經(jīng)聲明了,可以不聲明。但是你不知道那個(gè)機(jī)器先運(yùn)行,所以要聲明兩次。channel.queue_declare(queue='hello')def callback(ch, method, properties, body): # 四個(gè)參數(shù)為標(biāo)準(zhǔn)格式  print(ch, method, properties) # 打印看一下是什么  # 管道內(nèi)存對象 內(nèi)容相關(guān)信息 后面講  print(" [x] Received %r" % body)  time.sleep(15)  ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生成者,消息處理完成channel.basic_consume( # 消費(fèi)消息    callback, # 如果收到消息,就調(diào)用callback函數(shù)來處理消息    queue='hello', # 你要從那個(gè)隊(duì)列里收消息    # no_ack=True # 寫的話,如果接收消息,機(jī)器宕機(jī)消息就丟了    # 一般不寫。宕機(jī)則生產(chǎn)者檢測到發(fā)給其他消費(fèi)者    )print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming() # 開始消費(fèi)消息

3、RabbitMQ 消息分發(fā)輪詢

上面的只是一個(gè)生產(chǎn)者、一個(gè)消費(fèi)者,能不能一個(gè)生產(chǎn)者多個(gè)消費(fèi)者呢? 

可以上面的例子,多啟動(dòng)幾個(gè)消費(fèi)者consumer,看一下消息的接收情況。 

采用輪詢機(jī)制;把消息依次分發(fā)

假如消費(fèi)者處理消息需要15秒,如果當(dāng)機(jī)了,那這個(gè)消息處理明顯還沒處理完,怎么處理? 

(可以模擬消費(fèi)端斷了,分別注釋和不注釋 no_ack=True 看一下) 

你沒給我回復(fù)確認(rèn),就代表消息沒處理完。

上面的效果消費(fèi)端斷了就轉(zhuǎn)到另外一個(gè)消費(fèi)端去了,但是生產(chǎn)者怎么知道消費(fèi)端斷了呢? 

因?yàn)樯a(chǎn)者和消費(fèi)者是通過socket連接的,socket斷了,就說明消費(fèi)端斷開了。

上面的模式只是依次分發(fā),實(shí)際情況是機(jī)器配置不一樣。怎么設(shè)置類似權(quán)重的操作? 

RabbitMQ怎么辦呢,RabbitMQ做了簡單的處理就能實(shí)現(xiàn)公平的分發(fā)。 

就是RabbitMQ給消費(fèi)者發(fā)消息的時(shí)候檢測下消費(fèi)者里的消息數(shù)量,如果超過指定值(比如1條),就不給你發(fā)了。

只需要在消費(fèi)者端,channel.basic_consume前加上就可以了。

channel.basic_qos(prefetch_count=1) # 類似權(quán)重,按能力分發(fā),如果有一個(gè)消息,就不在給你發(fā)channel.basic_consume( # 消費(fèi)消息

三、RabbitMQ 消息持久化(durable、properties)

1、RabbitMQ 相關(guān)命令

rabbitmqctl list_queues # 查看當(dāng)前queue數(shù)量及queue里消息數(shù)量

2、消息持久化

如果隊(duì)列里還有消息,RabbitMQ 服務(wù)端宕機(jī)了呢?消息還在不在? 

把RabbitMQ服務(wù)重啟,看一下消息在不在。 

上面的情況下,宕機(jī)了,消息就久了,下面看看如何把消息持久化。 

每次聲明隊(duì)列的時(shí)候,都加上durable,注意每個(gè)隊(duì)列都得寫,客戶端、服務(wù)端聲明的時(shí)候都得寫。

# 在管道里聲明queuechannel.queue_declare(queue='hello2', durable=True)

測試結(jié)果發(fā)現(xiàn),只是把隊(duì)列持久化了,但是隊(duì)列里的消息沒了。 

durable的作用只是把隊(duì)列持久化。離消息持久話還差一步: 

發(fā)送端發(fā)送消息時(shí),加上properties

properties=pika.BasicProperties(  delivery_mode=2, # 消息持久化  )

發(fā)送端 producer

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(        'localhost',5672)) # 默認(rèn)端口5672,可不寫channel = connection.channel()#聲明queuechannel.queue_declare(queue='hello2', durable=True) # 若聲明過,則換一個(gè)名字#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.channel.basic_publish(exchange='',           routing_key='hello2',           body='Hello World!',           properties=pika.BasicProperties(             delivery_mode=2, # make message persistent             )           )print(" [x] Sent 'Hello World!'")connection.close()

接收端 consumer

import pikaimport timeconnection = pika.BlockingConnection(pika.ConnectionParameters(        'localhost'))channel = connection.channel()channel.queue_declare(queue='hello2', durable=True)def callback(ch, method, properties, body):  print(" [x] Received %r" % body)  time.sleep(10)  ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生產(chǎn)者,消息處理完成channel.basic_qos(prefetch_count=1) # 類似權(quán)重,按能力分發(fā),如果有一個(gè)消息,就不在給你發(fā)channel.basic_consume( # 消費(fèi)消息           callback, # 如果收到消息,就調(diào)用callback           queue='hello2',           # no_ack=True # 一般不寫,處理完接收處理結(jié)果。宕機(jī)則發(fā)給其他消費(fèi)者           )print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()

四、RabbitMQ 廣播模式(exchange)

前面的效果都是一對一發(fā),如果做一個(gè)廣播效果可不可以,這時(shí)候就要用到exchange了 

exchange必須精確的知道收到的消息要發(fā)給誰。exchange的類型決定了怎么處理, 

類型有以下幾種:

  • fanout: 所有綁定到此exchange的queue都可以接收消息
  • direct: 通過routingKey和exchange決定的那個(gè)唯一的queue可以接收消息
  • topic: 所有符合routingKey(此時(shí)可以是一個(gè)表達(dá)式)的routingKey所bind的queue可以接收消息

1、fanout 純廣播、all

需要queue和exchange綁定,因?yàn)橄M(fèi)者不是和exchange直連的,消費(fèi)者是連在queue上,queue綁定在exchange上,消費(fèi)者只會(huì)在queu里度消息

     |------------------------|     |      /—— queue <—|—> consumer1producer —|—exchange1 <bind    |             / |      /—— queue <—|—> consumer2    /-|-exchange2  ……    |     |------------------------|

發(fā)送端 publisher 發(fā)布、廣播

import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()# 注意:這里是廣播,不需要聲明queuechannel.exchange_declare(exchange='logs', # 聲明廣播管道             type='fanout')# message = ' '.join(sys.argv[1:]) or "info: Hello World!"message = "info: Hello World!"channel.basic_publish(exchange='logs',           routing_key='', # 注意此處空,必須有           body=message)print(" [x] Sent %r" % message)connection.close()

接收端 subscriber 訂閱

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='logs',             type='fanout')# 不指定queue名字,rabbit會(huì)隨機(jī)分配一個(gè)名字,exclusive=True會(huì)在使用此queue的消費(fèi)者斷開后,自動(dòng)將queue刪除result = channel.queue_declare(exclusive=True)# 獲取隨機(jī)的queue名字queue_name = result.method.queueprint("random queuename:", queue_name)channel.queue_bind(exchange='logs', # queue綁定到轉(zhuǎn)發(fā)器上          queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):  print(" [x] %r" % body)channel.basic_consume(callback,           queue=queue_name,           no_ack=True)channel.start_consuming()

注意:廣播,是實(shí)時(shí)的,收不到就沒了,消息不會(huì)存下來,類似收音機(jī)。

2、direct 有選擇的接收消息

接收者可以過濾消息,只收我想要的消息 

發(fā)送端publisher

import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='direct_logs',             type='direct')# 重要程度級別,這里默認(rèn)定義為 infoseverity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ' '.join(sys.argv[2:]) or 'Hello World!'channel.basic_publish(exchange='direct_logs',           routing_key=severity,           body=message)print(" [x] Sent %r:%r" % (severity, message))connection.close()

接收端subscriber

import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='direct_logs',             type='direct')result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 獲取運(yùn)行腳本所有的參數(shù)severities = sys.argv[1:]if not severities:  sys.stderr.write("Usage: %s [info] [warning] [error]/n" % sys.argv[0])  sys.exit(1)# 循環(huán)列表去綁定for severity in severities:  channel.queue_bind(exchange='direct_logs',            queue=queue_name,            routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):  print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,           queue=queue_name,           no_ack=True)channel.start_consuming()

運(yùn)行接收端,指定接收級別的參數(shù),例:

python direct_sonsumer.py info warning 
python direct_sonsumer.py warning error

3、topic 更細(xì)致的過濾

比如把error中,apache和mysql的分別或取出來

發(fā)送端publisher

import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='topic_logs',             type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'message = ' '.join(sys.argv[2:]) or 'Hello World!'channel.basic_publish(exchange='topic_logs',           routing_key=routing_key,           body=message)print(" [x] Sent %r:%r" % (routing_key, message))connection.close()

接收端 subscriber

import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='topic_logs',             type='topic')result = channel.queue_declare(exclusive=True)queue_name = result.method.queuebinding_keys = sys.argv[1:]if not binding_keys:  sys.stderr.write("Usage: %s [binding_key].../n" % sys.argv[0])  sys.exit(1)for binding_key in binding_keys:  channel.queue_bind(exchange='topic_logs',            queue=queue_name,            routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):  print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,           queue=queue_name,           no_ack=True)channel.start_consuming()

運(yùn)行接收端,指定接收哪些消息,例:

python topic_sonsumer.py *.infopython topic_sonsumer.py *.error mysql.*python topic_sonsumer.py '#' # 接收所有消息# 接收所有的 logs run:# python receive_logs_topic.py "#"# To receive all logs from the facility "kern":# python receive_logs_topic.py "kern.*"# Or if you want to hear only about "critical" logs:# python receive_logs_topic.py "*.critical"# You can create multiple bindings:# python receive_logs_topic.py "kern.*" "*.critical"# And to emit a log with a routing key "kern.critical" type:# python emit_log_topic.py "kern.critical" "A critical kernel error"

4、RabbitMQ RPC 實(shí)現(xiàn)(Remote procedure call)

不知道你有沒有發(fā)現(xiàn),上面的流都是單向的,如果遠(yuǎn)程的機(jī)器執(zhí)行完返回結(jié)果,就實(shí)現(xiàn)不了了。 

如果返回,這種模式叫什么呢,RPC(遠(yuǎn)程過程調(diào)用),snmp就是典型的RPC 

RabbitMQ能不能返回呢,怎么返回呢?既是發(fā)送端又是接收端。 

但是接收端返回消息怎么返回?可以發(fā)送到發(fā)過來的queue里么?不可以。 

返回時(shí),再建立一個(gè)queue,把結(jié)果發(fā)送新的queue里 

為了服務(wù)端返回的queue不寫死,在客戶端給服務(wù)端發(fā)指令的的時(shí)候,同時(shí)帶一條消息說,你結(jié)果返回給哪個(gè)queue

RPC client

import pikaimport uuidimport timeclass FibonacciRpcClient(object):  def __init__(self):    self.connection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))    self.channel = self.connection.channel()    result = self.channel.queue_declare(exclusive=True)    self.callback_queue = result.method.queue    self.channel.basic_consume(self.on_response, # 只要一收到消息就調(diào)用on_response                  no_ack=True,                  queue=self.callback_queue) # 收這個(gè)queue的消息  def on_response(self, ch, method, props, body): # 必須四個(gè)參數(shù)    # 如果收到的ID和本機(jī)生成的相同,則返回的結(jié)果就是我想要的指令返回的結(jié)果    if self.corr_id == props.correlation_id:      self.response = body  def call(self, n):    self.response = None # 初始self.response為None    self.corr_id = str(uuid.uuid4()) # 隨機(jī)唯一字符串    self.channel.basic_publish(        exchange='',        routing_key='rpc_queue', # 發(fā)消息到rpc_queue        properties=pika.BasicProperties( # 消息持久化          reply_to = self.callback_queue, # 讓服務(wù)端命令結(jié)果返回到callback_queue          correlation_id = self.corr_id, # 把隨機(jī)uuid同時(shí)發(fā)給服務(wù)器        ),        body=str(n)    )    while self.response is None: # 當(dāng)沒有數(shù)據(jù),就一直循環(huán)      # 啟動(dòng)后,on_response函數(shù)接到消息,self.response 值就不為空了      self.connection.process_data_events() # 非阻塞版的start_consuming()      # print("no msg……")      # time.sleep(0.5)    # 收到消息就調(diào)用on_response    return int(self.response)if __name__ == '__main__':  fibonacci_rpc = FibonacciRpcClient()  print(" [x] Requesting fib(7)")  response = fibonacci_rpc.call(7)  print(" [.] Got %r" % response)

RPC server

import pikaimport timedef fib(n):  if n == 0:    return 0  elif n == 1:    return 1  else:    return fib(n-1) + fib(n-2)def on_request(ch, method, props, body):  n = int(body)  print(" [.] fib(%s)" % n)  response = fib(n)  ch.basic_publish(      exchange='', # 把執(zhí)行結(jié)果發(fā)回給客戶端      routing_key=props.reply_to, # 客戶端要求返回想用的queue      # 返回客戶端發(fā)過來的correction_id 為了讓客戶端驗(yàn)證消息一致性      properties=pika.BasicProperties(correlation_id = props.correlation_id),      body=str(response)  )  ch.basic_ack(delivery_tag = method.delivery_tag) # 任務(wù)完成,告訴客戶端if __name__ == '__main__':  connection = pika.BlockingConnection(pika.ConnectionParameters(      host='localhost'))  channel = connection.channel()  channel.queue_declare(queue='rpc_queue') # 聲明一個(gè)rpc_queue ,  channel.basic_qos(prefetch_count=1)  # 在rpc_queue里收消息,收到消息就調(diào)用on_request  channel.basic_consume(on_request, queue='rpc_queue')  print(" [x] Awaiting RPC requests")  channel.start_consuming()

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持VEVB武林網(wǎng)。


注:相關(guān)教程知識閱讀請移步到python教程頻道。
發(fā)表評論 共有條評論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 阳江市| 大冶市| 扎兰屯市| 和静县| 普陀区| 天峻县| 民权县| 伊宁县| 桐庐县| 崇礼县| 祁连县| 西昌市| 丹巴县| 手机| 黔西县| 华宁县| 石城县| 霍邱县| 体育| 琼海市| 阳泉市| 柳江县| 仙游县| 英德市| 班玛县| 贵定县| 镇沅| 无锡市| 高邑县| 巍山| 五原县| 闽清县| 芦山县| 丁青县| 安庆市| 巴马| 祁门县| 聊城市| 万州区| 于田县| 鹤庆县|