在publish/subscribe模式中使用fanout類型有個缺陷,就是不能選擇性接收的消息。我們可以讓consumer獲得所有已發布的消息中指定的幾個消息。
在之前的例子中我們這樣綁定exchange和隊列:
channel.queueBind(queueName, EXCHANGE_NAME, "");暫且不論該代碼中綁定的exchange類型,這里空著的參數就是routing key。routing key的意義與exchange類型有關,比如使用fanout類型就會忽略掉routing key。
而解決這一問題的就是direct類型。direct exchange并不復雜,只不過是PRoducer和consumer雙方的exchange對應時還需要對應routing key。
以下代碼中,同一個exchange和兩個隊列進行綁定,兩個隊列分別和不同的binding key綁定。(PS:當然,我們也可以將同一個routing key綁定給不同的隊列也沒有問題。)另外,SERVERITY變量是rounting數組,假設將日志通過exchange發送出去,consumer根據自己的需要獲取不同級別的日志:
final class ChannelFactory_{ private final static ConnectionFactory connFactory = new ConnectionFactory(); public final static String EXCHANGE_NAME = "direct_exchange"; public final static String[] SEVERITY = {"info","warning","error"}; static { Channel temp = getChannel(); try { temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT); } catch (IOException e) { e.printStackTrace(); } } public static Channel getChannel(int channelNumber){ try { Connection connection = connFactory.newConnection(); return connection.createChannel(channelNumber); } catch (IOException e) { e.printStackTrace(); }return null; } public static Channel getChannel(){ try { Connection connection = connFactory.newConnection(); return connection.createChannel(); } catch (IOException e) { e.printStackTrace(); }return null; } public static void closeChannel(Channel channel) throws IOException { channel.close(); channel.getConnection().close(); } }確認定義:

consumer只需要warning和error級別(routing)的日志消息:
public static void main(String[] args) throws IOException, InterruptedException { Channel channel = ChannelFactory_.getChannel(); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning"); channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName,true,consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } }producer將所有級別的日志都發送出去:
public static void main(String[] args) throws IOException { Channel channel = ChannelFactory_.getChannel(); String content = "message "+new Date(); for (int i = 0; i <ChannelFactory_.SEVERITY.length ; i++) { channel.basicPublish(EXCHANGE_NAME,ChannelFactory_.SEVERITY[i],null,content.getBytes()); } ChannelFactory_.closeChannel(channel); }運行結果:

direct exchange可以讓我們有選擇性地接受消息。但這樣做仍然有缺陷。雖然我可以只要求error和warning級別的日志,但是我不能再進行細分。比如我只想要數據庫相關的error和warning級別的日志。
為了實現這一點,我們需要使用另一個exchange類型——Topic。exchange類型為topic時,routing key是一組用"."隔開的詞,但僅限255bytes。比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"
topic和direct的不同點還有在consumer中定義routing key時我們可以使用通配符,比如:符號'*':可以匹配某一個詞。符號'#':可以匹配0~N個詞。

舉個例子說明,假設我們用rounting key描述一個動物。格式為: <性格>.<顏色>.<種類>用符號'*',我想要得到桔***的動物,即:"*.orange.*"用符號'#',我想要得到懶散的動物,即:"lazy.#"如果使用過程中有人破壞了格式,即使rounting key為"lazy.orange.male.rabbit"也可以匹配"lazy.#"。
稍微修改上面的代碼,首先定義一個topic exchange。
public final static String EXCHANGE_NAME = "topic_exchange";temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);確認定義:

發送sql相關的log:
public static void main(String[] args) throws IOException { Channel channel = ChannelFactory_.getChannel(); String content = "message #$#$#$#$#$#$"; channel.basicPublish(EXCHANGE_NAME,"warning.sql.connection.close",null,content.getBytes()); channel.basicPublish(EXCHANGE_NAME,"error.sql.syntax",null,content.getBytes()); ChannelFactory_.closeChannel(channel); }consumer接收所有sql相關的warning和所有error:
public static void main(String[] args) throws IOException, InterruptedException { Channel channel = ChannelFactory_.getChannel(); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning.sql.#"); channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error.#"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName,true,consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } }運行結果:

新聞熱點
疑難解答