從原始碼分析Node的Cluster模組
前段時間,公司的老哥遇到一個問題,大概就是本機有個node的http伺服器,但是每次請求這個伺服器的埠返回的資料都報錯,一看返回的資料根本不是http的報文格式,然後經過一番排查發現是另外一個伺服器同時監聽了http伺服器的這個埠。這個時候老哥就很奇怪,為啥我這個埠明明使用了,卻還是可以啟動呢?這個時候我根據以前看libuv原始碼的經驗解釋了這個問題,因為uv__tcp_bind中,對socket會設定SO_REUSEADDR
選項,使得埠可以複用,但是tcp中地址不能複用,因為那兩個監聽雖然是同一個埠,但是地址不同,所以可以同時存在。這個問題讓我不禁想到了之前看一篇文章裡有人留言說這個選項是cluster內部複用埠的原因,當時沒有細細研究以為說的是SO_REUSEPORT
也就沒有細想,但是這次因為這個問題仔細看了下結果是設定的SO_REUSEADDR
選項,這個選項雖然能複用埠,但是前提是每個ip地址不同,比如可以同時監聽'0.0.0.0'和'192.168.0.12'的埠,但不能兩個都是'0.0.0.0'的同一個 埠,如果cluster是用這個來實現的,那要是多起幾個子程序很明顯ip地址不夠用啊,於是就用node文件中的例子試了下:
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`worker ${worker.process.pid} died`); }); } else { // Workers can share any TCP connection // In this case it is an HTTP server http.createServer((req, res) => { res.writeHead(200); res.end('hello world\n'); }).listen(8000); console.log(`Worker ${process.pid} started`); } 複製程式碼
在使用cluster的在幾個子程序同時監聽了8000埠後,查看了一下只有主程序監聽了這個埠,其他都沒有。這個時候,我猜測node還是使用在父程序中建立sever的io但是這個父程序應該就是通過Unix域套接字的cmsg_data將父程序中收到客戶端套接字描述符傳遞給子程序然後讓子程序來處理具體的資料與邏輯,但是node到底是如何通過在子程序中createServer並且listen但是隻在父程序中真的監聽了該埠來實現這個邏輯的呢?這個問題引起了我的好奇,讓我不得不到原始碼中一探究竟。
從net模組出發
按理說,這個問題我們應該直接通過cluster模組來分析,但是很明顯,在載入http模組的時候並不會像cluster模組啟動時一樣通過去判斷NODE_ENV來載入不同的模組,但是從上面的分析,我可以得出子程序中的createServer執行了跟父程序不同的操作,所以只能說明http模組中通過isMaster這樣的判斷來進行了不同的操作,不過http.js
和_http_server.js
中都沒有這個判斷,但是通過對createServer向上的查詢我在net.js
的listenInCluster
中找到了isMaster的判斷,listenInCluster
會在createServer後的server.listen(8000)
中呼叫,所以我們可以看下他的關鍵邏輯。
if (cluster === null) cluster = require('cluster'); if (cluster.isMaster || exclusive) { server._listen2(address, port, addressType, backlog, fd); return; } const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags: 0 }; // 獲取父程序的server控制代碼,並監聽它 cluster._getServer(server, serverQuery, listenOnMasterHandle); 複製程式碼
從這段程式碼中我們可以看出,如果是在父程序中,直接通過_listen2的邏輯就能開始正常的監聽了,但是在子程序中,會通過cluster._getServer
的方式獲取父程序的控制代碼,並通過回撥函式listenOnMasterHandle
監聽它。看到這裡我其實比較疑惑,因為在我對於網路程式設計的學習中,只聽說過傳遞描述符的,這個傳遞server的控制代碼實在是太新鮮了,於是趕緊繼續深入研究了起來。
深入cluster的程式碼
首先,來看一下_gerServer
的方法的程式碼。
const message = util._extend({ act: 'queryServer', index: indexes[indexesKey], data: null }, options); send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb);// Shared listen socket. else rr(reply, indexesKey, cb);// Round-robin. }); 複製程式碼
這個方法通過send像主程序傳送一個包,因為在send函式中有這樣一句程式碼:
message = util._extend({ cmd: 'NODE_CLUSTER' }, message); 複製程式碼
通過Node的文件,我們可以知道這種cmd帶了Node字串的包,父程序會通過internalMessage
事件來響應,所以我們可以從internal/cluster/master.js
中看到找到,對應於act: 'queryServer'
的處理函式queryServer的程式碼。
... var constructor = RoundRobinHandle; ... handle = new constructor(key, message.address,message.port,message.addressType,message.fd,message.flags); ... handle.add(worker, (errno, reply, handle) => { reply = util._extend({ errno: errno, key: key, ack: message.seq, data: handles[key].data }, reply); if (errno) delete handles[key];// Gives other workers a chance to retry. send(worker, reply, handle); }); 複製程式碼
這裡建立了一個RoundRobinHandle
例項,在該例項的建構函式中通過程式碼:
this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd }); else if (port >= 0) this.server.listen(port, address); else this.server.listen(address);// UNIX socket path. this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; }); 複製程式碼
在父程序中生成了一個server,並且通過註冊listen的方法將有心的客戶端連線到達時執行的onconnection改成了使用自身的this.distribute函式,這個函式我們先記下因為他是後來父程序給子程序派發任務的重要函式。說回getServer的程式碼,這裡通過RoundRobinHandle
例項的add方法:
const done = () => { if (this.handle.getsockname) { const out = {}; this.handle.getsockname(out); // TODO(bnoordhuis) Check err. send(null, { sockname: out }, null); } else { send(null, null, null);// UNIX socket. } this.handoff(worker);// In case there are connections pending. }; // Still busy binding. this.server.once('listening', done); 複製程式碼
會給子程序的getServer
以回覆。從這裡我們可以看到在給子程序的回覆中handle一直都是null。那這個所謂的去取得父程序的server是怎麼取得的呢?這個地方讓我困惑了一下,不過後來看子程序的程式碼我就明白了,實際上根本不存在什麼取得父程序server的控制代碼,這個地方的註釋迷惑了閱讀者,從之前子程序的回撥中我們可以看到,返回的handle只是決定子程序是用shared方式還是Round-robin的方式來處理父程序派下來的任務。從這個回撥函式我們就可以看出,子程序是沒有任何獲取控制代碼的操作的,那它是如何處理的呢?我們通過該例子中的rr方法可以看到:
const handle = { close, listen, ref: noop, unref: noop }; if (message.sockname) { handle.getsockname = getsockname;// TCP handles only. } handles[key] = handle; cb(0, handle); 複製程式碼
這個函式中生成了一個自帶listen和close方法的物件,並傳遞給了函式listenOnMasterHandle
,雖然這個名字寫的是在父程序的server控制代碼上監聽,實際上我們這個例子中是子程序自建了一個handle,但是如果是udp的情況下這個函式名字還確實就是這麼回事,原因在於SO_REUSEADDR
選項,裡面有這樣一個解釋:
SO_REUSEADDR允許完全相同的地址和埠的重複繫結。但這隻用於UDP的多播,不用於TCP。 複製程式碼
所以,在udp情況同一個地址和埠是可以重複監聽的(之前網上看到那個哥們兒說的也沒問題,只是一葉障目了),所以可以共享父程序的handle,跟TCP的情況不同。我們繼續來看當前這個TCP的情況,在這個情況下listenOnMasterHandle
會將我們在子程序中自己生成的handle物件傳入子程序中通過createServer建立的server的_handle屬性中並通過
server._listen2(address, port, addressType, backlog, fd); 複製程式碼
做了一個假的監聽操作,實際上因為_handle的存在這裡只會為之前_handle賦值一個onconnection函式,這個函式的觸發則跟父程序中通過真實的客戶端連線觸發的時機不同,而是通過
process.on('internalMessage', (message, handle) { if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') _disconnect.call(worker, true); } 複製程式碼
中註冊的internalMessage
事件中的對父程序傳入的act為newconn的包觸發。而父程序中就通過我們剛剛說到的改寫了server物件的onconnection函式的distribute函式,這個函式中會呼叫一個叫handoff的函式,通過程式碼:
const message = { act: 'newconn', key: this.key }; sendHelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle);// Worker is shutting down. Send to another. this.handoff(worker); }); 複製程式碼
其中send到子程序的handle就是新連線客戶端的控制代碼,Node中父子程序之間的通訊最後是通過src/stream_base.cc
中的StreamBase::WriteString
函式實現的,從這段程式碼我們可以看出:
... //當程序間通訊時 uv_handle_t* send_handle = nullptr; if (!send_handle_obj.IsEmpty()) { HandleWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); send_handle = wrap->GetHandle(); // Reference LibuvStreamWrap instance to prevent it from being garbage // collected before `AfterWrite` is called. CHECK_EQ(false, req_wrap->persistent().IsEmpty()); req_wrap_obj->Set(env->handle_string(), send_handle_obj); } err = DoWrite( req_wrap, &buf, 1, reinterpret_cast<uv_stream_t*>(send_handle)); 複製程式碼
可以看到,在呼叫此方式時,如果傳入了一個客戶端的控制代碼則通過Dowrite
方法最後通過輔助資料cmsg_data將客戶端控制代碼的套接字fd傳送到子程序中進行處理。看到這裡我不禁恍然大悟,原來還是走的是我熟悉的那套網路程式設計的邏輯啊。