1. 程式人生 > >【RocksDB】TransactionDB源碼分析

【RocksDB】TransactionDB源碼分析

vpd try 重載 shadow start read_only dbo cas commit

摘要: RocksDB版本:v5.13.4 1. 概述 得益於LSM-Tree結構,RocksDB所有的寫入並非是update in-place,所以他支持起來事務的難度也相對較小,主要原理就是利用WriteBatch將事務所有寫操作在內存緩存打包,然後在commit時一次性將WriteBatch寫入,保證了原子,另外通過Sequence和Key鎖來解決沖突實現隔離。

RocksDB版本:v5.13.4

  1. 概述
    得益於LSM-Tree結構,RocksDB所有的寫入並非是update in-place,所以他支持起來事務的難度也相對較小,主要原理就是利用WriteBatch將事務所有寫操作在內存緩存打包,然後在commit時一次性將WriteBatch寫入,保證了原子,另外通過Sequence和Key鎖來解決沖突實現隔離。

RocksDB的Transaction分為兩類:Pessimistic和Optimistic,類似悲觀鎖和樂觀鎖的區別,PessimisticTransaction的沖突檢測和加鎖是在事務中每次寫操作之前做的(commit後釋放),如果失敗則該操作失敗;OptimisticTransaction不加鎖,沖突檢測是在commit階段做的,commit時發現沖突則失敗。

具體使用時需要結合實際場景來選擇,如果並發事務寫入操作的Key重疊度不高,那麽用Optimistic更合適一些(省掉Pessimistic中額外的鎖操作)

  1. 用法
    介紹實現原理前,先來看一下用法:

【1. 基本用法】

Options options;
TransactionDBOptions txn_db_options;
options.create_if_missing = true;
TransactionDB* txn_db;

// 打開DB(默認Pessimistic)
Status s = TransactionDB::Open(options, txn_db_options, kDBPath, &txn_db);
assert(s.ok());

// 創建一個事務
Transaction* txn = txn_db->BeginTransaction(write_options);
assert(txn);

// 事務txn讀取一個key
s = txn->Get(read_options, "abc", &value);
assert(s.IsNotFound());

// 事務txn寫一個key
s = txn->Put("abc", "def");
assert(s.ok());

// 通過TransactionDB::Get在事務外讀取一個key
s = txn_db->Get(read_options, "abc", &value);

// 通過TrasactionDB::Put在事務外寫一個key
// 這裏並不會有影響,因為寫的不是"abc",不沖突
// 如果是"abc"的話
// 則Put會一直卡住直到超時或等待事務Commit(本例中會超時)
s = txn_db->Put(write_options, "xyz", "zzz");

s = txn->Commit();
assert(s.ok());
// 析構事務
delete txn;
delete txn_db;

通過BeginTransaction打開一個事務,然後調用Put、Get等接口進行事務操作,最後調用Commit進行提交。

【2. 回滾】

...
// 事務txn寫入abc
s = txn->Put("abc", "def");
assert(s.ok());

// 設置回滾點
txn->SetSavePoint();

// 事務txn寫入cba
s = txn->Put("cba", "fed");
assert(s.ok());
// 回滾至回滾點
s = txn->RollbackToSavePoint();

// 提交,此時事務中不包含對cba的寫入
s = txn->Commit();
assert(s.ok());
...

【3. GetForUpdate】

...
// 事務txn讀取abc並獨占該key,確保不被外部事務再修改
s = txn->GetForUpdate(read_options, “abc”, &value);
assert(s.ok());

// 通過TransactionDB::Put接口在事務外寫abc
// 不會成功
s = txn_db->Put(write_options, “abc”, “value0”);

s = txn->Commit();
assert(s.ok());
...

有時候在事務中需要對某一個key進行先讀後寫,此時則不能在寫時才進行該key的獨占及沖突檢測操作,所以使用GetForUpdate接口讀取該key並進行獨占

【4. SetSnapshot】

txn = txn_db->BeginTransaction(write_options);
// 設置事務txn使用的snapshot為當前全局Sequence Number
txn->SetSnapshot();

// 使用TransactionDB::Put接口在事務外部寫abc
// 此時全局Sequence Number會加1
db->Put(write_options, “key1”, “value0”);
assert(s.ok());

// 事務txn寫入abc
s = txn->Put(“abc”, “value1”);
s = txn->Commit();
// 這裏會失敗,因為在事務設置了snapshot之後,事務後來寫的key
// 在事務外部有過其他寫操作,所以這裏不會成功
// Pessimistic會在Put時失敗,Optimistic會在Commit時失敗

前面說過,TransactionDB在事務中需要寫入某個key時才對其進行獨占或沖突檢測,有時希望在事務一開始就對其之後所有要寫入的所有key進行獨占,此時可以通過SetSnapshot來實現,設置了Snapshot後,外部一旦對事務中將要進行寫操作key做過修改,則該事務最終會失敗(失敗點取決於是Pessimistic還是Optimistic,Pessimistic因為在Put時就進行沖突檢測,所以Put時就失敗,而Optimistic則會在Commit是檢測到沖突,失敗)

  1. 實現
    3.1 WriteBatch & WriteBatchWithIndex
    WriteBatch就不展開說了,事務會將所有的寫操作追加進同一個WriteBatch,直到Commit時才向DB原子寫入。

WriteBatchWithIndex在WriteBatch之外,額外搞一個Skiplist來記錄每一個操作在WriteBatch中的offset等信息。在事務沒有commit之前,數據還不在Memtable中,而是存在WriteBatch裏,如果有需要,這時候可以通過WriteBatchWithIndex來拿到自己剛剛寫入的但還沒有提交的數據。

事務的SetSavePoint和RollbackToSavePoint也是通過WriteBatch來實現的,SetSavePoint記錄當前WriteBatch的大小及統計信息,若幹操作之後,若想回滾,則只需要將WriteBatch truncate到之前記錄的大小並恢復統計信息即可。

3.2 PessimisticTransaction
PessimisticTransactionDB通過TransactionLockMgr進行行鎖管理。事務中的每次寫入操作之前都需要TryLock進Key鎖的獨占及沖突檢測,以Put為例:

Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
                                const Slice& key, const Slice& value) {
  // 調用TryLock搶鎖及沖突檢測
  Status s =
      TryLock(column_family, key, false /* read_only */, true /* exclusive */);

  if (s.ok()) {
    s = GetBatchForWrite()->Put(column_family, key, value);
    if (s.ok()) {
      num_puts_++;
    }
  }

  return s;
}

可以看到Put接口定義在TransactionBase中,無論Pessimistic還是Optimistic的Put都是這段邏輯,二者的區別是在對TryLock的重載。先看Pessimistic的,TransactionBaseImpl::TryLock通過TransactionBaseImpl::TryLock -> PessimisticTransaction::TryLock -> PessimisticTransactionDB::TryLock -> TransactionLockMgr::TryLock一路調用到TransactionLockMgr的TryLock,在裏面完成對key加鎖,加鎖成功便實現了對key的獨占,此時直到事務commit之前,其他事務是無法修改這個key的。

鎖是加成功了,但這也只能說明從此刻起到事務結束前這個key不會再被外部修改,但如果事務在最開始執行SetSnapshot設置了快照,如果在打快照和Put之間的過程中外部對相同key進行了修改(並commit),此時已經打破了snapshot的保證,所以事務之後的Put也不能成功,這個沖突檢測也是在PessimisticTransaction::TryLock中做的,如下:

Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
                                       const Slice& key, bool read_only,
                                       bool exclusive, bool skip_validate) {
  ...
  // 加鎖
  if (!previously_locked || lock_upgrade) {
    s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
  }

  SetSnapshotIfNeeded();

  ...

    // 使用事務一開始拿到的snapshot的sequence1與這個key在DB中最新
    // 的sequence2進行比較,如果sequence2 > sequence1則代表在snapshot
    // 之後,外部有對key進行過寫入,有沖突!
    s = ValidateSnapshot(column_family, key, &tracked_at_seq);

      if (!s.ok()) {
        // 檢測到沖突,解鎖
        // Failed to validate key
        if (!previously_locked) {
          // Unlock key we just locked
          if (lock_upgrade) {
            s = txn_db_impl_->TryLock(this, cfh_id, key_str,
                                      false /* exclusive */);
            assert(s.ok());
          } else {
            txn_db_impl_->UnLock(this, cfh_id, key.ToString());
          }
        }
      }

  if (s.ok()) {
    // 如果加鎖及沖突檢測通過,記錄這個key以便事務結束時釋放掉鎖
    // We must track all the locked keys so that we can unlock them later. If
    // the key is already locked, this func will update some stats on the
    // tracked key. It could also update the tracked_at_seq if it is lower than
    // the existing trackey seq.
    TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
  }
}

其中ValidateSnapshot就是進行沖突檢測,通過將事務設置的snapshot與key最新的sequence進行比較,如果小於key最新的sequence,則代表設置snapshot後,外部事務修改過這個key,有沖突!獲取key最新的sequence也是簡單粗暴,遍歷memtable,immutable memtable,memtable list history及SST文件來拿。總結如下圖:

技術分享圖片

GetForUpdate的邏輯和Put差不多,無非就是以Get之名行Put之事(加鎖及沖突檢測),如下圖:

技術分享圖片

接著介紹下TransactionLockMgr,如下圖:

技術分享圖片

最外層先是一個std::unordered_map,將每個ColumnFamily映射到一個LockMap,每個LockMap默認有16個LockMapStripe,然後每個LockMapStripe裏包含一個std::unordered_map keys,這就是存放每個key對應的鎖信息的。所以每次加鎖過程大致如下:

首先通過ThreadLocal拿到lock_maps指針
通過column family ID 拿到對應的LockMap
對key hash映射到某個LockMapStripe,對該LockMapStripe加鎖(同一LockMapStripe下的所有key會搶同一把鎖,粒度略大)
操作LockMapStripe裏的std::unordered_map完成加鎖
3.3 OptimisticTransaction
OptimisticTransactionDB不使用鎖進行key的獨占,只在commit是進行沖突檢測。所以OptimisticTransaction::TryLock如下:

Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
                                      const Slice& key, bool read_only,
                                      bool exclusive, bool untracked) {
  if (untracked) {
    return Status::OK();
  }
  uint32_t cfh_id = GetColumnFamilyID(column_family);

  SetSnapshotIfNeeded();
  // 如果設置了之前事務snapshot,這裏使用它作為key的seq
  // 如果沒有設置snapshot,則以當前全局的sequence作為key的seq
  SequenceNumber seq;
  if (snapshot_) {
    seq = snapshot_->GetSequenceNumber();
  } else {
    seq = db_->GetLatestSequenceNumber();
  }

  std::string key_str = key.ToString();
  // 記錄這個key及其對應的seq,後期在commit時通過使用這個seq和
  // key當前的最新sequence比較來做沖突檢測
  TrackKey(cfh_id, key_str, seq, read_only, exclusive);

  // Always return OK. Confilct checking will happen at commit time.
  return Status::OK();
}
這裏TryLock實際上就是給key標記一個sequence並記錄,用作commit時的沖突檢測,commit實現如下:

Status OptimisticTransaction::Commit() {
  // Set up callback which will call CheckTransactionForConflicts() to
  // check whether this transaction is safe to be committed.
  OptimisticTransactionCallback callback(this);

  DBImpl* db_impl = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
  // 調用WriteWithCallback進行沖突檢測,如果沒有沖突就寫入DB
  Status s = db_impl->WriteWithCallback(
      write_options_, GetWriteBatch()->GetWriteBatch(), &callback);

  if (s.ok()) {
    Clear();
  }

  return s;
}

沖突檢測的實現在OptimisticTransactionCallback裏,和設置了snapshot的PessimisticTransaction一樣,最終還是會調用TransactionUtil::CheckKeysForConflicts來檢測,也就是比較sequence。整體如下圖:

技術分享圖片

3.4 兩階段提交(Two Phase Commit)
在分布式場景下使用PessimisticTransaction時,我們可能需要使用兩階段提交(2PC)來確保一個事務在多個節點上執行成功,所以PessimisticTransaction也支持2PC。具體做法也不難,就是將之前commit拆分為prepare和commit,prepare階段進行WAL的寫入,commit階段進行Memtable的寫入(寫入後其他事務方可見),所以現在一個事務的操作流程如下:

BeginTransaction
GetForUpdate
Put
...
Prepare
Commit

使用2PC,我們首先要通過SetName為一個事務設置唯一的標識並註冊到全局映射表裏,這裏記錄著所有未完成的2PC事務,當Commit後再從映射表裏刪除。

接下來具體2PC實現無非就是在WriteBatch上做文章,通過特殊的標記來控制寫WAL和Memtable,簡單說一下:

正常的WriteBatch結構如下:

Sequence(0);NumRecords(3);Put(a,1);Merge(a,1);Delete(a);
2PC一開始的WriteBatch如下:

Sequence(0);NumRecords(0);Noop;
先使用一個Noop占位,至於為什麽,後面再說。緊接著就是一些操作,操作後,WriteBatch如下:

Sequence(0);NumRecords(3);Noop;Put(a,1);Merge(a,1);Delete(a);
然後執行Prepare,寫WAL,在寫WAL之前,先會隊WriteBatch做一些改動,插入Prepare和EndPrepare記錄,如下:

Sequence(0);NumRecords(3);Prepare();Put(a,1);Merge(a,1);Delete(a);EndPrepare(xid)
可以看到這裏將之前的Noop占位換成Prepare,然後在結尾插入EndPrepare(xid),構造好WriteBatch後就直接調用WriteImpl寫WAL了。註意,此時往WAL裏寫的這條日誌的sequence雖然比VersionSet的last_sequence大,但寫入WAL之後並不會調用SetLastSequence來更新VersionSet的last_sequence,它只有在最後寫入Memtable之後才更新,具體做法就是給VersionSet除了last_sequence_之外,再加一個last_allocatedsequence,初始相等,寫WAL是加後者,後者對外不可見,commit後再加前者。所以一旦PessimisticTransactionDB使用了2PC,就要求所有都是2PC,不然last_sequence_可能會錯亂(更正:如果使用two_writequeues,不管是Prepare -> Commit還是直接Commit,sequence的增長都是以last_allocated_sequence_為準,最後用它來調整lastsequence;如果不使用two_write_queues_則直接以last_sequence_為準,總之不會出現sequence混錯,所以可以Prepare -> Commit和Commit混用)。

WAL寫完之後,即使沒有commit就宕機也沒事,重啟後Recovery會將事務從WAL恢復記錄到全局recovered_transaction中,等待Commit

最後就是Commit,Commit階段會使用一個新的CommitTime WriteBatch,和之前的WriteBatch合並整理後最終使用CommitTime WriteBatch寫Memtable

整理後的CommitTime WriteBatch如下:

Sequence(0);NumRecords(3);Commit(xid);
Prepare();Put(a,1);Merge(a,1);Delete(a);EndPrepare(xid);

將CommitTime WriteBatch的WALTerminalPoint設置到Commit(xid)處,告訴Writer寫WAL時寫到這裏就可以停了,其實就是只將Commit記錄寫進WAL(因為其後的記錄在Prepare階段就已經寫到WAL了);

在最後就是MemTableInserter遍歷這個CommitTime WriteBatch向memtable寫入,具體就不說了。寫入成功後,更新VersionSet的lastsequence,至此,事務成功提交。

  1. WritePrepared & WriteUnprepared
    我們可以看到無論是Pessimistic還是Optimistic,都有一個共同缺點,那就是在事務最終Commit之前,所以數據都是緩存在內存(WriteBatch)裏,對於很大的事務來說,這非常耗費內存並且將所有實際寫入壓力都扔給Commit階段來搞,性能有瓶頸,所以RocksDB正在支持WritePolicy為WritePrepared和WriteUnprepared的PessimisticTransaction,主要思想就是將對Memtable的寫入提前,

如果放到Prepare階段那就是WritePrepared

如果再往前,每次操作直接寫Memtable那就是WriteUnprepared

可以看到WriteUnprepared無論內存占用還是寫入壓力點的分散都做的最好,WritePrepared稍遜。

支持這倆新的WritePolicy的難點在於如何保證寫入到Memtable但還未Commit的數據不被其他事物看到,這裏就需要在Sequence上大做文章了,目前Rocksdb支持了WritePrepare、而WriteUnprepared還未支持,期待後續...

  1. 隔離級別
    看了前面的介紹,這裏就不用展開說了

TransactionDB支持ReadCommitted和RepeatableReads級別的隔離
原文鏈接請添加鏈接描述
本文為雲棲社區原創內容,未經允許不得轉載

【RocksDB】TransactionDB源碼分析