這次我們試著實(shí)現(xiàn)這樣一個(gè)小程序:
嗯,就是任務(wù)隊(duì)列(task queue)。不是將任務(wù)集中在一堆并一直等到所有任務(wù)一并完成為止,而是將每一個(gè)任務(wù)封裝為一個(gè)消息,并將其發(fā)送到隊(duì)列,后臺的workers就從隊(duì)列中分擔(dān)工作。web應(yīng)用尤其喜歡這種處理方式,比如面對一個(gè)請求時(shí)我們有一大堆復(fù)雜邏輯需要處理,而我們卻不需要立即響應(yīng)處理結(jié)果,那就放到后面慢慢弄。(PS:另外也有直接對任務(wù)進(jìn)行持久化,然后用scheduler什么的去定時(shí)處理。無論如何,沒有銀彈。)
對于復(fù)雜的任務(wù),我們可以用Thread.sleep模擬一下。比如 provider也簡單模擬一下,一次塞個(gè)20個(gè)消息到隊(duì)列: 有一個(gè)需要注意的地方,就是consumer攬了活后沒干完就死掉了。我需要其他還活著的consumer替死者完成工作。RabbitMQ支持消息應(yīng)答,如果Worder沒有做出應(yīng)答卻死掉了,provider則會將消息重新發(fā)給其他活著的consumer。但這個(gè)和timeout無關(guān),只有在worker的connection斷掉時(shí)才會重新發(fā)送。 如果調(diào)用了沒有autoAck參數(shù)的basicConsume,消息應(yīng)答默認(rèn)是啟用的,也就是autoAck=false。 當(dāng)autoAck==false時(shí)需要我們顯示調(diào)用channel.basicAck方法將接收的消息ack一下。如果接收了消息卻不顯示調(diào)用應(yīng)答方法,就不能再接收新的消息,這就造成了浪費(fèi)。另外,如果設(shè)置了autoAck就不要顯示進(jìn)行應(yīng)答,否則會來一個(gè)com.rabbitmq.client.ShutdownSignalException。 consumer死了有其他人處理后事,那整個(gè)server死掉了怎么辦?為了讓消息不丟失,我們需要將隊(duì)列和消息標(biāo)記為durable。 好了,這樣即使重啟RabbitMQ服務(wù)也不會丟失隊(duì)列。 但這并不保證消息不會丟失,為了保證這一點(diǎn),我們在provider發(fā)布消息時(shí)加了essageProperties.PERSISTENT_TEXT_PLAIN: 雖然這種方式并不完美,我們還需要做其他的一些工作,但暫時(shí)先到這里。 最后一個(gè)問題是,如何做到給consumer公平分配任務(wù)。如果沒有做這個(gè)處理,會出現(xiàn)這樣一種情況。舉個(gè)例子:provider發(fā)送了20個(gè)消息,隨即啟動的consumer_1把這20個(gè)消息全都獨(dú)占了。在consumer_1工作期間又有consumer_2被啟動,但此時(shí)consumer_2沒有任何任務(wù)。此時(shí)provider又發(fā)送了20個(gè)消息,這時(shí)consumer_2會得到10個(gè)任務(wù)。 我們可以使用channel.basicQos(int prefetchCount)方法限制預(yù)獲取的數(shù)量,比如prefetchCount==1就是返回應(yīng)答后可以再獲得1個(gè)消息。 好了,consumer代碼如下:public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = "Hello..."; for (int i = 0; i < 20; i++) { channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.concat(i+1+"").getBytes()); System.out.println(" [x] Sent '" + message + (i + 1) + "' " + (i + 1) + " times"); } channel.close(); connection.close();}
boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
boolean durable = true;channel.queueDeclare("hello", durable, false, false, null);channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.concat(i+1+"").getBytes());public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
新聞熱點(diǎn)
疑難解答
圖片精選