国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發(fā)設計 > 正文

RabbitMQ

2019-11-14 21:54:39
字體:
來源:轉載
供稿:網友
RabbitMQ - 發(fā)布訂閱

這次我們試試publish / subscribe模式,也就是將一個消息發(fā)送給多個consumer。

這里用一個簡單的小程序來說明publish / subscribe。由一個PRovider提供消息,這個消息會被多個consumer接收。consumer對同一個消息做出不同的反應,比如打印、保存到文件、數據庫什么的。

之前的例子可能會給人這種感覺:producer將消息發(fā)送到隊列中,消息緩沖在隊列中,consumer從隊列獲得消息。

但這并不正確。在rabbit中,producer從來不會直接將消息發(fā)送到隊列中。producer根本無從得知消息是否會發(fā)送到某個隊列中。

事實上,producer只能將消息發(fā)送到exchange中。這么一說雖然感覺多了個東西,但exchange并不復雜。exchange只是從producer獲取消息并將消息推送到隊列中。

但為什么多了這么個步驟?比如exchange收到消息后,它應該將消息推送給某個特定的隊列? 或者可以將消息推送給多個隊列? 再或者直接拋棄該消息? 這些規(guī)則取決于exchange的類型。

以下是一些可用的exchange type(org.springframework.amqp.core.ExchangeTypes):

public static final String DIRECT = "direct";public static final String TOPIC = "topic";public static final String FANOUT = "fanout";public static final String HEADERS = "headers";public static final String SYSTEM = "system";

我們可以用以下方式定義一個exchange:

channel.exchangeDeclare("logs", "fanout");

正如其名,fanout就是將收到的消息發(fā)送給所有可訪問的隊列。

如何查看已定義的exchange?查看已定義的exchange,我們可以用rabbitmqctl list_exchanges命令,如圖:

圖中名為amq.*和沒有名字的exchange都是默認自帶的。(PS:之前的例子中我們還沒有用到exchange的概念,但仍然成功地將消息發(fā)送到了隊列中。這是因為我們使用的是默認的exchange。)

我們需要將消息發(fā)送到指定的exchange中。basicPublish的第一個參數就是exchange的名稱(重寫的幾個都是)。空字符串表示默認的exchange:

channel.basicPublish( "logs", "", null, message.getBytes());

隊列的命名很重要,比如多個worker共享一個隊列,producer和consumer的關系用隊列名維系。但并不是所有的場景都需要我們親自去命名。比如我們需要獲得所有消息,而不是它的某個子集。或者我們更關心最新的消息,而不是更早放到隊列的那些。

我們需要讓server隨機命名隊列,并且隊列在consumer連接斷開時自動刪除。

我們只需要一行代碼來做這些:

String queueName = channel.queueDeclare().getQueue();

調用不帶參數的queueDeclare()可以創(chuàng)建一個臨時隊列。

到此我們就創(chuàng)建好了exchange和隊列。我們需要用什么東西將他們聯(lián)系起來,這個東西叫作"binding"。

通過以下代碼將他們聯(lián)系起來:

channel.queueBind(queueName, "logs", "");

正如查看exchange那樣,我們可以用rabbitmqctl list_bindings命令查看binding。如圖:

從producer到queue的關系圖如下:

寫了個Channel靜態(tài)工廠,寫的不好。我打算在靜態(tài)初始化塊中定義兩個exchange:

final class ChannelFactory {         //consumer的temporary queue與這兩個exchange綁定    final static String EXCHANGE_NAME = "log";    final static String EXCHANGE_NAME_ = "log2";         private static final ConnectionFactory factory = new ConnectionFactory();         static{        try {            Channel temp = getChannel();            temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);            temp.exchangeDeclare(EXCHANGE_NAME_, ExchangeTypes.FANOUT);            closeChannel(temp);        } catch (IOException e) {            e.printStackTrace();        }    }     private ChannelFactory() {    }     public static Channel getChannel() {        try {            return factory.newConnection().createChannel();        } catch (IOException e) {            e.printStackTrace();        }        return null;    }     public static Channel getChannel(int channelNumber) {        try {            return factory.newConnection().createChannel();        } catch (IOException e) {            e.printStackTrace();        }        return null;    }     public static void closeChannel(Channel channel) {        try {            channel.close();            channel.getConnection().close();        } catch (IOException e) {            e.printStackTrace();        }    }     }

producer類,同一個producer給兩個exchange發(fā)消息:

class Publisher {    public static void main(String[] args) throws IOException {         Channel channel = ChannelFactory.getChannel();         String message = "Here is the content";        channel.basicPublish(ChannelFactory.EXCHANGE_NAME, StringUtils.EMPTY, null,                ("EXCHANGE_NAME 1:::"+message).getBytes());        channel.basicPublish(ChannelFactory.EXCHANGE_NAME_, StringUtils.EMPTY, null,                ("EXCHANGE_NAME 2:::"+message).getBytes());                 ChannelFactory.closeChannel(channel);    }}

consumer類,臨時隊列需要和兩個exchange進行綁定:

class Logger {    public static void main(String[] args) throws IOException,            ShutdownSignalException, ConsumerCancelledException,            InterruptedException {        Channel channel = ChannelFactory.getChannel();         String queue = channel.queueDeclare().getQueue();        System.out.println("temporary queue name::"+queue);         channel.queueBind(queue, ChannelFactory.EXCHANGE_NAME, "");        channel.queueBind(queue, ChannelFactory.EXCHANGE_NAME_, "");         QueueingConsumer consumer = new QueueingConsumer(channel);        channel.basicConsume(queue, true, consumer);         while (true) {            System.out.println(new String(consumer.nextDelivery().getBody()));        }    }}

由于使用的是臨時隊列,需要先運行consumer再運行producer。運行結果輸出:


上一篇:RabbitMQ

下一篇:RabbitMQ

發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 新巴尔虎左旗| 奉节县| 浑源县| 武冈市| 尼木县| 临泽县| 徐闻县| 即墨市| 潼南县| 井陉县| 庆元县| 大庆市| 扎鲁特旗| 丹东市| 图片| 崇州市| 苏州市| 科尔| 静海县| 焦作市| 清徐县| 榆林市| 长沙市| 涞源县| 鹤庆县| 和林格尔县| 新昌县| 瓮安县| 大同县| 新津县| 崇仁县| 南丰县| 双辽市| 庆城县| 时尚| 邛崃市| 武定县| 青铜峡市| 安远县| 九江市| 平阳县|