国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 學(xué)院 > 開(kāi)發(fā)設(shè)計(jì) > 正文

RabbitMQ發(fā)布/訂閱

2019-11-08 03:21:23
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

非常感謝  http://blog.csdn.net/lmj623565791/article/details/37657225

轉(zhuǎn)發(fā)請(qǐng)標(biāo)明出處:http://blog.csdn.net/lmj623565791/article/details/37657225

本系列教程主要來(lái)自于官網(wǎng)入門(mén)教程的翻譯,然后自己進(jìn)行了部分的修改與實(shí)驗(yàn),內(nèi)容僅供參考。 

上一篇博客中,我們實(shí)現(xiàn)了工作隊(duì)列,并且我們的工作隊(duì)列中的一個(gè)任務(wù)只會(huì)發(fā)給一個(gè)工作者,除非某個(gè)工作者未完成任務(wù)意外被殺死,會(huì)轉(zhuǎn)發(fā)給另外的工作者,如果你還不了解:RabbitMQ (二)工作隊(duì)列。這篇博客中,我們會(huì)做一些改變,就是把一個(gè)消息發(fā)給多個(gè)消費(fèi)者,這種模式稱之為發(fā)布/訂閱(類似觀察者模式)。

         為了驗(yàn)證這種模式,我們準(zhǔn)備構(gòu)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。這個(gè)系統(tǒng)包含兩類程序,一類程序發(fā)動(dòng)日志,另一類程序接收和處理日志。

         在我們的日志系統(tǒng)中,每一個(gè)運(yùn)行的接收者程序都會(huì)收到日志。然后我們實(shí)現(xiàn),一個(gè)接收者將接收到的數(shù)據(jù)寫(xiě)到硬盤(pán)上,與此同時(shí),另一個(gè)接收者把接收到的消息展現(xiàn)在屏幕上。

         本質(zhì)上來(lái)說(shuō),就是發(fā)布的日志消息會(huì)轉(zhuǎn)發(fā)給所有的接收者。

1、轉(zhuǎn)發(fā)器(Exchanges)

前面的博客中我們主要的介紹都是發(fā)送者發(fā)送消息給隊(duì)列,接收者從隊(duì)列接收消息。下面我們會(huì)引入Exchanges,展示RabbitMQ的完整的消息模型。

RabbitMQ消息模型的核心理念是生產(chǎn)者永遠(yuǎn)不會(huì)直接發(fā)送任何消息給隊(duì)列,一般的情況生產(chǎn)者甚至不知道消息應(yīng)該發(fā)送到哪些隊(duì)列。

相反的,生產(chǎn)者只能發(fā)送消息給轉(zhuǎn)發(fā)器(Exchange)。轉(zhuǎn)發(fā)器是非常簡(jiǎn)單的,一邊接收從生產(chǎn)者發(fā)來(lái)的消息,另一邊把消息推送到隊(duì)列中。轉(zhuǎn)發(fā)器必須清楚的知道消息如何處理它收到的每一條消息。是否應(yīng)該追加到一個(gè)指定的隊(duì)列?是否應(yīng)該追加到多個(gè)隊(duì)列?或者是否應(yīng)該丟棄?這些規(guī)則通過(guò)轉(zhuǎn)發(fā)器的類型進(jìn)行定義。

下面列出一些可用的轉(zhuǎn)發(fā)器類型:

Direct

Topic

Headers

Fanout

目前我們關(guān)注最后一個(gè)fanout,聲明轉(zhuǎn)發(fā)器類型的代碼:

channel.exchangeDeclare("logs","fanout");

fanout類型轉(zhuǎn)發(fā)器特別簡(jiǎn)單,把所有它介紹到的消息,廣播到所有它所知道的隊(duì)列。不過(guò)這正是我們前述的日志系統(tǒng)所需要的。

2、匿名轉(zhuǎn)發(fā)器(nameless exchange)

前面說(shuō)到生產(chǎn)者只能發(fā)送消息給轉(zhuǎn)發(fā)器(Exchange),但是我們前兩篇博客中的例子并沒(méi)有使用到轉(zhuǎn)發(fā)器,我們?nèi)匀豢梢园l(fā)送和接收消息。這是因?yàn)槲覀兪褂昧艘粋€(gè)默認(rèn)的轉(zhuǎn)發(fā)器,它的標(biāo)識(shí)符為””。之前發(fā)送消息的代碼:

channel.basicPublish("", QUEUE_NAME,MessagePRoperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

第一個(gè)參數(shù)為轉(zhuǎn)發(fā)器的名稱,我們?cè)O(shè)置為”” : 如果存在routingKey(第二個(gè)參數(shù)),消息由routingKey決定發(fā)送到哪個(gè)隊(duì)列。

現(xiàn)在我們可以指定消息發(fā)送到的轉(zhuǎn)發(fā)器:

channel.basicPublish( "logs","", null, message.getBytes());

3、臨時(shí)隊(duì)列(Temporary queues)

前面的博客中我們都為隊(duì)列指定了一個(gè)特定的名稱。能夠?yàn)殛?duì)列命名對(duì)我們來(lái)說(shuō)是很關(guān)鍵的,我們需要指定消費(fèi)者為某個(gè)隊(duì)列。當(dāng)我們希望在生產(chǎn)者和消費(fèi)者間共享隊(duì)列時(shí),為隊(duì)列命名是很重要的。不過(guò),對(duì)于我們的日志系統(tǒng)我們并不關(guān)心隊(duì)列的名稱。我們想要接收到所有的消息,而且我們也只對(duì)當(dāng)前正在傳遞的數(shù)據(jù)的感興趣。為了滿足我們的需求,需要做兩件事:第一, 無(wú)論什么時(shí)間連接到Rabbit我們都需要一個(gè)新的空的隊(duì)列。為了實(shí)現(xiàn),我們可以使用隨機(jī)數(shù)創(chuàng)建隊(duì)列,或者更好的,讓服務(wù)器給我們提供一個(gè)隨機(jī)的名稱。第二, 一旦消費(fèi)者與Rabbit斷開(kāi),消費(fèi)者所接收的那個(gè)隊(duì)列應(yīng)該被自動(dòng)刪除。java中我們可以使用queueDeclare()方法,不傳遞任何參數(shù),來(lái)創(chuàng)建一個(gè)非持久的、唯一的、自動(dòng)刪除的隊(duì)列且隊(duì)列名稱由服務(wù)器隨機(jī)產(chǎn)生。String queueName = channel.queueDeclare().getQueue();一般情況這個(gè)名稱與amq.gen-JzTY20BRgKO-HjmUJj0wLg 類似。

4、綁定(Bindings)

我們已經(jīng)創(chuàng)建了一個(gè)fanout轉(zhuǎn)發(fā)器和隊(duì)列,我們現(xiàn)在需要通過(guò)binding告訴轉(zhuǎn)發(fā)器把消息發(fā)送給我們的隊(duì)列。channel.queueBind(queueName, “l(fā)ogs”, ””)參數(shù)1:隊(duì)列名稱 ;參數(shù)2:轉(zhuǎn)發(fā)器名稱
5、完整的例子
日志發(fā)送端:[java] view plain copy 在CODE上查看代碼片package com.zhy.rabbit._03_bindings_exchanges;    import java.io.IOException;  import java.util.Date;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;    public class EmitLog  {      private final static String EXCHANGE_NAME = "ex_log";        public static void main(String[] args) throws IOException      {          // 創(chuàng)建連接和頻道          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          // 聲明轉(zhuǎn)發(fā)器和類型          channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );                    String message = new Date().toLocaleString()+" : log something";          // 往轉(zhuǎn)發(fā)器上發(fā)送消息          channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());            System.out.println(" [x] Sent '" + message + "'");            channel.close();          connection.close();        }    }  沒(méi)什么太大的改變,聲明隊(duì)列的代碼,改為聲明轉(zhuǎn)發(fā)器了,同樣的消息的傳遞也交給了轉(zhuǎn)發(fā)器。接收端1%20:ReceiveLogsToSave.java:[java] view%20plain copy package com.zhy.rabbit._03_bindings_exchanges;    import java.io.File;  import java.io.FileNotFoundException;  import java.io.FileOutputStream;  import java.io.IOException;  import java.text.SimpleDateFormat;  import java.util.Date;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.QueueingConsumer;    public class ReceiveLogsToSave  {      private final static String EXCHANGE_NAME = "ex_log";        public static void main(String[] argv) throws java.io.IOException,              java.lang.InterruptedException      {          // 創(chuàng)建連接和頻道          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");          // 創(chuàng)建一個(gè)非持久的、唯一的且自動(dòng)刪除的隊(duì)列          String queueName = channel.queueDeclare().getQueue();          // 為轉(zhuǎn)發(fā)器指定隊(duì)列,設(shè)置binding          channel.queueBind(queueName, EXCHANGE_NAME, "");            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            QueueingConsumer consumer = new QueueingConsumer(channel);          // 指定接收者,第二個(gè)參數(shù)為自動(dòng)應(yīng)答,無(wú)需手動(dòng)應(yīng)答          channel.basicConsume(queueName, true, consumer);            while (true)          {              QueueingConsumer.Delivery delivery = consumer.nextDelivery();              String message = new String(delivery.getBody());                print2File(message);          }        }        private static void print2File(String msg)      {          try          {              String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();              String logFileName = new SimpleDateFormat("yyyy-MM-dd")                      .format(new Date());              File file = new File(dir, logFileName+".txt");              FileOutputStream fos = new FileOutputStream(file, true);              fos.write((msg + "/r/n").getBytes());              fos.flush();              fos.close();          } catch (FileNotFoundException e)          {              e.printStackTrace();          } catch (IOException e)          {              e.printStackTrace();          }      }  }  隨機(jī)創(chuàng)建一個(gè)隊(duì)列,然后將隊(duì)列與轉(zhuǎn)發(fā)器綁定,然后將消費(fèi)者與該隊(duì)列綁定,然后寫(xiě)入日志文件。

接收端2:ReceiveLogsToConsole.java

[java] view%20plain copy package com.zhy.rabbit._03_bindings_exchanges;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.QueueingConsumer;    public class ReceiveLogsToConsole  {      private final static String EXCHANGE_NAME = "ex_log";        public static void main(String[] argv) throws java.io.IOException,              java.lang.InterruptedException      {          // 創(chuàng)建連接和頻道          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");          // 創(chuàng)建一個(gè)非持久的、唯一的且自動(dòng)刪除的隊(duì)列          String queueName = channel.queueDeclare().getQueue();          // 為轉(zhuǎn)發(fā)器指定隊(duì)列,設(shè)置binding          channel.queueBind(queueName, EXCHANGE_NAME, "");            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            QueueingConsumer consumer = new QueueingConsumer(channel);          // 指定接收者,第二個(gè)參數(shù)為自動(dòng)應(yīng)答,無(wú)需手動(dòng)應(yīng)答          channel.basicConsume(queueName, true, consumer);            while (true)          {              QueueingConsumer.Delivery delivery = consumer.nextDelivery();              String message = new String(delivery.getBody());              System.out.println(" [x] Received '" + message + "'");            }        }    }  隨機(jī)創(chuàng)建一個(gè)隊(duì)列,然后將隊(duì)列與轉(zhuǎn)發(fā)器綁定,然后將消費(fèi)者與該隊(duì)列綁定,然后打印到控制臺(tái)。

現(xiàn)在把兩個(gè)接收端運(yùn)行,然后運(yùn)行3次發(fā)送端:

輸出結(jié)果:

發(fā)送端:

 [x]%20Sent%20'2014-7-10%2016:04:54%20:%20log%20something'

 [x]%20Sent%20'2014-7-10%2016:04:58%20:%20log%20something'

 [x]%20Sent%20'2014-7-10%2016:05:02%20:%20log%20something'

接收端1:

接收端2:

 [*] Waiting for messages. To exit press CTRL+C [x] Received '2014-7-10 16:04:54 : log something' [x] Received '2014-7-10 16:04:58 : log something' [x] Received '2014-7-10 16:05:02 : log something'

這個(gè)例子實(shí)現(xiàn)了我們文章開(kāi)頭所描述的日志系統(tǒng),利用了轉(zhuǎn)發(fā)器的類型:fanout。

本篇說(shuō)明了,生產(chǎn)者將消息發(fā)送至轉(zhuǎn)發(fā)器,轉(zhuǎn)發(fā)器決定將消息發(fā)送至哪些隊(duì)列,消費(fèi)者綁定隊(duì)列獲取消息。


發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 家居| 两当县| 阿荣旗| 星子县| 北川| 女性| 潮安县| 海林市| 青铜峡市| 旺苍县| 莒南县| 都匀市| 上蔡县| 无为县| 岳普湖县| 荆州市| 长沙县| 灵台县| 义马市| 西丰县| 武宁县| 湘潭市| 白城市| 调兵山市| 莲花县| 绥阳县| 获嘉县| 湖州市| 白城市| 南乐县| 象州县| 榕江县| 宜章县| 富阳市| 舒兰市| 桂平市| 泗阳县| 淳化县| 射洪县| 澳门| 伽师县|