1. 程式人生 > >Memcached原始碼分析-網路模型(1)

Memcached原始碼分析-網路模型(1)

1 網路模型

Memcached採用了,單程序多執行緒的工作方式,同時採用了libevent事件驅動進行網路請求的處理。

2 工作原理

2.1 libevent介紹

2.2 網路請求流程

2.2.1 流程圖

在這裡插入圖片描述

2.2.2 主執行緒工作流程分析

主執行緒工作流程:
1 主執行緒主要在memcached.c的main()函式中。
2 socket_sdf = new_socket:建立一個新的socket。
3 setsockopt:配置socket資訊。
4 bind:繫結ip地址資訊。
5 listen:監聽埠。
6 向核心註冊事件
事件監聽物件:socket_sdf
事件監聽事件:read
事件處理函式:event_handle
7 阻塞等待核心事件觸發。這裡主要使用的是libevent庫中,網路I/O模型中的epoll模型,主要是呼叫epoll_wait()等待作業系統返回準備好的事件。每當socket_sdf 上面有read動作的時候,就會回撥event_handle。
8 事件準備好了之後,回撥函式event_handle工作流程:
呼叫drive_machine;接收連線請求con_accept_sfd = accept(;呼叫dispatch_conn_new分發請求
9 dispatch_conn_new工作流程:
a) 選擇工作執行緒:tid = (上次分配執行緒id + 1) % 工作執行緒數量
b)將連線請求放到工作執行緒的連線佇列中
c) 向工作執行緒監聽的管道讀通道中寫入c,驅動工作執行緒事件觸發
10 進入事件迴圈。

2.2.3 工作執行緒工作流程分析

工作執行緒工作流程圖分析:
1 初始化自己監聽的管道pipe(fds)
2 初始化自己的連線佇列new_conn_queue
3 向核心註冊事件
事件監聽物件:pipe_receive
事件監聽事件:read
事件處理函式:thread_libevent_process
4 阻塞等待核心事件的發生
5 如果是管道事件觸發
a) 呼叫thread_libevent_process回撥處理函式:從佇列中取出一個連線請求cq_pop。呼叫conn_new函式
b)向核心註冊監聽事件,conn_accept_sfd是主執行緒分配的連線請求。 監聽物件:conn_accept_sfd。 監聽事件:read;監聽回撥函式:event_handler
6 如果是連線請求事件觸發
a)呼叫event_handler回撥處理函式:呼叫drive_machine
b) drive_machine:狀態機處理:read()資料; 解析命令
; 根據命令處理請求; write()資料; close()請求
7 進入事件迴圈。

3 原始碼分析

3.1 原始碼地址

3.2 主執行緒原始碼分析

1 memcached.c的main函式中。主要通過server_socket建立一個socket。

//函式入口
int main (int argc, char **argv) 
{/*{{{*/
    

    /* create the listening socket, bind it, and init */
    if (settings.socketpath == NULL) {
        const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME"
); char temp_portnumber_filename[PATH_MAX]; FILE *portnumber_file = NULL; if (portnumber_filename != NULL) { snprintf(temp_portnumber_filename, sizeof(temp_portnumber_filename), "%s.lck", portnumber_filename); portnumber_file = fopen(temp_portnumber_filename, "a"); if (portnumber_file == NULL) { fprintf(stderr, "Failed to open \"%s\": %s\n", temp_portnumber_filename, strerror(errno)); } } errno = 0; //建立tcp socket if (settings.port && server_sockets(settings.port, tcp_transport, portnumber_file)) { vperror("failed to listen on TCP port %d", settings.port); exit(EX_OSERR); } //建立udp socket errno = 0; if (settings.udpport && server_sockets(settings.udpport, udp_transport, portnumber_file)) { vperror("failed to listen on UDP port %d", settings.udpport); exit(EX_OSERR); } } /* enter the event loop 主執行緒事件驅動迴圈 */ if (event_base_loop(main_base, 0) != 0) { retval = EXIT_FAILURE; } }/*}}}*/

2 server_socket主要是建立socket,bind,listen

static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
  

    for (next= ai; next; next= next->ai_next) 
    {/*{{{*/
        conn *listen_conn_add;
        if ((sfd = new_socket(next)) == -1) {
            /* getaddrinfo can return "junk" addresses,
             * we make sure at least one works before erroring.
             */
            if (errno == EMFILE) {
                /* ...unless we're out of fds */
                perror("server_socket");
                exit(EX_OSERR);
            }
            continue;
        }

#ifdef IPV6_V6ONLY
        if (next->ai_family == AF_INET6) {
            error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
            if (error != 0) {
                perror("setsockopt");
                close(sfd);
                continue;
            }
        }
#endif

        setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
        if (IS_UDP(transport)) {
            maximize_sndbuf(sfd);
        } else {
            error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
            if (error != 0)
                perror("setsockopt");

            error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
            if (error != 0)
                perror("setsockopt");

            error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
            if (error != 0)
                perror("setsockopt");
        }

        //bind
        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
            if (errno != EADDRINUSE) {
                perror("bind()");
                close(sfd);
                freeaddrinfo(ai);
                return 1;
            }
            close(sfd);
            continue;
        } else {
            success++;
            //監聽list
            if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
                perror("listen()");
                close(sfd);
                freeaddrinfo(ai);
                return 1;
            }
            if (portnumber_file != NULL &&
                (next->ai_addr->sa_family == AF_INET ||
                 next->ai_addr->sa_family == AF_INET6)) {
                union {
                    struct sockaddr_in in;
                    struct sockaddr_in6 in6;
                } my_sockaddr;
                socklen_t len = sizeof(my_sockaddr);
                if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
                    if (next->ai_addr->sa_family == AF_INET) {
                        fprintf(portnumber_file, "%s INET: %u\n",
                                IS_UDP(transport) ? "UDP" : "TCP",
                                ntohs(my_sockaddr.in.sin_port));
                    } else {
                        fprintf(portnumber_file, "%s INET6: %u\n",
                                IS_UDP(transport) ? "UDP" : "TCP",
                                ntohs(my_sockaddr.in6.sin6_port));
                    }
                }
            }
        }

        if (IS_UDP(transport)) 
        {/*{{{*/
            int c;

            for (c = 0; c < settings.num_threads_per_udp; c++) {
              
                int per_thread_fd = c ? dup(sfd) : sfd;
                dispatch_conn_new(per_thread_fd, conn_read,
                                  EV_READ | EV_PERSIST,
                                  UDP_READ_BUFFER_SIZE, transport);
            }
        } 
        else 
        {
            //新建立連線事件
            if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;
        }
    }/*}}}*/

    freeaddrinfo(ai);

    /* Return zero iff we detected no errors in starting up connections */
    return success == 0;
}

3 conn_new主要是建立建立一個連線結構體conn 。並且註冊監聽事件。

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    conn *c;

    assert(sfd >= 0 && sfd < max_fds);
    c = conns[sfd];

    if (NULL == c) {
        if (!(c = (conn *)calloc(1, sizeof(conn)))) {
            STATS_LOCK();
            stats.malloc_fails++;
            STATS_UNLOCK();
            fprintf(stderr, "Failed to allocate connection object\n");
            return NULL;
        }
        MEMCACHED_CONN_CREATE(c);

        c->rbuf = c->wbuf = 0;
        c->ilist = 0;
        c->suffixlist = 0;
        c->iov = 0;
        c->msglist = 0;
        c->hdrbuf = 0;

        c->rsize = read_buffer_size;
        c->wsize = DATA_BUFFER_SIZE;
        c->isize = ITEM_LIST_INITIAL;
        c->suffixsize = SUFFIX_LIST_INITIAL;
        c->iovsize = IOV_LIST_INITIAL;
        c->msgsize = MSG_LIST_INITIAL;
        c->hdrsize = 0;

        c->rbuf = (char *)malloc((size_t)c->rsize);
        c->wbuf = (char *)malloc((size_t)c->wsize);
        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);

        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
                c->msglist == 0 || c->suffixlist == 0) {
            conn_free(c);
            STATS_LOCK();
            stats.malloc_fails++;
            STATS_UNLOCK();
            fprintf(stderr, "Failed to allocate buffers for connection\n");
            return NULL;
        }

        STATS_LOCK();
        stats.conn_structs++;
        STATS_UNLOCK();

        c->sfd = sfd;
        conns[sfd] = c;
    }

    c->transport = transport;
    c->protocol = settings.binding_protocol;

    /* unix socket mode doesn't need this, so zeroed out.  but why
     * is this done for every command?  presumably for UDP
     * mode.  */
    if (!settings.socketpath) {
        c->request_addr_size = sizeof(c->request_addr);
    } else {
        c->request_addr_size = 0;
    }

    if (transport == tcp_transport && init_state == conn_new_cmd) {
        if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
                        &c->request_addr_size)) {
            perror("getpeername");
            memset(&c->request_addr, 0, sizeof(c->request_addr));
        }
    }

    if (settings.verbose > 1) {
        if (init_state == conn_listening) {
            fprintf(stderr, "<%d server listening (%s)\n", sfd,
                prot_text(c->protocol));
        } else if (IS_UDP(transport)) {
            fprintf(stderr, "<%d server listening (udp)\n", sfd);
        } else if (c->protocol == negotiating_prot) {
            fprintf(stderr, "<%d new auto-negotiating client connection\n",
                    sfd);
        } else if (c->protocol == ascii_prot) {
            fprintf(stderr, "<%d new ascii client connection.\n", sfd);
        } else if (c->protocol == binary_prot) {
            fprintf(stderr, "<%d new binary client connection.\n", sfd);
        } else {
            fprintf(stderr, "<%d new unknown (%d) client connection\n",
                sfd, c->protocol);
            assert(false);
        }
    }

    c->state = init_state;
    c->rlbytes = 0;
    c->cmd = -1;
    c->rbytes = c->wbytes = 0;
    c->wcurr = c->wbuf;
    c->rcurr = c->rbuf;
    c->ritem = 0;
    c->icurr = c->ilist;
    c->suffixcurr = c->suffixlist;
    c->ileft = 0;
    c->suffixleft = 0;
    c->iovused = 0;
    c->msgcurr = 0;
    c->msgused = 0;
    c->authenticated = false;

    c->write_and_go = init_state;
    c->write_and_free = 0;
    c->item = 0;

    c->noreply = false;

    //設定事件,處理方式是event_handler
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);


    event_base_set(base, &c->event);
    c->ev_flags = event_flags;

    //註冊事件
    if (event_add(&c->event, 0) == -1) {
        perror("event_add");
        return NULL;
    }

    STATS_LOCK();
    stats.curr_conns++;
    stats.total_conns++;
    STATS_UNLOCK();

    MEMCACHED_CONN_ALLOCATE(c->sfd);

    return c;
}

3.3 工作執行緒原始碼分析

1 主要函式中初始化工作執行緒。memcached.c中main()函式中呼叫thread_init初始化工作執行緒。thread_init函式主要在thread.c函式中。

/*
 * Initializes the thread subsystem, creating various worker threads.
 * 初始化執行緒系統
 *
 * nthreads  Number of worker event handler threads to spawn 工作執行緒的數量
 * main_base Event base for main thread 主執行緒的基礎時間
 */
void thread_init(int nthreads, struct event_base *main_base) {
    int         i;
    int         power;

    pthread_mutex_init(&cache_lock, NULL); //初始化cache鎖
    pthread_mutex_init(&stats_lock, NULL); //初始化全域性統鎖

    pthread_mutex_init(&init_lock, NULL); //初始化init鎖
    pthread_cond_init(&init_cond, NULL); //該函式按引數attr指定的屬性建立一個條件變數。呼叫成功返回,   

    pthread_mutex_init(&cqi_freelist_lock, NULL); //初始化cqi_freelist鎖
    cqi_freelist = NULL;

    /* Want a wide lock table, but don't waste memory */
    if (nthreads < 3) {
        power = 10;
    } else if (nthreads < 4) {
        power = 11;
    } else if (