1. 程式人生 > >zookeeper快速入門——應用(兩種分散式鎖)

zookeeper快速入門——應用(兩種分散式鎖)

        在《zookeeper快速入門——簡介》一文中,我們介紹了zookeeper的機制。但是還是比較抽象,沒有直觀感受到它在分散式系統中的應用。本文我們使用一個例子,三次迭代演進,來說明Zookeeper Client端和Server端如何配合以實現分散式協作。(轉載請指明出於breaksoftware的csdn部落格)

        為了例子足夠簡單明確,我們以實現“分散式鎖”為例。所謂分散式鎖,就是在一個分散式系統中,各個子系統可以共享的同一把“鎖”。這樣大家可以在這把鎖的協調下,進行協作。


        我們可以嘗試在Zookeeper Server的節點樹上建立一個特定名稱的節點。如果建立成功了,則認為獲取到了鎖。Client可以執行相應業務邏輯,然後通知Server刪除該節點以釋放鎖。其他Client可能在此時正好去建立該節點,併成功了,那麼它就獲得了鎖。其他建立失敗的Client則被認為沒有獲得鎖,則繼續等待和嘗試。


        可能此時你已經意識到一個問題:如果某個獲得鎖的Client和Server斷開了連線,而沒有機會通知Server刪除test_lock檔案。那就導致整個系統處於“死鎖”狀態。


        不用擔心,zookeeper設計了“臨時”節點的概念。“臨時”節點由Client向Server端請求建立,一旦Client和Server連線斷開,這個Client建立的“臨時”節點將被刪除。這樣我們就不用擔心因為連線斷開而導致的問題了。和普通節點一樣,“臨時”節點也可以被Client主動刪除。

        基本思路理清楚後,我們開始著手編寫這塊邏輯。為了簡單,我們在一個程序內部使用多執行緒技術模擬分佈在不同機器上的Client端。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <zookeeper.h>
#include <zookeeper_log.h>

        第一步我們要使用zookeeper_init方法去連線Zookeeper Server。該函式原型如下

ZOOAPI zhandle_t* zookeeper_init (	
	const char * 	host,
	watcher_fn 	watcher,
	int 	recv_timeout,
	const clientid_t * 	clientid,
	void * 	context,
	int 	flags	 
)	

        該方法建立了一個zhandle_t指標和一個與之繫結的連線session,之後我們將在一直要使用這個指標和Server進行通訊。

     但是這個函式有個陷阱:即使返回了一個可用指標,可是與之繫結的session此時不一定可用。我們需要等到ZOO_CONNECTED_STATE訊息到來才能確認。此時我們就要藉助zookeeper中無處不在的監視功能(watcher)。

         zookeeper_init方法第二個引數傳遞的是一個回撥函式地址——watcher,第五個引數傳遞的是這個回撥函式可以使用的上下文資訊——context。

        為了讓回撥函式可以通知工作執行緒session已經可用,我們可以把上下文資訊設定為一個包含條件變數的結構watchctx_t

typedef struct watchctx_t {
    pthread_cond_t cond;
    pthread_mutex_t cond_lock;
} watchctx_t;

        這樣在回撥函式中,如果我們收到ZOO_CONNECTED_STATE通知,就觸發條件變數

void main_watcher(zhandle_t* zh, int type, int state,
        const char* path, void* watcherCtx)
{
    if (type == ZOO_SESSION_EVENT) {
        watchctx_t *ctx = (watchctx_t*)watcherCtx;
        if (state == ZOO_CONNECTED_STATE) {
            pthread_cond_signal(&ctx->cond);
        }
    }
}

        在呼叫zookeeper_init方法後,工作執行緒一直等待條件變數,如果超過設定的超時時間,就認為連線失敗

int init_watchctx(watchctx_t* ctx) {
    if (0 != pthread_cond_init(&ctx->cond, NULL)) {
        fprintf(stderr, "condition init error\n");
        return -1;
    }

    if (0 != pthread_mutex_init(&ctx->cond_lock, NULL)) {
        fprintf(stderr, "mutex init error\n");
        pthread_cond_destroy(&ctx->cond);
        return -2;
    }

    return 0;
}
zhandle_t* init() {
    const char* host = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    int timeout = 30000;
    zhandle_t* zh = NULL;

    watchctx_t ctx;
    if (0 != init_watchctx(&ctx)) {
        return zh;
    }

    zh = zookeeper_init(host, main_watcher, timeout, 0, &ctx, 0);
    if (zh == NULL) {
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");
        pthread_cond_destroy(&ctx.cond);
        pthread_mutex_destroy(&ctx.cond_lock);
        return zh;
    }

    struct timeval now;  
    struct timespec outtime;     
    gettimeofday(&now, NULL);  
    outtime.tv_sec = now.tv_sec + 1;  
    outtime.tv_nsec = now.tv_usec * 1000;

    pthread_mutex_lock(&ctx.cond_lock);
    int wait_result = pthread_cond_timedwait(&ctx.cond, &ctx.cond_lock, &outtime);
    pthread_mutex_unlock(&ctx.cond_lock);
    
    pthread_cond_destroy(&ctx.cond);
    pthread_mutex_destroy(&ctx.cond_lock);

    if (0 != wait_result) {
        fprintf(stderr, "Connecting to zookeeper servers timeout...\n");
        zookeeper_close(zh);
        zh = NULL;
        return zh;
    }

    return zh;
}

        解決了連線問題,後面的邏輯就簡單了。我們使用zoo_create方法建立一個路徑為/test_lock的臨時節點,然後通過返回結果判斷是否獲得鎖

void thread_routine(void* ptr) {

    zhandle_t* zh = init();
    if (!zh) {
        return;
    }

    const char* lock_data = "lock";
    const char* lock_path =  "/test_lock";
    int ret = ZNODEEXISTS;
    do {
        ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
            &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
        if (ZNODEEXISTS == ret) {
            //fprintf(stderr, "lock exist\n");
            continue;
        }
        else if (ZOK == ret) {
            pthread_t pid = pthread_self();
            fprintf(stdout, "%lu get lock\n", (long long)pid);
            zoo_delete(zh, lock_path, -1);
            sleep(1);
        }   
        else {
            fprintf(stderr, "Error %d for %s\n", ret, "create");
            break;
        }
    } while (1);

    zookeeper_close(zh);
}

        上述程式碼19行開始的邏輯表示這個執行緒獲取了鎖,它只是簡單的打印出get lock,然後呼叫zoo_delete刪除節點——釋放鎖。

        這個函式使用一個while死迴圈來控制業務進行,這種不停呼叫zoo_create去檢測是否獲得鎖的方法非常浪費資源。那我們如何對這個函式進行改造?

        如果我們可以基於事件驅動監控/test_lock節點狀態就好了。zookeeper也提供了這種方式——還是watcher。

void thread_routine(void* ptr) {

    zhandle_t* zh = init();
    if (!zh) {
        return;
    }

    watchctx_t ctx;
    if (0 != init_watchctx(&ctx)) {
        return;
    }

    const char* lock_data = "lock";
    const char* lock_path = "/test_lock";
    int ret = ZNODEEXISTS;
    do {
        struct Stat stat;
        int cur_st = zoo_wexists(zh, lock_path, lock_watcher, &ctx, &stat);
        if (ZOK == cur_st) {
            //fprintf(stdout, "wait\n");
            pthread_mutex_lock(&ctx.cond_lock);
            pthread_cond_wait(&ctx.cond, &ctx.cond_lock);
            pthread_mutex_unlock(&ctx.cond_lock);
        }

        ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
            &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
        if (ZNODEEXISTS == ret) {
            //fprintf(stderr, "lock exist\n");
        }
        else if (ZOK == ret) {
            pthread_t pid = pthread_self();
            fprintf(stdout, "%lu get lock\n", (long long)pid);
            zoo_delete(zh, lock_path, -1);
        }   
        else {
            fprintf(stderr, "Error %d for %s\n", ret, "create");
        }

    } while (1);

    zookeeper_close(zh);
    pthread_cond_destroy(&ctx.cond);
    pthread_mutex_destroy(&ctx.cond_lock);
}

        第18行,我們呼叫zoo_wexists方法監控節點狀態。如果監控點設定成功,則等待上文中建立的條件變數。該條件變數在zoo_wexists引數的回撥函式中被設定

void lock_watcher(zhandle_t* zh, int type, int state,
        const char* path, void* watcherCtx)
{
    //sleep(1);
    //fprintf(stdout, "lock_watcher: %s %d, %d\n", path, type, state); 
    if (type == ZOO_DELETED_EVENT) {
        //fprintf(stdout, "delete %s\n", path);
        watchctx_t* ctx = (watchctx_t*)watcherCtx;
        pthread_cond_signal(&ctx->cond);
    }
    else {
        //fprintf(stdout, "add %s\n", path);
	struct Stat stat;
	zoo_wexists(zh, path, lock_watcher, watcherCtx, &stat);
    }
}

        zookeeper的監控點是一次性的,即如果一次被觸發則不再觸發。於是在這個回撥函式中,如果我們發現節點不是被刪除——監控到它被其他Client建立,就再次註冊該監控點。

        這樣我們就使用了相對高大上的事件通知機制。但是問題隨之而來,這種方式會引起驚群現象。即在一個Client釋放鎖後,其他Client都會嘗試去呼叫zoo_create去獲取鎖,這會造成系統抖動很強烈。

        我們繼續改進鎖的設計。現在我們換個思路,讓這些Client排著隊去嘗試獲取鎖。如果做呢?


        每個Client在Server上按順序建立一個節點,並監控比自己小的那個節點。如果比自己小的那個節點(最接近自己的)被刪除了,則意味著:

  1. 可能排在“我”前面的Client和Server斷開了連線,那麼此時應該還沒輪到“我”,於是“我”要找到此時比“我”小的、最鄰近的節點路徑,然後去監控這個節點。
  2. 可能排在“我”前面的所有Client都獲得過鎖了,並且它們都釋放了,現在輪到“我”來獲得鎖了。

        採用這種方式,我們可以最大限度的減少獲取鎖的行為。但是這對zookeeper提出了一個要求,我們可以原子性的建立包含單調遞增數字的路徑的節點。非常幸運的是,zookeeper的確提供了這樣的方式——順序節點。

void thread_routine(void* ptr) {

    zhandle_t* zh = init();
    if (!zh) {
        return;
    }

    watchctx_t ctx;
    if (0 != init_watchctx(&ctx)) {
        return;
    }

#define ROOT_PATH "/test_seq_lock"
    const char* root_path = ROOT_PATH;
    const char* lock_data = "lock";
    const char* lock_path = ROOT_PATH"/0";
    int ret = ZNODEEXISTS;
    do {
        const int seq_path_lenght = 512;
        char sequence_path[seq_path_lenght];
        ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
            &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL | ZOO_SEQUENCE,
            sequence_path, sizeof(sequence_path) - 1);
        if (ZNODEEXISTS == ret) {
            //fprintf(stderr, "lock exist\n");
        }
        else if (ZOK == ret) {
            ret = wait_for_lock(zh, &ctx, root_path, sequence_path);
            if (ZOK == ret) {
                pthread_t pid = pthread_self();
                fprintf(stdout, "%lu %s get lock\n", (long long)pid, sequence_path);
                sleep(0.1);
            }
            zoo_delete(zh, sequence_path, -1);
        }   
        else if (ZNONODE == ret) {
            ret = zoo_create(zh, root_path, lock_data, strlen(lock_data),
                &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
            if (ZNODEEXISTS != ret && ZOK != ret) {
                fprintf(stderr, "Error %d for %s\n", ret, "create root path");
                break;
            }
        }
        else {
            fprintf(stderr, "Error %d for %s\n", ret, "create");
        }

    } while (1);

    zookeeper_close(zh);
    pthread_cond_destroy(&ctx.cond);
    pthread_mutex_destroy(&ctx.cond_lock);
}

        第23行,我們給zoo_create方法傳入了一個路徑空間用於接收建立的有序節點路徑。第28行,我們將這個路徑連同條件變數一起傳入自定義函式wait_for_lock去等待獲得鎖的時機。

int search_watch_neighbor(zhandle_t* zh, const char* root_path,
                            const char* cur_name,
                            char* neighbor_name, int len)
 {
    struct String_vector strings;
    int rc = zoo_get_children(zh, root_path, 0, &strings);
    if (ZOK != rc || 0 == strings.count) {
        return ZNOTEMPTY;       
    }

    int neighbor = -1;
    for (int i = 0; i < strings.count; i++) {
        int cmp = strcmp(cur_name, strings.data[i]);
        if (cmp <= 0) {
            continue;
        }

        if (-1 == neighbor) {
            neighbor = i;
            continue;
        }

        cmp = strcmp(strings.data[neighbor], strings.data[i]);
        if (cmp >= 0) {
            continue;
        }
        neighbor = i;
    }

    if (-1 == neighbor) {
        *neighbor_name = 0;
        return ZNONODE;
    }

    int neighbor_name_len = strlen(strings.data[neighbor]);
    if (len < neighbor_name_len - 1) {
        *neighbor_name = 0;
        return ZBADARGUMENTS;
    }

    memcpy(neighbor_name, strings.data[neighbor], neighbor_name_len);
    *(neighbor_name + neighbor_name_len) = '\0';
    fprintf(stdout, "********\n self: %s neighbor:%s\n*********\n", cur_name, neighbor_name);
    return ZOK;
}

void neighbor_watcher(zhandle_t* zh, int type, int state,
        const char* path, void* watcherCtx)
{
    if (type == ZOO_DELETED_EVENT) {
        watchctx_t* ctx = (watchctx_t*)watcherCtx;
        const int path_len_max = 512;
        char neighbor_name[path_len_max];
        int ret = search_watch_neighbor(zh, ctx->root_path, 
            ctx->cur_name, neighbor_name, sizeof(neighbor_name));
        if (ZNONODE == ret) {
            pthread_cond_signal(&ctx->cond);
        }
        else if (ZOK == ret) {
            char neighbor_path[path_len_max];
            sprintf(neighbor_path, "%s/%s", ctx->root_path, neighbor_name);
            struct Stat stat;
            zoo_wexists(zh, neighbor_path, neighbor_watcher, watcherCtx, &stat);
        }
    }
}

int wait_for_lock(zhandle_t* zh, watchctx_t* ctx, const char* root_path, const char* sequence_path) {
    strcpy(ctx->root_path, root_path);
    strcpy(ctx->cur_name, sequence_path + strlen(root_path) + 1);

    const int path_len_max = 512;
    char neighbor_name[path_len_max];
    int status = ZOK;

    do {
        int ret = search_watch_neighbor(zh, ctx->root_path, 
            ctx->cur_name, neighbor_name, sizeof(neighbor_name));

        char neighbor_path[path_len_max];
        sprintf(neighbor_path, "%s/%s", root_path, neighbor_name);

        pthread_t pid = pthread_self();
        fprintf(stdout, "%lu get neighbor info: %d %s\n", (long long)pid, ret, neighbor_path);

        if (ZNONODE == ret) {
            status = ZOK;
            break;
        }
        else if (ZOK == ret) {
            struct Stat stat;
            if (ZOK == zoo_wexists(zh, neighbor_path, neighbor_watcher, ctx, &stat)) {
                pthread_mutex_lock(&ctx->cond_lock);
                pthread_cond_wait(&ctx->cond, &ctx->cond_lock);
                pthread_mutex_unlock(&ctx->cond_lock);
            }
            else {
                continue;
            }
        }
        else {
            status = ZSYSTEMERROR;
            break;
        }
        
    } while(1);
    return status;
}

        再結合main函式的實現,兩種不方式設計的分散式鎖都可以執行起來

#define countof(x) sizeof(x)/sizeof(x[0])

int main(int argc, const char *argv[]) {
    const int thread_num = 3;
    pthread_t ids[thread_num];

    for (int i = 0; i < countof(ids); i++) {
        pthread_create(&ids[i], NULL, (void*)thread_routine, NULL);
    }

    for (int i = 0; i < countof(ids); i++) {
        pthread_join(ids[i], NULL);
    }

    return 0;
}

        將上述檔案儲存為lock_test.c,然後呼叫下面的指令編譯

gcc -o lock_test lock_test.c -I/home/work/fangliang/zookeeper-3.4.11/src/c/generated -I/home/work/fangliang/zookeeper-3.4.11/src/c/include -L/home/work/fangliang/zookeeper-3.4.11/src/c/.libs -lzookeeper_mt -DTHREADED -std=c99

        關於zookeeper庫的編譯,網上有很多。我編譯起來還算順利,只是在找不到so時候使用下面指令指定下查詢路徑

export LD_LIBRARY_PATH=/home/work/fangliang/zookeeper-3.4.11/src/c/.libs:$LD_LIBRARY_PATH

        參考資料

  • https://www.cnblogs.com/xybaby/p/6871764.html
  • http://lib.csdn.net/article/hadoop/6665
  • https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/
  • http://www.cnblogs.com/haippy/archive/2013/02/21/2920280.html
  • http://zookeeper.sourcearchive.com/documentation/3.2.2plus-pdfsg3/zookeeper_8h.html
  • 《Zookeeper分散式過程協同技術詳解》