国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

RabbitMQ(python實現)學習之二:Producer發送消息至多個消息隊列queue(廣播消息)

2019-11-14 17:10:32
字體:
來源:轉載
供稿:網友

1.1本部分內容簡介

這部分我們將要發送一個消息到多個Consumer,這部分稱之為“publish/subscribe”

我們實現的方式就是發送端,發送一個消息,與此同時,多個接收端將同時接收到消息并打印在屏幕上面。

1.2exchange簡介

在前面的博文中,我們的講解是:發送端發送消息至消息隊列,接收端從消息隊列獲取消息。現在我們來介紹一下rabbitmq的完整消息傳送模型。

>PRoducer:用來發送消息的應用程序

>queue:用來存儲消息的緩存

>Consumer:用來接收消息的應用程序

消息傳送模型的核心是,Producer從不會直接將消息傳送給queue,而是,將消息傳送給exchange,exchange是個很簡單的東西,在一側,他接收來自Producer的消息,另一側將消息傳送給queue。exchange將消息傳送給你個queue,還是傳送給多個queue,這主要是由exchange的type決定。模型圖如下:

        

 

exchange有很多type可用,如下:direct、topic、headers、fanout。本博客針對fanout講解,后續博文對其他類型有所講解,讓我們創建一個exchange,type為fanout,名字為logs,代碼如下:

channel.exchange(exchange='logs',type='fanout')

對于type為fanout的exchange,理解起來非常簡單,它將接收到的消息,廣播給他所知道的所有的queue,即所有和他建立連接的queue。前面的博文降到了命令行查看list_exchanges的命令如下:

 $ :sudo rabbitmqctl list_exchanges Listing exchanges ... logs      fanout amq.direct      direct amq.topic       topic amq.fanout      fanout amq.headers     headers ...done.

對于上圖中,你會看到很多amq.*的exchange,這些是系統默認建立的,在你不建立exchange時,系統默認建立上面幾個。

對于消息的發布函數basic_publish()也隨之變為:

channel.basic_publish(exchange='logs',routing_key='',body=message)

1.3臨時queue

 正如你前面學到的,對于一個queue,會有自己的名字(hello什么的),

首先:

result = channel.queue_declare()

然后通過result.method.queue,系統會隨機給queue命名。

如果我們想Producer與Consumer斷開連接時,隊列queue刪除,那么需要改成下面的代碼:

result = channel.queue_declare(exclusive=True)

1.4Bingings(將queue與exchange綁定)

模型圖如下:

        

我們已經創建了一個type為fanout的exchange,現在,我們要告訴exchange,將消息發送給我們自己定義的queue,在exchange與queue之間建立連接的是binding,代碼如下:

channel.queue_bind(exchange='logs',queue=result.method.queue)

在命令行查看binding的列表,命令如下:

$: sudo rabbitmqctl list_bindings

 1.5最終代碼

最終的模型如下:

           

send.py代碼如下:

import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='logs',                         type='fanout')message = ' '.join(sys.argv[1:]) or "info: Hello World!"  #如果鍵盤有輸入,message為鍵盤輸入,如果鍵盤沒有輸入,消息message="info: Hello World!"; channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()

 

receive.py代碼

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='logs',                         type='fanout')result = channel.queue_declare(exclusive=True)queue_name = result.method.queuechannel.queue_bind(exchange='logs',                   queue=queue_name)print ' [*] Waiting for logs. To exit press CTRL+C'def callback(ch, method, properties, body):    print " [x] %r" % (body,)channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()

1.6代碼測試

開啟一個命令行窗口,運行send.py:

$: python send.py   #(此時你傳送的內容為info: Hello World!)或者  

$: python send.py message
#message為你想發送的內容

開啟兩個命令行窗口,分別運行receive.py,兩個窗口你會看到有相同的消息輸出:

$: python receive.py

 


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 封开县| 漳浦县| 平果县| 吉首市| 遵义市| 锡林浩特市| 柘荣县| 平定县| 西藏| 鄂温| 远安县| 河间市| 阆中市| 五台县| 菏泽市| 兰州市| 黄浦区| 秭归县| 江达县| 墨竹工卡县| 乌拉特中旗| 友谊县| 栖霞市| 宁国市| 中山市| 广饶县| 湟源县| 浮梁县| 会泽县| 铜鼓县| 红桥区| 弥渡县| 平山县| 泰来县| 广德县| 江都市| 开阳县| 长宁县| 慈溪市| 枣强县| 碌曲县|