1. 程式人生 > >Memcached原始碼分析之基於Libevent的網路模型(1)

Memcached原始碼分析之基於Libevent的網路模型(1)

文章列表:

《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》

關於Memcached:

memcached是一款非常普及的伺服器端快取軟體,memcached主要是基於Libevent庫進行開發的。

如果你還不瞭解libevent相關知識,請先看我的libevent這篇文章《Linux c 開發 - libevent 》

memcached也是使用autotools的進行程式碼編譯管理的,如果你還不瞭解autotools,你可以先看下文章:《Linux c 開發 - Autotools使用詳細解讀 》

memcached你可以去官網上獲取原始碼。

官網地址

Memcached分析

1.  網路模型流程分析

Memcached主要是基於Libevent的事件庫來實現網路執行緒模型的。我們先需要下載memcached的原始碼包,上面我們已經給出了原始碼包下載地址。

Memcached的網路執行緒模型主要涉及兩個主要檔案:memcached.cthread.c檔案。

我們這邊主要分析tcp的模型。memcached也支援udp。

流程

1. memcached首先在主執行緒中會建立main_base,memcached的主執行緒的主要工作就是監聽和接收listen和accpet新進入的連線。

2. 當memcached啟動的時候會初始化N個worker thread工作執行緒,每個工作執行緒都會有自己的LIBEVENT_THREAD

資料結構來儲存執行緒的資訊(執行緒基本資訊、執行緒佇列、pipe資訊)。worker thread工作執行緒和main thread主執行緒之間主要通過pipe來進行通訊。

3. 當用戶有連線進來的時候,main thread主執行緒會通過求餘的方式選擇一個worker thread工作執行緒。

4. main thread會將當前使用者的連線資訊放入一個CQ_ITEM,並且將CQ_ITEM放入這個執行緒的conn_queue處理佇列,然後主執行緒會通過寫入pipe的方式來通知worker thread工作執行緒。

5. 當工作執行緒得到主執行緒main thread的通知後,就會去自己的conn_queue佇列中取得一條連線資訊進行處理,建立libevent的socket讀寫事件。

6. 工作執行緒會監聽使用者的socket,如果使用者有訊息傳遞過來,則會進行訊息解析和處理,返回相應的結果。

流程圖


資料結構:

1. CQ_ITEM:主要用於儲存使用者socket連線的基本資訊。

主執行緒會將使用者的socket連線資訊封裝成CQ_ITEM,並放入工作執行緒的處理佇列中。工作執行緒得到主執行緒的pipe通知後,就會將佇列中的ITEM取出來,建立libevent的socket讀事件。

/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int               sfd;   //socket的fd
    enum conn_states  init_state; //事件型別
    int               event_flags; //libevent的flags
    int               read_buffer_size; //讀取的buffer的size
    enum network_transport     transport; 
    CQ_ITEM          *next; //下一個item的地址
};

2. CQ:每個執行緒的處理佇列結構。

/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
    CQ_ITEM *head;
    CQ_ITEM *tail;
    pthread_mutex_t lock;
};

3. LIBEVENT_THREAD:每個工作執行緒的資料結構。

每一個工作執行緒都有有這麼一個自己的資料結構,主要儲存執行緒資訊、處理佇列、pipe資訊等。

typedef struct {
    //執行緒ID
    pthread_t thread_id;        /* unique ID of this thread */
    //執行緒的 event_base,每個執行緒都有自己的event_base
    struct event_base *base;    /* libevent handle this thread uses */
    //非同步event事件
    struct event notify_event;  /* listen event for notify pipe */
    //管道接收端
    int notify_receive_fd;      /* receiving end of notify pipe */
    //管道傳送端
    int notify_send_fd;         /* sending end of notify pipe */
    //執行緒狀態
    struct thread_stats stats;  /* Stats generated by this thread */
    //新連線佇列結構
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
    uint8_t item_lock_type;     /* use fine-grained or global item lock */
} LIBEVENT_THREAD;

2. main啟動入口

我們需要找到memcached.c中的main()方法。下面的程式碼中只列出了我們需要的重要部分。

int main (int argc, char **argv) {
    //...省去一部分程式碼
    /* initialize main thread libevent instance */
    //初始化一個event_base
    main_base = event_init();

    /* initialize other stuff */
    stats_init();
    assoc_init(settings.hashpower_init);
    conn_init();
    slabs_init(settings.maxbytes, settings.factor, preallocate);

    /*
     * ignore SIGPIPE signals; we can use errno == EPIPE if we
     * need that information
     */
    if (sigignore(SIGPIPE) == -1) {
        perror("failed to ignore SIGPIPE; sigaction");
        exit(EX_OSERR);
    }
    /* start up worker threads if MT mode */
    //這邊非常重要,這個方法主要用來建立工作執行緒,預設會建立4個工作執行緒
    thread_init(settings.num_threads, main_base);

    if (start_assoc_maintenance_thread() == -1) {
        exit(EXIT_FAILURE);
    }

    if (settings.slab_reassign &&
        start_slab_maintenance_thread() == -1) {
        exit(EXIT_FAILURE);
    }

    /* Run regardless of initializing it later */
    init_lru_crawler();

    /* initialise clock event */
    clock_handler(0, 0, 0);

    /* create unix mode sockets after dropping privileges */
    if (settings.socketpath != NULL) {
        errno = 0;
        if (server_socket_unix(settings.socketpath,settings.access)) {
            vperror("failed to listen on UNIX socket: %s", settings.socketpath);
            exit(EX_OSERR);
        }
    }

    /* create the listening socket, bind it, and init */
    if (settings.socketpath == NULL) {
        const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
        char temp_portnumber_filename[PATH_MAX];
        FILE *portnumber_file = NULL;

        if (portnumber_filename != NULL) {
            snprintf(temp_portnumber_filename,
                     sizeof(temp_portnumber_filename),
                     "%s.lck", portnumber_filename);

            portnumber_file = fopen(temp_portnumber_filename, "a");
            if (portnumber_file == NULL) {
                fprintf(stderr, "Failed to open \"%s\": %s\n",
                        temp_portnumber_filename, strerror(errno));
            }
        }

        errno = 0;
        //這邊的server_sockets方法主要是socket的bind、listen、accept等操作
        //主執行緒主要用於接收客戶端的socket連線,並且將連線交給工作執行緒接管。
        if (settings.port && server_sockets(settings.port, tcp_transport,
                                           portnumber_file)) {
            vperror("failed to listen on TCP port %d", settings.port);
            exit(EX_OSERR);
        }
    }
    /* enter the event loop */
    //這邊開始進行主執行緒的事件迴圈
    if (event_base_loop(main_base, 0) != 0) {
        retval = EXIT_FAILURE;
    }
 //...省去一部分程式碼
}

主執行緒中主要是通過thread_init方法去建立N個工作執行緒:

thread_init(settings.num_threads, main_base); 

通過server_sockets方法去建立socket server:

        errno = 0;
        if (settings.port && server_sockets(settings.port, tcp_transport,
                                           portnumber_file)) {
            vperror("failed to listen on TCP port %d", settings.port);
            exit(EX_OSERR);
        }

3. worker thread工作執行緒原始碼分析

我們在thread.c檔案中找到thread_init這個方法:

void thread_init(int nthreads, struct event_base *main_base) {
    //...省了一部分程式碼
    //這邊通過迴圈的方式建立nthreads個執行緒
    //nthreads應該是可以設定的
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        //這邊會建立pipe,主要用於主執行緒和工作執行緒之間的通訊
        if (pipe(fds)) {
            perror("Can't create notify pipe");
            exit(1);
        }
        //threads是工作執行緒的基本結構:LIBEVENT_THREAD
        //將pipe接收端和寫入端都放到工作執行緒的結構體中
        threads[i].notify_receive_fd = fds[0]; //接收端
        threads[i].notify_send_fd = fds[1]; //寫入端

        //這個方法非常重要,主要是建立每個執行緒自己的libevent的event_base
        setup_thread(&threads[i]);
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;
    }

    /* Create threads after we've done all the libevent setup. */
    //這裡是迴圈建立執行緒
    //執行緒建立的回撥函式是worker_libevent
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }

    /* Wait for all the threads to set themselves up before returning. */
    pthread_mutex_lock(&init_lock);
    wait_for_thread_registration(nthreads);
    pthread_mutex_unlock(&init_lock);
}
setup_thread方法:
/*
 * Set up a thread's information.
 */
static void setup_thread(LIBEVENT_THREAD *me) {
    //建立一個event_base
    //根據libevent的使用文件,我們可以知道一般情況下每個獨立的執行緒都應該有自己獨立的event_base
    me->base = event_init();
    if (! me->base) {
        fprintf(stderr, "Can't allocate event base\n");
        exit(1);
    }

    /* Listen for notifications from other threads */
    //這邊非常重要,這邊主要建立pipe的讀事件EV_READ的監聽
    //當pipe中有寫入事件的時候,libevent就會回撥thread_libevent_process方法
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);
    //新增事件操作
    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }
    //初始化一個工作佇列
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }
    cq_init(me->new_conn_queue);

    //初始化執行緒鎖
    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
        perror("Failed to initialize mutex");
        exit(EXIT_FAILURE);
    }

    me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
                                    NULL, NULL);
    if (me->suffix_cache == NULL) {
        fprintf(stderr, "Failed to create suffix cache\n");
        exit(EXIT_FAILURE);
    }
}
create_worker方法:
/*
 * Creates a worker thread.
 */
//這個方法是真正的建立工作執行緒
static void create_worker(void *(*func)(void *), void *arg) {
    pthread_t       thread;
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);
    //這邊真正的建立執行緒
    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
        fprintf(stderr, "Can't create thread: %s\n",
                strerror(ret));
        exit(1);
    }
}

worker_libevent方法:

/*
 * Worker thread: main event loop
 */
static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;

    /* Any per-thread setup can happen here; thread_init() will block until
     * all threads have finished initializing.
     */

    /* set an indexable thread-specific memory item for the lock type.
     * this could be unnecessary if we pass the conn *c struct through
     * all item_lock calls...
     */
    me->item_lock_type = ITEM_LOCK_GRANULAR;
    pthread_setspecific(item_lock_type_key, &me->item_lock_type);

    register_thread_initialized();
    //這個方法主要是開啟事件的迴圈
    //每個執行緒中都會有自己獨立的event_base和事件的迴圈機制
    //memcache的每個工作執行緒都會獨立處理自己接管的連線
    event_base_loop(me->base, 0);
    return NULL;
}

thread_libevent_process方法:

static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];

    //回撥函式中回去讀取pipe中的資訊
    //主執行緒中如果有新的連線,會向其中一個執行緒的pipe中寫入1
    //這邊讀取pipe中的資料,如果為1,則說明從pipe中獲取的資料是正確的
    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");

    switch (buf[0]) {
    case 'c':
    //從工作執行緒的佇列中獲取一個CQ_ITEM連線資訊
    item = cq_pop(me->new_conn_queue);
    //如果item不為空,則需要進行連線的接管
    if (NULL != item) {
    	//conn_new這個方法非常重要,主要是建立socket的讀寫等監聽事件。
    	//init_state 為初始化的型別,主要在drive_machine中通過這個狀態類判斷處理型別
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, "Can't listen for events on UDP socket\n");
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, "Can't listen for events on fd %d\n",
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
        break;
    /* we were told to flip the lock type and report in */
    case 'l':
    me->item_lock_type = ITEM_LOCK_GRANULAR;
    register_thread_initialized();
        break;
    case 'g':
    me->item_lock_type = ITEM_LOCK_GLOBAL;
    register_thread_initialized();
        break;
    }
}
conn_new方法(主要看兩行):
    //我們發現這個方法中又在建立event了,這邊實際上是監聽socket的讀寫等事件
    //主執行緒主要是監聽使用者的socket連線事件;工作執行緒主要監聽socket的讀寫事件
    //當用戶socket的連線有資料傳遞過來的時候,就會呼叫event_handler這個回撥函式
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event); 
    c->ev_flags = event_flags;
    //將事件新增到libevent的loop迴圈中
    if (event_add(&c->event, 0) == -1) {
        perror("event_add");
        return NULL;
    }
event_handler方法:
void event_handler(const int fd, const short which, void *arg) {
    conn *c;
    //組裝conn結構
    c = (conn *)arg;
    assert(c != NULL);

    c->which = which;

    /* sanity */
    if (fd != c->sfd) {
        if (settings.verbose > 0)
            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
        conn_close(c);
        return;
    }
    //最終轉交給了drive_machine這個方法
    //memcache的大部分的網路事件都是由drive_machine這個方法來處理的
    //drive_machine這個方法主要通過c->state這個事件的型別來處理不同型別的事件   
    drive_machine(c);

    /* wait for next event */
    return;
}
然後繼續看最重要的,也是核心的處理事件的方法drive_machine(狀態機),監聽socket連線、監聽socket的讀寫、斷開連線等操作都是在drive_machine這個方法中實現的。而這些操作都是通過c->state這個狀態來判斷不同的操作型別。
static void drive_machine(conn *c) {
     //....................

    assert(c != NULL);

    while (!stop) {
        //這邊通過state來處理不同型別的事件
        switch(c->state) {
        //這邊主要處理tcp連線,只有在主執行緒的下,才會執行listening監聽操作。
        case conn_listening:
            addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
            if (use_accept4) {
                sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
            } else {
                sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
            }
#else
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
            //......................

            if (settings.maxconns_fast &&
                stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
                str = "ERROR Too many open connections\r\n";
                res = write(sfd, str, strlen(str));
                close(sfd);
                STATS_LOCK();
                stats.rejected_conns++;
                STATS_UNLOCK();
            } else {
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);
            }

            stop = true;
            break;

        //連線等待
        case conn_waiting:
            //.........
            break;

        //讀取事件
        //例如有使用者提交資料過來的時候,工作執行緒監聽到事件後,最終會呼叫這塊程式碼
        case conn_read:
            res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);

            switch (res) {
            case READ_NO_DATA_RECEIVED:
                conn_set_state(c, conn_waiting);
                break;
            case READ_DATA_RECEIVED:
                conn_set_state(c, conn_parse_cmd);
                break;
            case READ_ERROR:
                conn_set_state(c, conn_closing);
                break;
            case READ_MEMORY_ERROR: /* Failed to allocate more memory */
                /* State already set by try_read_network */
                break;
            }
            break;

        case conn_parse_cmd :
            if (try_read_command(c) == 0) {
                /* wee need more data! */
                conn_set_state(c, conn_waiting);
            }

            break;

        case conn_new_cmd:
            /* Only process nreqs at a time to avoid starving other
               connections */

            --nreqs;
            if (nreqs >= 0) {
                reset_cmd_handler(c);
            } else {
                pthread_mutex_lock(&c->thread->stats.mutex);
                c->thread->stats.conn_yields++;
                pthread_mutex_unlock(&c->thread->stats.mutex);
                if (c->rbytes > 0) {
                    /* We have already read in data into the input buffer,
                       so libevent will most likely not signal read events
                       on the socket (unless more data is available. As a
                       hack we should just put in a request to write data,
                       because that should be possible ;-)
                    */
                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
                        if (settings.verbose > 0)
                            fprintf(stderr, "Couldn't update event\n");
                        conn_set_state(c, conn_closing);
                        break;
                    }
                }
                stop = true;
            }
            break;

        case conn_nread:
           //....................
            break;

        case conn_swallow:
          //....................
            break;

        case conn_write:
           //.................
            break;
         
        //連線關閉
        case conn_closing:
            if (IS_UDP(c->transport))
                conn_cleanup(c);
            else
                conn_close(c);
            stop = true;
            break;

        case conn_closed:
            /* This only happens if dormando is an idiot. */
            abort();
            break;

        case conn_max_state:
            assert(false);
            break;
        }
    }

    return;
}

4. main thread主執行緒原始碼分析

主執行緒的socket server主要通過server_sockets這個方法建立。而server_sockets中主要呼叫了server_socket這個方法,我們可以看下server_socket這個方法:

/**
 * Create a socket and bind it to a specific port number
 * @param interface the interface to bind to
 * @param port the port number to bind to
 * @param transport the transport protocol (TCP / UDP)
 * @param portnumber_file A filepointer to write the port numbers to
 *        when they are successfully added to the list of ports we
 *        listen on.
 */
static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file) {
            //建立一個新的事件
            //我們發現上面的工作執行緒也是呼叫這個方法,但是區別是這個方法指定了state的型別為:conn_listening
            //注意這邊有一個conn_listening,這個引數主要是指定呼叫drive_machine這個方法中的conn_listen程式碼塊。
            if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;
}
conn_new方法:
    //我們發現這個方法中又在建立event了,這邊實際上是監聽socket的讀寫等事件
    //主執行緒主要是監聽使用者的socket連線事件;工作執行緒主要監聽socket的讀寫事件
    //當用戶socket的連線有資料傳遞過來的時候,就會呼叫event_handler這個回撥函式
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event); 
    c->ev_flags = event_flags;
    //將事件新增到libevent的loop迴圈中
    if (event_add(&c->event, 0) == -1) {
        perror("event_add");
        return NULL;
    }
然後我們跟蹤進入event_handler這個方法,並且進入drive_machine這個方法,我們上面說過server_socket這個方法中傳遞的state引數為預設寫死的conn_listening這個狀態,所以我們詳細看drive_machine中關於conn_listening這塊邏輯的程式碼。
 case conn_listening:
            addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
            //我們可以看到下面的程式碼是accept,接受客戶端的socket連線的程式碼
            if (use_accept4) {
                sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
            } else {
                sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
            }
#else
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
            if (sfd == -1) {
                if (use_accept4 && errno == ENOSYS) {
                    use_accept4 = 0;
                    continue;
                }
                perror(use_accept4 ? "accept4()" : "accept()");
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    /* these are transient, so don't log anything */
                    stop = true;
                } else if (errno == EMFILE) {
                    if (settings.verbose > 0)
                        fprintf(stderr, "Too many open connections\n");
                    accept_new_conns(false);
                    stop = true;
                } else {
                    perror("accept()");
                    stop = true;
                }
                break;
            }
            if (!use_accept4) {
                if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
                    perror("setting O_NONBLOCK");
                    close(sfd);
                    break;
                }
            }

            if (settings.maxconns_fast &&
                stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
                str = "ERROR Too many open connections\r\n";
                res = write(sfd, str, strlen(str));
                close(sfd);
                STATS_LOCK();
                stats.rejected_conns++;
                STATS_UNLOCK();
            } else {
                //如果客戶端用socket連線上來,則會呼叫這個分發邏輯的函式
                //這個函式會將連線資訊分發到某一個工作執行緒中,然後工作執行緒接管具體的讀寫操作
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);
            }

            stop = true;
            break;

dispatch_conn_new方法:

/*
 * Dispatches a new connection to another thread. This is only ever called
 * from the main thread, either during initialization (for UDP) or because
 * of an incoming connection.
 */
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    //每個連線連上來的時候,都會申請一塊CQ_ITEM的記憶體塊,用於儲存連線的基本資訊
	CQ_ITEM *item = cqi_new();
    char buf[1];
    //如果item建立失敗,則關閉連線
    if (item == NULL) {
        close(sfd);
        /* given that malloc failed this may also fail, but let's try */
        fprintf(stderr, "Failed to allocate memory for connection object\n");
        return ;
    }
    //這個方法非常重要。主要是通過求餘數的方法來得到當前的連線需要哪個執行緒來接管
    //而且last_thread會記錄每次最後一次使用的工作執行緒,每次記錄之後就可以讓工作執行緒進入一個輪詢,保證了每個工作執行緒處理的連線數的平衡
    int tid = (last_thread + 1) % settings.num_threads;

    //獲取執行緒的基本結構
    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;
    //向工作執行緒的佇列中放入CQ_ITEM
    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    //向工作執行緒的pipe中寫入1
    //工作執行緒監聽到pipe中有寫入資料,工作執行緒接收到通知後,就會向thread->new_conn_queue佇列中pop出一個item,然後進行連線的接管操作
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}

相關推薦

Memcached原始碼分析基於Libevent網路模型1

文章列表: 《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》 關於Memcached: memcached是一款非常普及的伺服器端快取軟體,memcached主要是基於Libevent庫進行開發的。 如果你還不瞭解libev

Memcached原始碼分析增刪改查操作5

文章列表: 《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》 前言 在看Memcached的增刪改查操作前,我們先來看一下process_command方法。Memcached解析命令之後,就通過process_comman

Memcached原始碼分析-網路模型1

1 網路模型 Memcached採用了,單程序多執行緒的工作方式,同時採用了libevent事件驅動進行網路請求的處理。 2 工作原理 2.1 libevent介紹 2.2 網路請求流程 2.2.1 流程圖 2.2.2 主執行緒工作流程分析 主執行緒工作流

muduo原始碼分析實現TCP網路連線的接收和關閉

在EventLoop、Channel、Poller三個類中完成對一般描述符、事件迴圈(poll)的封裝。實現了Reactor的基本功能,接下來則需要將網路套接字描述符、I/O函式、等進行封裝。 1.傳統的TcpServer 在進行封裝之前需要明確我們需要

Spark core原始碼分析spark叢集的啟動

2.2 Worker的啟動 org.apache.spark.deploy.worker 1 從Worker的伴生物件的main方法進入 在main方法中首先是得到一個SparkConf例項conf,然後將conf和啟動Worker傳入的引數封裝得到Wor

spark mllib原始碼分析隨機森林(Random Forest)

4. 特徵處理 這部分主要在DecisionTree.scala的findSplitsBins函式,將所有特徵封裝成Split,然後裝箱Bin。首先對split和bin的結構進行說明 4.1. 資料結構 4.1.1. Split cl

spark mllib原始碼分析隨機森林(Random Forest)

6. 隨機森林訓練 6.1. 資料結構 6.1.1. Node 樹中的每個節點是一個Node結構 class Node @Since("1.2.0") ( @Since("1.0.0") val id: Int, @S

android原始碼分析View的事件分發

1、View的繼承關係圖 View的繼承關係圖如下: 其中最重要的子類為ViewGroup,View是所有UI元件的基類,而ViewGroup是容納這些元件的容器,同時它也是繼承於View類。而UI元件的繼承關係如上圖,比較常用的元件類用紅色字型標出

原始碼分析基於LinkedList手寫HahMap(二)

package com.mayikt.extLinkedListHashMap; import java.util.LinkedList; import java.util.concurrent.ConcurrentHashMap; /** * 基於linkedList實現hashMap *

原始碼分析基於ArrayList手寫HahMap(一)

import java.util.ArrayList; import java.util.List; /** * 基於arraylist實現hashmap集合(簡版:效率低) * @author zjmiec * */ public class ExtArrayListHashMap&

jvm原始碼分析oop-klass物件模型

概述 HotSpot是基於c++實現,而c++是一門面向物件的語言,本身具備面向物件基本特徵,所以Java中的物件表示,最簡單的做法是為每個Java類生成一個c++類與之對應。 但HotSpot JVM並沒有這麼做,而是設計了一個OOP-Klass Model。這裡的 O

Netty原始碼分析Reactor執行緒模型

一、背景  最近在研究netty的原始碼,今天發表一篇關於netty的執行緒框架--Reactor執行緒模型,作為最近研究成果。如果有還不瞭解Reactor模型請自行百度,網上有很多關於Reactor模式。  研究netty的時候,先看了下《netty權威指南》,裡面講解不

Memcached原始碼分析Hash表操作

Memcached的Hash表用來提高資料訪問效能,通過連結法來解決Hash衝突,當Hash表中資料多餘Hash表容量的1.5倍時,Hash表就會擴容,Memcached的Hash表操作沒什麼特別的,我們這裡簡單介紹下Memcached裡面的Hash表操作。 //hash表

Memcached原始碼分析記憶體管理篇item結構圖及slab結構圖

.Memcached原始碼分析之記憶體管理篇 部落格分類: linuxc  . 使用命令 set(key, value) 向 memcached 插入一條資料, memcached 內部是如何組織資料呢 一 把資料組裝成 item memcached 接受到客戶端的資料後

memcached原始碼分析

還是從Memcached.c檔案的main函式開始,逐步分析Memcached的實現 if (!sanitycheck()) { return EX_OSERR; }static bool sanitycheck(void) { /*

Memcached原始碼分析訊息迴應3

文章列表: 《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》 前言 上一章《Memcached原始碼分析 - Memcached原始碼分析之命令解析(2)》,我們花了很大的力氣去講解Memcached如何從客戶端讀取命令,並且

Memcached原始碼分析儲存機制Slabs7

文章列表: 《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》前言 這一章我們重點講解Memcached的儲存機制Slabs。Memcached儲存Item的程式碼都是在slabs.c中來實現的。 在解讀這一章前,我們必須先了

從壹開始微服務 [ DDD ] 十一 ║ 基於原始碼分析,命令分發的過程

緣起 哈嘍小夥伴週三好,老張又來啦,DDD領域驅動設計的第二個D也快說完了,下一個系列我也在考慮之中,是 Id4 還是 Dockers 還沒有想好,甚至昨天我還想,下一步是不是可以寫一個簡單的Angular 入門教程,本來是想來個前後端分離的教學視訊的,簡單試了試,發現自己的聲音不好聽,真心不好聽那種,就作

netty原始碼分析-EventLoop與執行緒模型1

執行緒模型確定來程式碼的執行方式,我們總是必須規避併發執行可能會帶來的副作用,所以理解netty所採用的併發模型的影響很重要。netty使用了被稱為事件迴圈的EventLoop來執行任務來處理在連線的生命週期內發生的事件 執行緒模型 對於Even

Mybatis深入原始碼分析基於裝飾模式純手寫一級,二級,三級快取

寫在前面:設計模式源於生活,而又高於生活! 什麼是裝飾者模式 在不改變原有物件的基礎上附加功能,相比生成子類更靈活。