1. 程式人生 > >LevelDB原始碼分析之六:skiplist(2)

LevelDB原始碼分析之六:skiplist(2)

閱讀本文可參考:

LevelDB中的skiplist實現方式基本上和中的實現方式類似。它向外暴露介面非常簡單,如下:

public:
  // Create a new SkipList object that will use "cmp" for comparing keys,
  // and will allocate memory using "*arena".  Objects allocated in the arena
  // must remain allocated for the lifetime of the skiplist object.
  explicit SkipList(Comparator cmp, Arena* arena);

  // Insert key into the list.
  // REQUIRES: nothing that compares equal to key is currently in the list.
  void Insert(const Key& key);

  // Returns true iff an entry that compares equal to key is in the list.
  bool Contains(const Key& key) const
private成員變數:
private:
  enum { kMaxHeight = 12 };

  // Immutable after construction
  Comparator const compare_;
  Arena* const arena_;    // Arena used for allocations of nodes

  Node* const head_;

  // Modified only by Insert().  Read racily by readers, but stale
  // values are ok.
  port::AtomicPointer max_height_;   

  // Read/written only by Insert().
  Random rnd_;
一.建構函式
template<typename Key, class Comparator>
SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
    : compare_(cmp),
      arena_(arena),
      head_(NewNode(0 /* any key will do */, kMaxHeight)),
      max_height_(reinterpret_cast<void*>(1)),
      rnd_(0xdeadbeef) {
  for (int i = 0; i < kMaxHeight; i++) {
    head_->SetNext(i, NULL);
  }
}
重點注意下head_和rnd_的初始化,NewNode方法如下。
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
  char* mem = arena_->AllocateAligned(
      sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
  return new (mem) Node(key);
}
這裡為什麼是“height-1”詳見:LevelDb原始碼分析之五:skiplist(1)

new (mem) Node(key)使用了placement new技巧,詳見:C++中使用placement new

rnd_是一個Random型別的變數,使用0xdeadbeef進行初始化,Random詳見LevelDB原始碼分析之七:Random

二.插入函式

template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized.
  // prev記錄的是查詢路徑,下面需要使用prev來修改新生成結點的指標  
  // prev相當於LevelDb原始碼分析之五:skiplist(1)中的update
  // 整個流程與LevelDb原始碼分析之五:skiplist(1)相似,這裡不再詳細解釋
  Node* prev[kMaxHeight];
  // 返回大於等於key的結點或者NULL,原因詳見FindGreaterOrEqual的分析
  Node* x = FindGreaterOrEqual(key, prev);

  // Our data structure does not allow duplicate insertion
  // 不允許插入重複的值  
  assert(x == NULL || !Equal(key, x->key));
  // 產生一個隨機層數height
  int height = RandomHeight();
  // 如果height大於原最大層數,則更新prev,並更新最大層數
  if (height > GetMaxHeight()) {
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    }
    //fprintf(stderr, "Change height from %d to %d\n", max_height_, height);

    // It is ok to mutate max_height_ without any synchronization
    // with concurrent readers.  A concurrent reader that observes
    // the new value of max_height_ will see either the old value of
    // new level pointers from head_ (NULL), or a new value set in
    // the loop below.  In the former case the reader will
    // immediately drop to the next level since NULL sorts after all
    // keys.  In the latter case the reader will use the new node.
    max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
  }
  // 建立一個待插入的結點x,從低到高一層層插入
  x = NewNode(key, height);
  // 逐層更新結點的指標,和普通連結串列插入一樣 
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
}
插入函式裡呼叫了私有函式FindGreaterOrEqual。FindGreaterOrEqual中完成查詢操作,就是向下(level控制)和向右(x控制)移動過程,並不斷將經過路徑儲存到引數prev中。
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
    const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  // 從最高層往下,每層都查詢插入位置,當遍歷到該層的尾部(x->next[level]==NULL)  
  // 也沒有找到比key大的結點時,將該層的最後一個結點的指標放到prev[level]中。  
  // 如果在某層中找到了比key大或等於key的結點時,將該結點之前的那個比key小的結點的指標  
  // 放到prev[level]中。  
  while (true) {
    Node* next = x->Next(level);
    if (KeyIsAfterNode(key, next)) {
      // Keep searching in this list
      x = next;
    } else {
      if (prev != NULL) prev[level] = x;
	  // 當查到第一層時,有兩種情況:
	  // 1.第一層中有滿足要求的結點,此時next剛好是不小於key的那個結點
	  // 2.第一層中沒有滿足要求的結點,此時實際上到了尾部,next=NULL
      if (level == 0) {
        return next;
      } else {
        // Switch to next list
        level--;
      }
    }
  }
}
三.查詢函式

查詢操作基本上就是呼叫函式上面的函式FindGreaterOrEqual實現。

template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
  Node* x = FindGreaterOrEqual(key, NULL);
  if (x != NULL && Equal(key, x->key)) {
    return true;
  } else {
    return false;
  }
}
需要注意的是,LevelDB中沒有提供顯式的刪除節點操作,但實際上是可以刪除的,因為當我們插入資料時,key的形式為key:value,當刪除資料時,則插入key:deleted類似刪除的標記,等到Compaction再刪除。

參考連結:http://blog.csdn.net/xuqianghit/article/details/6948554