試著用RabbitMQ進行RPC。
其實用RabbitMQ搞RPC也沒什么特別的。只是我們需要在請求中再加入一個callback queue。比如這樣:
callbackQueueName = channel.queueDeclare().getQueue(); BasicPRoperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build();channel.basicPublish("", "rpc_queue", props, message.getBytes());
剩下的工作就是等待對方處理完成再從callback隊列中讀取響應消息。
上面用到了BasicProperties。(注意:是com.rabbitmq.client.AMQP.BasicProperties 不是 com.rabbitmq.client.BasicProperties)關于Message properties,AMQP協議為消息預定義了14種屬性。
private String contentType; private String contentEncoding; private Map<String,Object> headers; private Integer deliveryMode; private Integer priority; private String correlationId; private String replyTo; private String expiration; private String messageId; private Date timestamp; private String type; private String userId; private String appId; private String clusterId;
通常我們只需要使用其中一小部分:·deliveryMode: 將消息設置為持久或者臨時,2為持久,其余為臨時。·contentType: 指定mime-type,比如要使用JSON就是application/json·replyTo: 指定callback queue的名字·correlationId: 用來關聯RPC請求和響應的標識。上面那段代碼中就是用到了correlationId。
另外需要說明這個correlationId。其實在上面的代碼中我們為每一個RPC請求都創建了一個回調隊列。但這樣明顯不效率,我們可以為每一個客戶端只創建一個回調隊列。
但這樣我們又需要考慮另一個問題:<當我們將收到的消息放到隊列時,如何確定該消息是屬于哪個請求?>
這時我們可以使用correlationId解決這個問題。我們可以用它來為每一個請求加上標識,獲取信息時對比這個標識,以對應請求和響應。如果我們收到了無法識別的correlationId,即該響應不與任何請求匹配,那么這個消息將會廢除。

好了,代碼比較簡單。
class RPCServer{ private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties .Builder() .correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody()); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static int fib(int n) throws Exception { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }}由于是共享隊列,這里我們就不用exchange和routing了。另外,有時我們可能需要運行多個服務,為了讓多個服務端負載均衡,我們可以使用prefetchCount。這個屬性在之前任務隊列的例子里也用過,也就是
workerChannel.basicQos(1);
即讓多個worker一次獲取一個任務。用basicConsume方法進入隊列后循環等待請求,發現有請求到達時根據隊列和CorrelationId對相應請求作出響應。
另外需要注意的一點,server中basicConsume的第二個參數是false。其意義為是否自動作出回應,即:true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements于是循環時需要顯示調用basicAck進行回應。
class RPCClient{ private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = java.util.UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; } public void close() throws Exception { connection.close(); }}callback隊列只是一個匿名隊列,但切記需要將其設置到BasicProperties中。corrId的生成方法有很多種,在這里使用UUID。call方法中通過調用basicPublish進行RPC請求,參數中帶著BasicProperties。
新聞熱點
疑難解答