国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

基于Netty與RabbitMQ的消息服務

2019-11-14 22:45:34
字體:
來源:轉載
供稿:網友
基于Netty與RabbitMQ的消息服務

Netty作為一個高性能的異步網絡開發框架,可以作為各種服務的開發框架。

前段時間的一個項目涉及到硬件設備實時數據的采集,采用Netty作為采集服務的實現框架,同時使用RabbitMQ作為采集服務和各個其他模塊的通信消息隊列,整個服務框架圖如下:

將業務代碼和實際協議解析部分的代碼抽離,得到以上一個簡單的設計圖,代碼開源在GitHub上,簡單介紹下NettyMQServer采集服務涉及到的幾個關鍵技術點:

1、設備TCP消息解析:

NettyMQServer和采集設備Device之間采用TCP通信,TCP消息的解析可以使用LengthFieldBasedFrameDecoder(消息頭和消息體),可以有效的解決TCP消息“粘包”問題。

消息包解析圖如下:

 lengthFieldOffset   =  0 lengthFieldLength   =  2 lengthAdjustment    = -2 (= the length of the Length field) initialBytesToStrip =  0 BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes) +--------+----------------+      +--------+----------------+ | Length | Actual Content |----->| Length | Actual Content | | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" | +--------+----------------+      +--------+----------------+

代碼中消息長度的存儲采用了4個字節,采用LengthFieldBasedFrameDecoder(65535,0,4,-4,0)解碼,Netty會從接收的數據中頭4個字節中得到消息的長度,進而得到一個TCP消息包。

2、給設備發消息:

首先在連接創建時,要保留TCP的連接:

static final ChannelGroup channels = new DefaultChannelGroup(            GlobalEventExecutor.INSTANCE);    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        // A closed channel will be removed from ChannelGroup automatically        channels.add(ctx.channel());    }

在每次一個Channel Active(連接創建)的時候用ChannelGroup保存這個Channel連接,當需要給某個設備發消息的時候,可以遍歷該ChannelGroup,找到對應的Channel,給該Channel發送消息:

for (io.netty.channel.Channel c : EchoServerHandler.channels) {                            ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());                            c.writeAndFlush(msg);                        }

這里是給所有的連接的設備都發。當連接斷開的時候,ChannelGroup會自動remove掉這個連接,不需要我們手動管理。

3、心跳檢測

當某個設備Device由于斷電或是其他原因導致設備不正常無法采集數據,Netty服務端需要知道該設備是否在正常工作,可以使用Netty的IdleStateHandler,示例代碼如下:

// 3 minutes for read idlech.pipeline().addLast(new IdleStateHandler(3*60,0,0));ch.pipeline().addLast(new HeartBeatHandler());/** * Handler implementation for heart beating. */public class HeartBeatHandler extends ChannelInboundHandlerAdapter{    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)            throws Exception {        if (evt instanceof IdleStateEvent) {            IdleStateEvent event = (IdleStateEvent) evt;            if (event.state() == IdleState.READER_IDLE) {                // Read timeout                System.out.

上面設置3分鐘沒有讀到數據,則觸發一個READER_IDLE事件。

4、RabbitMQ消息接收與發送

NettyMQServer消息發送采用了Spring AMQP,只需要在配置文件中簡單配置一下,就可以方便使用。

NettyMQServer消息接收同樣可以采用Spring AMQP,但由于對Spring相關的配置不是很熟悉,為了更靈活的使用MQ,這里使用了RabbitMQ Client java API來實現:

                    Connection connection = connnectionFactory.newConnection();                    Channel channel = connection.createChannel();                    channel.exchangeDeclare(exchangeName, "direct", true, false, null);                    channel.queueDeclare(queueName, true, false, false, null);                    channel.queueBind(queueName, exchangeName, routeKey);                    // process the message one by one                    channel.basicQos(1);                    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);                    // auto-ack is false                    channel.basicConsume(queueName, false, queueingConsumer);                    while (true) {                        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();                        String message = new String(delivery.getBody());                        log.debug("Mq Receiver get message");                        // Send the message to all connected clients                        // If you want to send to a specified client, just add                        // your own logic and ack manually                        // Be aware that ChannelGroup is thread safe                        log.info(String.format("Conneted client number: %d",EchoServerHandler.channels.size()));                        for (io.netty.channel.Channel c : EchoServerHandler.channels) {                            ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());                            c.writeAndFlush(msg);                        }                        // manually ack to MQ server the message is consumed.                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }

以上代碼從一個Queue中讀取數據,為了有效處理數據,防止異常數據丟失,使用了手動Ack。

RabbitMQ的使用方式:http://m.survivalescaperooms.com/luxiaoxun/p/3918054.html

代碼托管在GitHub上:https://github.com/luxiaoxun/NettyMqServer

參考:

http://netty.io/

http://netty.io/4.0/api/io/netty/handler/codec/LengthFieldBasedFrameDecoder.html

http://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 富川| 吉木萨尔县| 成武县| 伊金霍洛旗| 郑州市| 永靖县| 旌德县| 福鼎市| 泰宁县| 苗栗县| 遵义市| 山东省| 桓仁| 孝昌县| 台中县| 景德镇市| 华池县| 泰兴市| 洪湖市| 南充市| 朔州市| 淅川县| 灵寿县| 奉节县| 毕节市| 峨眉山市| 朝阳区| 泗洪县| 永年县| 布尔津县| 额济纳旗| 甘孜县| 嘉祥县| 浮梁县| 梓潼县| 白水县| 孟连| 湖南省| 莒南县| 太仆寺旗| 南安市|