1. 程式人生 > >Redis4.0原始碼解析--3種線性表

Redis4.0原始碼解析--3種線性表

為了大家看整體原始碼方便,我將加上了完整註釋的程式碼傳到了我的github上供大家直接下載:

上一章講了SDS動態字串,大概講了看的方向,其實更深層次的還是要請讀者自己看原始碼,我將原始碼加上了註釋,這樣大家看起來也更加的方便,本文講Redis中的連結串列。

Redis中的連結串列地位很高,除了Redis對外暴露的list功能用到內部的連結串列之外,其實內部的很多結構和功能都間接使用了連結串列來實現,Redis中連結串列的實現分為3個部分,也使用了3個C檔案來描述:ziplist.cadlist.cquicklist.c,其中quicklist.c是在前兩者的基礎上實現的Redis對外暴露的列表,也就是我們經常使用的lpush

linsertlindex等命令的具體實現,我們稱之為快速列表,既然他是基於前兩者(壓縮列表ziplist和雙向連結串列adlist)來實現的,那想要了解它就必須先了解前兩者。

細心的讀者應該注意到,我稱ziplist為列表,稱adlist為連結串列,個人理解的列表指的是記憶體連續或者大多數記憶體連續的資料結構,也就是平常所說的順序表,而連結串列則用於指僅僅在邏輯上連續而在記憶體上不連續的列表結構。adlist是一個雙向連結串列,各個節點包含了前一個節點和後一個節點指標,在quicklist中使用類似adlist的連結串列作為作為中控器,也就是連線一個又一個ziplist的連結串列巢狀層,quicklist

使用雙向連結串列儲存底層資料結構ziplist,這樣既保留了動態擴充套件連結串列的需求,又儘可能的使用了連續記憶體,提高了記憶體使用率和查詢效率,大家平常所用的LPUSHLRANGE等命令就是使用的quicklist

其實quicklist就是adlist這個通用雙向連結串列的思想加上ziplist的結合體,所以我們先來了解下通用連結串列adlist,它是Redis內部使用最多最廣泛的連結串列,比較簡單,也就是大家平常最常瞭解的連結串列,雖然實現方式沒有太多的特殊點,但我們也大致講下,方便我們後續讀quicklist中的雙向連結串列時做鋪墊。

一、通用雙向連結串列adlist

adlist

a double linked list,和這個間接普通的C原始檔名字以一樣,adlist的實現也是非常簡單明瞭,一個普通的雙向連結串列,我們先看其節點定義

typedef struct listNode {                                                                                            
    // 前一個節點                                                                                                    
    struct listNode *prev;                                                                                           
    struct listNode *next;                                                                                           
    // 節點的具體值指標,由於為void*,所以連結串列中節點的值型別可以完全各不相同                                         
    void *value;                                                                                                     
} listNode;   

節點的定義很簡單,儲存了前一個節點和後一個節點,值可以儲存任意值,按理說直接使用listNode就直接能夠構成連結串列結構,但是使用adlist定義的 list結構體操作會更加的方便,我們來看下使用該結構體更加方便

  typedef struct list {
      // 連結串列的首個元素
      listNode *head;
      // 連結串列的尾部元素
      // 之所以記錄尾部元素是因為可以方便的支援redis能夠通過負數表示從尾部倒數索引
      listNode *tail;
      // 節點拷貝函式,在對連結串列進行復制時會嘗試呼叫該函式
      // 如果沒有設定該函式則僅會對連結串列進行淺拷貝(直接拷貝將值的地址賦給新連結串列節點)
      void *(*dup)(void *ptr);
      // 在釋放連結串列節點元素記憶體前會嘗試呼叫該函式,相當於節點銷燬前的一個監聽
      void (*free)(void *ptr);
      // 在搜尋連結串列中節點時會呼叫該函式來判斷兩個節點是否相等
      // 如果沒有設定該函式則會直接比較兩個節點的記憶體地址
      int (*match)(void *ptr, void *key);
      // 連結串列當前的節點個數,即連結串列長度,方便統計(因為有提供給使用者獲取連結串列長度的命令llen)
      unsigned long len;
  } list;

雖然listNode本身可以表示連結串列,但是list結構體操作更加方便並且記錄了一些關鍵資訊,降低了查詢複雜度,另外由於list的函式指標,使得對於連結串列的複製、節點釋放、節點的搜尋可以更加的靈活,由呼叫者自由定義。特別是match函式,由於連結串列的值是各異的,所以如何比較兩個值是否相等是僅有連結串列的使用者才最清楚。

1.1 adlist連結串列插入值

建立連結串列的過程很簡單,不再單獨列出,僅是建立一個list結構體並設定初始值,我們看下在連結串列中插入值的過程

/*
   * 插入一個節點到指定節點的前面或者後面
   *
   * 引數列表
   *      1. list: 待操作的連結串列                                                                                                                                                                                                               
   *      2. old_node: 要插入到哪一個節點的前面或者後面
   *      3. value: 要插入的值
   *      4. after: 如果為0則插入到old_value的前面,非0則插入到old_value的後面
   *
   * 返回值
   *      返回值即是連結串列本身,僅是為了方便鏈式操作
   */
  list *listInsertNode(list *list, listNode *old_node, void *value, int after) {
      listNode *node;

      // 如果不能插入新節點則返回空告訴上層呼叫者插入失敗
      if ((node = zmalloc(sizeof(*node))) == NULL)
          return NULL;
      node->value = value;
      if (after) {
          // 插入到指定節點的後面
          node->prev = old_node;
          node->next = old_node->next;
          // 如果正好插入到了連結串列的尾部則將新插入的節點設定連結串列尾
          if (list->tail == old_node) {
              list->tail = node;
          }
      } else {
          // 插入到指定節點的前面
          node->next = old_node;
          node->prev = old_node->prev;
          // 如果正好插入到了連結串列頭部則將新的節點設定為連結串列頭
          if (list->head == old_node) {
              list->head = node;
          }
      }
      // 設定前後節點對應的前後指標值
      if (node->prev != NULL) {
          node->prev->next = node;
      }
      if (node->next != NULL) {
          node->next->prev = node;
      }
      list->len++;
      return list;
  }

1.2 adlist連結串列查詢

插入連結串列的過程基本上能夠了解到Redis這個雙向連結串列的內部結構以及設計原理,除了這個還剩下的就是對連結串列的查詢了,其中搜索listSearchKey很好的展示了連結串列查詢的過程

/*                                                                                                                 
   * 搜尋指定與key值匹配的節點, 返回第一個匹配的節點                                                                 
   * 如果有連結串列中有匹配函式則使用匹配函式,否則直接判斷key值地址與節點值地址是否相等                                 
   *                                                                                                                 
   * 引數列表                                                                                                        
   *      1. list: 待搜尋的連結串列                                                                                      
   *      2. key: 用於搜尋的key值                                                                                    
   *                                                                                                                 
   * 返回值                                                                                                          
   *      與key值匹配的節點或者空                                                                                    
   */                                                                                                                
  listNode *listSearchKey(list *list, void *key)
  {
      listIter iter;
      listNode *node;

      // 先重置迭代器的迭代起始位置為連結串列頭
      listRewind(list, &iter);
      // 呼叫連結串列的迭代器逐一遍歷連結串列元素
      while((node = listNext(&iter)) != NULL) {
          // 如果連結串列設定了節點匹配函式則使用否則直接比較記憶體地址
          if (list->match) {                                                                                                                                  
              if (list->match(node->value, key)) {
                  return node;
              }
          } else {
              if (key == node->value) {
                  return node;
              }                                                                                                      
          }                                                                                                          
      }                                                                                                              
      return NULL;                                                                                                   
  }

Redis的通用雙向連結串列實現比較簡單,通過這兩個函式基本上就對整個adlist有了一定的瞭解。

二、壓縮列表ziplist

Redis是非常注意節約記憶體的,極高的記憶體利用率是Redis的一大特點,也是因為目前伺服器的計算能力是大量富餘的,所以拿計算換記憶體是很值得的。
zippiest 的結構體比較複雜,先從最外層看起,結構體如下

<total-bytes><tail-offset><len><entry>...<entry><end-mark>

名稱均是按我自己的理解命名的,也就是

<總的記憶體分配大小> <末尾元素地址> <列表長度> <節點> <節點> ... <結束標記>

這個結構僅僅是根據原始碼邏輯構思出來的,在Redis中沒有宣告任何結構體來表示這個結構,壓縮列表ziplist的表示方法就是一個普通char*指標,再加上一大堆的巨集操作,就構成了這個壓縮列表,具體看下各個值的情況

  1. total-bytes:32位整型,表示ziplist所用的總記憶體數
  2. tail-offset: 表示列表最有一個元素的地址,之所以有它是因為Redis的風格是大量的支援倒序索引的,有了它就很方便在尾端進行操作。
  3. len:列表的長度,16位整型,為了表示更大意義上的長度值甚至無限長,當它小於2^16-1時表示的是節點的個數,但是等於2^16-1時則代表該列表長度不可儲存,必須要遍歷列表才能得出長度
  4. entry:表示真正存放資料的資料項,長度是不固定的,每個entry都有自己的資料結構,用於動態表示節點長度以及編碼方式
  5. end-mark:標記列表結束,固定值255

列表中的具體節點entry則顯得有點複雜了,它的結構是比較典型的TLV格式,前幾位來表示編碼型別,然後是資料長度,接著就是資料,具體的結構如下

<prevrawlen><len><data>

這幾個名稱是在Redis原始碼註釋中有出現的,分別代表著

  1. prevrawlen:前一個節點的總長度,該屬性本身長度也是動態的,當前一個節點的長度小於254時,則為1個char長度,其它情況長度則為5個char,第一位char為標記位(254),後4位char用於表示前一個節長度
  2. len:當前節點的真實資料的長度,和prevrawlen一樣,該屬性本身的長度也是動態的,如前文所說採用TLV形式,不同的型別對應不同的長度和資料儲存方式,稍後單獨講解
  3. data:實際的資料,分為字元或整型兩種形式儲存,具體形式由len中設定編碼決定

    對於len值編碼的設定一共分為9種,我們通過巨集ZIP_DECODE_LENGTH來了解下

    /*
    * 解析指定到entry節點並將編碼型別,儲存長度的元素的長度,列表長度的值設定到對應的變數中
    * 步驟如下
    *  1、先得到編碼型別,一共9種,分別表示使用了幾位字元來表示該節點的總長度
    *  2、編碼小於1100 0000共有3種類型,此型別下資料(data)儲存的都是字元(char)
    *      1. 00xxxxxx: 前兩位作為標誌位,後6位用來記錄長度
    *      2. 01xxxxxx xxxxxxxx 共2位: 使用14位來記錄長度,最大值位2^14 - 1
    *      3. 10xxxxxx xxxxxxxx...共5位: 使用32位來記錄長度(帶標記位的char整個捨棄不用),最大值2^32 - 1
    *  3、編碼大於1100 0000共規定了6種類型,長度均採用1個字元表示,每種型別資料的儲存格式也各不相同
    *      4. 1100 0000: data指標儲存資料格式為16位元組整型
    *      5. 1101 0000: data指標儲存資料格式為32位元組整型
    *      6. 1110 0000: data指標儲存資料格式為64位元組整型
    *      7. 1111 0000: data指標儲存資料格式為3位元組整型
    *      8. 1111 1110: data指標儲存資料格式為1位元組整型
    *      9. 1111 dddd: 特殊情況,後4位表示真實資料,0~12,也就是dddd的值減去1就是真實值
    *                    之所以減1是因為較小的數字肯定是從0開始,但1111 0000又和第6點衝突
    *                    最大隻到1101因為1110又和第8點衝突
    */
    define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do {                    \                                                                              
    ZIP_ENTRY_ENCODING((ptr), (encoding));                                     \
    if ((encoding) < ZIP_STR_MASK) {                                           \
        if ((encoding) == ZIP_STR_06B) {                                       \
            (lensize) = 1;                                                     \
            (len) = (ptr)[0] & 0x3f;                                           \
        } else if ((encoding) == ZIP_STR_14B) {                                \
            (lensize) = 2;                                                     \
            (len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1];                       \
        } else if ((encoding) == ZIP_STR_32B) {                                \
            (lensize) = 5;                                                     \
            (len) = ((ptr)[1] << 24) |                                         \
                    ((ptr)[2] << 16) |                                         \
                    ((ptr)[3] <<  8) |                                         \
                    ((ptr)[4]);                                                \
        } else {                                                               \
            panic("Invalid string encoding 0x%02X", (encoding));               \
        }                                                                      \
    } else {                                                                   \
        (lensize) = 1;                                                         \
        (len) = zipIntSize(encoding);                                          \
    }                                                                          \
    } while(0);

    根據不同的編碼型別,Redis使用盡可能小的記憶體對其進行儲存,瞭解了儲存結構,基本上就對壓縮列表ziplist瞭解了大半了,接下來我們看下它的插入操作

2.1 壓縮列表插入值

/*
 * 在壓縮列表指定位置插入一個字串值
 *
 * 引數列表
 *      1. zl: 待插入的壓縮列表
 *      2. p: 要插入到哪個位置
 *      3. s: 待插入的字串(不以NULL結尾)的起始地址
 *      4. slen: 待插入的字串的長度,由於不是標準的C字串,所以需要指定長度
 *
 * 返回值
 *      壓縮列表地址
 */
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    // 先取出當前壓縮列表總記憶體分配長度
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    // 這個初始化值只是為了防止編譯器警告
    long long value = 123456789; 
    zlentry tail;

    // 因為每個節點都會記錄上一個節點資料佔用的記憶體長度(方便倒序索引),所以先查出該值
    // 如果待插入的位置是壓縮列表的尾部, 則相當於尾部追加
    if (p[0] != ZIP_END) {
        // 如果不是插入尾部則根據p正常獲取前一個節點的長度
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        // 如果是尾部追加則先獲取列表中最後一個節點的地址(注意最後一個節點並不一定是列表結束)
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        // 如果最後一個節點也是空的(ptail[0]==列表結束標記)則代表整個壓縮列表都還是空列表
        // 如果不是空列表則正常取出最後一個節點的長度
        if (ptail[0] != ZIP_END) {
            // 取出尾部節點所佔記憶體字元長度
            prevlen = zipRawEntryLength(ptail);
        }
    }

    // 如果可以轉換為整型儲存則使用整型儲存
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        // 計算整型所佔長度
        // 1位: -128~127,2位: -32768~3276...
        reqlen = zipIntSize(encoding);
    } else {
        // 如果不能轉換為整型儲存則直接使用字串(char)方式儲存
        reqlen = slen;
    }
    // 除了儲存資料(V),一個節點還還需要儲存編碼型別(T)和節點長度(L)以及前一個節點的長度
    // 計算出儲存上一個節點長度的值所需要的記憶體大小
    reqlen += zipStorePrevEntryLength(NULL,prevlen);
    // 計算處需要儲存自己的編碼型別所需的記憶體大小
    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);

    // 計算出儲存該節點的長度所需的記憶體大小並嘗試賦值給該節點的下一個節點(每個都節點儲存上一個節點的長度)
    int forcelarge = 0;
    // 如果插入的節點不是列表尾的話,那該節點的下一個節點應該儲存該節點的長度
    // 計算出下一個節點之前已經分配的用於儲存上一個節點長度的記憶體和目前儲存實際所需記憶體的差距
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
    // 其實儲存長度值僅有兩種可能,小於254則使用一個char儲存,其它則使用5個char儲存
    if (nextdiff == -4 && reqlen < 4) {
        // 如果所需記憶體減少了(之前一個節點長度比當前節點長)
        // 但是當前節點又已經儲存為較小的整數的情況下(共兩種編碼)則不進行縮小了
        nextdiff = 0;
        forcelarge = 1;
    }

    offset = p-zl;
    // 根據新加入的元素所需擴充套件的記憶體重新申請記憶體
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    // 重新申請之後原來的p有可能失效(因為整塊列表地址都換了),所以根據原先偏移量重新計算出地址
    p = zl+offset;

    // 接下來開始挪動p兩端的位置並把新的節點插入
    if (p[0] != ZIP_END) {
        // 把p位置之後的元素都往後移動reqlen個位置,空出reqlen長度的記憶體給新節點使用
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
        // 將新節點的長度設定到後一個節點之中
        if (forcelarge)
            // 如果滿足我們前面計算nextdiff的所設定的不縮小條件則強行保留5個char來儲存新節點的長度
            zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
        else
            zipStorePrevEntryLength(p+reqlen,reqlen);

        // 設定zl頭部中尾部元素偏移量
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        // 節約變數,直接使用tail作為節點
        zipEntry(p+reqlen, &tail);
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        // 如果本身要插到尾部則元素偏移位置就是頭部到插入位置p的
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    // 如果下個節點的長度有所變化(因為儲存當前節點的長度所佔記憶體變化了)
    // 那意味著因為下個節點長度變化,下下個節點儲存下個節點長度的記憶體也發生了變化又導致下下個節點的長度變化
    // 這改變是個蝴蝶效應,所以需要逐一遍歷修改
    if (nextdiff != 0) {
        offset = p-zl;
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    /* Write the entry */
    // 將前一個節點的長度存入該節點首部
    p += zipStorePrevEntryLength(p,prevlen);
    // 儲存該節點資料編碼方式和長度
    p += zipStoreEntryEncoding(p,encoding,slen);
    if (ZIP_IS_STR(encoding)) {
        // 如果是字元編碼則直接拷貝
        memcpy(p,s,slen);
    } else {
        // 整型編碼則儲存對應整型
        zipSaveInteger(p,value,encoding);
    }
    // 將列表的長度加1
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}      

雖然程式碼中已經有很多的註釋,但還是簡單解釋一下,函式的功能是在指定的位置p插入一個新的entry,起始位置為p,資料的地址指標是s,原來位於p位置的資料項以及後面的所有資料項,需要統一向後偏移。該函式可以將資料插入到列表中的某個節點後,也可以插入到列表尾部。

  1. 首先計算出待插入位置的前一個entry的長度prevlen,稍後要將這個值存入到新節點的prevrawlen屬性中
  2. 計算新的entry總共需要記憶體數,一個entry包含3個部分,所以這個記憶體數是這3部分的總和,當然也可能因為值小於13而變成沒有data部分
  3. 壓縮列表有一個比較麻煩的地方就是每個節點都儲存了前一個節點的長度,而且儲存記憶體本身也是動態的,那麼當新節點插入,它的下一個節點則要儲存它的長度,這有可能引起下一個節點發生長度變化,因為可能原先下一個節點的prevrawlen僅需一個字元儲存,結果新的節點的長度大於254了,那就需要5個字元來儲存了,此時下一個節點的長度發生了變化,更可怕的是,由於下一個節點長度發生了變化,下下一個節點也面臨著同樣的問題,這就像是蝴蝶效應,一個小小的改動卻帶來驚天動地的變化, Redis稱之為瀑布式改變,當然Redis也做了些許優化,當節點嘗試變短時會根據某些條件僅可能避免這種大量改動的發生
  4. 既然長度發生了變化則要申請新的記憶體空間並將原來的值拷貝過去,之後就是生成新的節點,並將其插入到列表中,設定新節點的各個屬性值,當然還有對列表本身的長度和總記憶體等進行設定

2.2 壓縮列表獲取值

ziplist獲取值的方法基本上就是插入的逆序,根據編碼型別和值長度來算出具體值的位置並轉換為相應結果。

/*                                                                                                                   
 * 獲取p節點的實際資料的值並設定到sstr或sval中,如何設定取決於節點的編碼型別                                         
 *                                                                                                                   
 * 引數列表                                                                                                          
 *      1. p: 指定的節點,該節點為列表尾或者指標無效時則告訴呼叫者獲取節點值失敗                                     
 *      2. sstr: 出參字串,如果該節點是以字串形式編碼的話則會設定該出參                                          
 *      3. slen: 出參字串長度                                                                                      
 *      4. sval: 出參整型,如果該節點是以整型編碼(任何一種整型編碼)則會設定該出參為節點實際資料值                    
 *                                                                                                                   
 * 返回值                                                                                                            
 *      返回0代表指定的節點無效,返回1則代表節點有效併成功獲取到其實際資料值                                         
 */                                                                                                                  
unsigned int ziplistGet(unsigned char *p, unsigned char **sstr, unsigned int *slen, long long *sval) {               
    zlentry entry;                                                                                                   
    if (p == NULL || p[0] == ZIP_END) return 0;                                                                      
    // 呼叫者是以sstr有沒有被設定值來判斷該節點是以整型編碼還是字串編碼的                                          
    // 為了防止出現歧義所以強制將sstr先指向空                                                                        
    if (sstr) *sstr = NULL;                                                                                          

    // 將節點p的屬性設定到工具結構體中,這樣處理起來方便的多                                                         
    zipEntry(p, &entry);                                                                                                                                      
    if (ZIP_IS_STR(entry.encoding)) {                                                                                
        // 如果是以字串編碼則設定字串出參                                                                        
        if (sstr) {                                                                                                  
            *slen = entry.len;                                                                                       
            *sstr = p+entry.headersize;                                                                              
        }                                                                                                            
    } else {                                                                                                         
        if (sval) {                                                                                                  
            // 取出實際的整型資料                                                                                    
            *sval = zipLoadInteger(p+entry.headersize,entry.encoding);                                               
        }                                                                                                            
    }                                                                                                                
    return 1;                                                                                                        
}

ziplist沒有明確的定義,大多數操作都是通過巨集定義的,獲取值也不例外

/*
 * 設定壓縮列表節點的屬性值
 *
 * 引數列表
 *      1.p: 新節點記憶體的起始地址
 *      2.e: 一個節點結構體的指標
 */
void zipEntry(unsigned char *p, zlentry *e) {                                                                                                                 
    // 首先設定該節點第一個元素(儲存前一個節點的長度)
    ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);
    // 設定該節點的資料編碼型別和資料長度
    ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);
    // 記錄節點頭部總長度
    e->headersize = e->prevrawlensize + e->lensize;
    e->p = p;
}

三、快速連結串列quicklist

Redis暴露給使用者使用的list資料型別(即LPUSHLRANGE等系列命令),實現所用的內部資料結構就是quicklistquicklist的實現是一個封裝了ziplist的雙向連結串列,既然和adlist一樣就是個雙向連結串列,那我們在已經瞭解adlist的情況下學習quicklist就會快很多,但是quicklist要比adlist複雜的多,原因在於額外的壓縮和對ziplist的封裝,首先我們來看下它是如何ziplist的,每一個ziplist都會被封裝為一個quicklistNode,它的結構如下

/*
 * 快速列表的具體節點
 */ 
typedef struct quicklistNode {
    // 前一個節點
    struct quicklistNode *prev;
    // 後一個節點
    struct quicklistNode *next;
    // ziplist首部指標,各節點的實際資料項儲存在ziplist中(連續記憶體空間的壓縮列表)
    unsigned char *zl;
    // ziplist佔用的總記憶體大小,不論壓縮與否都是儲存實際的總記憶體大小
    unsigned int sz;            
    // ziplist的資料項的個數
    unsigned int count : 16;    
    // 該節點是否被壓縮過了,1代表沒壓縮,2代表使用LZF演算法壓縮過了
    // 可能以後會有別的壓縮演算法,目前則只有這一種壓縮演算法
    unsigned int encoding : 2;  
    // 該節點使用何種方式來儲存資料,1代表沒儲存資料,2代表使用ziplist儲存資料
    // 這個節點目前看來都是2,即使用ziplist來儲存資料,後續可能會有別的方式
    unsigned int container : 2;  
    // 這個節點是否需要重新壓縮?
    // 某些情況下需要臨時解壓下這個節點,有這個標記則會找機會再重新進行壓縮
    unsigned int recompress : 1; 
    // 節點資料不能壓縮?
    unsigned int attempted_compress : 1; 
    // 只是一個int正好剩下的記憶體,目前還沒使用上,可以認為是擴充套件欄位
    unsigned int extra : 10; 
} quicklistNode;

可以清楚到看到快速連結串列的節點(quicklistNode)主要是對ziplist封裝,複雜的地方在於控制各個ziplist的長度和壓縮情況,從結構設計上可以看到Redis可能還打算使用別的結構代替ziplist作為儲存實際資料的節點,但目前在4.0版本中僅有ziplist這一種,壓縮演算法也只有lzf

3.1 建立快速連結串列

當用戶執行LPUSH命令時,如果指定的列表名稱在Redis不存在則會建立一個新的快速連結串列,程式碼呼叫路徑大致如下

server.c 事件迴圈 --> 呼叫module API --> module.c moduleCreateEmptyKey() --> object.c createQuicklistObject() --> quicklist.c quicklistCreate()

主要判斷邏輯在module.c

/*
 * LPUSH命令的實現
 * 將元素加入到一個Redis List集合中(快速連結串列quicklist),如果該key的List不存在則會建立一個List
 * 當key存在確不是List型別時則會丟擲型別不符合錯誤
 *
 */
int RM_ListPush(RedisModuleKey *key, int where, RedisModuleString *ele) {
    // 如果對應的key是隻讀的則會返回鍵值不可寫錯誤
    if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
    // 如果存在key但是型別不是List則會返回型別不符合錯誤
    if (key->value && key->value->type != OBJ_LIST) return REDISMODULE_ERR;
    // 如果指定key不存在則建立一個quicklist型別的物件
    if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_LIST);                                                                              
    // 將具體的值存入List值
    listTypePush(key->value, ele, 
        (where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL);
    return REDISMODULE_OK;
}

之後就是呼叫quicklist.c中的方法來建立一個快速連結串列

/*
 * 建立一個快速連結串列
 * 當使用LPUSH建立List時會呼叫該函式
 *
 * 返回值
 *      新的快速連結串列的指標
 */
quicklist *quicklistCreate(void) {                                                                                                                            
    struct quicklist *quicklist;

    quicklist = zmalloc(sizeof(*quicklist));
    quicklist->head = quicklist->tail = NULL;
    quicklist->len = 0;
    quicklist->count = 0;
    quicklist->compress = 0;
    // -2代表ziplist的大小不超過8kb
    quicklist->fill = -2;
    return quicklist;
}

3.2 快速連結串列插入值

插入值的方式有很多種,比如從插入到頭部、插入到尾部、插入到某個節點前面、從其他ziplist匯入等等,但原理都差不多,我們這裡僅看插入到頭部即可

/*                                                                                                                 
   * 在連結串列的首部新增一個節點                                                                                        
   *                                                                                                                 
   * 引數列表                                                                                                        
   *      1. quicklist: 待操作的快速連結串列                                                                             
   *      2. value: 待插入的值                                                                                       
   *      3. sz: 值的記憶體長度                                                                                        
   *                                                                                                                 
   * 返回值                                                                                                          
   *      返回1代表建立了一個新的節點,返回0代表使用了既有的節點                                                     
   */                                                                                                                
  int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {                                              
      quicklistNode *orig_head = quicklist->head;                                                                    
      // likely是條件大概率為真時的語法優化寫法                                                                      
      // 首先需要判斷當前快速連結串列節點是否能夠再新增值                                                                
      if (likely(                                                                                                    
              _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {                                    
          // 能的話則將值插入到當前節點對應的ziplist中即可                                                           
          quicklist->head->zl =                                                                                      
              ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);                                             
          quicklistNodeUpdateSz(quicklist->head);                                                                    
      } else {
          // 不能則建立一個新的快速連結串列節點並將值插入
          quicklistNode *node = quicklistCreateNode();                                                                                                        
          node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);                                             

          quicklistNodeUpdateSz(node);                                                                               
          _quicklistInsertNodeBefore(quicklist, quicklist->head, node);                                              
      }                                                                                                              
      quicklist->count++;                                                                                            
      quicklist->head->count++;                                                                                      
      return (orig_head != quicklist->head);                                                                         
  }

3.3 從快速連結串列中獲取值

獲取值最麻煩的地方在於需要解壓ziplist,目前Redis使用的是lzf壓縮演算法(也可以說是個編碼演算法),要注意的是quicklist中的獲取值都是指獲取真實的資料項的值,也就是儲存在各個ziplist中的資料項,而不是指quicklistNode

/*
 * 獲取指定位置的節點
 *
 * 引數列表
 *      1. quicklist: 待操作的連結串列
 *      2. idx: 節點位置序號,大於0表示從連結串列頭開始索引,小於代表從連結串列尾部開始索引
 *              注意這個序號是所有ziplist的所有節點的序號,不是quicklist節點的序號
 *      3. entry: 出參,如果找到節點則將節點的屬性設定到該entry中
 *
 * 返回值
 *      返回1代表成功找到指定位置節點,否則返回0
 */
int quicklistIndex(const quicklist *quicklist, const long long idx,
                   quicklistEntry *entry) {
    quicklistNode *n;
    unsigned long long accum = 0;
    unsigned long long index;
    // 小於0從後往前搜尋
    int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */                                                                                                                                                                                                                                                                                               

    // 這裡會對entry設定一些初始值,所以必須通過該函式返回值判斷獲取成功失敗
    // 而不能通過entry是否設定來判斷
    initEntry(entry);
    entry->quicklist = quicklist;

    if (!forward) {
        // 從尾部開始遍歷-1代表第1個節點(位置0),-2代表第二個節點(位置1)
        index = (-idx) - 1;
        n = quicklist->tail;
    } else {
        index = idx;
        n = quicklist->head;
    }

    // 如果指定位置超出了連結串列本身長度
    if (index >= quicklist->count)
        return 0;

    // 編譯器和linux系統的一種優化語法糖
    // 當條件為真的可能性很大時使用該寫法可以提高執行效率
    while (likely(n)) {
        // 這個迴圈只能算出想要的節點在哪個ziplist中,後續再從ziplist取出真正節點
        if ((accum + n->count) > index) {
            break;
        } else {
            D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
              accum);
            // 每個快速列表的節點都記錄了它附帶的ziplist中的節點個數
            accum += n->count;
            n = forward ? n->next : n->prev;
        }
    }
    // 如果沒有找到指定節點則返回失敗
    if (!n)
        return 0;
    // 除錯日誌
    D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
      accum, index, index - accum, (-index) - 1 + accum);
    entry->node = n;
    // 設定在當前ziplist中還要偏移多少個位置才是真正的資料節點
    if (forward) {
        entry->offset = index - accum;
    } else {
        entry->offset = (-index) - 1 + accum;
    }

    // 解壓當前節點的ziplist,由於是將該節點給呼叫者使用,所以解壓之後不再重新壓縮
    // 由呼叫者根據重壓縮標誌決定是否需要再壓縮
    quicklistDecompressNodeForUse(entry->node);
    // 獲取實際的資料節點首部指標
    entry->zi = ziplistIndex(entry->node->zl, entry->offset);
    // 到此已找到資料節點,現把資料節點中的實際資料取出並根據編碼型別設定不同屬性
    // 值得注意的是呼叫者通過entry的value屬性是否有值來判斷實際資料是否是字串編碼
    ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
    return 1;
}

我們看到最終的出參是quicklistEntry,這是一個工具型結構體,主要用於中間過渡和方便程式呼叫,在ziplist的實現中也有類似的工具型結構體,quicklistEntry的定義如下

// 快速列表節點表示的工具型結構體
// 和ziplist的zlenty類似,一切為了操作方便
typedef struct quicklistEntry {                                                                                                                               
    // 快速連結串列
    const quicklist *quicklist;
    // 對應的節點
    quicklistNode *node;
    // 在ziplist中的實際的資料節點的首部指標
    unsigned char *zi;
    // 如果實際資料是字串編碼型別則值設定在該屬性中
    unsigned char *value;
    // 如果實際資料是整型編碼型別則值設定在該屬性中
    long long longval;
    // 不同使用場景下表示意義稍有不同
    // 獲取指定節點實際資料值時表示字串編碼情況下字串的長度
    unsigned int sz;
    int offset;
} quicklistEntry;

我們經常使用的LRANGE命令則是通過連結串列的迭代器來實現的,其實adlistziplist都是有迭代器的,通過迭代器可以從指定位置開始逐個遍歷連結串列中的值,非常方便且安全。
LRANGE的主要呼叫流程如下

server.c 事件迴圈 --> 命令表 lrangeCommand命令 --> t_list.c lrangeCommand() --> quicklist.c quicklistGetIteratorAtIdx() --> quicklist.c quicklistNext()

初始化迭代器的過程很簡單

/*                                                                                                                   
 * 建立一個從連結串列指定位置開始的迭代器                                                                                
 *                                                                                                                   
 * 引數列表