国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

RabbitMQ

2019-11-14 21:54:39
字體:
供稿:網(wǎng)友
RabbitMQ - 任務(wù)隊(duì)列

這次我們試著實(shí)現(xiàn)這樣一個(gè)小程序:

嗯,就是任務(wù)隊(duì)列(task queue)。不是將任務(wù)集中在一堆并一直等到所有任務(wù)一并完成為止,而是將每一個(gè)任務(wù)封裝為一個(gè)消息,并將其發(fā)送到隊(duì)列,后臺的workers就從隊(duì)列中分擔(dān)工作。web應(yīng)用尤其喜歡這種處理方式,比如面對一個(gè)請求時(shí)我們有一大堆復(fù)雜邏輯需要處理,而我們卻不需要立即響應(yīng)處理結(jié)果,那就放到后面慢慢弄。(PS:另外也有直接對任務(wù)進(jìn)行持久化,然后用scheduler什么的去定時(shí)處理。無論如何,沒有銀彈。)

對于復(fù)雜的任務(wù),我們可以用Thread.sleep模擬一下。比如

provider也簡單模擬一下,一次塞個(gè)20個(gè)消息到隊(duì)列:

public static void main(String[] argv) throws java.io.IOException {     ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);     String message = "Hello...";     for (int i = 0; i < 20; i++) {        channel.basicPublish("", TASK_QUEUE_NAME,                MessageProperties.PERSISTENT_TEXT_PLAIN, message.concat(i+1+"").getBytes());        System.out.println(" [x] Sent '" + message + (i + 1) + "'  "                + (i + 1) + " times");    }     channel.close();    connection.close();}

有一個(gè)需要注意的地方,就是consumer攬了活后沒干完就死掉了。我需要其他還活著的consumer替死者完成工作。RabbitMQ支持消息應(yīng)答,如果Worder沒有做出應(yīng)答卻死掉了,provider則會將消息重新發(fā)給其他活著的consumer。但這個(gè)和timeout無關(guān),只有在worker的connection斷掉時(shí)才會重新發(fā)送。

如果調(diào)用了沒有autoAck參數(shù)的basicConsume,消息應(yīng)答默認(rèn)是啟用的,也就是autoAck=false。

boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

當(dāng)autoAck==false時(shí)需要我們顯示調(diào)用channel.basicAck方法將接收的消息ack一下。如果接收了消息卻不顯示調(diào)用應(yīng)答方法,就不能再接收新的消息,這就造成了浪費(fèi)。另外,如果設(shè)置了autoAck就不要顯示進(jìn)行應(yīng)答,否則會來一個(gè)com.rabbitmq.client.ShutdownSignalException。

consumer死了有其他人處理后事,那整個(gè)server死掉了怎么辦?為了讓消息不丟失,我們需要將隊(duì)列和消息標(biāo)記為durable。

boolean durable = true;channel.queueDeclare("hello", durable, false, false, null);

好了,這樣即使重啟RabbitMQ服務(wù)也不會丟失隊(duì)列。

但這并不保證消息不會丟失,為了保證這一點(diǎn),我們在provider發(fā)布消息時(shí)加了essageProperties.PERSISTENT_TEXT_PLAIN:

channel.basicPublish("", TASK_QUEUE_NAME,                    MessageProperties.PERSISTENT_TEXT_PLAIN, message.concat(i+1+"").getBytes());

雖然這種方式并不完美,我們還需要做其他的一些工作,但暫時(shí)先到這里。

最后一個(gè)問題是,如何做到給consumer公平分配任務(wù)。如果沒有做這個(gè)處理,會出現(xiàn)這樣一種情況。舉個(gè)例子:provider發(fā)送了20個(gè)消息,隨即啟動的consumer_1把這20個(gè)消息全都獨(dú)占了。在consumer_1工作期間又有consumer_2被啟動,但此時(shí)consumer_2沒有任何任務(wù)。此時(shí)provider又發(fā)送了20個(gè)消息,這時(shí)consumer_2會得到10個(gè)任務(wù)。

我們可以使用channel.basicQos(int prefetchCount)方法限制預(yù)獲取的數(shù)量,比如prefetchCount==1就是返回應(yīng)答后可以再獲得1個(gè)消息。

好了,consumer代碼如下:

public class Worker {    private static final String TASK_QUEUE_NAME = "task_queue";     public static void main(String[] argv) throws java.io.IOException,            java.lang.InterruptedException {         ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();         channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");         channel.basicQos(1);         QueueingConsumer consumer = new QueueingConsumer(channel);        boolean autoAck = false;        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);         while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());             System.out.println(" [x] Received '" + message + "'");            doWork(message);            System.out.println(" [x] Done");            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }     private static void doWork(String task) throws InterruptedException {        for (char ch : task.toCharArray()) {            if (ch == '.')                Thread.sleep(1000);        }    } }


上一篇:RabbitMQ

下一篇:RabbitMQ

發(fā)表評論 共有條評論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 曲靖市| 黎城县| 留坝县| 车险| 阳原县| 滕州市| 新河县| 新泰市| 福州市| 洞头县| 贵港市| 平遥县| 崇明县| 电白县| 宜州市| 灵武市| 巫溪县| 荔波县| 武山县| 南昌市| 桐庐县| 太和县| 西青区| 淳化县| 武山县| 江阴市| 巴南区| 和林格尔县| 山东省| 陆丰市| 本溪市| 乌兰县| 都昌县| 柳江县| 买车| 合水县| 宜章县| 慈利县| 望奎县| 通海县| 无棣县|