1. 程式人生 > >Redis(四):del/unlink 命令原始碼解析

Redis(四):del/unlink 命令原始碼解析

  上一篇文章從根本上理解了set/get的處理過程,相當於理解了 增、改、查的過程,現在就差一個刪了。本篇我們來看一下刪除過程。

  對於客戶端來說,刪除操作無需區分何種資料型別,只管進行 del 操作即可。

 

零、刪除命令 del 的定義

  主要有兩個: del/unlink, 差別是 unlink 速度會更快, 因為其使用了非同步刪除優化模式, 其定義如下:

    // 標識只有一個 w, 說明就是一個普通的寫操作,沒啥好說的
    {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}
    // 標識為 wF, 說明它是一個快速寫的操作,其實就是有一個非同步優化的過程,稍後詳解
    {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0}

 

一、delCommand

  delCommand 的作用就是直接刪除某個 key 的資料,釋放記憶體即可。

// db.c, del 命令處理    
void delCommand(client *c) {
    // 同步刪除
    delGenericCommand(c,0);
}

/* This command implements DEL and LAZYDEL. */
void delGenericCommand(client *c, int lazy) {
    int numdel = 0, j;

    for (j = 1; j < c->argc; j++) {
        // 自動過期資料清理
        expireIfNeeded(c->db,c->argv[j]);
        // 此處分同步刪除和非同步刪除, 主要差別在於對於複雜資料型別的刪除方面,如hash,list,set...
        // 針對 string 的刪除是完全一樣的
        int deleted  = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
                              dbSyncDelete(c->db,c->argv[j]);
        // 寫命令的傳播問題
        if (deleted) {
            signalModifiedKey(c->db,c->argv[j]);
            notifyKeyspaceEvent(NOTIFY_GENERIC,
                "del",c->argv[j],c->db->id);
            server.dirty++;
            numdel++;
        }
    }
    // 響應刪除資料量, 粒度到 key 級別
    addReplyLongLong(c,numdel);
}

  框架程式碼一看即明,只是相比於我們普通的刪除是多了不少事情。否則也不存在設計了。

 

二、unlinkCommand

  如下,其實和del是一毛一樣的,僅是變化了一個 lazy 標識而已。

// db.c, unlink 刪除處理
void unlinkCommand(client *c) {
    // 與 del 一致,只是 lazy 標識不一樣
    delGenericCommand(c,1);
}

 

三、刪除資料過程詳解

  刪除資料分同步和非同步兩種實現方式,道理都差不多,只是一個是後臺刪一個是前臺刪。我們分別來看看。

1. 同步刪除 dbSyncDelete

  同步刪除很簡單,只要把對應的key刪除,val刪除就行了,如果有內層引用,則進行遞迴刪除即可。

// db.c, 同步刪除資料
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    // 首先從 expires 佇列刪除,然後再從 db->dict 中刪除
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}
// dict.c, 如上, 僅僅是 dictDelete() 就可以了,所以真正的刪除動作是在 dict 中實現的。
int dictDelete(dict *ht, const void *key) {
    // nofree: 0, 即要求釋放記憶體
    return dictGenericDelete(ht,key,0);
}
// dict.c, nofree: 0:要釋放相應的val記憶體, 1:不釋放相應val記憶體只刪除key
/* Search and remove an element */
static int dictGenericDelete(dict *d, const void *key, int nofree)
{
    unsigned int h, idx;
    dictEntry *he, *prevHe;
    int table;

    if (d->ht[0].size == 0) return DICT_ERR; /* d->ht[0].table is NULL */
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
    // ht[0] 和 ht[1] 如有可能都進行掃描
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        prevHe = NULL;
        while(he) {
            if (dictCompareKeys(d, key, he->key)) {
                /* Unlink the element from the list */
                if (prevHe)
                    prevHe->next = he->next;
                else
                    d->ht[table].table[idx] = he->next;
                // no nofree, 就是要 free 記憶體咯
                if (!nofree) {
                    // 看起來 key/value 需要單獨釋放記憶體哦
                    dictFreeKey(d, he);
                    dictFreeVal(d, he);
                }
                zfree(he);
                d->ht[table].used--;
                return DICT_OK;
            }
            prevHe = he;
            he = he->next;
        }
        // 如果沒有進行 rehashing, 只需掃描0就行了
        if (!dictIsRehashing(d)) break;
    }
    return DICT_ERR; /* not found */
}

  其實對於有GC收集器的語言來說,根本不用關注記憶體的釋放問題,自有後臺工具處理,然而對於 c 語言這種級別語言,則是需要自行關注記憶體的。這也是本文存在的意義,不然對於一個 hash 表的元素刪除操作,如上很難嗎?並沒有。

  下面,我們就來看看 redis 是如何具體釋放記憶體的吧。

// dict.h, 釋放key, value 的邏輯也是非常簡單,用一個巨集就定義好了
// 釋放依賴於 keyDestructor, valDestructor
#define dictFreeKey(d, entry) \
    if ((d)->type->keyDestructor) \
        (d)->type->keyDestructor((d)->privdata, (entry)->key)
#define dictFreeVal(d, entry) \
    if ((d)->type->valDestructor) \
        (d)->type->valDestructor((d)->privdata, (entry)->v.val)
// 所以,我們有必要回去看看 key,value 的析構方法
// 而這,又依賴於具體的資料型別,也就是你在 setXXX 的時候用到的資料型別
// 我們看一下這個 keyDestructor,valDestructor 初始化的樣子
// server.c  kv的解構函式定義
/* Db->dict, keys are sds strings, vals are Redis objects. */
dictType dbDictType = {
    dictSdsHash,                /* hash function */
    NULL,                       /* key dup */
    NULL,                       /* val dup */
    dictSdsKeyCompare,          /* key compare */
    dictSdsDestructor,          /* key destructor */
    dictObjectDestructor   /* val destructor */
};

// 1. 先看看 key destructor, key 的釋放
// server.c, 直接呼叫 sds 提供的服務即可
void dictSdsDestructor(void *privdata, void *val)
{
    DICT_NOTUSED(privdata);
    // sds 直接釋放key就行了
    sdsfree(val);
}
// sds.c, 真正釋放 value 記憶體
/* Free an sds string. No operation is performed if 's' is NULL. */
void sdsfree(sds s) {
    if (s == NULL) return;
    // zfree, 確實很簡單嘛, 因為 sds 是連續的記憶體空間,直接使用系統提供的方法即可刪除
    s_free((char*)s-sdsHdrSize(s[-1]));
}

// 2. value destructor 對value的釋放, 如果說 key 一定是string格式的話,value可主不一定了,因為 redis提供豐富的資料型別呢
// server.c
void dictObjectDestructor(void *privdata, void *val)
{
    DICT_NOTUSED(privdata);

    if (val == NULL) return; /* Lazy freeing will set value to NULL. */
    decrRefCount(val);
}
// 減少 value 的引用計數
void decrRefCount(robj *o) {
    if (o->refcount == 1) {
        switch(o->type) {
            // string 型別
            case OBJ_STRING: freeStringObject(o); break;
            // list 型別
            case OBJ_LIST: freeListObject(o); break;
            // set 型別
            case OBJ_SET: freeSetObject(o); break;
            // zset 型別
            case OBJ_ZSET: freeZsetObject(o); break;
            // hash 型別
            case OBJ_HASH: freeHashObject(o); break;
            default: serverPanic("Unknown object type"); break;
        }
        zfree(o);
    } else {
        if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
        if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
    }
}

  額,可以看出,對key的釋放自然是簡單之極。而對 value 則謹慎許多,首先它表面上只對引用做減操作。只有發只剩下1個引用即只有當前引用的情況下,本次釋放就是最後一次釋放,所以才會回收記憶體。

// 在介紹不同資料型別的記憶體釋放前,我們可以先來看下每個元素的資料結構
// dict.h
typedef struct dictEntry {
    // 儲存 key 欄位內容
    void *key;
    // 用一個聯合體儲存value
    union {
        // 儲存資料時使用 *val 儲存
        void *val;
        uint64_t u64;
        // 儲存過期時間時使用該欄位
        int64_t s64;
        // 儲存 score 時使用
        double d;
    } v;
    // 存在hash衝突時,作連結串列使用
    struct dictEntry *next;
} dictEntry;

// 1. string 型別的釋放
// object.c
void freeStringObject(robj *o) {
    // 直接呼叫 sds服務釋放
    if (o->encoding == OBJ_ENCODING_RAW) {
        sdsfree(o->ptr);
    }
}

// 2. list 型別的釋放
// object.c
void freeListObject(robj *o) {
    switch (o->encoding) {
    case OBJ_ENCODING_QUICKLIST:
        quicklistRelease(o->ptr);
        break;
    default:
        serverPanic("Unknown list encoding type");
    }
}
// quicklist.c
/* Free entire quicklist. */
void quicklistRelease(quicklist *quicklist) {
    unsigned long len;
    quicklistNode *current, *next;

    current = quicklist->head;
    len = quicklist->len;
    // 連結串列依次迭代就可以釋放完成了
    while (len--) {
        next = current->next;
        // 釋放list具體值
        zfree(current->zl);
        quicklist->count -= current->count;
        // 釋放list物件
        zfree(current);

        quicklist->len--;
        current = next;
    }
    zfree(quicklist);
}

// 3. set 型別的釋放
// object.c, set 分兩種型別, ht, intset
void freeSetObject(robj *o) {
    switch (o->encoding) {
    case OBJ_ENCODING_HT:
        // hash 型別則需要刪除每個 hash 的 kv
        dictRelease((dict*) o->ptr);
        break;
    case OBJ_ENCODING_INTSET:
        // intset 直接釋放
        zfree(o->ptr);
        break;
    default:
        serverPanic("Unknown set encoding type");
    }
}
// dict.c, 
/* Clear & Release the hash table */
void dictRelease(dict *d)
{
    // ht[0],ht[1] 依次清理
    _dictClear(d,&d->ht[0],NULL);
    _dictClear(d,&d->ht[1],NULL);
    zfree(d);
}
// dict.c, 
/* Destroy an entire dictionary */
int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
    unsigned long i;

    /* Free all the elements */
    for (i = 0; i < ht->size && ht->used > 0; i++) {
        dictEntry *he, *nextHe;

        if (callback && (i & 65535) == 0) callback(d->privdata);
        // 元素為空,hash未命中,但只要 used > 0, 代表就還有需要刪除的元素存在
        // 其實對於只有少數幾個元素的情況下,這個效率就呵呵了
        if ((he = ht->table[i]) == NULL) continue;
        while(he) {
            nextHe = he->next;
            // 這裡的釋放 kv 邏輯和前面是一致的
            // 看起來像是遞迴,其實不然,因為redis不存在資料型別巢狀問題,比如 hash下儲存hash, 所以不會存在遞迴
            // 具體結構會在後續解讀到
            dictFreeKey(d, he);
            dictFreeVal(d, he);
            zfree(he);
            ht->used--;
            he = nextHe;
        }
    }
    /* Free the table and the allocated cache structure */
    zfree(ht->table);
    /* Re-initialize the table */
    _dictReset(ht);
    return DICT_OK; /* never fails */
}

// 4. hash 型別的釋放
// object.c, hash和set其實是很相似的,程式碼也做了大量的複用
void freeHashObject(robj *o) {
    switch (o->encoding) {
    case OBJ_ENCODING_HT:
        // ht 形式與set一致
        dictRelease((dict*) o->ptr);
        break;
    case OBJ_ENCODING_ZIPLIST:
        // ziplist 直接釋放
        zfree(o->ptr);
        break;
    default:
        serverPanic("Unknown hash encoding type");
        break;
    }
}

// 5. zset 型別的釋放
// object.c, zset 的儲存形式與其他幾個
void freeZsetObject(robj *o) {
    zset *zs;
    switch (o->encoding) {
    case OBJ_ENCODING_SKIPLIST:
        zs = o->ptr;
        // 釋放dict 資料, ht 0,1 的釋放
        dictRelease(zs->dict);
        // 釋放skiplist 資料, 主要看下這個
        zslFree(zs->zsl);
        zfree(zs);
        break;
    case OBJ_ENCODING_ZIPLIST:
        zfree(o->ptr);
        break;
    default:
        serverPanic("Unknown sorted set encoding");
    }
}
// t_zset.c, 釋放跳錶資料
/* Free a whole skiplist. */
void zslFree(zskiplist *zsl) {
    zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree(zsl->header);
    while(node) {
        // 基於第0層資料釋放,也基於第0層做迭代,直到刪除完成
        // 因為其他層資料都是引用的第0層的資料,所以釋放時無需關注
        next = node->level[0].forward;
        zslFreeNode(node);
        node = next;
    }
    zfree(zsl);
}
// t_zset 也很簡單,只是把 node.ele 釋放掉,再把自身釋放到即可
// 這樣的刪除方式依賴於其儲存結構,咱們後續再聊
/* Free the specified skiplist node. The referenced SDS string representation
 * of the element is freed too, unless node->ele is set to NULL before calling
 * this function. */
void zslFreeNode(zskiplistNode *node) {
    sdsfree(node->ele);
    zfree(node);
}

 

2. 非同步刪除過程

  非同步刪除按理說會更復雜,更有意思些。只不過我們前面已經把核心的東西擼了個遍,這剩下的也不多了。

// lazyfree.c, 
int dbAsyncDelete(redisDb *db, robj *key) {
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

    /* If the value is composed of a few allocations, to free in a lazy way
     * is actually just slower... So under a certain limit we just free
     * the object synchronously. */
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
        size_t free_effort = lazyfreeGetFreeEffort(val);

        /* If releasing the object is too much work, let's put it into the
         * lazy free list. */
        // 其實非同步方法與同步方法的差別在這,即要求 刪除的元素影響須大於某閥值(64)
        // 否則按照同步方式直接刪除,因為那樣代價更小
        if (free_effort > LAZYFREE_THRESHOLD) {
            // 非同步釋放+1,原子操作
            atomicIncr(lazyfree_objects,1,&lazyfree_objects_mutex);
            // 將 value 的釋放新增到非同步執行緒佇列中去,後臺處理, 任務型別為 非同步釋放記憶體
            bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
            // 設定val為NULL, 以便在外部進行刪除時忽略釋放value相關記憶體
            dictSetVal(db->dict,de,NULL);
        }
    }

    /* Release the key-val pair, or just the key if we set the val
     * field to NULL in order to lazy free it later. */
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}
// bio.c, 新增非同步任務到執行緒中, 型別由type決定,執行緒安全地新增
// 然後嘛,後臺執行緒就不會停地運行了任務了
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    // 上鎖操作
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    // 喚醒任務執行緒
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}
// bio.c, 後臺執行緒任務框架,總之還是有事情可做了。
void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    /* Check that the type is within the right interval. */
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

    /* Make the thread killable at any time, so that bioKillThreads()
     * can work reliably. */
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
    // 任務一直執行
    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            // 注意此處將會釋放鎖喲,以便外部可以新增任務進來
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        /* Process the job accordingly to its type. */
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } 
        // 也就是這玩意了,會去處理提交過來的任務
        else if (type == BIO_LAZY_FREE) {
            /* What we free changes depending on what arguments are set:
             * arg1 -> free the object at pointer.
             * arg2 & arg3 -> free two dictionaries (a Redis DB).
             * only arg3 -> free the skiplist. */
            // 本文介紹的刪除value形式,用第一種情況
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        // 喚醒所有相關等待執行緒
        pthread_cond_broadcast(&bio_step_cond[type]);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;
    }
}

// lazyfree.c, 和同步刪除一致了
/* Release objects from the lazyfree thread. It's just decrRefCount()
 * updating the count of objects to release. */
void lazyfreeFreeObjectFromBioThread(robj *o) {
    decrRefCount(o);
    atomicDecr(lazyfree_objects,1,&lazyfree_objects_mutex);
}

  從此處redis非同步處理過程,我們可以看到,redis並不是每次進入非同步時都進行非同步操作,而是在必要的時候才會進行。這也提示我們,不要為了非同步而非同步,而是應該計算利弊。

 

  如此,整個 del/unlink 的過程就完成了。總體來說,刪除都是基於hash的簡單remove而已,唯一有點難度是對記憶體的回收問題,這其實就是一個簡單的使用引用計數器演算法實現的垃圾回收器應該做的事而已。勿須多言。具體過程需依賴於資料的儲存結構,主要目的自然是遞迴釋放空間,避免記憶體洩漏了