總結一下幾種ExchangeTypes。
之前寫發布/訂閱模式時第一次提到了exchange type。即PRoducer不是將消息直接放到隊列中,而是先到exchange中,exchange主要用于控制消息到隊列的路由,根據具體的exchange type將消息傳給需要的隊列或者直接廢棄。在這一篇中總結一下那些用到的exchange type。
一.Direct Exchangedirect exchange算是最基本的了。direct exchange用于將帶上routing key的消息傳值擁有相同routing key的隊列中。
當我們想用一個簡單的標識符區別所有傳入同一個exchange中的消息時direct exchange就非常合適。
private static String DIRECT_EXCHANGE = "DIRECT_EXCHAGNE"; static class FanoutProducer { public static void main(String[] args) throws IOException { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = connectionFactory.newConnection(); Channel channel= connection.createChannel();; String content = "I miss the conversation"; channel.exchangeDeclare(DIRECT_EXCHANGE, ExchangeTypes.DIRECT); channel.basicPublish(DIRECT_EXCHANGE, "alvez", null, content.getBytes()); } } static class FanoutConsumer { public static void main(String[] args) throws IOException, InterruptedException { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = connectionFactory.newConnection(); Channel channel= connection.createChannel(); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, DIRECT_EXCHANGE, "alvez"); QueueingConsumer consumer = new QueueingConsumer(channel); String s = channel.basicConsume(queueName, true, consumer); System.out.println(s); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println("From:" + routingKey + "':'" + message + "'"); } } }二.Fanout Exchangefanout和routing key無關,它將消息無差別地(indiscriminately)傳送給所有隊列。
fanout exchange通常用于發布/訂閱模式。將消息傳送給不同的隊列,不同的隊列對同一種消息采取不同的行為。比如,現在有一個客戶訂單消息被三個隊列接收,隊列1完成該訂單,隊列2將訂單寫入日志,隊列3將訂單發給別的部門什么的。比如下面的代碼,消費者可以獲得routing key并輸出,但能否獲取與routing key無關:
private static String FANOUT_EXCHANGE = "FANOUT_EXCHANGE"; static class DirectProducer { public static void main(String[] args) throws IOException { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = connectionFactory.newConnection(); Channel channel= connection.createChannel();; String content = "I miss the conversation"; channel.exchangeDeclare(FANOUT_EXCHANGE, ExchangeTypes.FANOUT); channel.basicPublish(FANOUT_EXCHANGE, "alvez", null, content.getBytes()); } } static class DirectConsumer { public static void main(String[] args) throws IOException, InterruptedException { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = connectionFactory.newConnection(); Channel channel= connection.createChannel(); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, FANOUT_EXCHANGE, ""); QueueingConsumer consumer = new QueueingConsumer(channel); String s = channel.basicConsume(queueName, true, consumer); System.out.println(s); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println("From:" + routingKey + "':'" + message + "'"); } } }三.Topic Exchange如果根據topic exchange用法說明其特征的話反而更麻煩。topic exchange正如其名,就是根據某種主題而不是特定的標題,也就是可以匹配routing key的一部分或者全部。topic exchange的routing key可以有多個詞組成,詞用'.'分隔。routing key中可以包括'*'或者'#','*'表示一個詞,'#'表示0~N個詞。

比如消息發布時routing key為"honda.civic.navy",能接收消息的隊列的routing key可以是"honda.civic.navy"或"*.civic.*"或"honda.#"或"#",但不能是"honda.accord.navy"或"honda.accord.silver"或"*.accord.*"或"ford.#"。
private static String TOPIC_EXCHANGE = "TOPIC_EXCHAGNE"; static class TopicProducer { public static void main(String[] args) throws IOException { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = connectionFactory.newConnection(); Channel channel= connection.createChannel();; String content = "I miss the conversation"; channel.exchangeDeclare(TOPIC_EXCHANGE, ExchangeTypes.TOPIC); channel.basicPublish(TOPIC_EXCHANGE, "alvez.dep.FBI.map", null, content.getBytes()); } } static class TopicConsumer { public static void main(String[] args) throws IOException, InterruptedException { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = connectionFactory.newConnection(); Channel channel= connection.createChannel(); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, TOPIC_EXCHANGE, "alvez.#"); QueueingConsumer consumer = new QueueingConsumer(channel); String s = channel.basicConsume(queueName, true, consumer); System.out.println(s); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println("From:" + routingKey + "':'" + message + "'"); } } }四.Headers Exchange即消息頭和隊列中聲明的消息頭匹配時可以通信,似乎不是很常用。就可以定義多個條件進行匹配這一點來說,headers exchange和topic exchange有些相似。有時候會給人"為什么會有這種東西?"的感覺,相比topic exchage有什么優勢?如果僅僅說"headers exchange是基于headers的,topic exchange是基于routing key的",這種回答沒什么意義。

代碼如下,可以看到producer和consumer的routing key是不同的,producer的header通過properties對象傳輸:
private static String HEADERS_EXCHANGE = "HEADERS_EXCHANGE"; static class HeadersProducer { public static void main(String[] args) throws IOException { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String content = "I miss the conversation"; channel.exchangeDeclare(HEADERS_EXCHANGE, ExchangeTypes.HEADERS); AMQP.BasicProperties properties = new AMQP.BasicProperties(); Map<String,Object> map = new HashMap<>(); map.put("key1","val1"); properties.setHeaders(map); channel.basicPublish(HEADERS_EXCHANGE, "alvez", properties, content.getBytes()); } } static class HeadersConsumer { public static void main(String[] args) throws IOException, InterruptedException { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String queueName = channel.queueDeclare().getQueue(); Map<String, Object> headers = new HashMap<>(); headers.put("key1","val1"); headers.put("key2","val2"); headers.put("key3","val3"); headers.put("key4","val4"); channel.queueBind(queueName, HEADERS_EXCHANGE, "",headers); QueueingConsumer consumer = new QueueingConsumer(channel); String s = channel.basicConsume(queueName, true, consumer); System.out.println(s); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println("From:" + routingKey + "':'" + message + "'"); } } }(ps:圖不錯,感謝圖片作者。)
新聞熱點
疑難解答