1. 程式人生 > >《redis設計與實現》-第6章整數集合intset

《redis設計與實現》-第6章整數集合intset

一 序

  intset是Redis集合的底層實現之一,當儲存整數集合並且資料量較小的情況下Redis會使用intset作為set的底層實現。當資料量較大或者集合元素為字串時則會使用dict實現set。這一章看書相對簡單,看一下原始碼對應的api,發現自己對於位元組序不懂,所以先把預備知識貼一下,熟悉的可以跳過無視了。

  計算機硬體有兩種儲存資料的方式:大端位元組序(big endian)和小端位元組序(little endian)。不論大端小端,記憶體中資料按照8位(1位元組)分割。舉例來說,數值0x2211使用兩個位元組儲存:高位位元組是0x22,低位位元組是0x11。

  • 大端位元組序:高位位元組在前,低位位元組在後,這是人類讀寫數值的方法。
  • 小端位元組序:低位位元組在前,高位位元組在後,即以0x1122形式儲存。

對於反人類的設計真是不理解,統一大端位元組序多好,弄這麼複雜幹什麼?
首先,為什麼會有小端位元組序?
答案是,計算機電路先處理低位位元組,效率比較高,因為計算都是從低位開始的。所以,計算機的內部處理都是小端位元組序。
但是,人類還是習慣讀寫大端位元組序。所以,除了計算機的內部處理,其他的場合幾乎都是大端位元組序,比如網路傳輸和檔案儲存。計算機處理位元組序的時候,不知道什麼是高位位元組,什麼是低位位元組。它只知道按順序讀取位元組,先讀第一個位元組,再讀第二個位元組。如果是大端位元組序,先讀到的就是高位位元組,後讀到的就是低位位元組。小端位元組序正好相反。只有讀取的時候,才必須區分位元組序,其他情況都不用考慮.
     處理器讀取外部資料的時候,必須知道資料的位元組序,將其轉成正確的值。然後,就正常使用這個值,完全不用再考慮位元組序。C語言的高效還真是有原因的。作者還舉個了個例子,執行一下:

在80X86平臺上,系統將多位元組中的低位儲存在變數起始地址,使用小端法。htonl將i_num轉換成網路位元組序,可見網路位元組序是大端法。

二 intset資料結構

 intset 資料結構相對簡單,原始碼在intset.c,intset.h

typedef struct intset {
    
    // 編碼方式
    uint32_t encoding;

    // 集合包含的元素數量
    uint32_t length;

    // 儲存元素的陣列
    int8_t contents[];

} intset;

    需要注意contents陣列成員被宣告為int8_t型別並不表示contents裡存的是int8_t型別的成員,這個型別宣告對於contents來說可以認為是毫無意義的,因為intset成員是什麼型別完全取決於encoding變數的值。encoding提供下面三種值:

#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))

      雖然每個成員的“實際型別”是int8_t,無法直接通過contents[x]取出索引為x的成員元素,但是intset.c裡提供了些函式,可以按照不同的encoding方式設定/取出contents的成員。(用指標設定,memcpy取出)。這裡就牽扯一開列出來的位元組序。
     如果通過contents[x]的方式賦值取值,我們就不需要考慮這個位元組序的問題,但是intset根據encoding的值指定元素的地址偏移,暴力地對記憶體進行操作。若資料被截斷了,則大端機器和小端機器會表現出不統一的狀況。為了避免這種情況發生,intset不管在什麼機器上都按照同一種位元組序(小端)在記憶體中存intset的成員變數。

具體跟位元組序有關的程式碼在endianconv.c,endianconv.h

#ifndef __ENDIANCONV_H
#define __ENDIANCONV_H

#include "config.h"
#include <stdint.h>

void memrev16(void *p);
void memrev32(void *p);
void memrev64(void *p);
uint16_t intrev16(uint16_t v);
uint32_t intrev32(uint32_t v);
uint64_t intrev64(uint64_t v);

/* variants of the function doing the actual convertion only if the target
 * host is big endian */
#if (BYTE_ORDER == LITTLE_ENDIAN)
#define memrev16ifbe(p)
#define memrev32ifbe(p)
#define memrev64ifbe(p)
#define intrev16ifbe(v) (v)
#define intrev32ifbe(v) (v)
#define intrev64ifbe(v) (v)
#else
#define memrev16ifbe(p) memrev16(p)
#define memrev32ifbe(p) memrev32(p)
#define memrev64ifbe(p) memrev64(p)
#define intrev16ifbe(v) intrev16(v)
#define intrev32ifbe(v) intrev32(v)
#define intrev64ifbe(v) intrev64(v)
#endif

/* The functions htonu64() and ntohu64() convert the specified value to
 * network byte ordering and back. In big endian systems they are no-ops. */
#if (BYTE_ORDER == BIG_ENDIAN)
#define htonu64(v) (v)
#define ntohu64(v) (v)
#else
#define htonu64(v) intrev64(v)
#define ntohu64(v) intrev64(v)
#endif

#ifdef REDIS_TEST
int endianconvTest(int argc, char *argv[]);
#endif

#endif


/* Toggle the 16 bit unsigned integer pointed by *p from little endian to
 * big endian */
void memrev16(void *p) {
    unsigned char *x = p, t;

    t = x[0];
    x[0] = x[1];
    x[1] = t;
}

/* Toggle the 32 bit unsigned integer pointed by *p from little endian to
 * big endian */
void memrev32(void *p) {
    unsigned char *x = p, t;

    t = x[0];
    x[0] = x[3];
    x[3] = t;
    t = x[1];
    x[1] = x[2];
    x[2] = t;
}

/* Toggle the 64 bit unsigned integer pointed by *p from little endian to
 * big endian */
void memrev64(void *p) {
    unsigned char *x = p, t;

    t = x[0];
    x[0] = x[7];
    x[7] = t;
    t = x[1];
    x[1] = x[6];
    x[6] = t;
    t = x[2];
    x[2] = x[5];
    x[5] = t;
    t = x[3];
    x[3] = x[4];
    x[4] = t;
}

uint16_t intrev16(uint16_t v) {
    memrev16(&v);
    return v;
}

uint32_t intrev32(uint32_t v) {
    memrev32(&v);
    return v;
}

uint64_t intrev64(uint64_t v) {
    memrev64(&v);
    return v;
}

看程式碼可知道這些方法是用來低位與高位交換,實現統一的目的。

三API實現

底層賦值/取值操作

通過_intsetSet和_intsetGet這兩個工具函式,可以根據intset的encoding 讀/寫contents裡索引為pos的值。這是後續intset操作的基礎。


/* Return the value at pos, using the configured encoding. 
 *
 * 根據集合的編碼方式,返回底層陣列在 pos 索引上的值
 *
 * T = O(1)
 */
static int64_t _intsetGet(intset *is, int pos) {
    return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding));
}

/* Set the value at pos, using the configured encoding. 
 *
 * 根據集合的編碼方式,將底層陣列在 pos 位置上的值設為 value 。
 *
 * T = O(1)
 */
static void _intsetSet(intset *is, int pos, int64_t value) {

    // 取出集合的編碼方式
    uint32_t encoding = intrev32ifbe(is->encoding);

    // 根據編碼 ((Enc_t*)is->contents) 將陣列轉換回正確的型別
    // 然後 ((Enc_t*)is->contents)[pos] 定位到陣列索引上
    // 接著 ((Enc_t*)is->contents)[pos] = value 將值賦給陣列
    // 最後, ((Enc_t*)is->contents)+pos 定位到剛剛設定的新值上 
    // 如果有需要的話, memrevEncifbe 將對值進行大小端轉換
    if (encoding == INTSET_ENC_INT64) {
        ((int64_t*)is->contents)[pos] = value;
        memrev64ifbe(((int64_t*)is->contents)+pos);
    } else if (encoding == INTSET_ENC_INT32) {
        ((int32_t*)is->contents)[pos] = value;
        memrev32ifbe(((int32_t*)is->contents)+pos);
    } else {
        ((int16_t*)is->contents)[pos] = value;
        memrev16ifbe(((int16_t*)is->contents)+pos);
    }
}

建立一個空intset

空intset的預設encoding是INTSET_ENC_INT16,contents每個成員的邏輯型別是int16_t(雖然還沒有成員)

/* Create an empty intset. 
 *
 * 建立並返回一個新的空整數集合
 *
 * T = O(1)
 */
intset *intsetNew(void) {

    // 為整數集合結構分配空間
    intset *is = zmalloc(sizeof(intset));

    // 設定初始編碼
    is->encoding = intrev32ifbe(INTSET_ENC_INT16);

    // 初始化元素數量
    is->length = 0;

    return is;
}

插入節點

/* Insert an integer in the intset 
 * 
 * 嘗試將元素 value 新增到整數集合中。
 *
 * *success 的值指示新增是否成功:
 * - 如果新增成功,那麼將 *success 的值設為 1 。
 * - 因為元素已存在而造成新增失敗時,將 *success 的值設為 0 。
 *
 * T = O(N)
 */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
   // 計算編碼 value 所需的長度
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;
     //valenc: 新元素適合的儲存編碼
     //is->encoding: 當前集合的編碼
     //當 valenc> is->encoding時,表明當前集合無法儲存新元素,那麼此時需要對集合進行升級。
     //反之,檢查集合是否存在該元素
    if (valenc > intrev32ifbe(is->encoding)) {
        return intsetUpgradeAndAdd(is,value);
    } else {
        //查詢新元素是否在集合中,如果存在,則return 1, 否則返回0,並設定插入的位置
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0;
            return is;
        } 
          // 為 value 在集合中分配空間       
        is = intsetResize(is,intrev32ifbe(is->length)+1);
        //移動pos之後的元素,為新元素騰出空間 
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }
     // 將新值設定到底層陣列的指定位置中
    _intsetSet(is,pos,value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

 通過呼叫 _intsetValueEncoding 得到value合適的編碼valenc。 

/* Return the required encoding for the provided value. 
 *
 * 返回適用於傳入值 v 的編碼方式
 *
 * T = O(1)
 */
static uint8_t _intsetValueEncoding(int64_t v) {
    if (v < INT32_MIN || v > INT32_MAX)
        return INTSET_ENC_INT64;
    else if (v < INT16_MIN || v > INT16_MAX)
        return INTSET_ENC_INT32;
    else
        return INTSET_ENC_INT16;
}

intsetUpgradeAndAdd: 僅供內部使用,其功能是對集合進行升級,並新增新元素。

static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    //當前集合編碼
    uint8_t curenc = intrev32ifbe(is->encoding);
    //新編碼
    uint8_t newenc = _intsetValueEncoding(value);
    //集合大小
    int length = intrev32ifbe(is->length);
    //方向標識
    int prepend = value < 0 ? 1 : 0;
    is->encoding = intrev32ifbe(newenc);
    //擴充空間
    is = intsetResize(is,intrev32ifbe(is->length)+1);
    //移動集合元素
    while(length--)
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
    //prepend = 1:表明value是最小的負數,將新元素新增到集合首位。
    //prepend = 0:表明value是最大的整數,將新元素新增至集合尾部。
    if (prepend)
        _intsetSet(is,0,value);
    else
        _intsetSet(is,intrev32ifbe(is->length),value);
    //設定長度
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

書上畫圖講了升級的過程,結合程式碼來看升級過程:根據集合原來的編碼方式,從底層陣列中取出集合元素
     然後再將元素以新編碼的方式新增到集合中。    當完成了這個步驟之後,集合中所有原有的元素就完成了從舊編碼到新編碼的轉換。因為新分配的空間都放在陣列的後端,所以程式先從後端向前端移動元素,就是length--的過程。

   然後插入頭或者尾,升級之後新元素的擺放位置

因為引發升級的新元素的長度總是比整數集合現有所有元素的長度都大, 所以這個新元素的值要麼就大於所有現有元素, 要麼就小於所有現有元素:

  • 在新元素小於所有現有元素的情況下, 新元素會被放置在底層陣列的最開頭(索引 0 );
  • 在新元素大於所有現有元素的情況下, 新元素會被放置在底層陣列的最末尾(索引 length-1 )。

intsetSearch : 它的作用是在整數集合裡用二分法找到value的位置,並把位置寫給pos引數,函式返回1;若沒找到,則寫給pos的是能被插入的value的位置(intset按順序儲存),函式返回0。

static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    /* The value can never be found when the set is empty */
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;
        return 0;
    } else {
        /* Check for the case where we know we cannot find the value,
         * but do know the insert position. */
         // 因為底層陣列是有序的,如果 value 比陣列中最後一個值都要大
        // 那麼 value 肯定不存在於集合中,
        // 並且應該將 value 新增到底層陣列的最末端
        if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
            if (pos) *pos = intrev32ifbe(is->length);//value可以被插入的位置
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
            return 0;
        }
    }

    //二分
    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
        }
    }

    if (value == cur) {
        if (pos) *pos = mid;//找到了
        return 1;
    } else {
        if (pos) *pos = min;
        return 0;
    }
}

movetoTail:移動元素到末尾

static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {

    void *src, *dst;

    // 要移動的元素個數
    uint32_t bytes = intrev32ifbe(is->length)-from;

    // 集合的編碼方式
    uint32_t encoding = intrev32ifbe(is->encoding);

    // 根據不同的編碼
    // src = (Enc_t*)is->contents+from 記錄移動開始的位置
    // dst = (Enc_t*)is_.contents+to 記錄移動結束的位置
    // bytes *= sizeof(Enc_t) 計算一共要移動多少位元組
    if (encoding == INTSET_ENC_INT64) {
        src = (int64_t*)is->contents+from;
        dst = (int64_t*)is->contents+to;
        bytes *= sizeof(int64_t);
    } else if (encoding == INTSET_ENC_INT32) {
        src = (int32_t*)is->contents+from;
        dst = (int32_t*)is->contents+to;
        bytes *= sizeof(int32_t);
    } else {
        src = (int16_t*)is->contents+from;
        dst = (int16_t*)is->contents+to;
        bytes *= sizeof(int16_t);
    }

    // 進行移動
    // T = O(N)
    memmove(dst,src,bytes);
}

集合from位置之後的元素 移至 to位置,內部使用c語言memmove函式保證移動過程中資料的完整性。注意複製之後from值不變,但是可以被覆蓋。

intsetRemove:從集合移除某個元素

intset *intsetRemove(intset *is, int64_t value, int *success) {

    // 計算 value 的編碼方式
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;

    // 預設設定標識值為刪除失敗
    if (success) *success = 0;

    // 當 value 的編碼大小小於或等於集合的當前編碼方式(說明 value 有可能存在於集合)
    // 並且 intsetSearch 的結果為真,那麼執行刪除
    // T = O(log N)
    if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {

        // 取出集合當前的元素數量
        uint32_t len = intrev32ifbe(is->length);

        /* We know we can delete */
        // 設定標識值為刪除成功
        if (success) *success = 1;
        //往前移位
         if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
        // 縮小陣列的大小,移除被刪除元素佔用的空間
        // T = O(N)
        is = intsetResize(is,len-1);
        // 更新集合的元素數量
        is->length = intrev32ifbe(len-1);
    }

    return is;
}

移除一個成員時不會改變intset的encoding,儘管移除這個成員之後所有成員的encoding都小於所在intset的encoding。也就是說intset內部只有”編碼升級”的過程,沒有”降級”的操作。當將唯一一個高位元素從將集合移除時,此時,集合不會轉換為低位編碼集合。 

總結:

  • 整數集合是集合鍵的底層實現之一。
  • 整數集合的底層實現為陣列, 這個陣列以有序、無重複的方式儲存集合元素, 在有需要時, 程式會根據新新增元素的型別, 改變這個陣列的型別。
  • 升級操作為整數集合帶來了操作上的靈活性, 並且儘可能地節約了記憶體。
  • 整數集合只支援升級操作, 不支援降級操作。

看程式碼可以知道,程式碼設計很巧妙,在靈活節約記憶體,因為基於有序陣列的一些操作:SADD、SREM 操作一個成員時,時間複雜度會是O(logn)。所以當整數集合資料量變大的時候,redis會用dict作為集合的底層實現,將SADD、SREM、SISMEMBER這些命令的時間複雜度降至O(1)。

參考: