zookeeper快速入門——應用(兩種分散式鎖)
在《zookeeper快速入門——簡介》一文中,我們介紹了zookeeper的機制。但是還是比較抽象,沒有直觀感受到它在分散式系統中的應用。本文我們使用一個例子,三次迭代演進,來說明Zookeeper Client端和Server端如何配合以實現分散式協作。(轉載請指明出於breaksoftware的csdn部落格)
為了例子足夠簡單明確,我們以實現“分散式鎖”為例。所謂分散式鎖,就是在一個分散式系統中,各個子系統可以共享的同一把“鎖”。這樣大家可以在這把鎖的協調下,進行協作。
我們可以嘗試在Zookeeper Server的節點樹上建立一個特定名稱的節點。如果建立成功了,則認為獲取到了鎖。Client可以執行相應業務邏輯,然後通知Server刪除該節點以釋放鎖。其他Client可能在此時正好去建立該節點,併成功了,那麼它就獲得了鎖。其他建立失敗的Client則被認為沒有獲得鎖,則繼續等待和嘗試。
可能此時你已經意識到一個問題:如果某個獲得鎖的Client和Server斷開了連線,而沒有機會通知Server刪除test_lock檔案。那就導致整個系統處於“死鎖”狀態。
不用擔心,zookeeper設計了“臨時”節點的概念。“臨時”節點由Client向Server端請求建立,一旦Client和Server連線斷開,這個Client建立的“臨時”節點將被刪除。這樣我們就不用擔心因為連線斷開而導致的問題了。和普通節點一樣,“臨時”節點也可以被Client主動刪除。
基本思路理清楚後,我們開始著手編寫這塊邏輯。為了簡單,我們在一個程序內部使用多執行緒技術模擬分佈在不同機器上的Client端。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <zookeeper.h>
#include <zookeeper_log.h>
第一步我們要使用zookeeper_init方法去連線Zookeeper Server。該函式原型如下
ZOOAPI zhandle_t* zookeeper_init ( const char * host, watcher_fn watcher, int recv_timeout, const clientid_t * clientid, void * context, int flags )
該方法建立了一個zhandle_t指標和一個與之繫結的連線session,之後我們將在一直要使用這個指標和Server進行通訊。
但是這個函式有個陷阱:即使返回了一個可用指標,可是與之繫結的session此時不一定可用。我們需要等到ZOO_CONNECTED_STATE訊息到來才能確認。此時我們就要藉助zookeeper中無處不在的監視功能(watcher)。
zookeeper_init方法第二個引數傳遞的是一個回撥函式地址——watcher,第五個引數傳遞的是這個回撥函式可以使用的上下文資訊——context。
為了讓回撥函式可以通知工作執行緒session已經可用,我們可以把上下文資訊設定為一個包含條件變數的結構watchctx_t
typedef struct watchctx_t {
pthread_cond_t cond;
pthread_mutex_t cond_lock;
} watchctx_t;
這樣在回撥函式中,如果我們收到ZOO_CONNECTED_STATE通知,就觸發條件變數
void main_watcher(zhandle_t* zh, int type, int state,
const char* path, void* watcherCtx)
{
if (type == ZOO_SESSION_EVENT) {
watchctx_t *ctx = (watchctx_t*)watcherCtx;
if (state == ZOO_CONNECTED_STATE) {
pthread_cond_signal(&ctx->cond);
}
}
}
在呼叫zookeeper_init方法後,工作執行緒一直等待條件變數,如果超過設定的超時時間,就認為連線失敗
int init_watchctx(watchctx_t* ctx) {
if (0 != pthread_cond_init(&ctx->cond, NULL)) {
fprintf(stderr, "condition init error\n");
return -1;
}
if (0 != pthread_mutex_init(&ctx->cond_lock, NULL)) {
fprintf(stderr, "mutex init error\n");
pthread_cond_destroy(&ctx->cond);
return -2;
}
return 0;
}
zhandle_t* init() {
const char* host = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
int timeout = 30000;
zhandle_t* zh = NULL;
watchctx_t ctx;
if (0 != init_watchctx(&ctx)) {
return zh;
}
zh = zookeeper_init(host, main_watcher, timeout, 0, &ctx, 0);
if (zh == NULL) {
fprintf(stderr, "Error when connecting to zookeeper servers...\n");
pthread_cond_destroy(&ctx.cond);
pthread_mutex_destroy(&ctx.cond_lock);
return zh;
}
struct timeval now;
struct timespec outtime;
gettimeofday(&now, NULL);
outtime.tv_sec = now.tv_sec + 1;
outtime.tv_nsec = now.tv_usec * 1000;
pthread_mutex_lock(&ctx.cond_lock);
int wait_result = pthread_cond_timedwait(&ctx.cond, &ctx.cond_lock, &outtime);
pthread_mutex_unlock(&ctx.cond_lock);
pthread_cond_destroy(&ctx.cond);
pthread_mutex_destroy(&ctx.cond_lock);
if (0 != wait_result) {
fprintf(stderr, "Connecting to zookeeper servers timeout...\n");
zookeeper_close(zh);
zh = NULL;
return zh;
}
return zh;
}
解決了連線問題,後面的邏輯就簡單了。我們使用zoo_create方法建立一個路徑為/test_lock的臨時節點,然後通過返回結果判斷是否獲得鎖
void thread_routine(void* ptr) {
zhandle_t* zh = init();
if (!zh) {
return;
}
const char* lock_data = "lock";
const char* lock_path = "/test_lock";
int ret = ZNODEEXISTS;
do {
ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
if (ZNODEEXISTS == ret) {
//fprintf(stderr, "lock exist\n");
continue;
}
else if (ZOK == ret) {
pthread_t pid = pthread_self();
fprintf(stdout, "%lu get lock\n", (long long)pid);
zoo_delete(zh, lock_path, -1);
sleep(1);
}
else {
fprintf(stderr, "Error %d for %s\n", ret, "create");
break;
}
} while (1);
zookeeper_close(zh);
}
上述程式碼19行開始的邏輯表示這個執行緒獲取了鎖,它只是簡單的打印出get lock,然後呼叫zoo_delete刪除節點——釋放鎖。
這個函式使用一個while死迴圈來控制業務進行,這種不停呼叫zoo_create去檢測是否獲得鎖的方法非常浪費資源。那我們如何對這個函式進行改造?
如果我們可以基於事件驅動監控/test_lock節點狀態就好了。zookeeper也提供了這種方式——還是watcher。
void thread_routine(void* ptr) {
zhandle_t* zh = init();
if (!zh) {
return;
}
watchctx_t ctx;
if (0 != init_watchctx(&ctx)) {
return;
}
const char* lock_data = "lock";
const char* lock_path = "/test_lock";
int ret = ZNODEEXISTS;
do {
struct Stat stat;
int cur_st = zoo_wexists(zh, lock_path, lock_watcher, &ctx, &stat);
if (ZOK == cur_st) {
//fprintf(stdout, "wait\n");
pthread_mutex_lock(&ctx.cond_lock);
pthread_cond_wait(&ctx.cond, &ctx.cond_lock);
pthread_mutex_unlock(&ctx.cond_lock);
}
ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
if (ZNODEEXISTS == ret) {
//fprintf(stderr, "lock exist\n");
}
else if (ZOK == ret) {
pthread_t pid = pthread_self();
fprintf(stdout, "%lu get lock\n", (long long)pid);
zoo_delete(zh, lock_path, -1);
}
else {
fprintf(stderr, "Error %d for %s\n", ret, "create");
}
} while (1);
zookeeper_close(zh);
pthread_cond_destroy(&ctx.cond);
pthread_mutex_destroy(&ctx.cond_lock);
}
第18行,我們呼叫zoo_wexists方法監控節點狀態。如果監控點設定成功,則等待上文中建立的條件變數。該條件變數在zoo_wexists引數的回撥函式中被設定
void lock_watcher(zhandle_t* zh, int type, int state,
const char* path, void* watcherCtx)
{
//sleep(1);
//fprintf(stdout, "lock_watcher: %s %d, %d\n", path, type, state);
if (type == ZOO_DELETED_EVENT) {
//fprintf(stdout, "delete %s\n", path);
watchctx_t* ctx = (watchctx_t*)watcherCtx;
pthread_cond_signal(&ctx->cond);
}
else {
//fprintf(stdout, "add %s\n", path);
struct Stat stat;
zoo_wexists(zh, path, lock_watcher, watcherCtx, &stat);
}
}
zookeeper的監控點是一次性的,即如果一次被觸發則不再觸發。於是在這個回撥函式中,如果我們發現節點不是被刪除——監控到它被其他Client建立,就再次註冊該監控點。
這樣我們就使用了相對高大上的事件通知機制。但是問題隨之而來,這種方式會引起驚群現象。即在一個Client釋放鎖後,其他Client都會嘗試去呼叫zoo_create去獲取鎖,這會造成系統抖動很強烈。
我們繼續改進鎖的設計。現在我們換個思路,讓這些Client排著隊去嘗試獲取鎖。如果做呢?
每個Client在Server上按順序建立一個節點,並監控比自己小的那個節點。如果比自己小的那個節點(最接近自己的)被刪除了,則意味著:
- 可能排在“我”前面的Client和Server斷開了連線,那麼此時應該還沒輪到“我”,於是“我”要找到此時比“我”小的、最鄰近的節點路徑,然後去監控這個節點。
- 可能排在“我”前面的所有Client都獲得過鎖了,並且它們都釋放了,現在輪到“我”來獲得鎖了。
採用這種方式,我們可以最大限度的減少獲取鎖的行為。但是這對zookeeper提出了一個要求,我們可以原子性的建立包含單調遞增數字的路徑的節點。非常幸運的是,zookeeper的確提供了這樣的方式——順序節點。
void thread_routine(void* ptr) {
zhandle_t* zh = init();
if (!zh) {
return;
}
watchctx_t ctx;
if (0 != init_watchctx(&ctx)) {
return;
}
#define ROOT_PATH "/test_seq_lock"
const char* root_path = ROOT_PATH;
const char* lock_data = "lock";
const char* lock_path = ROOT_PATH"/0";
int ret = ZNODEEXISTS;
do {
const int seq_path_lenght = 512;
char sequence_path[seq_path_lenght];
ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL | ZOO_SEQUENCE,
sequence_path, sizeof(sequence_path) - 1);
if (ZNODEEXISTS == ret) {
//fprintf(stderr, "lock exist\n");
}
else if (ZOK == ret) {
ret = wait_for_lock(zh, &ctx, root_path, sequence_path);
if (ZOK == ret) {
pthread_t pid = pthread_self();
fprintf(stdout, "%lu %s get lock\n", (long long)pid, sequence_path);
sleep(0.1);
}
zoo_delete(zh, sequence_path, -1);
}
else if (ZNONODE == ret) {
ret = zoo_create(zh, root_path, lock_data, strlen(lock_data),
&ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
if (ZNODEEXISTS != ret && ZOK != ret) {
fprintf(stderr, "Error %d for %s\n", ret, "create root path");
break;
}
}
else {
fprintf(stderr, "Error %d for %s\n", ret, "create");
}
} while (1);
zookeeper_close(zh);
pthread_cond_destroy(&ctx.cond);
pthread_mutex_destroy(&ctx.cond_lock);
}
第23行,我們給zoo_create方法傳入了一個路徑空間用於接收建立的有序節點路徑。第28行,我們將這個路徑連同條件變數一起傳入自定義函式wait_for_lock去等待獲得鎖的時機。
int search_watch_neighbor(zhandle_t* zh, const char* root_path,
const char* cur_name,
char* neighbor_name, int len)
{
struct String_vector strings;
int rc = zoo_get_children(zh, root_path, 0, &strings);
if (ZOK != rc || 0 == strings.count) {
return ZNOTEMPTY;
}
int neighbor = -1;
for (int i = 0; i < strings.count; i++) {
int cmp = strcmp(cur_name, strings.data[i]);
if (cmp <= 0) {
continue;
}
if (-1 == neighbor) {
neighbor = i;
continue;
}
cmp = strcmp(strings.data[neighbor], strings.data[i]);
if (cmp >= 0) {
continue;
}
neighbor = i;
}
if (-1 == neighbor) {
*neighbor_name = 0;
return ZNONODE;
}
int neighbor_name_len = strlen(strings.data[neighbor]);
if (len < neighbor_name_len - 1) {
*neighbor_name = 0;
return ZBADARGUMENTS;
}
memcpy(neighbor_name, strings.data[neighbor], neighbor_name_len);
*(neighbor_name + neighbor_name_len) = '\0';
fprintf(stdout, "********\n self: %s neighbor:%s\n*********\n", cur_name, neighbor_name);
return ZOK;
}
void neighbor_watcher(zhandle_t* zh, int type, int state,
const char* path, void* watcherCtx)
{
if (type == ZOO_DELETED_EVENT) {
watchctx_t* ctx = (watchctx_t*)watcherCtx;
const int path_len_max = 512;
char neighbor_name[path_len_max];
int ret = search_watch_neighbor(zh, ctx->root_path,
ctx->cur_name, neighbor_name, sizeof(neighbor_name));
if (ZNONODE == ret) {
pthread_cond_signal(&ctx->cond);
}
else if (ZOK == ret) {
char neighbor_path[path_len_max];
sprintf(neighbor_path, "%s/%s", ctx->root_path, neighbor_name);
struct Stat stat;
zoo_wexists(zh, neighbor_path, neighbor_watcher, watcherCtx, &stat);
}
}
}
int wait_for_lock(zhandle_t* zh, watchctx_t* ctx, const char* root_path, const char* sequence_path) {
strcpy(ctx->root_path, root_path);
strcpy(ctx->cur_name, sequence_path + strlen(root_path) + 1);
const int path_len_max = 512;
char neighbor_name[path_len_max];
int status = ZOK;
do {
int ret = search_watch_neighbor(zh, ctx->root_path,
ctx->cur_name, neighbor_name, sizeof(neighbor_name));
char neighbor_path[path_len_max];
sprintf(neighbor_path, "%s/%s", root_path, neighbor_name);
pthread_t pid = pthread_self();
fprintf(stdout, "%lu get neighbor info: %d %s\n", (long long)pid, ret, neighbor_path);
if (ZNONODE == ret) {
status = ZOK;
break;
}
else if (ZOK == ret) {
struct Stat stat;
if (ZOK == zoo_wexists(zh, neighbor_path, neighbor_watcher, ctx, &stat)) {
pthread_mutex_lock(&ctx->cond_lock);
pthread_cond_wait(&ctx->cond, &ctx->cond_lock);
pthread_mutex_unlock(&ctx->cond_lock);
}
else {
continue;
}
}
else {
status = ZSYSTEMERROR;
break;
}
} while(1);
return status;
}
再結合main函式的實現,兩種不方式設計的分散式鎖都可以執行起來
#define countof(x) sizeof(x)/sizeof(x[0])
int main(int argc, const char *argv[]) {
const int thread_num = 3;
pthread_t ids[thread_num];
for (int i = 0; i < countof(ids); i++) {
pthread_create(&ids[i], NULL, (void*)thread_routine, NULL);
}
for (int i = 0; i < countof(ids); i++) {
pthread_join(ids[i], NULL);
}
return 0;
}
將上述檔案儲存為lock_test.c,然後呼叫下面的指令編譯
gcc -o lock_test lock_test.c -I/home/work/fangliang/zookeeper-3.4.11/src/c/generated -I/home/work/fangliang/zookeeper-3.4.11/src/c/include -L/home/work/fangliang/zookeeper-3.4.11/src/c/.libs -lzookeeper_mt -DTHREADED -std=c99
關於zookeeper庫的編譯,網上有很多。我編譯起來還算順利,只是在找不到so時候使用下面指令指定下查詢路徑
export LD_LIBRARY_PATH=/home/work/fangliang/zookeeper-3.4.11/src/c/.libs:$LD_LIBRARY_PATH
參考資料
- https://www.cnblogs.com/xybaby/p/6871764.html
- http://lib.csdn.net/article/hadoop/6665
- https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/
- http://www.cnblogs.com/haippy/archive/2013/02/21/2920280.html
- http://zookeeper.sourcearchive.com/documentation/3.2.2plus-pdfsg3/zookeeper_8h.html
- 《Zookeeper分散式過程協同技術詳解》