国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

RabbitMQ

2019-11-14 22:00:22
字體:
來源:轉載
供稿:網友
RabbitMQ - 遠程過程調用

試著用RabbitMQ進行RPC。

其實用RabbitMQ搞RPC也沒什么特別的。只是我們需要在請求中再加入一個callback queue。比如這樣:

callbackQueueName = channel.queueDeclare().getQueue(); BasicPRoperties props = new BasicProperties                            .Builder()                            .replyTo(callbackQueueName)                            .build();channel.basicPublish("", "rpc_queue", props, message.getBytes());

剩下的工作就是等待對方處理完成再從callback隊列中讀取響應消息。

上面用到了BasicProperties。(注意:是com.rabbitmq.client.AMQP.BasicProperties 不是 com.rabbitmq.client.BasicProperties)關于Message properties,AMQP協議為消息預定義了14種屬性。

        private String contentType;        private String contentEncoding;        private Map<String,Object> headers;        private Integer deliveryMode;        private Integer priority;        private String correlationId;        private String replyTo;        private String expiration;        private String messageId;        private Date timestamp;        private String type;        private String userId;        private String appId;        private String clusterId;

通常我們只需要使用其中一小部分:·deliveryMode: 將消息設置為持久或者臨時,2為持久,其余為臨時。·contentType: 指定mime-type,比如要使用JSON就是application/json·replyTo: 指定callback queue的名字·correlationId: 用來關聯RPC請求和響應的標識。上面那段代碼中就是用到了correlationId。

另外需要說明這個correlationId。其實在上面的代碼中我們為每一個RPC請求都創建了一個回調隊列。但這樣明顯不效率,我們可以為每一個客戶端只創建一個回調隊列。

但這樣我們又需要考慮另一個問題:<當我們將收到的消息放到隊列時,如何確定該消息是屬于哪個請求?>

這時我們可以使用correlationId解決這個問題。我們可以用它來為每一個請求加上標識,獲取信息時對比這個標識,以對應請求和響應。如果我們收到了無法識別的correlationId,即該響應不與任何請求匹配,那么這個消息將會廢除。

好了,代碼比較簡單。

class RPCServer{    private static final String RPC_QUEUE_NAME = "rpc_queue";    public static void main(String[] args) throws Exception {         ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");         Connection connection = factory.newConnection();        Channel channel = connection.createChannel();         channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);         channel.basicQos(1);         QueueingConsumer consumer = new QueueingConsumer(channel);        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);         System.out.println(" [x] Awaiting RPC requests");         while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();             BasicProperties props = delivery.getProperties();            BasicProperties replyProps = new BasicProperties                    .Builder()                    .correlationId(props.getCorrelationId())                    .build();             String message = new String(delivery.getBody());            int n = Integer.parseInt(message);             System.out.println(" [.] fib(" + message + ")");            String response = "" + fib(n);             channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }     }     private static int fib(int n) throws Exception {        if (n == 0) return 0;        if (n == 1) return 1;        return fib(n-1) + fib(n-2);    }}

由于是共享隊列,這里我們就不用exchange和routing了。另外,有時我們可能需要運行多個服務,為了讓多個服務端負載均衡,我們可以使用prefetchCount。這個屬性在之前任務隊列的例子里也用過,也就是

workerChannel.basicQos(1);

即讓多個worker一次獲取一個任務。用basicConsume方法進入隊列后循環等待請求,發現有請求到達時根據隊列和CorrelationId對相應請求作出響應。

另外需要注意的一點,server中basicConsume的第二個參數是false。其意義為是否自動作出回應,即:true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements于是循環時需要顯示調用basicAck進行回應。

class RPCClient{     private Connection connection;    private Channel channel;    private String requestQueueName = "rpc_queue";    private String replyQueueName;    private QueueingConsumer consumer;     public RPCClient() throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        connection = factory.newConnection();        channel = connection.createChannel();         replyQueueName = channel.queueDeclare().getQueue();        consumer = new QueueingConsumer(channel);        channel.basicConsume(replyQueueName, true, consumer);    }     public String call(String message) throws Exception {        String response = null;        String corrId = java.util.UUID.randomUUID().toString();         BasicProperties props = new BasicProperties                .Builder()                .correlationId(corrId)                .replyTo(replyQueueName)                .build();         channel.basicPublish("", requestQueueName, props, message.getBytes());         while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            if (delivery.getProperties().getCorrelationId().equals(corrId)) {                response = new String(delivery.getBody());                break;            }        }         return response;    }     public void close() throws Exception {        connection.close();    }}

callback隊列只是一個匿名隊列,但切記需要將其設置到BasicProperties中。corrId的生成方法有很多種,在這里使用UUID。call方法中通過調用basicPublish進行RPC請求,參數中帶著BasicProperties。


上一篇:jdk動態代理源碼學習

下一篇:RabbitMQ

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 梨树县| 鹿邑县| 茌平县| 鹤岗市| 确山县| 广宁县| 高安市| 高唐县| 故城县| 绩溪县| 绥阳县| 马公市| 神池县| 乌什县| 白朗县| 林西县| 扶绥县| 长治县| 崇州市| 平阴县| 乌拉特后旗| 上蔡县| 永康市| 托克托县| 威宁| 三门县| 龙里县| 灵丘县| 昭苏县| 五大连池市| 满城县| 江油市| 辉县市| 大埔县| 巢湖市| 涞源县| 平远县| 津市市| 长武县| 山东省| 巴南区|