1. 程式人生 > >用C、python手寫redis客戶端,相容redis叢集 (-MOVED和-ASK),快速搭建redis叢集

用C、python手寫redis客戶端,相容redis叢集 (-MOVED和-ASK),快速搭建redis叢集

  

  想沒想過,自己寫一個redis客戶端,是不是很難呢?   其實,並不是特別難。   首先,要知道redis服務端用的通訊協議,建議直接去官網看,部落格啥的其實也是從官網摘抄的,或者從其他部落格抄的(忽略)。   協議說明中文官網地址: http://www.redis.cn/topics/protocol.html  

  redis通訊協議

     列出主要的點,便於對於下面程式的理解。   Redis在TCP埠6379(預設埠,在配置可以修改)上監聽到來的連線,在客戶端與伺服器端之間傳輸的每個Redis命令或者資料都以\r\n結尾。  

  回覆(服務端可客戶端恢復的協議)

  Redis用不同的回覆型別回覆命令。它可能從伺服器傳送的第一個位元組開始校驗回覆型別:   * 用單行回覆(狀態回覆),回覆的第一個位元組將是“+”   * 錯誤訊息,回覆的第一個位元組將是“-”   * 整型數字,回覆的第一個位元組將是“:”   * 批量回復,回覆的第一個位元組將是“$”   * 多個批量回復,回覆的第一個位元組將是“*”  

  Bulk Strings(批量回復)

  批量回覆被伺服器用於返回一個單二進位制安全字串。   C: GET mykey   S: $6\r\nfoobar\r\n   伺服器傳送第一行回覆,該行以“$”開始後面跟隨實際要傳送的位元組數,隨後是CRLF,然後傳送實際資料,隨後是2個位元組的額外資料用於最後的CRLF。伺服器傳送的準確序列如下:   "$6\r\nfoobar\r\n"   如果請求的值不存在,批量回復將使用特殊的值-1來作為資料長度,例如:   C: GET nonexistingkey   S: $-1   當請求的物件不存在時,客戶端庫API不會返回空字串,而會返回空物件。例如:Ruby庫返回‘nil’,而C庫返回NULL(或者在回覆的物件裡設定指定的標誌)等等。    

  二進位制

  簡單說下二進位制,就是會包含\0,所以C語言在處理的時候,就不能用str函式,像strlen、strcpy等,因為它們都是以\0來判斷字串結尾的。  

  redis叢集  

  寫redis客戶端,要考慮到單機和叢集,單機知道上面的協議就可以寫了,叢集還要學習一下。   官網地址: http://www.redis.cn/topics/cluster-tutorial.html  http://www.redis.cn/topics/cluster-spec.html

  超簡單搭建redis叢集

  

  官網也介紹了怎麼搭建redis叢集,試過比較麻煩,因為用的centos6.5,如果用較新的centos,可能會好點。   redis叢集超簡單搭建方法:https://blog.csdn.net/cjfeii/article/details/47320351

  Redis 叢集的資料分片

     Redis 叢集沒有使用一致性hash, 而是引入了 雜湊槽的概念.   Redis 叢集有16384個雜湊槽,每個key通過CRC16校驗後對16384取模來決定放置哪個槽.叢集的每個節點負責一部分hash槽,舉個例子,比如當前叢集有3個節點,那麼:   * 節點 A 包含 0 到 5500號雜湊槽.   * 節點 B 包含5501 到 11000 號雜湊槽.   * 節點 C 包含11001 到 16384號雜湊槽.   這種結構很容易新增或者刪除節點. 比如如果我想新添加個節點D, 我需要從節點 A, B, C中得部分槽到D上. 如果我想移除節點A,需要將A中的槽移到B和C節點上,然後將沒有任何槽的A節點從叢集中移除即可.   由於從一個節點將雜湊槽移動到另一個節點並不會停止服務,所以無論新增刪除或者改變某個節點的雜湊槽的數量都不會造成叢集不可用的狀態.   

  Redis 叢集協議中的客戶端和伺服器端

  在 Redis 叢集中,節點負責儲存資料、記錄叢集的狀態(包括鍵值到正確節點的對映)。叢集節點同樣能自動發現其他節點,檢測出沒正常工作的節點, 並且在需要的時候在從節點中推選出主節點。   為了執行這些任務,所有的叢集節點都通過TCP連線(TCP bus?)和一個二進位制協議(叢集連線,cluster bus)建立通訊。 每一個節點都通過叢集連線(cluster bus)與叢集上的其餘每個節點連線起來。  節點們使用一個 gossip 協議來傳播叢集的資訊,這樣可以:發現新的節點、 傳送ping包(用來確保所有節點都在正常工作中)、在特定情況發生時傳送叢集訊息。叢集連線也用於在叢集中釋出或訂閱訊息。   由於叢集節點不能代理(proxy)請求,所以客戶端在接收到重定向錯誤(redirections errors) -MOVED 和 -ASK 的時候, 將命令重定向到其他節點。理論上來說,客戶端是可以自由地向叢集中的所有節點發送請求,在需要的時候把請求重定向到其他節點,所以客戶端是不需要儲存叢集狀態。 不過客戶端可以快取鍵值和節點之間的對映關係,這樣能明顯提高命令執行的效率。

  -MOVED

     簡單說下返回-MOVED的情況,就是客戶端連節點A請求處理key,但其實key其實在節點B,就返回-MOVED,協議如下:-MOVED 3999 127.0.0.1:6381 不用考慮-ASK的情況。

  C語言實現redis客戶端

  程式碼如下:

  

#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <sys/poll.h>
#include <unistd.h>
#include <sys/types.h>
#include <stdlib.h>
#include <stdio.h>


ssize_t sock_write_loop( int fd, const void *vptr, size_t n )
{
    size_t nleft = 0;
    ssize_t nwritten = 0;
    const char *ptr;

    ptr = (char *) vptr;
    nleft = n;

    while( nleft > 0 )
    {
        if( (nwritten = write(fd, ptr, nleft) ) <= 0 )
        {
            if( errno == EINTR )
            {
                nwritten = 0;  //再次呼叫write
            }
            else
            {
                return -5;
            }
        }
        nleft = nleft - nwritten;
        ptr = ptr + nwritten;
    }
    return(n);
}

int sock_read_wait( int fd, int timeout )
{
    struct pollfd pfd;

    pfd.fd = fd;
    pfd.events = POLLIN;
    pfd.revents = 0;

    timeout *= 1000;

    for (;;) 
    {
        switch( poll(&pfd, 1, timeout) ) 
        {
            case -1:
                if( errno != EINTR ) 
                {
                    return (-2);
                }
                continue;

            case 0:
                errno = ETIMEDOUT;
                return (-1);

            default:
                if( pfd.revents & POLLIN )
                    return (0);
                else
                    return (-3);
        }
    }

}

ssize_t sock_read_tmo( int fd, void *vptr, size_t len, int timeout )
{   
    if( timeout > 0 && sock_read_wait(fd, timeout) < 0 )
        return (-1);
    else
        return (read(fd, vptr, len));

}

int sock_connect_nore(const char *IPaddr , int port , int timeout)
{
   // char temp[4096];
    int sock_fd = 0, n = 0, errcode = 0;
    struct sockaddr_in servaddr;

    if( IPaddr == NULL )
    {
        return -1;
    }

    if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )
    {
        return -1;
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port   = htons(port);

    //changed by navy 2003.3.3 for support domain addr
    //if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )
    if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 )
    {
        //added by navy 2003.3.31 for support domain addr
        struct hostent* pHost = NULL, host;
        char sBuf[2048], sHostIp[17];
        int h_errnop = 0;

        memset(&host, 0, sizeof(host));
        memset(sBuf, 0, sizeof(sBuf));
        memset(sHostIp, 0 , sizeof(sHostIp));
        pHost = &host;

#ifdef _SOLARIS_PLAT
        //solaris
        if( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || 
#else
                //linux
                if( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || 
#endif
                    (pHost == NULL) ) 
                {
                close(sock_fd);
                return -1;
                }

                if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 )
                {
                close(sock_fd);
                return -1;
                }

                //目前僅取第一個IP地址
                if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )
                {
                    close(sock_fd);
                    return -1;
                }

                
                if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 )
                {
                    close(sock_fd);
                    
                    return -1;
                }
                //end added by navy 2003.3.31
    }

    if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, 
                    sizeof(servaddr), timeout) ) < 0 )
    {
        close(sock_fd);
        
        return -1;
    }

    return sock_fd;
}

int sock_connect(const char *IPaddr , int port , int timeout)
{
    char temp[4096];
    int sock_fd = 0, n = 0, errcode = 0;
    struct sockaddr_in servaddr;

    if( IPaddr == NULL )
    {
        return -1;
    }

    if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )
    {
        return -1;
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port   = htons(port);

    //changed by navy 2003.3.3 for support domain addr
    //if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )
    if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 )
    {
        //added by navy 2003.3.31 for support domain addr
        struct hostent* pHost = NULL, host;
        char sBuf[2048], sHostIp[17];
        int h_errnop = 0;

        memset(&host, 0, sizeof(host));
        memset(sBuf, 0, sizeof(sBuf));
        memset(sHostIp, 0 , sizeof(sHostIp));
        pHost = &host;

#ifdef _SOLARIS_PLAT
        //solaris
        if( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || 
#else
                //linux
                if( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || 
#endif
                    (pHost == NULL) ) 
                {
                close(sock_fd);
                return -1;
                }

                if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 )
                {
                close(sock_fd);
                return -1;
                }

                //目前僅取第一個IP地址
                if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )
                {
                    close(sock_fd);
                    return -1;
                }
                
                if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 )
                {
                    close(sock_fd);
                    
                    return -1;
                }
                //end added by navy 2003.3.31
    }

    if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, 
                    sizeof(servaddr), timeout) ) < 0 )
    {
        close(sock_fd);
        
        return -1;
    }

    n = sock_read_tmo(sock_fd, temp, 4096, timeout);

    //一般錯誤
    if( n <= 0 ) 
    {
        close(sock_fd);
        
        sock_fd = -1;
    }

    return sock_fd;
}

int sock_non_blocking(int fd, int on)
{
    int     flags;

    if ((flags = fcntl(fd, F_GETFL, 0)) < 0){
        return -10;
    }
    if (fcntl(fd, F_SETFL, on ? flags | O_NONBLOCK : flags & ~O_NONBLOCK) < 0){
        return -10;
    }
    return 0;
}

int sock_write_wait(int fd, int timeout)
{
    struct pollfd pfd;

    pfd.fd = fd;
    pfd.events = POLLOUT;
    pfd.revents = 0;

    timeout *= 1000;

    for (;;) 
    {
        switch( poll(&pfd, 1, timeout) ) 
        {
        case -1:
            if( errno != EINTR ) 
            {

                return (-2);
            }
            continue;

        case 0:
            errno = ETIMEDOUT;
            return (-1);

        default:
            if( pfd.revents & POLLOUT )
                return (0);
            else
                return (-3);
        }
    }

}

int sock_timed_connect(int sock, struct sockaddr * sa, int len, int timeout)
{
    int error = 0;
    socklen_t error_len;

    sock_non_blocking(sock, 1);

    if( connect(sock, sa, len) == 0 )
    {
        sock_non_blocking(sock, 0);
        return (0);
    }

    if( errno != EINPROGRESS )
    {
        sock_non_blocking(sock, 0);
        return (-1);
    }

    /*
     * A connection is in progress. Wait for a limited amount of time for
     * something to happen. If nothing happens, report an error.
     */
    if( sock_write_wait(sock, timeout) != 0)
    {
        sock_non_blocking(sock, 0);
        return (-2);
    }

    /*
     * Something happened. Some Solaris 2 versions have getsockopt() itself
     * return the error, instead of returning it via the parameter list.
     */
    error = 0;
    error_len = sizeof(error);

    if( getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &error, &error_len) != 0 )
    {
        sock_non_blocking(sock, 0);
        return (-3);
    }
    if( error ) 
    {
        errno = error;
        sock_non_blocking(sock, 0);
        return (-4);
    }

    sock_non_blocking(sock, 0);
    /*
     * No problems.
     */
    return (0);

}

static int check_ip_in_list(const char *ip, char *iplist)
{        
    char *token = NULL;
    char *saveptr = NULL;
    token = strtok_r(iplist, ",", &saveptr);
    while(token != NULL)
    {        
        char *ptmp = NULL;                        
        char *ip_mask = strtok_r(token, "/", &ptmp);
        if(!ip_mask)                    
            return -1;
                     
        char *ip_bit = strtok_r(NULL, "/", &ptmp);
        
        if(ip_bit)
        {
            int mask_bit = atoi(ip_bit);
            if(mask_bit < 0 || mask_bit >32)
                continue;

            unsigned long addr[4] = { 0 };
            sscanf( ip_mask, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 );
            unsigned long vl1 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3];

            sscanf( ip, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 );
            unsigned long vl2 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3];

            vl1 = ( vl1 >> ( 32 - mask_bit ) );
            vl2 = ( vl2 >> ( 32 - mask_bit ) );

            if( vl1 == vl2 )                        
                return 1;                          
        }
        else
        {
            if(strcmp(ip,ip_mask) == 0)            
                return 1;                            
        }                    

        token = strtok_r(NULL, ",", &saveptr);                
    }
        
    return 0;
}

static int check_ip_in_redis(const char *redis_host, const char *ip,const char *rq_pro)
{
    char buf[128];
    int loops = 0;    

    strcpy(buf, redis_host);    

    do
    {
        loops ++;
        char *ptmp = NULL;
        char *host = strtok_r(buf, ":", &ptmp);
        if(!host) return -1;
        char *s_port = strtok_r(NULL, ":", &ptmp);
        if(!s_port) return -1;
        int port = atoi(s_port);
        char respone[40] = {0};

        int sock_fd = -1;
        if((sock_fd = sock_connect_nore(host, port, 5))<0)
            return -1;

        if(sock_write_loop(sock_fd, rq_pro, strlen(rq_pro)) != strlen(rq_pro))
        {
            close(sock_fd);
            return -1;
        }

        if(sock_read_tmo(sock_fd, respone, sizeof(respone)-1, 5)<=0)
        {
            close(sock_fd);
            return -1;
        }        

        if(strncmp(":0", respone, 2) == 0)
        {
            close(sock_fd);
            return 0;
        }            
        else if(strncmp(":1", respone, 2) == 0)
        {
            close(sock_fd);
            return 1;
        }            
        else if(strncmp("$", respone, 1) == 0)
        {                                    
            int data_size = 0;   
            int ret = 0;

            char *data_line = strstr(respone,"\r\n");
            if(!data_line)
            {
                close(sock_fd);
                return -1;
            }
            data_line = data_line+2;

            data_size = atoi(respone+1);
            if(data_size == -1)
            {
                close(sock_fd);
                return 0;
            }
            if(strlen(data_line) == data_size+2)
            {
                printf("line = %d, data_line = %s\n",__LINE__,data_line);
                ret=check_ip_in_list(ip, data_line);
                close(sock_fd);
                return ret;
            }
            char *data = calloc(data_size+3,1);
            if(!data)
            {
                close(sock_fd);
                return -1;
            }
            strcpy(data,data_line);
            int read_size = strlen(data);
            int left_size = data_size + 2 - read_size;
            while(left_size > 0)
            {
                int nread = sock_read_tmo(sock_fd, data+read_size, left_size, 5);
                if(nread<=0)
                {
                    free(data);
                    close(sock_fd);            
                    return -1;
                }
                read_size += nread;
                left_size -= nread;
            }
            close(sock_fd);
            printf("line = %d, data = %s\n",__LINE__,data);
            ret=check_ip_in_list(ip, data);
            free(data);
            return ret;
        }            
        else if(strncmp("-MOVED", respone, 6) == 0)
        {
            close(sock_fd);
            char *p = strchr(respone, ' ');
            if(p == NULL)
                return -1;

            p = strchr(p+1, ' ');
            if(p == NULL)
                return -1;

            strcpy(buf, p+1);
        }
        else
        {
            close(sock_fd);
            return -1;
        }            
        
    }while(loops < 2);

    return -1;
}

int main(int argc,char *argv[])
{
    if(argc != 2)
    {
        printf("please input ip\n");
        return -1;
    }     
    const char *redis_ip = "127.0.0.1:7002";
    const char *domain = "test.com";

    char exist_pro[128] = {0};
    char get_pro[128] = {0};    
    snprintf(exist_pro,sizeof(exist_pro),"EXISTS test|%s|%s\r\n",domain,"127.0.0.1");        
    snprintf(get_pro,sizeof(get_pro),"GET test_%s\r\n",domain);
    int loops = 0;
    int ret = 0;
    do
    {
        loops ++;
        ret = check_ip_in_redis(redis_ip, argv[1],exist_pro);
        if(ret == 0)
            ret = check_ip_in_redis(redis_ip, argv[1],get_pro);
    }while(loops < 3 && ret < 0);

    printf("line = %d, ret = %d\n",__LINE__,ret);

    return ret;
}
c_redis_cli.c

  主要看這個check_ip_in_redis函式就行了,其它都是一些socket的封裝。

  

  python實現redis客戶端

  

#!/usr/bin/python

import sys  
import socket

def main(argv):
    if(len(argv) != 3):
        print "please input domain ip!"
        return
    host = "192.168.188.47"   
    port = 7002
    while 1:
        s = socket.socket()                
        s.connect((host, port))
        
        cmd = 'set %s_white_ip %s\r\n' % (argv[1],argv[2])        
        s.send(cmd)
        res = s.recv(32)
        s.close()        
    
        if res[0] == "+":
            print "set domain white  ip suc!"
            return    
        elif res[0:6] == "-MOVED":
            list = res.split(" ")
            ip_list = list[2].split(":")            
            host = ip_list[0]    
            port = int(ip_list[1])                            
        else:
            print "set domain white  ip error!"
            return                               

if __name__ == "__main__":
    main(sys.argv)

  總結

  多去學一些,想想redis客戶端怎麼實現的,對redis的理解會更加深入,寫了這部分之後,對redis叢集有了更加深入的瞭解了