国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

rabbitmq學習3:Publish/Subscribe

2019-11-08 03:09:56
字體:
來源:轉載
供稿:網友

非常感謝 http://wubin850219.iteye.com/blog/1004921

在前面的Work Queue中的消息是均勻分配消息給消費者;如果我想把消息分發給所有的消費者呢?那應當怎么操作呢?這就是要下面提到的Publish/Subscribe(分布/訂閱)。讓我們開始Publish/Subscribe之旅吧!

Publish/Subscribe的工作示意圖如下:

在上圖中的X表示Exchange(交換區);Exchange的類型有:direct , topic , headers 和 fanout

Publish/Subscribe的Exchang的類型為fanout;聲明Publish/Subscribe的Exchang代碼如下:

java代碼  收藏代碼channel.exchangeDeclare("logs", "fanout");  

 

對于Work Queue中提到的發布消息的代碼如下:

Java代碼  收藏代碼channel.basicPublish("", queueName,   null, message.getBytes());  

 但對于Publish/Subscribe中發布消息中的Queue的使用的是默認的;代碼如下:

Java代碼  收藏代碼channel.basicPublish( "logs", "", null, message.getBytes());  

 

Exchange和各Queue之間是如何通信的呢?主要是通過把Exchange和各Queue綁定(binding);示意代碼如下:

Java代碼  收藏代碼channel.queueBind(queueName, exchangeName, "");  

Publish/Subscribe加入綁定的工作示意圖如下:

 

那我們就開始程序代碼吧:P端的代碼如下:

 

Java代碼  收藏代碼package com.abin.rabbitmq;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;    public class EmitLog {      PRivate static final String EXCHANGE_NAME = "logs";        public static void main(String[] argv) throws Exception {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//聲明Exchange          for (int i = 0; i <= 2; i++) {              String message = "hello Word!" + i;              channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());              System.out.println(" [x] Sent '" + message + "'");          }          channel.close();          connection.close();      }    }  

 運行結果如下:

Java代碼  收藏代碼[x] Sent 'hello word!0'  [x] Sent 'hello word!1'  [x] Sent 'hello word!2'  

 

C端的代碼如下:

Java代碼  收藏代碼package com.abin.rabbitmq;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.QueueingConsumer;    public class ReceiveLogsOne {      private static final String EXCHANGE_NAME = "logs";        public static void main(String[] argv) throws Exception {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          channel.exchangeDeclare(EXCHANGE_NAME, "fanout");          String queueName = "log-fb1";          channel.queueDeclare(queueName, false, false, false, null);          channel.queueBind(queueName, EXCHANGE_NAME, "");//把Queue、Exchange綁定          QueueingConsumer consumer = new QueueingConsumer(channel);          channel.basicConsume(queueName, true, consumer);          while (true) {              QueueingConsumer.Delivery delivery = consumer.nextDelivery();              String message = new String(delivery.getBody());              System.out.println(" [x] Received '" + message + "'");          }      }  }  

 

對于C端的代碼我寫了二個差不多的程序,只需要修改一下queueName。這樣就形成了二個Queue;運行結果相同;

運行結果可能如下:

Java代碼  收藏代碼[x] Received 'hello word!0'  [x] Received 'hello word!1'  [x] Received 'hello word!2'  

 


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 黑山县| 大埔县| 孝感市| 西乡县| 高雄市| 安福县| 兰考县| 建水县| 沧源| 元朗区| 丰都县| 贵阳市| 江川县| 高清| 青川县| 洛隆县| 象山县| 德清县| 临漳县| 衡阳县| 秦安县| 玛曲县| 池州市| 乌兰县| 正蓝旗| 盐源县| 林州市| 汝州市| 五大连池市| 湾仔区| 长沙县| 济宁市| 措勤县| 伊吾县| 巴东县| 独山县| 易门县| 舒城县| 淅川县| 焉耆| 阿克陶县|