Node.js非同步處理CPU密集型任務
Node.js非同步處理CPU密集型任務
Node.js擅長資料密集型實時(data-intensive real-time)互動的應用場景。然而資料密集型實時應用程式並不是只有I/O密集型任務,當碰到CPU密集型任務時,比如要對資料加解密(node.bcrypt.js),資料壓縮和解壓(node-tar),或者要根據使用者的身份對圖片做些個性化處理,在這些場景下,主執行緒致力於做複雜的CPU計算,IO請求佇列中的任務就被阻塞。
Node.js主執行緒的event loop在處理所有的任務/事件時,都是沿著事件佇列順序執行的,所以在其中任何一個任務/事件本身沒有完成之前,其它的回撥、監聽器、超時、
一個可行的解決方案是新開程序,通過ipc通訊,將CPU密集型任務交給子程序,子程序計算完畢後,再通過ipc訊息通知主程序,並將結果返回給主程序[1]。
和建立執行緒相比,開闢新程序的系統資源佔用率大,程序間通訊效率也不高。如果能不開新程序而是新開執行緒,將CPU耗時任務交給一個工作執行緒去做,然後主執行緒立即返回,處理其他的IO請求,等到工作執行緒計算完畢後,通知主執行緒並將結果返回給主執行緒。那麼在同時面對IO密集型和CPU密集型服務的場景下,
因此,和開程序相比,一個更加優秀的解決方案是:
1 不開程序,而是將CPU耗時操作交給程序內的一個工作執行緒完成。
2 CPU耗時操作的具體邏輯支援通過C++和Js實現。
3 js使用這個機制與使用IO庫類似,方便高效。
4 在新執行緒中執行一個獨立的V8 VM,與主執行緒的VM併發執行,並且這個執行緒必須 由我們自己託管。
為了實現以上四個目標,我們在Node中增加了一個backgroundthread執行緒,文章稍候會詳細解釋這個概念。在具體實現上,為Node增加了一個’pt_c’的內建C++模組。這個模組負責吧CPU耗時操作封裝成一個
BackgroundThread
Node提供了一種機制可以將CPU耗時操作交給其他執行緒去做,等到執行完畢後設置結果通知主執行緒執行callback函式。以下是一段程式碼,用來演示這個過程:
intmain() {
loop = uv_default_loop();
int data[FIB_UNTIL];
uv_work_t req[FIB_UNTIL];
int i;
for (i =0; i < FIB_UNTIL; i++) {
data[i] = i;
req[i].data = (void*) &data[i];
uv_queue_work(loop, &req[i], fib, after_fib);
}
return uv_run(loop, UV_RUN_DEFAULT);
}
其中函式uv_queue_work的定義如下:
UV_EXTERN int uv_queue_work(uv_loop_t* loop,
uv_work_t* req,
uv_work_cb work_cb,
uv_after_work_cb after_work_cb);
引數 work_cb 是在另外執行緒執行的函式指標,after_work_cb相當於給主執行緒執行的回撥函式。
在windows平臺上,uv_queue_work最終呼叫API函式QueueUserWorkItem來派發這個task,最終執行task 的執行緒是由作業系統託管的,每次可能都不一樣。這不滿足上述第四條。
因為我們要支援線上程中執行js程式碼,這就需要開一個V8 VM,所以需要把這個執行緒固定下來,特定任務,只交給這個執行緒處理。並且一旦建立,不管有沒有task,都不能隨便退出。這就需要我們自己維護一個執行緒物件,並且提供介面,使得使用者可以方便的生成一個物件並且提交給這個執行緒的任務佇列。
在node程序啟動初始化過程中,加入一個建立background thread物件的過程。這個執行緒擁有一個taskloop,有任務就處理,沒有任務就等待在一個訊號量上。多執行緒要考慮執行緒間同步的問題。執行緒同步只發生在讀寫此執行緒的incomming queue 的時候。Node的主執行緒生成task後,提交到這個執行緒的incomming queue中,並激活訊號量然後立即返回。在下一次迴圈中,backgroundthread從incomming queue中取出所有的task,放入working queue,然後依次執行working queue中的task。主執行緒不訪問working queue因此不需要加鎖。這樣做可以降低衝突。
這個執行緒在進入taskloop迴圈之前會建立一個獨立的v8 VM,專門用來執行backgroundjs的程式碼。主執行緒的v8引擎和這個執行緒的可以並行執行。它的生命週期與Node程序的生命週期一致。
BackgroundJs
可以把所有CPU耗時邏輯放入backgroundJs中,主執行緒通過生成一個task,指定好執行的函式和引數,拋給工作執行緒。工作執行緒在執行task的過程中呼叫在backgroundJs中的函式。BackgroundJs是一個.js檔案,在裡面新增CPU耗時函式。
background.js程式碼示例:
var globalFunction = function(v){
var flag;
try
{
flag = true;
JSON.parse(v);
}
catch(e)
{
flag = false;
}
if(!flag)
{
var err = 'err';
return err;
}
var obj = JSON.parse(v);
var a = obj.param1;
var b = obj.param2;
var i;
// simulate CPU intensive process...
for(i = 0; i < 95550000; ++i)
{
i += 100;
i -= 100;
}
return (a+b).toString();
}
執行node.js,在控制檯輸入:
var bind = process.binding('pt_c');
var obj = {param1: 123,param2: 456};
bind.jstask('globalFunction', JSON.stringify(obj), function(err, data){if(err) console.log("err"); else console.log(data);});
呼叫的方法是bind.jstask,稍後會解釋這個函式的用法。
以下是測試結果:
上面這個實驗操作步驟如下:
1 首先繫結’pt_c’內建模組
2 快速多次呼叫backgroundjs中的CPU耗時函式,上面的實驗中連續呼叫了三次。
當backgroundjs中的函式完成後,主執行緒接到通知,在新一輪的evenloop中,呼叫回撥函式,打印出結果。這個實驗說明了CPU耗時操作非同步執行。
方法jstask總共三個引數,前兩個引數為字串,分別是background.js中的全域性函式名稱,傳給函式的引數。最後一個引數是一個callback函式,非同步留給主執行緒執行。
為什麼用字串做引數?
為了適應各種不同的引數型別,就需要為C++函式提供各種不同的函式實現,這是非常受限制的。C++根據函式名獲取backgroundjs中的函式然後將引數傳遞給js。在js中,處理json字串是非常容易的,因此採用字串,簡化了C++的邏輯,js又能夠方便的生成和解析引數。同樣的理由,backgroundjs中函式的返回值也為json串。
對C++的支援
在苛求效能的場景,’pt_c’允許載入一個.dll 檔案到node程序,這個dll檔案包含CPU耗時操作。js載入’pt_c’的時候,指定檔名即可完成載入。
程式碼示例:
var bind = process.binding('pt_c');
bind.registermodule('node_pt_c.dll', 'DllInit', 'Json to Init');
bind.posttask('Func_example', 'Json_Param', function(err, data){if(err) console.log("err"); else console.log(data);});
與backgroundjs相比,載入C++模組多了一個步驟,這個步驟是呼叫bind.registermodule。這個函式負責將載入dll並負責對其初始化。一旦成功後,不能再載入其他模組。所有的CPU耗時操作函式都應該在這個dll檔案中實現。
總結
這篇文章提出了backgroundjs這個新的概念,擴充套件了Node.js的能力,解決了Node在處理CPU密集任務時的短板。這個解決方案使得使用Node的開發人員只需要關注backgroundjs中的函式。比起多開程序或者新新增模組的解決方案更高效,通用和一致。
我們的程式碼已經開源,您可以在 https://github.com/classfellow/node/tree/Ansy-CPU-intensive-work--in-one-process
下載。
支援backgroundjs一個穩定Node版本您可以在
下載。
參考文獻:
1 Node.js軟肋之CPU密集型任務
2 Why you should use Node.js for CPU-bound tasks,Neil Kandalgaonkar,2013.4.30;
3 http://nikhilm.github.io/uvbook/threads.html#inter-thread-communication