国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

RabbitMQ

2019-11-14 22:00:30
字體:
來源:轉載
供稿:網友
RabbitMQ - exchange

總結一下幾種ExchangeTypes。

之前寫發布/訂閱模式時第一次提到了exchange type。即PRoducer不是將消息直接放到隊列中,而是先到exchange中,exchange主要用于控制消息到隊列的路由,根據具體的exchange type將消息傳給需要的隊列或者直接廢棄。在這一篇中總結一下那些用到的exchange type。

一.Direct Exchangedirect exchange算是最基本的了。direct exchange用于將帶上routing key的消息傳值擁有相同routing key的隊列中。

當我們想用一個簡單的標識符區別所有傳入同一個exchange中的消息時direct exchange就非常合適。

private static String DIRECT_EXCHANGE = "DIRECT_EXCHAGNE";     static class FanoutProducer {        public static void main(String[] args) throws IOException {            ConnectionFactory connectionFactory = new ConnectionFactory();            Connection connection = connectionFactory.newConnection();            Channel channel= connection.createChannel();;             String content = "I miss the conversation";            channel.exchangeDeclare(DIRECT_EXCHANGE, ExchangeTypes.DIRECT);            channel.basicPublish(DIRECT_EXCHANGE, "alvez", null, content.getBytes());        }    }     static class FanoutConsumer {        public static void main(String[] args) throws IOException, InterruptedException {            ConnectionFactory connectionFactory = new ConnectionFactory();            Connection connection = connectionFactory.newConnection();            Channel channel= connection.createChannel();             String queueName = channel.queueDeclare().getQueue();            channel.queueBind(queueName, DIRECT_EXCHANGE, "alvez");             QueueingConsumer consumer = new QueueingConsumer(channel);            String s = channel.basicConsume(queueName, true, consumer);            System.out.println(s);            while (true) {                QueueingConsumer.Delivery delivery = consumer.nextDelivery();                String message = new String(delivery.getBody());                String routingKey = delivery.getEnvelope().getRoutingKey();                 System.out.println("From:" + routingKey + "':'" + message + "'");            }         }  }

二.Fanout Exchangefanout和routing key無關,它將消息無差別地(indiscriminately)傳送給所有隊列。fanout exchange通常用于發布/訂閱模式。將消息傳送給不同的隊列,不同的隊列對同一種消息采取不同的行為。比如,現在有一個客戶訂單消息被三個隊列接收,隊列1完成該訂單,隊列2將訂單寫入日志,隊列3將訂單發給別的部門什么的。比如下面的代碼,消費者可以獲得routing key并輸出,但能否獲取與routing key無關:

    private static String FANOUT_EXCHANGE = "FANOUT_EXCHANGE";     static class DirectProducer {        public static void main(String[] args) throws IOException {            ConnectionFactory connectionFactory = new ConnectionFactory();            Connection connection = connectionFactory.newConnection();            Channel channel= connection.createChannel();;             String content = "I miss the conversation";            channel.exchangeDeclare(FANOUT_EXCHANGE, ExchangeTypes.FANOUT);            channel.basicPublish(FANOUT_EXCHANGE, "alvez", null, content.getBytes());        }    }     static class DirectConsumer {        public static void main(String[] args) throws IOException, InterruptedException {            ConnectionFactory connectionFactory = new ConnectionFactory();            Connection connection = connectionFactory.newConnection();            Channel channel= connection.createChannel();             String queueName = channel.queueDeclare().getQueue();            channel.queueBind(queueName, FANOUT_EXCHANGE, "");             QueueingConsumer consumer = new QueueingConsumer(channel);            String s = channel.basicConsume(queueName, true, consumer);            System.out.println(s);            while (true) {                QueueingConsumer.Delivery delivery = consumer.nextDelivery();                String message = new String(delivery.getBody());                String routingKey = delivery.getEnvelope().getRoutingKey();                 System.out.println("From:" + routingKey + "':'" + message + "'");            }         }     }

三.Topic Exchange如果根據topic exchange用法說明其特征的話反而更麻煩。topic exchange正如其名,就是根據某種主題而不是特定的標題,也就是可以匹配routing key的一部分或者全部。topic exchange的routing key可以有多個詞組成,詞用'.'分隔。routing key中可以包括'*'或者'#','*'表示一個詞,'#'表示0~N個詞。

比如消息發布時routing key為"honda.civic.navy",能接收消息的隊列的routing key可以是"honda.civic.navy"或"*.civic.*"或"honda.#"或"#",但不能是"honda.accord.navy"或"honda.accord.silver"或"*.accord.*"或"ford.#"。

    private static String TOPIC_EXCHANGE = "TOPIC_EXCHAGNE";     static class TopicProducer {        public static void main(String[] args) throws IOException {            ConnectionFactory connectionFactory = new ConnectionFactory();            Connection connection = connectionFactory.newConnection();            Channel channel= connection.createChannel();;             String content = "I miss the conversation";            channel.exchangeDeclare(TOPIC_EXCHANGE, ExchangeTypes.TOPIC);            channel.basicPublish(TOPIC_EXCHANGE, "alvez.dep.FBI.map", null, content.getBytes());        }    }     static class TopicConsumer {        public static void main(String[] args) throws IOException, InterruptedException {            ConnectionFactory connectionFactory = new ConnectionFactory();            Connection connection = connectionFactory.newConnection();            Channel channel= connection.createChannel();             String queueName = channel.queueDeclare().getQueue();            channel.queueBind(queueName, TOPIC_EXCHANGE, "alvez.#");             QueueingConsumer consumer = new QueueingConsumer(channel);            String s = channel.basicConsume(queueName, true, consumer);            System.out.println(s);            while (true) {                QueueingConsumer.Delivery delivery = consumer.nextDelivery();                String message = new String(delivery.getBody());                String routingKey = delivery.getEnvelope().getRoutingKey();                 System.out.println("From:" + routingKey + "':'" + message + "'");            }         }  }

四.Headers Exchange即消息頭和隊列中聲明的消息頭匹配時可以通信,似乎不是很常用。就可以定義多個條件進行匹配這一點來說,headers exchange和topic exchange有些相似。有時候會給人"為什么會有這種東西?"的感覺,相比topic exchage有什么優勢?如果僅僅說"headers exchange是基于headers的,topic exchange是基于routing key的",這種回答沒什么意義。

代碼如下,可以看到producer和consumer的routing key是不同的,producer的header通過properties對象傳輸:

  private static String HEADERS_EXCHANGE = "HEADERS_EXCHANGE";     static class HeadersProducer {        public static void main(String[] args) throws IOException {            ConnectionFactory connectionFactory = new ConnectionFactory();            Connection connection = connectionFactory.newConnection();            Channel channel = connection.createChannel();             String content = "I miss the conversation";            channel.exchangeDeclare(HEADERS_EXCHANGE, ExchangeTypes.HEADERS);            AMQP.BasicProperties properties = new AMQP.BasicProperties();            Map<String,Object> map = new HashMap<>();            map.put("key1","val1");            properties.setHeaders(map);             channel.basicPublish(HEADERS_EXCHANGE, "alvez", properties, content.getBytes());         }    }     static class HeadersConsumer {        public static void main(String[] args) throws IOException, InterruptedException {            ConnectionFactory connectionFactory = new ConnectionFactory();            Connection connection = connectionFactory.newConnection();            Channel channel = connection.createChannel();             String queueName = channel.queueDeclare().getQueue();            Map<String, Object> headers = new HashMap<>();            headers.put("key1","val1");            headers.put("key2","val2");            headers.put("key3","val3");            headers.put("key4","val4");            channel.queueBind(queueName, HEADERS_EXCHANGE, "",headers);             QueueingConsumer consumer = new QueueingConsumer(channel);            String s = channel.basicConsume(queueName, true, consumer);            System.out.println(s);            while (true) {                QueueingConsumer.Delivery delivery = consumer.nextDelivery();                String message = new String(delivery.getBody());                String routingKey = delivery.getEnvelope().getRoutingKey();                 System.out.println("From:" + routingKey + "':'" + message + "'");            }         }     }

(ps:圖不錯,感謝圖片作者。)


上一篇:RabbitMQ

下一篇:Java版打字練習游戲源碼

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 修文县| 高安市| 商城县| 屯门区| 广安市| 德清县| 寿光市| 日照市| 都兰县| 永州市| 长治市| 德格县| 南丹县| 桐城市| 安岳县| 香港| 逊克县| 鸡西市| 新田县| 蓬莱市| 黄平县| 龙里县| 横山县| 丽江市| 安宁市| 全椒县| 天长市| 乃东县| 虞城县| 体育| 湖北省| 大宁县| 海晏县| 惠州市| 缙云县| 石门县| 湘潭县| 加查县| 山东省| 舞钢市| 淮南市|