兄déi,libuv瞭解一下
前序:說說為啥要研究libuv,其實在很久之前(大概2年前吧)玩nodejs的時候就對這個核心庫非常感興趣,不過由於當年水平確實比較菜,大概看了看之後實在沒能靜下心來看下去。18年初的時候,360直播雲官網做了React同構,那個時候我問自己如果真有百萬併發,每天億級的訪問量有沒有信心保證中間node層一次不掛(或者不出任何事故),其實我到今天仍然是沒有足夠底氣的。原因有兩個吧:一是對nodejs和它底層的內容還遠遠不夠了解,二是對監控層面做的不夠好。我們大概也都知道alinode,他們早在3 4年前就在nodejs上做了很多工作,比如v8記憶體監控等,但是比較遺憾的是alinode至今沒有開源。於是乎有了我的第一篇關於libuv的文章,後面爭取還會更新nodejs、v8等相關的內容。 本文從下面幾個方面來介紹libuv,通過fs、net兩方面介紹libuv的思想。
如何安裝、使用libuv這個框架
首先我們可以在 ofollow,noindex">libuv 上找到libuv這個框架,在README.md裡,我們就可以在Build Instructions找到安裝方法,作者電腦作業系統是macos(所以後面的例項也是以linux、unix為主,不會講windows)。我們首先把專案clone到我們的電腦上,在專案根目錄執行一下的命令,在執行過程中可能會出現各種底層庫沒有安裝的情況,按照提示自行安裝就可以了,作者在執行 xcodebuild 的時候發現不能加上 -target All 的引數,不加的話可以順利build過去。
$ ./gyp_uv.py -f xcode $ xcodebuild -ARCHS="x86_64" -project uv.xcodeproj \ -configuration Release -target All
build完成後 我們可以在專案目錄裡找到 build/Release/libuv.a 檔案,這個就是編譯後的檔案了,我們稍後會用到。 準備工作做好之後我們就可以建立一個C或者C++的工程了,在Mac上我一般使用xcode來編寫oc、c、c++的專案。 首先建立一個C專案,這個時候我們需要把我們之前編譯的libuv.a的檔案加入到專案的依賴中,我們在Build Phases中的 Link Binary with Libraries中新增libuv.a的路徑,同時我們需要在專案根目錄引入uv.h等檔案頭。準備工作做好之後,我們就開始學習怎麼寫標準的hello world了 哈哈哈哈。
#include <stdio.h> #include <stdlib.h> #include <uv.h> int main() { uv_loop_t *loop = malloc(sizeof(uv_loop_t)); uv_loop_init(loop); printf("Now quitting.\n"); uv_run(loop, UV_RUN_DEFAULT); uv_loop_close(loop); free(loop); return 0; }
上述程式碼僅僅初始化了一個loop迴圈,並沒有執行任何內容,然後就close且退出了。雖然上述程式碼並沒有利用libuv的async功能,但是給我們展示了 uv_loop_init uv_run 兩個核心函式。我們稍後會介紹他們做了什麼。
先從一個數據結構開始
在開始介紹整個整個libuv之前,我不得不首先介紹一個數據結構,因為這個資料結構在libuv裡無處不在,這個資料結構就是--迴圈雙向連結串列。 我們在專案根目錄下的src目錄可以找到queue.h的標頭檔案。不錯,這個資料結構就是用巨集實現的,那我讓我們一起來學習一下什麼是連結串列。
連結串列的定義:
連結串列是一種物理儲存單元上非連續、非順序的儲存結構
那什麼是雙向連結串列呢?

雙向連結串列其實就是頭尾相連
那什麼是雙向迴圈連結串列呢?

看圖我們就明白了,所謂的迴圈連結串列就是把頭尾相連。
來看一下 queue.h 是怎麼實現的
#define QUEUE_NEXT(q)(*(QUEUE **) &((*(q))[0])) #define QUEUE_PREV(q)(*(QUEUE **) &((*(q))[1])) #define QUEUE_PREV_NEXT(q)(QUEUE_NEXT(QUEUE_PREV(q))) #define QUEUE_NEXT_PREV(q)(QUEUE_PREV(QUEUE_NEXT(q))) /* Public macros. */ #define QUEUE_DATA(ptr, type, field)\ ((type *) ((char *) (ptr) - offsetof(type, field))) #define QUEUE_INIT(q)\ do {\ QUEUE_NEXT(q) = (q);\ QUEUE_PREV(q) = (q);\ }\ while (0)
上述程式碼我只截取了部分的實現 其實這裡我只想講兩個點 1:QUEUE_NEXT 的實現
(*(QUEUE **) &((*(q))[0]))
在這個巨集裡,他為什麼用這個複雜的方式來實現呢? 其實他有兩個目的:強制型別轉換、成為左值
*(q))[0]
這個步驟是取到陣列的第一個元素
(QUEUE **)
這個步驟進行強制型別轉換
(*(nnn) &(xxx))
這個步驟目的就是為了使xxx成為左值
2:QUEUE_DATA 獲取連結串列的值 巧妙的使用了地址的偏移量來完成
來看一個使用queue.h的demo吧
#include "queue.h" #include <stdio.h> static QUEUE* q; static QUEUE queue; struct user_s { int age; char* name; QUEUE node; }; int main() { struct user_s* user; struct user_s john; struct user_s henry; john.name = "john"; john.age = 44; henry.name = "henry"; henry.age = 32; QUEUE_INIT(&queue); QUEUE_INIT(&john.node); QUEUE_INIT(&henry.node); QUEUE_INIT(&willy.node); QUEUE_INIT(&sgy.node); ((*(&queue))[0]) = john.node; (*(QUEUE **) &((*(&queue))[0])) = &john.node; QUEUE_INSERT_TAIL(&queue, &john.node); QUEUE_INSERT_TAIL(&queue, &henry.node); q = QUEUE_HEAD(&queue); user = QUEUE_DATA(q, struct user_s, node); printf("Received first inserted user: %s who is %d.\n", user->name, user->age); QUEUE_REMOVE(q); QUEUE_FOREACH(q, &queue) { user = QUEUE_DATA(q, struct user_s, node); printf("Received rest inserted users: %s who is %d.\n", user->name, user->age); } return 0; }
從上面程式碼可以總結出5個方法 QUEUE_INIT 佇列初始化 QUEUE_INSERT_TAIL 插入到隊尾 QUEUE_HEAD 頭部第一個元素 QUEUE_DATA 獲得元素的內容 QUEUE_REMOVE 從佇列中移除元素
那雙向迴圈連結串列就先簡單介紹到這。
libuv的核心
libuv為什麼可以這麼高效呢?實際他使用了作業系統提供的高併發非同步模型
linux: epoll
freebsd: kqueue
windows: iocp
每個我們常見的作業系統都為我們封裝了類似的高併發非同步模型,那libuv其實就是對各個作業系統進行封裝,最後暴露出統一的api供開發者呼叫,開發者不需要關係底層是什麼作業系統,什麼API了。 我們來看一下同步模型和非同步模型的區別
阻塞模型

我們在一個執行緒中呼叫網路請求,之後執行緒就會被阻塞,直到返回結果才能繼續執行執行緒
非同步模型

在非同步模型中 我們呼叫網路請求後不在去直接呼叫accept阻塞執行緒,而是輪詢fd是否發生變化,在返回內容後我們在呼叫cb執行我們的程式碼,這個過程是非阻塞的。 說了這麼多我們通過2個例子瞭解一下其中的原理。
學習如何建立一個socket
我們首先了解一下 C是如何建立socket的,之後我們在看一下如果通過高併發非同步模型來建立socket,最後我們在瞭解一下 libuv下怎麼建立socket。
C如何建立一個socket呢?
#include <sys/types.h> #include <sys/socket.h> #include <stdio.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <fcntl.h> #include <sys/shm.h> #define MYPORT8887 #define QUEUE20 #define BUFFER_SIZE 1024 int main() { //定義sockfd AF_INET(IPv4) AF_INET6(IPv6) AF_LOCAL(UNIX協議) AF_ROUTE(路由套接字) AF_KEY(祕鑰套接字) // SOCK_STREAM(位元組流套接字) SOCK_DGRAM int server_sockfd = socket(AF_INET, SOCK_STREAM, 0); ///定義sockaddr_in struct sockaddr_in server_sockaddr; server_sockaddr.sin_family = AF_INET; server_sockaddr.sin_port = htons(MYPORT); server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY); ///bind,成功返回0,出錯返回-1 if(bind(server_sockfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1) { perror("bind"); exit(1); } printf("監聽%d埠\n", MYPORT); ///listen,成功返回0,出錯返回-1 if(listen(server_sockfd, QUEUE) == -1) { perror("listen"); exit(1); } ///客戶端套接字 char buffer[BUFFER_SIZE]; struct sockaddr_in client_addr; socklen_t length = sizeof(client_addr); printf("等待客戶端連線\n"); ///成功返回非負描述字,出錯返回-1 int conn = accept(server_sockfd, (struct sockaddr*)&client_addr, &length); if(conn<0) { perror("connect"); exit(1); } printf("客戶端成功連線\n"); while(1) { memset(buffer,0,sizeof(buffer)); long len = recv(conn, buffer, sizeof(buffer), 0); //客戶端傳送exit或者異常結束時,退出 ; if(strcmp(buffer,"exit\n")==0 || len<=0) { printf("出現異常"); break; } printf("來自客戶端資料:\n"); fwrite(buffer, len, 1, stdout); send(conn, buffer, len, 0); printf("傳送給客戶端資料:\n"); fwrite(buffer, len, 1, stdout); } close(conn); close(server_sockfd); return 0; }
程式碼一大坨,其實上我們簡單拆分一下
第一步:建立socket 檔案描述符 第二步:定義socket addr 第三步:繫結檔案描述符和地址bind 第四步:監聽檔案描述符 listen 第五步:等待socket返回內容 accept 第六步:接收資訊 recv
那我們如何使用kqueue來建立socket呢?
由於作者電腦是macos,所以只能使用kqueue,不能使用epoll。
#include <sys/types.h> #include <sys/socket.h> #include <stdio.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <fcntl.h> #include <sys/shm.h> #define MYPORT8887 #define QUEUE20 #define BUFFER_SIZE 1024 int main() { // 定義sockfd AF_INET(IPv4) AF_INET6(IPv6) AF_LOCAL(UNIX協議) AF_ROUTE(路由套接字) AF_KEY(祕鑰套接字) // SOCK_STREAM(位元組流套接字) SOCK_DGRAM int server_sockfd = socket(AF_INET, SOCK_STREAM, 0); // 定義sockaddr_in struct sockaddr_in server_sockaddr; server_sockaddr.sin_family = AF_INET; server_sockaddr.sin_port = htons(MYPORT); server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY); // bind,成功返回0,出錯返回-1 if(bind(server_sockfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1) { perror("bind"); exit(1); } printf("監聽%d埠\n", MYPORT); // listen,成功返回0,出錯返回-1 if(listen(server_sockfd, QUEUE) == -1) { perror("listen"); exit(1); } //建立一個訊息佇列並返回kqueue描述符 int kq =kqueue(); struct kevent change_list;//想要監控的事件 struct kevent event_list[10000];//用於kevent返回 char buffer[1024]; int nevents; // 監聽sock的讀事件 EV_SET(&change_list, server_sockfd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); while(1) { printf("new loop...\n"); // 等待監聽事件的發生 nevents = kevent(kq, &change_list, 1, event_list, 2, NULL); if (nevents < 0) { printf("kevent error.\n");// 監聽出錯 } else if (nevents > 0) { printf("get events number: %d\n", nevents); for (int i = 0; i < nevents; ++i) { printf("loop index: %d\n", i); struct kevent event = event_list[i]; //監聽事件的event資料結構 int clientfd = (int) event.ident;// 監聽描述符 // 表示該監聽描述符出錯 if (event.flags & EV_ERROR) { close(clientfd); printf("EV_ERROR: %s\n", strerror(event_list[i].data)); } // 表示sock有新的連線 if (clientfd == server_sockfd) { printf("new connection\n"); struct sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); int new_fd = accept(server_sockfd, (struct sockaddr *) &client_addr, &client_addr_len); long len = recv(new_fd, buffer, sizeof(buffer), 0); char remote[INET_ADDRSTRLEN]; printf("connected with ip: %s, port: %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, remote, INET_ADDRSTRLEN), ntohs(client_addr.sin_port)); send(new_fd, buffer, len, 0); } } } } return 0; }
我們可以看到,listen之前都是一樣的,不在贅述,簡化一下後面的步驟
第一步:建立 kqueue描述符 第二部:監聽socket讀事件 EV_SET 第三步:繫結kq 和 change_list kevent
一直while迴圈直到 kevent 返回可以的檔案描述符數量 那到這裡其實我們就完全弄懂了 如何直接用C寫出高併發非同步是怎麼執行的。那麼我們就看看使用libuv的例子吧
使用libuv的scoket
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <uv.h> #define DEFAULT_PORT 7000 #define DEFAULT_BACKLOG 128 uv_loop_t *loop; struct sockaddr_in addr; typedef struct { uv_write_t req; uv_buf_t buf; } write_req_t; void free_write_req(uv_write_t *req) { write_req_t *wr = (write_req_t*) req; free(wr->buf.base); free(wr); } void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { buf->base = (char*) malloc(suggested_size); buf->len = suggested_size; } void on_close(uv_handle_t* handle) { free(handle); } void echo_write(uv_write_t *req, int status) { if (status) { fprintf(stderr, "Write error %s\n", uv_strerror(status)); } free_write_req(req); } void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { if (nread > 0) { write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t)); req->buf = uv_buf_init(buf->base, nread); fwrite(buf->base, 30, 1, stdout); uv_write((uv_write_t*) req, client, &req->buf, 1, echo_write); return; } if (nread < 0) { if (nread != UV_EOF) fprintf(stderr, "Read error %s\n", uv_err_name(nread)); uv_close((uv_handle_t*) client, on_close); } free(buf->base); } void on_new_connection(uv_stream_t *server, int status) { if (status < 0) { fprintf(stderr, "New connection error %s\n", uv_strerror(status)); // error! return; } uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t)); uv_tcp_init(loop, client); if (uv_accept(server, (uv_stream_t*) client) == 0) { uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read); } else { uv_close((uv_handle_t*) client, on_close); } } int main() { loop = uv_default_loop(); uv_tcp_t server; uv_tcp_init(loop, &server); uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr); uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0); int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection); if (r) { fprintf(stderr, "Listen error %s\n", uv_strerror(r)); return 1; } return uv_run(loop, UV_RUN_DEFAULT); }
實際上整體我們都可以把libuv和我們原生的c kqueue進行一一對應,發現相差不多,唯一不同是我們需要定義 uv_loop 這個內部迴圈,後面我們在來講套迴圈機制。
學習如何進行檔案讀寫
我們學習完了網路,那麼我們再來看看檔案i/o是怎麼處理的。

剛剛我們玩轉了socket來看這張圖是不是很熟悉?但是發現右側有了很大的不同。檔案操作、DNS、使用者程式碼不是基於epoll這種模型嗎? 顯而易見我們有了答案,這是為什麼呢?其實很簡單檔案的很多操作就是同步的,但是libuv為了統一非同步,利用開闢執行緒進行檔案等操作模擬了非同步的過程!!原來我們用了這麼久才發現他是個騙子。哈哈!其實是我們學藝不精。 那其實講到這裡檔案讀寫其實講的差不多了,我們還是來看看例子吧!
#include <stdio.h> #include <uv.h> uv_fs_t open_req; uv_fs_t _read; static char buffer[1024]; static uv_buf_t iov; void on_read(uv_fs_t *req) { printf("%s\n",iov.base); } void on_open(uv_fs_t *req) { printf("%zd\n",req->result); iov = uv_buf_init(buffer, sizeof(buffer)); uv_fs_read(uv_default_loop(), &_read, (int)req->result, &iov, 1, -1, on_read); } int main() { const char* path = "/Users/sgy/koa/package.json"; // O_RDONLY 、 O_WRONLY 、 O_RDWR 、 O_CREAT uv_fs_open(uv_default_loop(), &open_req, path, O_RDONLY, 0, on_open); uv_run(uv_default_loop(), UV_RUN_DEFAULT); uv_fs_req_cleanup(&open_req); return 0; }
其實libuv底層對檔案open和read的操作是分開的。 看到這裡檔案api沒啥講的了,我們來簡單講講執行緒池。
執行緒池
執行緒池就是對執行緒的統一管理,預先創建出執行緒,如果有任務就把任務放到執行緒池裡去執行。

通過上圖我們可以看到有任務進來首先會插入到連結串列中進行排隊等待, 直到執行緒空餘就會去連結串列中去取。 通過閱讀 src/threadpool.c檔案我們可以瞭解 MAX_THREADPOOL_SIZE 128 最大執行緒為128個 default_threads[4] 預設只會開闢4個執行緒 如果你對底層不瞭解 那當你在進行大量的檔案i/o時 執行緒池數量就是阻礙你的最大障礙。 為啥最大隻能建立128個執行緒呢?因為大多數作業系統建立一個執行緒大概花費1M的記憶體空間,外加使用者本身程式碼也要佔用大量的記憶體,所以這裡設定了最大128的限制。
瞭解libuv的迴圈機制
我們通過網路和檔案瞭解了libuv,那麼我們來看看libuv的迴圈機制
uv_loop_t *loop; loop = uv_default_loop() uv_run(loop, UV_RUN_DEFAULT);
首先我們會建立 loop 然後一系列的騷操作之後 最後我們執行了uv_run 嗯嗯 那uv_run 肯定是突破口了 在src/unix/core.c 檔案裡 我們找到了 uv_run的定義
int uv_run(uv_loop_t* loop, uv_run_mode mode) { int timeout; int r; int ran_pending; r = uv__loop_alive(loop); if (!r) uv__update_time(loop); while (r != 0 && loop->stop_flag == 0) { uv__update_time(loop); uv__run_timers(loop); ran_pending = uv__run_pending(loop); uv__run_idle(loop); uv__run_prepare(loop); timeout = 0; if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT) timeout = uv_backend_timeout(loop); uv__io_poll(loop, timeout); uv__run_check(loop); uv__run_closing_handles(loop); if (mode == UV_RUN_ONCE) { /* UV_RUN_ONCE implies forward progress: at least one callback must have * been invoked when it returns. uv__io_poll() can return without doing * I/O (meaning: no callbacks) when its timeout expires - which means we * have pending timers that satisfy the forward progress constraint. * * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from * the check. */ uv__update_time(loop); uv__run_timers(loop); } r = uv__loop_alive(loop); if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT) break; }
從程式碼中 我們就可以總結出libuv的執行週期 通過while迴圈不斷的查詢 loop中是否有停止符 如果有則退出 否則就不停的進行迴圈。

上面的圖已經清楚的描述我們uv_run的流程了 那其中的核心 就在*uv io_poll* 中 例如在 src/unix/linux-core.c 中的uv io_poll函式 我們就可以找到 我們 epoll 熟悉的身影了。實現邏輯也和我們之前使用過的差不多。
總結
洋洋灑灑寫了這麼多,最後總結一下也提出自己的思考。 其實libuv底層的 actor模型是非常高效的,很多遊戲伺服器核心也使用actor模型,那相對於火的不行的go(協程模型) nodejs一直沒有在服務端發揮它的高效呢? 我覺得其實原因很簡單,因為nodejs他並不高效,我覺得nodejs能夠快速的被開發出來並且js執行如此高效 v8功不可沒。但是成也v8敗也v8,JIT優化的在好 依然和編譯型語言相差甚遠。 但是一點的效能是阻礙大資料等框架使用go而不是用nodejs的原因嗎?我覺得其實並不是,最大的原因我覺得是生態!非常多的Apache開源框架使用java編寫,很多大資料使用go來承載,nodejs有什麼頂級生態嗎?我覺得並沒有,他大多數面向的是前端這個群體導致他的生態的發展。 謝謝大家能看到這裡,上述的心得都是近期整理的,如果有不對的地方歡迎大家多多批評。上述內容如果轉載請附帶原文連結,感謝。