国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > JavaScript > 正文

node.js中TCP Socket多進程間的消息推送示例詳解

2019-11-19 13:31:23
字體:
供稿:網(wǎng)友

前言

前段時間接到了一個支付中轉(zhuǎn)服務(wù)的需求,即支付數(shù)據(jù)通過http接口傳到中轉(zhuǎn)服務(wù)器,中轉(zhuǎn)服務(wù)器將支付數(shù)據(jù)發(fā)送到異構(gòu)后臺(Lua)的指定tcp socket。

一開始評估的時候感覺蠻簡單的,就是http server和tcp server間的通信,不是一個Event實例就能解決的狀態(tài)管理問題嗎?注冊一個事件A用于消息傳遞,在socket連接時注冊唯一的ID,然后在http接收到數(shù)據(jù)時,emit事件A;在監(jiān)聽到事件A時,在tcp server中尋找指定ID對應(yīng)的socket處理該數(shù)據(jù)即可。

盡管node.js在高并發(fā)方面有不錯的性能,但是單個tcp server實例的承載能力有限,為避免服務(wù)器過載,node.js 單進程的內(nèi)存有上限(默認2G),能容納的長連接客戶端數(shù)不多。但隨著業(yè)務(wù)的擴大,我們需要考慮多機集群部署,客戶端可以連接到任一節(jié)點,并發(fā)送消息。如何做到多節(jié)點的同時推送,我們需要建立一套多節(jié)點之間的消息分發(fā)/訂閱架構(gòu)。常用的第三方消息管理庫有 RabbitMQ和Redis等。在這里,我用的是Redis的訂閱發(fā)布服務(wù)。

redis.io有一個比較成熟的redis消息中轉(zhuǎn)庫socket.io-redis (本地下載)。但我們項目中異構(gòu)后臺用到的并非websocket,而是原生的TCP原生的Socket。用原生redis的sub/pubs實現(xiàn)并不難,就手寫了。

redis在該項目中主要起到一個消息分發(fā)中心(publish/subscribe)的作用。當http請求的支付數(shù)據(jù)發(fā)送過來時,則通過redis的publish功能往所有的channel推送消息,這樣所有訂閱該channel的socket server就能收到回調(diào),然后推送到指定客戶端。在應(yīng)用層看跟Event事件消息的處理差不多。

const redis = require("redis"), redisClient = redis.createClient, REDIS_CFG = {  host: '127.0.0.1',  port: 6379 }, sub = redisClient(REDIS_CFG), pub = redisClient(REDIS_CFG), PAY_MQ_CHANNEL = 'pay_mq_channel';// 監(jiān)聽頻道的消息回調(diào)sub.on('message', function(channel, message) { switch (channle){  case PAY_MQ_CHANNEL:   console.log('notification received:', message);   // 廣播消息到指定socket   break; }});// 訂閱頻道sub.subscribe(PAY_MQ_CHANNEL);// 當接收到支付數(shù)據(jù)時,推送頻道消息pub.publish(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});

由于redis的sub/pub的channel訂閱數(shù)有上限,所以建議一類消息使用一個channel,一個channel下使用map、set或數(shù)組來存儲訂閱時的回調(diào)函數(shù),在接收到訂閱消息時遍歷執(zhí)行回調(diào)函數(shù)。

下面是我封裝好的Redis組件(RedisMQProxy.js):

/* * redis 訂閱/發(fā)布 */const _ = require('lodash'), redis = require("redis"), REDIS_CFG = {  host: '127.0.0.1',  port: 6379 }, sub = redisClient(REDIS_CFG), pub = redisClient(REDIS_CFG);let SubListenerFuns = {}; // channel的回調(diào)函數(shù)列表let RedisMQProxy = { // 訂閱channel on(channel, cb, errorCb, once = false) {  sub.subscribe(channel); // 訂閱channel消息  // 將回調(diào)函數(shù)存放數(shù)組中  SubListenerFuns[channel] = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];  SubListenerFuns[channel].push({   once, cb, errorCb  }); }, // 監(jiān)聽一次性的channel回調(diào)函數(shù) once(channel, cb, errorCb) {  this.on(channel, cb, errorCb, true); }, // 發(fā)送channel消息 emit(channel, message) {  if(!_.isString(message)) {   message = JSON.stringify(message);  }  pub.publish(channel, message); }, // 移除channel上的監(jiān)聽函數(shù) removeListener(channel, func) {  let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];  for(let i = 0, l = channelHandlers.length; i < l; i++) {   let handler = channelHandlers[i] || {};   let cb = handler.cb;   if(func && func == cb) {    channelHandlers.splice(i, 1);    return false;   }  } }};RedisMQProxy.SubListeners = SubListenerFuns;pub.on('error', onError);sub.on('error', onError);// 監(jiān)聽redis的訂閱消息sub.on("message", function(channel, message) { // 遍歷執(zhí)行channel的回調(diào)函數(shù) try {  message = JSON.parse(message); } catch(e) {} broadcastToChannel(channel, message);});// 廣播消息到指定頻道function broadcastToChannel(channel, message, isError) { let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel]; for(let i = 0, l = channelHandlers.length; i < l; i++) {  let handler = channelHandlers[i] || {};  let isOnce = handler.once || false;  let func = handler.cb;  let errorFunc = handler.errorCb;  _.isFunction(func) && func(message);  isError && _.isFunction(errorFunc) && errorFunc(message);  isOnce && channelHandlers.splice(i, 1); // 移除一次性監(jiān)聽的函數(shù) }}function broadcastToAllChannels(message, isError) { for(let channel in SubListenerFuns) {  broadcastToChannel(channel, message, isError); }}function onError(err) { err = err || {}; err.msg = err.msg || 'redis sub/pub fail'; // 通知所有channel執(zhí)行錯誤回調(diào)函數(shù) broadcastToAllChannels(err, true);}module.exports = RedisMQProxy;

在使用時就可以比較方便地調(diào)用了:

const RedisMQProxy = require('./RedisMQProxy'), PAY_MQ_CHANNEL = 'pay_mq_channel';// 訂閱channelRedisMQ.on(PAY_MQ_CHANNEL, function(message) { console.log('notification received:', message); // 廣播消息到指定socket // ...});// 訂閱一次性的channelRedisMQ.once(PAY_MQ_CHANNEL, function(message) { // ...});// 當接收到支付數(shù)據(jù)時,推送頻道消息RedisMQ.emit(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});

目前該項目已經(jīng)健康運行了一個多月。由于socket server的多進程間消息推送依賴于redis的消息中轉(zhuǎn),而Redis使用的是單進程,未能充分利用CPU。當業(yè)務(wù)膨脹的時候,redis就要考慮分布集群了。

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對武林網(wǎng)的支持。

發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 张家口市| 和平县| 石泉县| 平和县| 康定县| 吉安县| 攀枝花市| 宣汉县| 鄢陵县| 临西县| 清镇市| 龙川县| 平利县| 平乡县| 礼泉县| 册亨县| 乐昌市| 房山区| 汉阴县| 武宁县| 孟连| 泾川县| 韶山市| 长治县| 开远市| 连城县| 武定县| 肥城市| 沙湾县| 伊金霍洛旗| 陇川县| 策勒县| 吴堡县| 阳东县| 安丘市| 巧家县| 老河口市| 广德县| 深水埗区| 抚宁县| 铜山县|