本文介紹了Node.Js中實現端口重用原理詳解,分享給大家,具體如下:
起源,從官方實例中看多進程共用端口
const cluster = require('cluster');const http = require('http');const numCPUs = require('os').cpus().length;if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); for (let i = 0; i < numCPUs; i++) {  cluster.fork(); } cluster.on('exit', (worker, code, signal) => {  console.log(`worker ${worker.process.pid} died`); });} else { http.createServer((req, res) => {  res.writeHead(200);  res.end('hello world/n'); }).listen(8000); console.log(`Worker ${process.pid} started`);}執行結果:
$ node server.js
Master 3596 is running
Worker 4324 started
Worker 4520 started
Worker 6056 started
Worker 5644 started
了解http.js模塊:
我們都只有要創建一個http服務,必須引用http模塊,http模塊最終會調用net.js實現網絡服務
// lib/net.js'use strict'; ...Server.prototype.listen = function(...args) {  ... if (options instanceof TCP) {   this._handle = options;   this[async_id_symbol] = this._handle.getAsyncId();   listenInCluster(this, null, -1, -1, backlogFromArgs); // 注意這個方法調用了cluster模式下的處理辦法   return this;  }  ...};function listenInCluster(server, address, port, addressType,backlog, fd, exclusive) {// 如果是master 進程或者沒有開啟cluster模式直接啟動listenif (cluster.isMaster || exclusive) {  //_listen2,細心的人一定會發現為什么是listen2而不直接使用listen // _listen2 包裹了listen方法,如果是Worker進程,會調用被hack后的listen方法,從而避免出錯端口被占用的錯誤  server._listen2(address, port, addressType, backlog, fd);  return; } const serverQuery = {  address: address,  port: port,  addressType: addressType,  fd: fd,  flags: 0 };// 是fork 出來的進程,獲取master上的handel,并且監聽,// 現在是不是很好奇_getServer方法做了什么 cluster._getServer(server, serverQuery, listenOnMasterHandle);} ...答案很快就可以通過cluster._getServer 這個函數找到
// lib/internal/cluster/child.jscluster._getServer = function(obj, options, cb) { // ... const message = util._extend({  act: 'queryServer',  // 關鍵點:構建一個queryServer的消息  index: indexes[indexesKey],  data: null }, options); message.address = address;// 發送queryServer消息給master進程,master 在收到這個消息后,會創建一個開始一個server,并且listen send(message, (reply, handle) => {   rr(reply, indexesKey, cb);       // Round-robin. }); obj.once('listening', () => {  cluster.worker.state = 'listening';  const address = obj.address();  message.act = 'listening';  message.port = address && address.port || options.port;  send(message); });}; //... // Round-robin. Master distributes handles across workers.function rr(message, indexesKey, cb) {  if (message.errno) return cb(message.errno, null);  var key = message.key;  // 這里hack 了listen方法  // 子進程調用的listen方法,就是這個,直接返回0,所以不會報端口被占用的錯誤  function listen(backlog) {    return 0;  }  // ...  const handle = { close, listen, ref: noop, unref: noop };  handles[key] = handle;  // 這個cb 函數是net.js 中的listenOnMasterHandle 方法  cb(0, handle);}// lib/net.js/*function listenOnMasterHandle(err, handle) {  err = checkBindError(err, port, handle);  server._handle = handle;  // _listen2 函數中,調用的handle.listen方法,也就是上面被hack的listen  server._listen2(address, port, addressType, backlog, fd); }*/master進程收到queryServer消息后進行啟動服務
// lib/internal/cluster/master.jsfunction queryServer(worker, message) {  const args = [    message.address,    message.port,    message.addressType,    message.fd,    message.index  ];  const key = args.join(':');  var handle = handles[key];  // 如果地址沒被監聽過,通過RoundRobinHandle監聽開啟服務  if (handle === undefined) {    var constructor = RoundRobinHandle;    if (schedulingPolicy !== SCHED_RR ||      message.addressType === 'udp4' ||      message.addressType === 'udp6') {      constructor = SharedHandle;    }    handles[key] = handle = new constructor(key,      address,      message.port,      message.addressType,      message.fd,      message.flags);  }  // 如果地址已經被監聽,直接綁定handel到已經監聽到服務上,去消費請求  // Set custom server data  handle.add(worker, (errno, reply, handle) => {    reply = util._extend({      errno: errno,      key: key,      ack: message.seq,      data: handles[key].data    }, reply);    if (errno)      delete handles[key]; // Gives other workers a chance to retry.    send(worker, reply, handle);  });}看到這一步,已經很明顯,我們知道了多進行端口共享的實現原理
那現在問題來了,既然Worker進程是如何獲取到master進程監聽服務接收到的connect呢?
// lib/internal/cluster/round_robin_handle.jsfunction RoundRobinHandle(key, address, port, addressType, fd) {  this.server = net.createServer(assert.fail);  if (fd >= 0)    this.server.listen({ fd });  else if (port >= 0)    this.server.listen(port, address);  else    this.server.listen(address); // UNIX socket path.  this.server.once('listening', () => {    this.handle = this.server._handle;    // 監聽onconnection方法    this.handle.onconnection = (err, handle) => this.distribute(err, handle);    this.server._handle = null;    this.server = null;  });}RoundRobinHandle.prototype.add = function (worker, send) {  // ...};RoundRobinHandle.prototype.remove = function (worker) {  // ...};RoundRobinHandle.prototype.distribute = function (err, handle) {  // 負載均衡地挑選出一個worker  this.handles.push(handle);  const worker = this.free.shift();  if (worker) this.handoff(worker);};RoundRobinHandle.prototype.handoff = function (worker) {  const handle = this.handles.shift();  const message = { act: 'newconn', key: this.key };  // 向work進程其發送newconn內部消息和客戶端的句柄handle  sendHelper(worker.process, message, handle, (reply) => {  // ...    this.handoff(worker);  });};下面讓我們看看Worker進程接收到newconn消息后進行了哪些操作
// lib/child.jsfunction onmessage(message, handle) {  if (message.act === 'newconn')   onconnection(message, handle);  else if (message.act === 'disconnect')   _disconnect.call(worker, true); }// Round-robin connection.// 接收連接,并且處理function onconnection(message, handle) { const key = message.key; const server = handles[key]; const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle);}總結
分享出于共享學習的目的,如有錯誤,歡迎大家留言指導,不喜勿噴。也希望大家多多支持VeVb武林網。
新聞熱點
疑難解答