1. 程式人生 > >【nodejs原理&原始碼賞析(6)】深度剖析cluster模組原始碼與node.js多程序(下)

【nodejs原理&原始碼賞析(6)】深度剖析cluster模組原始碼與node.js多程序(下)

目錄

  • 一. 引言
  • 二.server.listen方法
  • 三.cluster._getServer( )方法
  • 四.跨程序通訊工具方法Utils
  • 五.act:queryServer訊息
  • 六.輪詢排程Round-Robin-Handle
  • 七. 圖解叢集建立過程的邏輯跳轉

示例程式碼託管在:http://www.github.com/dashnowords/blogs

部落格園地址:《大史住在大前端》原創博文目錄

華為雲社群地址:【你要的前端打怪升級指南】

閱讀本章需要先閱讀本系列前兩章內容預熱一下。

一. 引言

前兩篇博文中已經分別介紹了使用cluster模組建立叢集時主程序執行cluster.fork( )方法時的執行邏輯,以及net模組在不同場景下建立通訊的基本原理。本篇繼續分析cluster模組,從第一個子程序開始建立伺服器講起,cluster基本用法示例程式碼再來一遍:

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`主程序 ${process.pid} 正在執行`);

  // 衍生工作程序。
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`工作程序 ${worker.process.pid} 已退出`);
  });
} else {
  // 工作程序可以共享任何 TCP 連線。
  // 在本例子中,共享的是 HTTP 伺服器。
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('你好世界\n');
  }).listen(8000);

  console.log(`工作程序 ${process.pid} 已啟動`);
}

程式碼是足夠精簡的,實現過程也確實是很龐大的工程。每一個子程序中執行的邏輯都是http.createServer().listen(),我們來看看它是如何一步一步運作而最終建立通訊機制的,你會發現它和上一節中的簡易模型非常相似。

二.server.listen方法

http模組的原始碼中很容易找到http.createServer( )方法的邏輯就是透傳引數生成了一個net.Server例項,這個例項在上一節中就已經介紹過,實際上就只是生成了一個server的例項,所以這裡跳轉到net.Server.prototype.listen()net.js檔案1306-1404行),基本邏輯如下:

Server.prototype.listen = function(...args){
    const normalized = normalizeArgs(args);
    var options = normalized[0];
    /*..獲取監聽引數中的控制代碼物件..*/
     options = options._handle || options.handle || options;
    
    //如果options上有控制代碼,控制代碼是一個TCP例項
    if(options instanceof TCP){
        //......
        listenInCluster(......);
    }
                        
    //如果配置引數中有fd(file descriptor)
    if(typeof options.fd === 'number' && options.fd >=0){
            //......
        listenInCluster(......);
    }
                        
    //如果引數中有port埠號
    if(typeof options.port === 'number' || typeof options.port === 'string'){
         //.....
         listenInCluster(......);
    }
                         
    //如果引數中有port埠號 或 字元型的pipe名稱
    if(typeof options.port === 'number' || typeof options.port === 'string'){
         //.....
         listenInCluster(......);
    }
}

這裡不難看出它的邏輯就和net模組官方文件中描述的server.listen( )的幾種場景對應,可以監聽帶有非空handle屬性的控制代碼物件,數字型埠號,字串型命名管道地址,或者直接傳入配置引數合集options,然後分別根據幾種不同的情況來呼叫listenInCluster方法(叢集功能的邏輯主線是數字型port,假設傳入了12315)。

listenInCluster方法定義如下:

大致可以看出,如果是主程序,就直接呼叫server._listen2()方法然後return了,否則(也就是在工作程序中的邏輯,敲黑板!!!這裡是重點了),構造一個serverQuery的引數集,可以看到裡面記錄了以各種不同姿勢呼叫這個方法時傳入的引數,所以有的引數為null也很正常,然後呼叫了cluster._getServer( )方法,這就是工作程序在引用cluster模組時引入的child.js中定義並掛載在cluster上的方法,最後一個引數listenOnMasterHandle是一個回撥函式,也是一個錯誤前置風格的函式,可以看到,它接收了一個控制代碼物件,並把這個控制代碼物件掛載在了子程序這個server例項的_handle屬性上,接著也呼叫了server._listen2( )方法,可以看到兩種情況下呼叫這個方法時傳入的引數是一樣的。接著來到server._listen2( )方法,它綁定了setupListenHandle方法(別抓狂,這是net模組中相關邏輯的最後一步了),簡化程式碼如下:

function setupListenHandle(......){
  if (this._handle) {
    //工作程序在執行上一步邏輯時,在cluster._getServer()回撥函式中把一個handle傳遞給了server._handle
    debug('setupListenHandle: have a handle already');
  } else {
    //主程序會執行的邏輯
    debug('setupListenHandle: create a handle');
      //......
       rval = createServerHandle(address, port, addressType, fd, flags);
      //......
      this._handle = rval;
}
  //......
  this._handle.onconnection = onconnection;
  this._handle[owner_symbol] = this;
  //....
}

工作程序通過cluster._getServer( )方法拿到了一個handle,所以不會再生成,而主程序server.listen(port)執行時會走到else分支,然後生成一個新的綁定了埠號的特殊的socket控制代碼然後掛載到主程序server._handle上,這裡對控制代碼的connection事件回撥邏輯進行了修改,相關程式碼如下:

這裡需要注意的是,server._handleconnection事件和serverconnection事件是兩碼事,server._handle指向的是一個綁定了埠的特殊的socket控制代碼,當客戶端connect一個server時實際上底層是客戶端socket與服務端這個socket的對接,所以需要在server._handle這個的connection回撥函式中,將客戶端的socket控制代碼clientHandle重新包裝,然後再通過觸發serverconnection事件將其轉發給server例項。所以在使用server例項時可以直接新增connectionListener:

let server = net.createServer(socket=>{
    /*這個回撥函式就是server的connection事件回撥
    * 這裡接收到的socket就是server._handle的connection收到的客戶端控制代碼clientHandle封裝成的socket例項
    */
})

無論是主程序還是子程序都會觸發這個邏輯,只需要看成是一種功能性質的封裝即可,並不影響業務邏輯。

三.cluster._getServer( )方法

下面回到cluster模組繼續,_getServer( )方法只存在於子程序程式碼中,原始碼位於lib/internal/cluster/child.js,方法定義在54-106行,基本邏輯如下:

cluster._getServer = function(obj, options, cb){
 /* 這裡的obj就是子程序中執行上面listenInCluster方法中傳入的server,
  * options就是serverQuery,
  * cb就是最後要把主程序handle傳回去的回撥函式listenOnMasterHandler   
  */
  
  //先構建index然後進行了一通記錄,就是根據監聽的引數來構建一個識別這個server的索引
  //然後構建訊息
  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };
    
  //......
    
  /* 傳送act:queryServer訊息,並傳一個回撥函式,
   * 從形參命名就可以看出,這個回撥函式被呼叫時會被傳入一個控制代碼,
   * 最後根據不同的排程策略來執行不同的函式,這裡主要看Round-robin
  */ 
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });
    
  //......
}

rr方法將響應reply和前一個呼叫者傳入的回撥函式cb進行了透傳,rr的函式體就是實現listen方法偷樑換柱的地方了:

// Round-robin. Master distributes handles across workers.
function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  var key = message.key;

  function listen(backlog) {
    return 0;
  }

  function close() {
    if (key === undefined)
      return;

    send({ act: 'close', key });
    handles.delete(key);
    indexes.delete(indexesKey);
    key = undefined;
  }

  function getsockname(out) {
    if (key)
      Object.assign(out, message.sockname);

    return 0;
  }

  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  assert(handles.has(key) === false);
  handles.set(key, handle);
  cb(0, handle); //這裡的cb其實就是listenInCluster方法中定義的那個listenOnMasterHandler回撥
}

可以看到rr方法中構建了一個假的handle控制代碼,並呼叫cb將它傳了回去,然後執行邏輯回回到net模組,前文已經提這個handle在回撥函式中被掛載在了server._handle上,於是setupListenHandle( )的邏輯中也不會重新構建控制代碼。

重新梳理一下這部分的邏輯,就是子程序中呼叫listen方法時,會通過cluster._getServer( )拿到一個假控制代碼,然後執行一個空的listen方法,這樣就避免了埠的重複監聽。所以我們可以推測,cluster._getServer( )必然會觸發主程序啟動一個監聽埠的伺服器,並建立對子程序的排程,程序之間的IPC通訊可以直接通過process物件來完成,不需要再重新構建跨程序通訊管道。

四.跨程序通訊工具方法Utils

繼續進行後續內容前,先來看一個獨立的跨程序通訊工具,原始碼放在lib/internal/cluster/utils.js

它是cluster模組傳送跨程序訊息的內部代理,這個模組對外暴露了訊息傳送方法sendHelper和內部訊息監聽器的預處理方法internal,原始碼很短就不貼了。當子程序呼叫sendHelper傳送訊息時,utils內部會把這條訊息處理完成後需要執行的回撥函式先快取起來,然後給訊息新增一些包裝標記,然後再發出去;internal會對傳入的內部訊息監聽器進行代理,過濾掉非NODE_CLUSTER類別的訊息,如果訊息攜帶的message物件沒有ack屬性則最終會執行繫結監聽時傳入的回撥函式,否則會從快取中找出之前暫存的回撥函式來執行。

發個訊息為什麼要搞這麼複雜呢?這個ack屬性又是哪來的呢?其實這個utils模組主要是在跨程序的雙向訊息通訊時實現了方法複用,同一個message從工作程序發往主程序時和主程序發回給工作程序時是由同一個事件名internalMessage攜帶的,那如何來區分訊息傳送的方向呢,就是ack屬性,如果訊息帶有ack屬性,就表示它是由主程序發給子程序的,那麼就要呼叫子程序中的後續處理方法,這個方法其實就是子程序傳送訊息給主程序之前暫存在utils內部callbacks裡的方法,也就是child.jscluster._getServer()中呼叫send方法時傳入的回撥方法,也就是net模組中listenInCluster( )方法中的listenOnMasterHandle方法,這個方法漂洋過海透傳了N個函式,的確不容易看懂,“回撥地獄”也的確不是鬧著玩的。再看看沒有ack屬性的情況,沒有這個屬性時訊息是從子程序發給主程序的,自然要呼叫主程序的方法,從邏輯裡不難看出,這種情況下方法引用的就是internal方法執行時傳入的第二個引數(master.js原始碼213行執行的internal(worker, onmessage)onmessage這個函式),原始碼中就是利用高階函式這種分步執行的特點實現了引用。

五.act:queryServer訊息

故事再回到第三節工作程序中發出act:'queryServer的訊息後,來看主程序master.js中的程式碼,主程序中在呼叫cluster.fork( )時就綁定了對worker執行緒internalMessage的監聽,對於act:queryServer型別的叢集訊息,主程序已經定義了queryServer這個方法來處理。這段原始碼的主要邏輯如下:

1.根據重要引數組拼接出一個唯一的key
2.1.根據key查詢是否有已經存在的排程控制代碼round-robin-handle,如果有則直接進行後續邏輯
2.2.如果沒有已經存在的排程控制代碼,則選擇排程策略,例項化一個排程控制代碼,並把它新增進記錄裡
3.把訊息資料message.data掛載在排程控制代碼的handle.data欄位上
4.執行排程控制代碼的add方法,把子程序和一個回撥方法傳進例項,回撥方法被執行時會從排程控制代碼中取得資料,並組裝返回訊息(帶有ack屬性和其他資料的訊息)發給子程序,子程序收到這個訊息後執行的方法,就是前文分析過的返回假控制代碼給net模組中的`listenInCluster()`邏輯。

從開篇的多程序程式碼可以看到,每個子程序中執行的listen方法監聽的埠號都是一樣的,所以每個子程序傳送queryServer訊息給主程序並執行這段邏輯時,其實對應的key都是一樣的,所以排程物件RoundRobinHandle只會例項化一次,在之後的過程中,每一個子程序會根據key獲取到同一個排程例項,並呼叫add方法將worker物件和一個回撥函式新增進排程例項,可以看到回撥函式執行時,就會將原message中的seq屬性的值新增給ack屬性再掛載上處理後的資料併發送給子程序。那麼剩下的事情,就剩下排程物件RoundRobinHandle的原始碼了。

我們不妨來推測一下,它的主要邏輯就是在主程序中建立真正監聽目標埠的伺服器,並添加當客戶端請求到達時對於工作程序的排程程式碼,下一節我們就一起來驗證一下。

六.輪詢排程Round-Robin-Handle

排程方法的原始碼是internal/cluster/round_robin_handle.js,另一種shared_handle.js是windows下使用的排程策略,先不做分析(主要是沒研究過,不敢瞎說)。先從建構函式開始:

16行,bingo,終於看到主程序啟動伺服器了。接著就是根據引數而分流的監聽方法,叢集程式碼中對應的是20行的帶有有效port引數的情況,所以伺服器就在主程序啟動了,最後來看看server開始觸發listening事件時執行的邏輯(此處呼叫的是once方法,所以只會執行一次):

1.將主程序server的內部_handle控制代碼,掛載給round-robin-handle例項
2.當這個控制代碼被連線時(也就是客戶端socket執行connect方法連線後),會觸發它的`connection`事件,回撥函式會呼叫`distribute`方法來分發這個客戶端socket控制代碼,注意32行後面半句的箭頭函式方法,這裡的handle就是指客戶端`socket`例項。
3.將server._handle指向null
4.將server屬性指向null

如果你還記得net模組中listen方法的邏輯的話可能會有印象,_handleconnection事件回撥其實原本已經被複寫過一次了,也就是說單程序執行的程式在建立伺服器時,server._handleconnection事件會觸發server例項的connection事件,而在叢集模式下,主程序中排程例項中伺服器控制代碼server._handleconnection再次被複寫,將邏輯改變為分發socket,而子程序中的server._handle還是保持原來的邏輯。

最後一步指向null的邏輯還涉及到add方法,繼續看主程序中呼叫的add方法:

這個send形參實際上就是主程序中傳入的最終向子程序傳送返回訊息的那個回撥函式,它被封裝進了done函式,這裡需要著重看一下55行的邏輯,this.server === null這個條件實際上對應的就是建構函式中伺服器開始監聽的事件,所以55-59行的程式碼以及建構函式中新增的listening事件的回撥函式需要聯合在一起來理解,也就是每個子程序的send方法都被包裹在一個獨立的done函式中,這個函式會在主程序的server處於listening狀態後觸發執行,並且只觸發一次。當它觸發時,會從例項的handle屬性(也就是server_handle控制代碼)上取得socket名稱然後呼叫send方法,這個特殊socket的名稱在回撥函式中對應reply形參,最終掛載在message中發回了子程序。

至此其實主程序和子程序建立伺服器的訊息已經完成了閉環。最後再看一下RoundRobinHandle中最後兩個方法:

當客戶端socket執行connect方法連線到主程序server的控制代碼後,主程序會呼叫round-robin-handle例項的distribute方法,這個方法的邏輯比較簡單,把這個客戶端控制代碼加入到待處理佇列,然後從空閒程序佇列頭部取出一個worker程序,把它作為引數傳給handoff方法。

handoff方法中,從客戶端請求控制代碼佇列的頭部取出下一個待處理的socket,如果已經沒有要處理的請求,就把傳進來的worker放回空閒子程序佇列free中。在add方法內部封裝的done方法中也執行了這個handoff方法,現在再回過頭來看這個add方法的作用,就是當主程序處於監聽狀態後,將每一個子程序物件worker依次新增到空閒程序佇列free中。最後夠早了一個新的act:newconn訊息,並通過排程選出的worker.process物件實現跨程序通訊來將待處理控制代碼和【新連線】訊息傳送給子程序。

七. 圖解叢集建立過程的邏輯跳轉

叢集建立過程的邏輯大致的跳轉路徑如下,細節部分直接參考前文的講解即可。

相關推薦

nodejs原理&amp;原始碼賞析6深度剖析cluster模組原始碼node.js程序

目錄 一. 引言 二.server.listen方法 三.cluster._getServer( )方法 四.跨程序通訊工具方法Utils 五.act:queryServer訊息

nodejs原理&amp;原始碼賞析4深度剖析cluster模組原始碼node.js程序

目錄 一. 概述 二. 執行緒與程序 三. cluster模組原始碼解析 3.1 起步 3.2 入口 3.3 主程序模組master.js 3.4 子程序模組c

# nodejs原理&amp;原始碼賞析3欣賞手術級的原型鏈加工藝術

目錄 一. 概述 二. 原型鏈基礎知識 三. Worker類的原型鏈加工 四. 例項的生成 五. 最後一個問題 六. 一些心得 示例程式碼託

nodejs原理&amp;原始碼賞析5net模組通訊的實現

目錄 一. net模組簡介 二. Client-Server的通訊 2.1 server的建立 2.2 Socket的建立 三. IPC通訊 四. 擼一個簡易

nodejs原理&amp;原始碼賞析7Node.js中的事件迴圈,定時器和process.nextTick

目錄 Event Loop 是什麼? Event Loop 基本解釋 事件迴圈階段概覽 事件迴圈細節 timers pending callbacks poll

nodejs原理&amp;原始碼賞析9node-ssh實現輕量級自動化部署

目錄 一. 需求描述 二. 預備知識 IP+埠訪問 域名訪問 三. Nodejs應用的手動部署 四. 基於nodejs的自動部署 4.1 pa

nodejs原理&amp;原始碼雜記8Timer模組基於二叉堆的定時器

目錄 一.概述 二. 資料結構 2.1 連結串列 2.2 二叉堆 三. 從setTimeout理解Timer模組原始碼 3.1 timers.js中的定義

KoaNode.js開發實戰2——使用Koa中介軟體獲取響應時間視訊演示

學習架構:     在實戰專案中,經常需要記錄下伺服器的響應時間,也就是從伺服器接收到HTTP請求,到最終返回給客戶端之間所耗時長。在Koa應用中,利用中介軟體機制可以很方便的實現這一功能。程式碼如下所示: 01 const koa = require('koa');

KoaNode.js開發實戰3——Nunjucks模板在Koa中的應用視訊演示

技術架構: ​   在Koa中應用Nunjucks,需要先把Nunjucks整合為符合Koa規格的中介軟體(Middleware),從本質上來講,整合後的中介軟體的作用是給上下文物件繫結一個render(view, model)方法,這樣,後面的Controller就可以呼叫這個方法來渲染模板

nodejs原理&原始碼賞析5net模組通訊的實現

  一. net模組簡介      二. Client-Server的通訊      2.1 server的建立      2.2

LeetCode &amp; 劍指offer刷題陣列題5:3 陣列中重複的數字287. Find the Duplicate Number

【LeetCode & 劍指offer 刷題筆記】目錄(持續更新中...) 287 .   Find the Duplicate Number Given an array   nums

LeetCode &amp; 劍指offer刷題矩陣題1:4 有序矩陣中的查詢 74. Search a 2D Matrix 系列

【LeetCode & 劍指offer 刷題筆記】目錄(持續更新中...) 74. Search a 2D Matrix Write an efficient algorithm that searches for a va

LeetCode &amp; 劍指offer刷題分治法題1:16 數值的整數次方50. Pow(x, n)

【LeetCode & 劍指offer 刷題筆記】目錄(持續更新中...) 50. Pow(x, n) Implement   pow( x ,   n ) , which calculates&n

通知▁▂▃ Himi 著作《Android遊戲程式設計之從零開始》★書籍原始碼+第4/6/7樣章★部落格系列原始碼整理打包->免費下載★ ▃▂▁

第1章 Android 平臺介紹與環境搭建1.1 Android平臺簡介21.1.1    Android作業系統平臺的優勢和趨勢21.1.2    Android SDK與Android NDK21.2 Android開發環境的搭建31.2.1    搭配環境前的準備工作31.2.2    安裝和配置環境6

安富萊專題教程第6SEGGER的J-Scope波形上位機軟件,RTT模式波形上傳速度可狂飆到500KB/S左右

static str 數值 height size oat 工作方式 調整 設置 說明:1、在實際項目中,很多時候,我們需要將傳感器或者ADC的數值以波形的形式顯示。通常的解決辦法是用串口上位機,USB接口上位機或者MDK的邏輯分析儀功能,使用這三種方式都比較繁瑣。本期專題

深入Java虛擬機1:Java內存區域內存溢出

count 遇到 leak 分析 對象類型 深度 分配內存 解釋執行 尋址 內存區域 Java虛擬機在執行Java程序的過程中會把他所管理的內存劃分為若幹個不同的數據區域。Java虛擬機規範將JVM所管理的內存分為以下幾個運行時數據區:程序計數器、Java虛擬機棧、本地方法

火爐煉AI深度學習009-用Keras遷移學習提升性能分類問題

tro ray size array 全連接 步驟 loss pytho numpy 【火爐煉AI】深度學習009-用Keras遷移學習提升性能(多分類問題) (本文所使用的Python庫和版本號: Python 3.6, Numpy 1.14, scikit-learn

SparkSQL 之 Shuffle Join 核心原理及應用深度剖析-Spark商業原始碼實戰

本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何商業交流,可隨時聯絡。

Python 執行緒、程序 原始碼執行流程、GIL

Python 多執行緒、多程序 (一)之 原始碼執行流程、GIL Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊 Python 多執行緒、多程序 (三)之 執行緒程序對比、多執行緒 一、python程式的執行原理 許多時候,在執行一個python檔案的時候,會發現在同一目錄下會出現一個__

《TensorFlow:實戰Google深度學習框架》——6.2 卷積神經網路簡介卷積神經網路的基本網路結構及其全連線神經網路的差異

下圖為全連線神經網路與卷積神經網路的結構對比圖: 由上圖來分析兩者的差異:                  全連線神經網路與卷積網路相同點   &nb