国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

rabbitmq學(xué)習(xí)2:Work Queues

2019-11-08 03:10:07
字體:
供稿:網(wǎng)友

非常感謝http://wubin850219.iteye.com/blog/1003840

在前面的已經(jīng)提到了一對(duì)一的情況;現(xiàn)在一個(gè)生產(chǎn)者與多個(gè)消費(fèi)者的情況(Work Queues)。Work Queues的示意圖如下:

 

對(duì)于上圖的模型中對(duì)于c端的worker來說。RabbitMQ服務(wù)器可能一直發(fā)送多個(gè)消息給一個(gè)worker,而另一個(gè)可能幾乎不做任何事情。這樣就會(huì)導(dǎo)致一個(gè)worker很忙,而另一個(gè)卻很空閑。這種情況可能都不想出現(xiàn)。如何解決這個(gè)問題呢。當(dāng)然最理想的情況是均勻分配消息給每個(gè)worker。我們可能通過channel . basicQos(1)方法(PRefetchCount = 1 )來設(shè)置同一時(shí)間每次發(fā)給一個(gè)消息給一個(gè)worker。示意圖如下:

 

P端的程序如下:

java代碼  收藏代碼package com.abin.rabbitmq;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.MessageProperties;    public class NewTask {      private static final String TASK_QUEUE_NAME = "task_queue";        public static void main(String[] argv) throws Exception {            ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          //聲明此隊(duì)列并且持久化          channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);            String message = getMessage(argv);            channel.basicPublish("", TASK_QUEUE_NAME,                  MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//持久化消息          System.out.println(" [x] Sent '" + message + "'");            channel.close();          connection.close();      }        private static String getMessage(String[] strings) {          if (strings.length < 1)              return "Hello World!";          return joinStrings(strings, " ");      }        private static String joinStrings(String[] strings, String delimiter) {          int length = strings.length;          if (length == 0)              return "";          StringBuilder Words = new StringBuilder(strings[0]);          for (int i = 1; i < length; i++) {              words.append(delimiter).append(strings[i]);          }          return words.toString();      }  }  

    多次運(yùn)行此程序并傳入的參數(shù)分別為“First message ”,“Secondmessage ”,“Third message ”,“Fourth message ”,“Fifth message ”

 

C端的程序如下:

Java代碼  收藏代碼package com.abin.rabbitmq;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.QueueingConsumer;    public class Worker {      private static final String TASK_QUEUE_NAME = "task_queue";      public static void main(String[] argv) throws Exception {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          //聲明此隊(duì)列并且持久化          channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);          System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            channel.basicQos(1);//告訴RabbitMQ同一時(shí)間給一個(gè)消息給消費(fèi)者          /* We're about to tell the server to deliver us the messages from the queue.           * Since it will push us messages asynchronously,           * we provide a callback in the form of an object that will buffer the messages           * until we're ready to use them. That is what QueueingConsumer does.*/          QueueingConsumer consumer = new QueueingConsumer(channel);          /*           把名字為TASK_QUEUE_NAME的Channel的值回調(diào)給QueueingConsumer,即使一個(gè)worker在處理消息的過程中停止了,這個(gè)消息也不會(huì)失效         */          channel.basicConsume(TASK_QUEUE_NAME, false, consumer);            while (true) {              QueueingConsumer.Delivery delivery = consumer.nextDelivery();//得到消息傳輸信息              String message = new String(delivery.getBody());                System.out.println(" [x] Received '" + message + "'");              doWork(message);              System.out.println(" [x] Done");                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//下一個(gè)消息          }      }        private static void doWork(String task) throws InterruptedException {          for (char ch : task.toCharArray()) {              if (ch == '.')                  Thread.sleep(1000);//這里是假裝我們很忙          }      }  }  

  開啟兩個(gè)worker分別運(yùn)行。運(yùn)行結(jié)果如:

c1的結(jié)果:

Java代碼  收藏代碼[*] Waiting for messages. To exit press CTRL+C   [x] Received 'First message'   [x] Received 'Third message'   [x] Received 'Fifth message'  

 c2的結(jié)果

Java代碼  收藏代碼[*] Waiting for messages. To exit press CTRL+C   [x] Received 'Second message'   [x] Received 'Fourth message'   

 


發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 蓝山县| 湾仔区| 嵊州市| 寻乌县| 体育| 玉田县| 乌兰县| 行唐县| 枝江市| 桑植县| 宁强县| 高雄市| 常熟市| 怀集县| 浠水县| 临沧市| 昌宁县| 大石桥市| 柳州市| 仙游县| 北川| 浦江县| 开平市| 济南市| 本溪| 哈尔滨市| 宝坻区| 梁山县| 阳江市| 蓝田县| 西盟| 双鸭山市| 高邑县| 菏泽市| 双城市| 小金县| 开原市| 开原市| 新巴尔虎右旗| 子洲县| 天峻县|