国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

RabbitMQ

2019-11-14 21:54:48
字體:
來源:轉載
供稿:網友
RabbitMQ - topic

在publish/subscribe模式中使用fanout類型有個缺陷,就是不能選擇性接收的消息。我們可以讓consumer獲得所有已發布的消息中指定的幾個消息。

在之前的例子中我們這樣綁定exchange和隊列:

channel.queueBind(queueName, EXCHANGE_NAME, "");

暫且不論該代碼中綁定的exchange類型,這里空著的參數就是routing key。routing key的意義與exchange類型有關,比如使用fanout類型就會忽略掉routing key。

而解決這一問題的就是direct類型。direct exchange并不復雜,只不過是PRoducer和consumer雙方的exchange對應時還需要對應routing key。

以下代碼中,同一個exchange和兩個隊列進行綁定,兩個隊列分別和不同的binding key綁定。(PS:當然,我們也可以將同一個routing key綁定給不同的隊列也沒有問題。)另外,SERVERITY變量是rounting數組,假設將日志通過exchange發送出去,consumer根據自己的需要獲取不同級別的日志:

final class ChannelFactory_{    private final static ConnectionFactory connFactory = new ConnectionFactory();     public final static String EXCHANGE_NAME = "direct_exchange";    public final static String[] SEVERITY = {"info","warning","error"};     static {        Channel temp = getChannel();        try {            temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);        } catch (IOException e) {            e.printStackTrace();        }    }     public static Channel getChannel(int channelNumber){        try {            Connection connection = connFactory.newConnection();            return connection.createChannel(channelNumber);        } catch (IOException e) {            e.printStackTrace();        }return null;    }     public static Channel getChannel(){        try {            Connection connection = connFactory.newConnection();            return connection.createChannel();        } catch (IOException e) {            e.printStackTrace();        }return null;    }     public static void  closeChannel(Channel channel) throws IOException {        channel.close();        channel.getConnection().close();    } }

確認定義:

consumer只需要warning和error級別(routing)的日志消息:

public static void main(String[] args) throws IOException, InterruptedException {        Channel channel = ChannelFactory_.getChannel();         String queueName = channel.queueDeclare().getQueue();        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning");        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error");         QueueingConsumer consumer = new QueueingConsumer(channel);        channel.basicConsume(queueName,true,consumer);        while(true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            String routingKey = delivery.getEnvelope().getRoutingKey();             System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");        }     }

producer將所有級別的日志都發送出去:

public static void main(String[] args) throws IOException {        Channel channel = ChannelFactory_.getChannel();        String content = "message "+new Date();         for (int i = 0; i <ChannelFactory_.SEVERITY.length ; i++) {            channel.basicPublish(EXCHANGE_NAME,ChannelFactory_.SEVERITY[i],null,content.getBytes());        }        ChannelFactory_.closeChannel(channel);    }

運行結果:

direct exchange可以讓我們有選擇性地接受消息。但這樣做仍然有缺陷。雖然我可以只要求error和warning級別的日志,但是我不能再進行細分。比如我只想要數據庫相關的error和warning級別的日志。

為了實現這一點,我們需要使用另一個exchange類型——Topic。exchange類型為topic時,routing key是一組用"."隔開的詞,但僅限255bytes。比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"

topic和direct的不同點還有在consumer中定義routing key時我們可以使用通配符,比如:符號'*':可以匹配某一個詞。符號'#':可以匹配0~N個詞。

舉個例子說明,假設我們用rounting key描述一個動物。格式為: <性格>.<顏色>.<種類>用符號'*',我想要得到桔***的動物,即:"*.orange.*"用符號'#',我想要得到懶散的動物,即:"lazy.#"如果使用過程中有人破壞了格式,即使rounting key為"lazy.orange.male.rabbit"也可以匹配"lazy.#"。

稍微修改上面的代碼,首先定義一個topic exchange。

public  final static String EXCHANGE_NAME = "topic_exchange";

temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);

確認定義:

發送sql相關的log:

public static void main(String[] args) throws IOException {        Channel channel = ChannelFactory_.getChannel();        String content = "message #$#$#$#$#$#$";         channel.basicPublish(EXCHANGE_NAME,"warning.sql.connection.close",null,content.getBytes());        channel.basicPublish(EXCHANGE_NAME,"error.sql.syntax",null,content.getBytes());         ChannelFactory_.closeChannel(channel);    }

consumer接收所有sql相關的warning和所有error:

public static void main(String[] args) throws IOException, InterruptedException {        Channel channel = ChannelFactory_.getChannel();         String queueName = channel.queueDeclare().getQueue();        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning.sql.#");        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error.#");         QueueingConsumer consumer = new QueueingConsumer(channel);        channel.basicConsume(queueName,true,consumer);        while(true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            String routingKey = delivery.getEnvelope().getRoutingKey();             System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");        }     }

運行結果:


上一篇:RabbitMQ

下一篇:Java異常(1)

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 浦县| 三门县| 河西区| 富平县| 青河县| 克拉玛依市| 清水县| 尼木县| 阳信县| 盖州市| 孝感市| 新竹县| 普陀区| 阳山县| 西安市| 原平市| 新沂市| 丹阳市| 榆社县| 申扎县| 汕头市| 龙南县| 阿拉善右旗| 江北区| 阆中市| 南通市| 县级市| 八宿县| 海晏县| 海兴县| 衡山县| 扶风县| 邢台县| 酉阳| 长白| 大埔县| 玛纳斯县| 铜梁县| 塔河县| 正阳县| 达拉特旗|