這次我們試試publish / subscribe模式,也就是將一個消息發(fā)送給多個consumer。
這里用一個簡單的小程序來說明publish / subscribe。由一個PRovider提供消息,這個消息會被多個consumer接收。consumer對同一個消息做出不同的反應,比如打印、保存到文件、數據庫什么的。
之前的例子可能會給人這種感覺:producer將消息發(fā)送到隊列中,消息緩沖在隊列中,consumer從隊列獲得消息。
但這并不正確。在rabbit中,producer從來不會直接將消息發(fā)送到隊列中。producer根本無從得知消息是否會發(fā)送到某個隊列中。
事實上,producer只能將消息發(fā)送到exchange中。這么一說雖然感覺多了個東西,但exchange并不復雜。exchange只是從producer獲取消息并將消息推送到隊列中。
但為什么多了這么個步驟?比如exchange收到消息后,它應該將消息推送給某個特定的隊列? 或者可以將消息推送給多個隊列? 再或者直接拋棄該消息? 這些規(guī)則取決于exchange的類型。
以下是一些可用的exchange type(org.springframework.amqp.core.ExchangeTypes):
public static final String DIRECT = "direct";public static final String TOPIC = "topic";public static final String FANOUT = "fanout";public static final String HEADERS = "headers";public static final String SYSTEM = "system";
我們可以用以下方式定義一個exchange:
channel.exchangeDeclare("logs", "fanout");正如其名,fanout就是將收到的消息發(fā)送給所有可訪問的隊列。
如何查看已定義的exchange?查看已定義的exchange,我們可以用rabbitmqctl list_exchanges命令,如圖:

圖中名為amq.*和沒有名字的exchange都是默認自帶的。(PS:之前的例子中我們還沒有用到exchange的概念,但仍然成功地將消息發(fā)送到了隊列中。這是因為我們使用的是默認的exchange。)
我們需要將消息發(fā)送到指定的exchange中。basicPublish的第一個參數就是exchange的名稱(重寫的幾個都是)。空字符串表示默認的exchange:
channel.basicPublish( "logs", "", null, message.getBytes());
隊列的命名很重要,比如多個worker共享一個隊列,producer和consumer的關系用隊列名維系。但并不是所有的場景都需要我們親自去命名。比如我們需要獲得所有消息,而不是它的某個子集。或者我們更關心最新的消息,而不是更早放到隊列的那些。
我們需要讓server隨機命名隊列,并且隊列在consumer連接斷開時自動刪除。
我們只需要一行代碼來做這些:
String queueName = channel.queueDeclare().getQueue();
調用不帶參數的queueDeclare()可以創(chuàng)建一個臨時隊列。
到此我們就創(chuàng)建好了exchange和隊列。我們需要用什么東西將他們聯(lián)系起來,這個東西叫作"binding"。
通過以下代碼將他們聯(lián)系起來:
channel.queueBind(queueName, "logs", "");
正如查看exchange那樣,我們可以用rabbitmqctl list_bindings命令查看binding。如圖:
從producer到queue的關系圖如下:
寫了個Channel靜態(tài)工廠,寫的不好。我打算在靜態(tài)初始化塊中定義兩個exchange:
final class ChannelFactory { //consumer的temporary queue與這兩個exchange綁定 final static String EXCHANGE_NAME = "log"; final static String EXCHANGE_NAME_ = "log2"; private static final ConnectionFactory factory = new ConnectionFactory(); static{ try { Channel temp = getChannel(); temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT); temp.exchangeDeclare(EXCHANGE_NAME_, ExchangeTypes.FANOUT); closeChannel(temp); } catch (IOException e) { e.printStackTrace(); } } private ChannelFactory() { } public static Channel getChannel() { try { return factory.newConnection().createChannel(); } catch (IOException e) { e.printStackTrace(); } return null; } public static Channel getChannel(int channelNumber) { try { return factory.newConnection().createChannel(); } catch (IOException e) { e.printStackTrace(); } return null; } public static void closeChannel(Channel channel) { try { channel.close(); channel.getConnection().close(); } catch (IOException e) { e.printStackTrace(); } } }producer類,同一個producer給兩個exchange發(fā)消息:
class Publisher { public static void main(String[] args) throws IOException { Channel channel = ChannelFactory.getChannel(); String message = "Here is the content"; channel.basicPublish(ChannelFactory.EXCHANGE_NAME, StringUtils.EMPTY, null, ("EXCHANGE_NAME 1:::"+message).getBytes()); channel.basicPublish(ChannelFactory.EXCHANGE_NAME_, StringUtils.EMPTY, null, ("EXCHANGE_NAME 2:::"+message).getBytes()); ChannelFactory.closeChannel(channel); }}consumer類,臨時隊列需要和兩個exchange進行綁定:
class Logger { public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { Channel channel = ChannelFactory.getChannel(); String queue = channel.queueDeclare().getQueue(); System.out.println("temporary queue name::"+queue); channel.queueBind(queue, ChannelFactory.EXCHANGE_NAME, ""); channel.queueBind(queue, ChannelFactory.EXCHANGE_NAME_, ""); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queue, true, consumer); while (true) { System.out.println(new String(consumer.nextDelivery().getBody())); } }}由于使用的是臨時隊列,需要先運行consumer再運行producer。運行結果輸出:
新聞熱點
疑難解答