非常感謝 http://blog.csdn.net/lmj623565791/article/details/37657225
轉(zhuǎn)發(fā)請(qǐng)標(biāo)明出處:http://blog.csdn.net/lmj623565791/article/details/37657225
本系列教程主要來(lái)自于官網(wǎng)入門(mén)教程的翻譯,然后自己進(jìn)行了部分的修改與實(shí)驗(yàn),內(nèi)容僅供參考。
上一篇博客中,我們實(shí)現(xiàn)了工作隊(duì)列,并且我們的工作隊(duì)列中的一個(gè)任務(wù)只會(huì)發(fā)給一個(gè)工作者,除非某個(gè)工作者未完成任務(wù)意外被殺死,會(huì)轉(zhuǎn)發(fā)給另外的工作者,如果你還不了解:RabbitMQ (二)工作隊(duì)列。這篇博客中,我們會(huì)做一些改變,就是把一個(gè)消息發(fā)給多個(gè)消費(fèi)者,這種模式稱之為發(fā)布/訂閱(類似觀察者模式)。
為了驗(yàn)證這種模式,我們準(zhǔn)備構(gòu)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。這個(gè)系統(tǒng)包含兩類程序,一類程序發(fā)動(dòng)日志,另一類程序接收和處理日志。
在我們的日志系統(tǒng)中,每一個(gè)運(yùn)行的接收者程序都會(huì)收到日志。然后我們實(shí)現(xiàn),一個(gè)接收者將接收到的數(shù)據(jù)寫(xiě)到硬盤(pán)上,與此同時(shí),另一個(gè)接收者把接收到的消息展現(xiàn)在屏幕上。
本質(zhì)上來(lái)說(shuō),就是發(fā)布的日志消息會(huì)轉(zhuǎn)發(fā)給所有的接收者。
1、轉(zhuǎn)發(fā)器(Exchanges)
前面的博客中我們主要的介紹都是發(fā)送者發(fā)送消息給隊(duì)列,接收者從隊(duì)列接收消息。下面我們會(huì)引入Exchanges,展示RabbitMQ的完整的消息模型。
RabbitMQ消息模型的核心理念是生產(chǎn)者永遠(yuǎn)不會(huì)直接發(fā)送任何消息給隊(duì)列,一般的情況生產(chǎn)者甚至不知道消息應(yīng)該發(fā)送到哪些隊(duì)列。
相反的,生產(chǎn)者只能發(fā)送消息給轉(zhuǎn)發(fā)器(Exchange)。轉(zhuǎn)發(fā)器是非常簡(jiǎn)單的,一邊接收從生產(chǎn)者發(fā)來(lái)的消息,另一邊把消息推送到隊(duì)列中。轉(zhuǎn)發(fā)器必須清楚的知道消息如何處理它收到的每一條消息。是否應(yīng)該追加到一個(gè)指定的隊(duì)列?是否應(yīng)該追加到多個(gè)隊(duì)列?或者是否應(yīng)該丟棄?這些規(guī)則通過(guò)轉(zhuǎn)發(fā)器的類型進(jìn)行定義。

下面列出一些可用的轉(zhuǎn)發(fā)器類型:
Direct
Topic
Headers
Fanout
目前我們關(guān)注最后一個(gè)fanout,聲明轉(zhuǎn)發(fā)器類型的代碼:
channel.exchangeDeclare("logs","fanout");
fanout類型轉(zhuǎn)發(fā)器特別簡(jiǎn)單,把所有它介紹到的消息,廣播到所有它所知道的隊(duì)列。不過(guò)這正是我們前述的日志系統(tǒng)所需要的。
2、匿名轉(zhuǎn)發(fā)器(nameless exchange)
前面說(shuō)到生產(chǎn)者只能發(fā)送消息給轉(zhuǎn)發(fā)器(Exchange),但是我們前兩篇博客中的例子并沒(méi)有使用到轉(zhuǎn)發(fā)器,我們?nèi)匀豢梢园l(fā)送和接收消息。這是因?yàn)槲覀兪褂昧艘粋€(gè)默認(rèn)的轉(zhuǎn)發(fā)器,它的標(biāo)識(shí)符為””。之前發(fā)送消息的代碼:
channel.basicPublish("", QUEUE_NAME,MessagePRoperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
第一個(gè)參數(shù)為轉(zhuǎn)發(fā)器的名稱,我們?cè)O(shè)置為”” : 如果存在routingKey(第二個(gè)參數(shù)),消息由routingKey決定發(fā)送到哪個(gè)隊(duì)列。
現(xiàn)在我們可以指定消息發(fā)送到的轉(zhuǎn)發(fā)器:
channel.basicPublish( "logs","", null, message.getBytes());
3、臨時(shí)隊(duì)列(Temporary queues)
前面的博客中我們都為隊(duì)列指定了一個(gè)特定的名稱。能夠?yàn)殛?duì)列命名對(duì)我們來(lái)說(shuō)是很關(guān)鍵的,我們需要指定消費(fèi)者為某個(gè)隊(duì)列。當(dāng)我們希望在生產(chǎn)者和消費(fèi)者間共享隊(duì)列時(shí),為隊(duì)列命名是很重要的。不過(guò),對(duì)于我們的日志系統(tǒng)我們并不關(guān)心隊(duì)列的名稱。我們想要接收到所有的消息,而且我們也只對(duì)當(dāng)前正在傳遞的數(shù)據(jù)的感興趣。為了滿足我們的需求,需要做兩件事:第一, 無(wú)論什么時(shí)間連接到Rabbit我們都需要一個(gè)新的空的隊(duì)列。為了實(shí)現(xiàn),我們可以使用隨機(jī)數(shù)創(chuàng)建隊(duì)列,或者更好的,讓服務(wù)器給我們提供一個(gè)隨機(jī)的名稱。第二, 一旦消費(fèi)者與Rabbit斷開(kāi),消費(fèi)者所接收的那個(gè)隊(duì)列應(yīng)該被自動(dòng)刪除。java中我們可以使用queueDeclare()方法,不傳遞任何參數(shù),來(lái)創(chuàng)建一個(gè)非持久的、唯一的、自動(dòng)刪除的隊(duì)列且隊(duì)列名稱由服務(wù)器隨機(jī)產(chǎn)生。String queueName = channel.queueDeclare().getQueue();一般情況這個(gè)名稱與amq.gen-JzTY20BRgKO-HjmUJj0wLg 類似。
4、綁定(Bindings)
我們已經(jīng)創(chuàng)建了一個(gè)fanout轉(zhuǎn)發(fā)器和隊(duì)列,我們現(xiàn)在需要通過(guò)binding告訴轉(zhuǎn)發(fā)器把消息發(fā)送給我們的隊(duì)列。channel.queueBind(queueName, “l(fā)ogs”, ””)參數(shù)1:隊(duì)列名稱 ;參數(shù)2:轉(zhuǎn)發(fā)器名稱5、完整的例子
日志發(fā)送端:[java] view plain copy 
package com.zhy.rabbit._03_bindings_exchanges; import java.io.IOException; import java.util.Date; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { private final static String EXCHANGE_NAME = "ex_log"; public static void main(String[] args) throws IOException { // 創(chuàng)建連接和頻道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 聲明轉(zhuǎn)發(fā)器和類型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); String message = new Date().toLocaleString()+" : log something"; // 往轉(zhuǎn)發(fā)器上發(fā)送消息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } } 沒(méi)什么太大的改變,聲明隊(duì)列的代碼,改為聲明轉(zhuǎn)發(fā)器了,同樣的消息的傳遞也交給了轉(zhuǎn)發(fā)器。接收端1%20:ReceiveLogsToSave.java:[java] view%20plain copy ![在CODE上查看代碼片]()
package com.zhy.rabbit._03_bindings_exchanges; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogsToSave { private final static String EXCHANGE_NAME = "ex_log"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { // 創(chuàng)建連接和頻道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 創(chuàng)建一個(gè)非持久的、唯一的且自動(dòng)刪除的隊(duì)列 String queueName = channel.queueDeclare().getQueue(); // 為轉(zhuǎn)發(fā)器指定隊(duì)列,設(shè)置binding channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); // 指定接收者,第二個(gè)參數(shù)為自動(dòng)應(yīng)答,無(wú)需手動(dòng)應(yīng)答 channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); print2File(message); } } private static void print2File(String msg) { try { String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath(); String logFileName = new SimpleDateFormat("yyyy-MM-dd") .format(new Date()); File file = new File(dir, logFileName+".txt"); FileOutputStream fos = new FileOutputStream(file, true); fos.write((msg + "/r/n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } 隨機(jī)創(chuàng)建一個(gè)隊(duì)列,然后將隊(duì)列與轉(zhuǎn)發(fā)器綁定,然后將消費(fèi)者與該隊(duì)列綁定,然后寫(xiě)入日志文件。接收端2:ReceiveLogsToConsole.java
[java] view%20plain copy ![在CODE上查看代碼片]()
package com.zhy.rabbit._03_bindings_exchanges; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogsToConsole { private final static String EXCHANGE_NAME = "ex_log"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { // 創(chuàng)建連接和頻道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 創(chuàng)建一個(gè)非持久的、唯一的且自動(dòng)刪除的隊(duì)列 String queueName = channel.queueDeclare().getQueue(); // 為轉(zhuǎn)發(fā)器指定隊(duì)列,設(shè)置binding channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); // 指定接收者,第二個(gè)參數(shù)為自動(dòng)應(yīng)答,無(wú)需手動(dòng)應(yīng)答 channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } } 隨機(jī)創(chuàng)建一個(gè)隊(duì)列,然后將隊(duì)列與轉(zhuǎn)發(fā)器綁定,然后將消費(fèi)者與該隊(duì)列綁定,然后打印到控制臺(tái)。現(xiàn)在把兩個(gè)接收端運(yùn)行,然后運(yùn)行3次發(fā)送端:
輸出結(jié)果:
發(fā)送端:
[x]%20Sent%20'2014-7-10%2016:04:54%20:%20log%20something'
[x]%20Sent%20'2014-7-10%2016:04:58%20:%20log%20something'
[x]%20Sent%20'2014-7-10%2016:05:02%20:%20log%20something'
接收端1:
![]()
接收端2:
[*] Waiting for messages. To exit press CTRL+C [x] Received '2014-7-10 16:04:54 : log something' [x] Received '2014-7-10 16:04:58 : log something' [x] Received '2014-7-10 16:05:02 : log something'
這個(gè)例子實(shí)現(xiàn)了我們文章開(kāi)頭所描述的日志系統(tǒng),利用了轉(zhuǎn)發(fā)器的類型:fanout。
本篇說(shuō)明了,生產(chǎn)者將消息發(fā)送至轉(zhuǎn)發(fā)器,轉(zhuǎn)發(fā)器決定將消息發(fā)送至哪些隊(duì)列,消費(fèi)者綁定隊(duì)列獲取消息。