memcached原始碼分析-網路模組

1.概述
memcached網路模組是基於libevent庫開發的,主要分為兩個模組:連線監聽執行緒,工作執行緒。連線監聽執行緒是用來監聽來自客戶端連線的,工作執行緒主要是用來完成具體業務邏輯處理。
網路模組模型

網路模組時序圖

2.連線監聽執行緒(主執行緒)
1.初始化主執行緒libevent例項,用於監聽來自客戶端的連線
2.work執行緒的資源分配與初始化
int main (int argc, char **argv) { //....... /* initialize main thread libevent instance */ #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101 /* If libevent version is larger/equal to 2.0.2-alpha, use newer version */ struct event_config *ev_config; ev_config = event_config_new(); event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK); main_base = event_base_new_with_config(ev_config); event_config_free(ev_config); #else /* Otherwise, use older API */ main_base = event_init(); //main_base 主執行緒的libevent例項,主要用於監聽來自客戶端連線 #endif //....... //work執行緒相關初始化 memcached_thread_init(settings.num_threads, storage); //....... errno = 0; //服務端socket繫結ip與埠進行連線監聽 if (settings.port && server_sockets(settings.port, tcp_transport, portnumber_file)) { vperror("failed to listen on TCP port %d", settings.port); exit(EX_OSERR); } //....... /* enter the event loop */ if (event_base_loop(main_base, 0) != 0) { retval = EXIT_FAILURE; } //....... return retval }
server_sockets根據配置資訊開啟服務端socket的監聽
static int server_sockets(int port, enum network_transport transport, FILE *portnumber_file) { if (settings.inter == NULL) { return server_socket(settings.inter, port, transport, portnumber_file); }else{ // tokenize them and bind to each one of them.. char *b; int ret = 0; char *list = strdup(settings.inter); //....... ret |= server_socket(p, the_port, transport, portnumber_file); //....... return ret; } } static int server_socket(const char *interface, int port, enum network_transport transport, FILE *portnumber_file) { //....... for (next= ai; next; next= next->ai_next) { //....... conn *listen_conn_add; //服務端監聽fd的生成 if ((sfd = new_socket(next)) == -1) { //....... setsockopt(sfd,...); bind(sfd,...); listen(sfd,...); //至此完成了服務端socket引數設定、bind、listen等工作 //接下來就是將該sfd加入到libevent例項中進行"客戶端連線"事件的監聽 //....... //該函式功能執行相關libevent配置,以及針對該套接字控制代碼作一些相關資源的配置 conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base))); //....... } } //....... }
3.work執行緒
1.監聽來自客戶端連線的可讀事件
2.獲取客戶端的操作請求,完成具體業務邏輯處理功能
//work執行緒初始入口函式 void memcached_thread_init(int nthreads, void *arg) { //....... //nthreads work執行緒數目 //threads 是LIBEVENT_THREAD[nthreads]陣列,每個work執行緒都有個自己對應的LIBEVENT_THREAD //是LIBEVENT_THREAD結構體儲存了當前執行緒相關資源,例如:libevent例項,套接字fd佇列等 threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); for (i = 0; i < nthreads; i++) { //for迴圈為每個執行緒生成一個管道 int fds[2]; //管道初始化 if (pipe(fds)) { perror("Can't create notify pipe"); exit(1); } //第i號work執行緒的接收管道 threads[i].notify_receive_fd = fds[0]; //第i號work執行緒的傳送管道 threads[i].notify_send_fd = fds[1]; //....... //執行緒資源初始化,主要是執行緒的libevent例項初始化,套接字fd佇列等資源初始化 setup_thread(&threads[i]); //....... } //....... for (i = 0; i < nthreads; i++) { //建立work執行緒 //worker_libevent是work執行緒函式 create_worker(worker_libevent, &threads[i]); } } static void setup_thread(LIBEVENT_THREAD *me) { #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101 struct event_config *ev_config; ev_config = event_config_new(); event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK); me->base = event_base_new_with_config(ev_config); event_config_free(ev_config); #else me->base = event_init(); #endif //me->base 將一個libevent例項儲存到LIBEVENT_THREAD中 //每個執行緒都維護著自己的libevent例項,用於監聽檔案控制代碼事件的發生 //me->base這個libevent例項主要用於監聽"客戶端fd連線"是否有可讀事件 //....... //notify_event也是一個libevent例項,主要用來監聽當前work執行緒是否有"管道事件" //當主執行緒向me->notify_send_fd管道寫'c'的時候 //me->notify_receive_fd接收管道有資料可讀,那麼就會觸發thread_libevent_process回撥函式 //thread_libevent_process管道回撥函式 event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); event_base_set(me->base, &me->notify_event); if (event_add(&me->notify_event, 0) == -1) { fprintf(stderr, "Can't monitor libevent notify pipe\n"); exit(1); } //....... //執行緒套接字fd佇列資源初始化 me->new_conn_queue = malloc(sizeof(struct conn_queue)); if (me->new_conn_queue == NULL) { perror("Failed to allocate memory for connection queue"); exit(EXIT_FAILURE); } cq_init(me->new_conn_queue); //....... } //建立執行緒work執行緒函式 typedef void *(*func)(void *)pfunc; static void create_worker(pfunc func, void *arg) { pthread_attr_tattr; intret; pthread_attr_init(&attr); if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) { fprintf(stderr, "Can't create thread: %s\n", strerror(ret)); exit(1); } } /* * Worker thread: main event loop */ static void *worker_libevent(void *arg) { LIBEVENT_THREAD *me = arg; //....... //libevent事件迴圈監聽,如果有事件發生,就會觸發對應的業務回撥函式 event_base_loop(me->base, 0); event_base_free(me->base); return NULL; }
thread_libevent_process管道回撥函式,例如:當主執行緒監聽到有客戶端連線過濾,主執行緒accept之後返回該客戶端的fd控制代碼,主執行緒不會將fd直接加入到work執行緒的libevent中進行可讀事件的監聽,而是分兩步進行,第一步:將fd封裝一個item節點,將item節點放入到一個work執行緒的item佇列中。第二步:通過管道寫入'c'訊息,通知對應的work執行緒到它自己的item佇列中取item節點,然後加入到自己的libevent中對fd進行可讀事件的監聽。而這個thread_libevent_process就是管道可讀事件的回撥觸發函式。
//管道事件回撥函式 static void thread_libevent_process(int fd, short which, void *arg) { //執行緒資源 LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; conn *c; unsigned int timeout_fd; //notify_receive_fd讀管道讀1個位元組資料 if (read(fd, buf, 1) != 1) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); return; } switch (buf[0]) { case 'c': //'c'訊息就是主執行緒將客戶端新連線加到work執行緒new_conn_queue佇列中,並通過notify_send_fd通知work執行緒去取 item = cq_pop(me->new_conn_queue); //item 客戶端fd封裝結構 if (NULL == item) { break; } switch (item->mode) { case queue_new_conn: //item->init_state = conn_new_cmd 新連線 c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); if (c == NULL) { //....... } else { //c->thread儲存work執行緒LIBEVENT_THREAD資源 //這樣fd會話連線就可以訪問該work主執行緒的LIBEVENT_THREAD相關資源 c->thread = me; } break; case queue_redispatch: conn_worker_readd(item->c); break; } cqi_free(item); break; /* we were told to pause and report in */ case 'p': register_thread_initialized(); break; /* a client socket timed out */ case 't': //超時執行緒的通知訊息 //超時執行緒會定時掃描conns連線會話,如果發現某個連線會話超時了, //那麼就會寫't'訊息通知該會話對應的work執行緒關閉該會話 if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) { if (settings.verbose > 0) fprintf(stderr, "Can't read timeout fd from libevent pipe\n"); return; } //關閉會話 conn_close_idle(conns[timeout_fd]); break; } }
1.conn_new主要是針對新連線會話做資源分配與初始化,以及將fd套接字加入到libevent中進行監聽.在主執行緒和work執行緒中都有涉及
2.drive_machine是一個非常重要的函式,它內部會根據 conn
連線的具體狀態,選擇執行相應的業務處理邏輯,我們這裡可以理解為一個狀態機
conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base) { //conn儲存了sfd連線的相關資訊,我在這裡將其理解為一個會話 //conns[MAX]陣列就是維護著當前系統所有的連線會話 conn *c; c = conns[sfd]; if (NULL == c) { c = (conn *)calloc(1, sizeof(conn)); //....... c->sfd = sfd; conns[sfd] = c; } //事件回撥函式event_handler,通過c->state狀態值呼叫相應的業務邏輯處理 c->state = init_state; //....... //將套接字控制代碼sfd加入到libevent例項中進行監聽 //當sfd套接字有可讀事件發生的時候 libevent會回撥event_handler函式 //回撥函式event_handler就是具體的業務邏輯處理 event_set(&c->event, sfd, event_flags, event_handler, (void *)c); event_base_set(base, &c->event); event_add(&c->event, 0); //....... return c; } //事件回撥函式 void event_handler(const int fd, const short which, void *arg) { conn *c; c = (conn *)arg; //....... //狀態機 drive_machine(c); //....... } static void drive_machine(conn *c) { bool stop = false; int sfd; //....... while (!stop) { switch(c->state) { case conn_listening: //....... sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); //....... //服務端accept一個新的客戶端連線控制代碼sfd //將該sfd通過輪訓的方式分發到對應work執行緒的fd佇列中,然後通過管道通知 //對應work執行緒到對應fd佇列中去取客戶端連線控制代碼 //work執行緒獲取該sfd後將其加入work執行緒的libevent進行監聽,work執行緒libevent主要監聽sfd是否有可讀資料 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, c->transport); stop = true; break; case conn_waiting: //....... break; case conn_read: //....... break; case conn_parse_cmd: //....... break; case conn_new_cmd: //....... reset_cmd_handler(c); //....... break; case conn_nread: //....... break; //....... case conn_max_state: //....... break; } } //....... } //分發一個新的連線到其他執行緒 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { //將sfd封裝到CQ_ITEM節點中 CQ_ITEM *item = cqi_new(); char buf[1]; //通過輪訓的方式確定一個work執行緒的編號 //settings.num_threads work執行緒數目 int tid = (last_thread + 1) % settings.num_threads; //LIBEVENT_THREAD 該結構儲存了執行緒相關資源 //threads全域性變數 //threads + tid 對應執行緒相關資源 LIBEVENT_THREAD *thread = threads + tid; last_thread = tid; item->sfd = sfd; //init_state = conn_new_cmd //新連線 item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; item->mode = queue_new_conn; //將封裝好sfd的節點push到對應work執行緒fd佇列中 cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); //’c'新連線標誌 buf[0] = 'c'; //將標誌寫入notify_send_fd管道 //work執行緒發現notify_receive_fd管道有資料傳送過來,就會觸發管道事件的回撥函式 if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } } static void reset_cmd_handler(conn *c) { //....... if (c->rbytes > 0) { //已經讀取了客戶端的資料,那麼設定狀態為conn_parse_cmd, //提示狀態機drive_machine下一步執行conn_parse_cmd邏輯處理 conn_set_state(c, conn_parse_cmd); } else { //還未讀取資料,是一個新連線 //將狀態設定為conn_waiting,提示狀態機下一步執行conn_waiting邏輯處理 //conn_waiting中執行了修改update_event(c, EV_READ | EV_PERSIST),用於監聽可讀事件 conn_set_state(c, conn_waiting); } //....... }