非常感謝 http://wubin850219.iteye.com/blog/1004973
在前面的《rabbitmq學(xué)習(xí)4:Routing 》中使用一般的名字的路由,現(xiàn)在想通過一些路由規(guī)則讓消費(fèi)者來接受符合規(guī)則的消息?那應(yīng)當(dāng)怎么樣呢?那就要用到類型為topic的Exchange了。
Topics的工作示意圖如下:

我們可能從圖中看到有*和#兩個通配符。*表示通配一個詞;#表示通配0個或多個詞。
下面讓我們來看看Topics的程序如何實現(xiàn)的吧!
P端的程序如下 :
java代碼
package com.abin.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { 運(yùn)行結(jié)果可能如下:
Java代碼
[x] Sent 'logs.error.one':'this is one error logs:0' [x] Sent 'logs.error.one':'this is one error logs:1' ################################ [x] Sent 'logs.error.two':'this is two error logs:0' [x] Sent 'logs.error.two':'this is two error logs:1' [x] Sent 'logs.error.two':'this is two error logs:2' ################################ [x] Sent 'logs.info.one':'this is one info logs:0' [x] Sent 'logs.info.one':'this is one info logs:1' [x] Sent 'logs.info.one':'this is one info logs:2' [x] Sent 'logs.info.one':'this is one info logs:3'
第一個C端的代碼如下:
Java代碼
package com.abin.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs";// 定義Exchange名稱 public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 聲明topic類型的Exchange String queueName = "queue_topic_logs1";// 定義隊列名為“queue_topic_logs1”的Queue channel.queueDeclare(queueName, false, false, false, null); // String routingKeyOne = "*.error.two";// "error"路由規(guī)則 // channel.queueBind(queueName, EXCHANGE_NAME, routingKeyOne);// 把Queue、Exchange及路由綁定 String routingKeyTwo = "logs.*.one";//通配所有l(wèi)ogs下第三詞(最后一個)詞為one的消息 channel.queueBind(queueName, EXCHANGE_NAME, routingKeyTwo); System.out.println(" [*] Waiting for messages."); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } }
第一個C端的運(yùn)行結(jié)果如下:
Java代碼
[*] Waiting for messages. [x] Received 'logs.error.one':'this is one error logs:0' [x] Received 'logs.error.one':'this is one error logs:1' [x] Received 'logs.info.one':'this is one info logs:0' [x] Received 'logs.info.one':'this is one info logs:1' [x] Received 'logs.info.one':'this is one info logs:2' [x] Received 'logs.info.one':'this is one info logs:3'
第二個C端的程序如下:
Java代碼
package com.abin.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogsTopicTwo { private static final String EXCHANGE_NAME = "topic_logs";//定義Exchange名稱 public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic");//聲明topic類型的Exchange String queueName = "queue_topic_logs2";//定義隊列名為“queue_topic_logs2”的Queue channel.queueDeclare(queueName, false, false, false, null); String routingKeyOne = "logs.#";//通配所有l(wèi)ogs下的消息 channel.queueBind(queueName, EXCHANGE_NAME, routingKeyOne);//把Queue、Exchange及路由綁定 System.out.println(" [*] Waiting for messages."); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } }
第二個C端的運(yùn)行結(jié)果如下:
Java代碼
[*] Waiting for messages. [x] Received 'logs.error.one':'this is one error logs:0' [x] Received 'logs.error.one':'this is one error logs:1' [x] Received 'logs.error.two':'this is two error logs:0' [x] Received 'logs.error.two':'this is two error logs:1' [x] Received 'logs.error.two':'this is two error logs:2' [x] Received 'logs.info.one':'this is one info logs:0' [x] Received 'logs.info.one':'this is one info logs:1' [x] Received 'logs.info.one':'this is one info logs:2' [x] Received 'logs.info.one':'this is one info logs:3'
新聞熱點
疑難解答