本文介紹了Node.Js中實(shí)現(xiàn)端口重用原理詳解,分享給大家,具體如下:
起源,從官方實(shí)例中看多進(jìn)程共用端口
const cluster = require('cluster');const http = require('http');const numCPUs = require('os').cpus().length;if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`worker ${worker.process.pid} died`); });} else { http.createServer((req, res) => { res.writeHead(200); res.end('hello world/n'); }).listen(8000); console.log(`Worker ${process.pid} started`);}執(zhí)行結(jié)果:
$ node server.js
Master 3596 is running
Worker 4324 started
Worker 4520 started
Worker 6056 started
Worker 5644 started
了解http.js模塊:
我們都只有要?jiǎng)?chuàng)建一個(gè)http服務(wù),必須引用http模塊,http模塊最終會(huì)調(diào)用net.js實(shí)現(xiàn)網(wǎng)絡(luò)服務(wù)
// lib/net.js'use strict'; ...Server.prototype.listen = function(...args) { ... if (options instanceof TCP) { this._handle = options; this[async_id_symbol] = this._handle.getAsyncId(); listenInCluster(this, null, -1, -1, backlogFromArgs); // 注意這個(gè)方法調(diào)用了cluster模式下的處理辦法 return this; } ...};function listenInCluster(server, address, port, addressType,backlog, fd, exclusive) {// 如果是master 進(jìn)程或者沒有開啟cluster模式直接啟動(dòng)listenif (cluster.isMaster || exclusive) { //_listen2,細(xì)心的人一定會(huì)發(fā)現(xiàn)為什么是listen2而不直接使用listen // _listen2 包裹了listen方法,如果是Worker進(jìn)程,會(huì)調(diào)用被hack后的listen方法,從而避免出錯(cuò)端口被占用的錯(cuò)誤 server._listen2(address, port, addressType, backlog, fd); return; } const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags: 0 };// 是fork 出來的進(jìn)程,獲取master上的handel,并且監(jiān)聽,// 現(xiàn)在是不是很好奇_getServer方法做了什么 cluster._getServer(server, serverQuery, listenOnMasterHandle);} ...答案很快就可以通過cluster._getServer 這個(gè)函數(shù)找到
// lib/internal/cluster/child.jscluster._getServer = function(obj, options, cb) { // ... const message = util._extend({ act: 'queryServer', // 關(guān)鍵點(diǎn):構(gòu)建一個(gè)queryServer的消息 index: indexes[indexesKey], data: null }, options); message.address = address;// 發(fā)送queryServer消息給master進(jìn)程,master 在收到這個(gè)消息后,會(huì)創(chuàng)建一個(gè)開始一個(gè)server,并且listen send(message, (reply, handle) => { rr(reply, indexesKey, cb); // Round-robin. }); obj.once('listening', () => { cluster.worker.state = 'listening'; const address = obj.address(); message.act = 'listening'; message.port = address && address.port || options.port; send(message); });}; //... // Round-robin. Master distributes handles across workers.function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); var key = message.key; // 這里hack 了listen方法 // 子進(jìn)程調(diào)用的listen方法,就是這個(gè),直接返回0,所以不會(huì)報(bào)端口被占用的錯(cuò)誤 function listen(backlog) { return 0; } // ... const handle = { close, listen, ref: noop, unref: noop }; handles[key] = handle; // 這個(gè)cb 函數(shù)是net.js 中的listenOnMasterHandle 方法 cb(0, handle);}// lib/net.js/*function listenOnMasterHandle(err, handle) { err = checkBindError(err, port, handle); server._handle = handle; // _listen2 函數(shù)中,調(diào)用的handle.listen方法,也就是上面被hack的listen server._listen2(address, port, addressType, backlog, fd); }*/master進(jìn)程收到queryServer消息后進(jìn)行啟動(dòng)服務(wù)
// lib/internal/cluster/master.jsfunction queryServer(worker, message) { const args = [ message.address, message.port, message.addressType, message.fd, message.index ]; const key = args.join(':'); var handle = handles[key]; // 如果地址沒被監(jiān)聽過,通過RoundRobinHandle監(jiān)聽開啟服務(wù) if (handle === undefined) { var constructor = RoundRobinHandle; if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } handles[key] = handle = new constructor(key, address, message.port, message.addressType, message.fd, message.flags); } // 如果地址已經(jīng)被監(jiān)聽,直接綁定handel到已經(jīng)監(jiān)聽到服務(wù)上,去消費(fèi)請(qǐng)求 // Set custom server data handle.add(worker, (errno, reply, handle) => { reply = util._extend({ errno: errno, key: key, ack: message.seq, data: handles[key].data }, reply); if (errno) delete handles[key]; // Gives other workers a chance to retry. send(worker, reply, handle); });}看到這一步,已經(jīng)很明顯,我們知道了多進(jìn)行端口共享的實(shí)現(xiàn)原理
那現(xiàn)在問題來了,既然Worker進(jìn)程是如何獲取到master進(jìn)程監(jiān)聽服務(wù)接收到的connect呢?
// lib/internal/cluster/round_robin_handle.jsfunction RoundRobinHandle(key, address, port, addressType, fd) { this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd }); else if (port >= 0) this.server.listen(port, address); else this.server.listen(address); // UNIX socket path. this.server.once('listening', () => { this.handle = this.server._handle; // 監(jiān)聽onconnection方法 this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; });}RoundRobinHandle.prototype.add = function (worker, send) { // ...};RoundRobinHandle.prototype.remove = function (worker) { // ...};RoundRobinHandle.prototype.distribute = function (err, handle) { // 負(fù)載均衡地挑選出一個(gè)worker this.handles.push(handle); const worker = this.free.shift(); if (worker) this.handoff(worker);};RoundRobinHandle.prototype.handoff = function (worker) { const handle = this.handles.shift(); const message = { act: 'newconn', key: this.key }; // 向work進(jìn)程其發(fā)送newconn內(nèi)部消息和客戶端的句柄handle sendHelper(worker.process, message, handle, (reply) => { // ... this.handoff(worker); });};下面讓我們看看Worker進(jìn)程接收到newconn消息后進(jìn)行了哪些操作
// lib/child.jsfunction onmessage(message, handle) { if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') _disconnect.call(worker, true); }// Round-robin connection.// 接收連接,并且處理function onconnection(message, handle) { const key = message.key; const server = handles[key]; const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle);}總結(jié)
分享出于共享學(xué)習(xí)的目的,如有錯(cuò)誤,歡迎大家留言指導(dǎo),不喜勿噴。也希望大家多多支持武林網(wǎng)。
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注