《Go語言四十二章經》第四十章 LevelDB與BoltDB
《Go語言四十二章經》第四十章 LevelDB與BoltDB
作者:李驍
LevelDB 和 BoltDB 都是k/v資料庫。
但LevelDB沒有事務,LevelDB實現了一個日誌結構化的merge tree。它將有序的key/value儲存在不同檔案的之中,通過db, _ := leveldb.OpenFile("db", nil),在db目錄下有很多資料檔案,並通過“層級”把它們分開,並且週期性地將小的檔案merge為更大的檔案。這讓其在隨機寫的時候會很快,但是讀的時候卻很慢。
這也讓LevelDB的效能不可預知:但資料量很小的時候,它可能效能很好,但是當隨著資料量的增加,效能只會越來越糟糕。而且做merge的執行緒也會在伺服器上出現問題。
LSM樹而且通過批量儲存技術規避磁碟隨機寫入問題。 LSM樹的設計思想非常樸素,它的原理是把一顆大樹拆分成N棵小樹, 它首先寫入到記憶體中(記憶體沒有尋道速度的問題,隨機寫的效能得到大幅提升),在記憶體中構建一顆有序小樹,隨著小樹越來越大,記憶體的小樹會flush到磁碟上。磁碟中的樹定期可以做merge操作,合併成一棵大樹,以優化讀效能。
BoltDB會在資料檔案上獲得一個檔案鎖,所以多個程序不能同時開啟同一個資料庫。BoltDB使用一個單獨的記憶體對映的檔案(.db),實現一個寫入時拷貝的B+樹,這能讓讀取更快。而且,BoltDB的載入時間很快,特別是在從crash恢復的時候,因為它不需要去通過讀log去找到上次成功的事務,它僅僅從兩個B+樹的根節點讀取ID。
BoltDB支援完全可序列化的ACID事務,讓應用程式可以更簡單的處理複雜操作。
BoltDB設計源於LMDB,具有以下特點:
- 直接使用API存取資料,沒有查詢語句;
- 支援完全可序列化的ACID事務,這個特性比LevelDB強;
- 資料儲存在記憶體對映的檔案裡。沒有wal、執行緒壓縮和垃圾回收;
- 通過COW技術,可實現無鎖的讀寫併發,但是無法實現無鎖的寫寫併發,這就註定了讀效能超高,但寫效能一般,適合與讀多寫少的場景。
- 最後,BoltDB使用Golang開發,而且被應用於influxDB專案作為底層儲存。
LMDB的全稱是Lightning Memory-Mapped Database(快如閃電的記憶體對映資料庫),它的檔案結構簡單,包含一個數據檔案和一個鎖檔案.
LMDB檔案可以同時由多個程序開啟,具有極高的資料存取速度,訪問簡單,不需要執行單獨的資料庫管理程序,只要在訪問資料的程式碼裡引用LMDB庫,訪問時給檔案路徑即可。
讓系統訪問大量小檔案的開銷很大,而LMDB使用記憶體對映的方式訪問檔案,使得檔案內定址的開銷非常小,使用指標運算就能實現。資料庫單檔案還能減少資料集複製/傳輸過程的開銷。
40.1 LevelDB
package kvdb import ( "fmt" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" ) func Leveldb() { db, _ := leveldb.OpenFile("db", nil) defer db.Close() //讀寫資料庫: _ = db.Put([]byte("key1"), []byte("好好檢查"), nil) _ = db.Put([]byte("key2"), []byte("天天向上"), nil) _ = db.Put([]byte("key:3"), []byte("就會一個本事"), nil) data, _ := db.Get([]byte("key1"), nil) fmt.Println(string(data)) //迭代資料庫內容: iter := db.NewIterator(nil, nil) for iter.Next() { key := iter.Key() value := iter.Value() fmt.Println(string(key), string(value)) } iter.Release() iter.Error() //Seek-then-Iterate: iter = db.NewIterator(nil, nil) for ok := iter.Seek([]byte("key:")); ok; ok = iter.Next() { // Use key/value. fmt.Println("Seek-then-Iterate:") fmt.Println(string(iter.Value())) } iter.Release() //Iterate over subset of database content: iter = db.NewIterator(&util.Range{Start: []byte("key:"), Limit: []byte("xoo")}, nil) for iter.Next() { // Use key/value. fmt.Println("Iterate over subset of database content:") fmt.Println(string(iter.Value())) } iter.Release() //Iterate over subset of database content with a particular prefix: iter = db.NewIterator(util.BytesPrefix([]byte("key")), nil) for iter.Next() { // Use key/value. fmt.Println("Iterate over subset of database content with a particular prefix:") fmt.Println(string(iter.Value())) } iter.Release() _ = iter.Error() //批量寫: batch := new(leveldb.Batch) batch.Put([]byte("foo"), []byte("value")) batch.Put([]byte("bar"), []byte("another value")) batch.Delete([]byte("baz")) _ = db.Write(batch, nil) _ = db.Delete([]byte("key"), nil) }
40.2 BoltDB
package kvdb import ( "fmt" "log" "time" "github.com/boltdb/bolt" ) func Boltdb() error { // Bolt 會在資料檔案上獲得一個檔案鎖,所以多個程序不能同時開啟同一個資料庫。 // 開啟一個已經開啟的 Bolt 資料庫將導致它掛起,直到另一個程序關閉它。 // 為防止無限期等待,您可以將超時選項傳遞給Open()函式: db, err := bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second}) defer db.Close() if err != nil { log.Fatal(err) } //兩種處理方式:讀-寫和只讀操作,讀-寫方式開始於db.Update方法: //Bolt 一次只允許一個讀寫事務,但是一次允許多個只讀事務。 // 每個事務處理都有一個始終如一的資料檢視 err = db.Update(func(tx *bolt.Tx) error { // 這裡還有另外一層:k-v儲存在bucket中, // 可以將bucket當做一個key的集合或者是資料庫中的表。 //(順便提一句,buckets中可以包含其他的buckets,這將會相當有用) // Buckets 是鍵值對在資料庫中的集合.所有在bucket中的key必須唯一, // 使用DB.CreateBucket() 函式建立buket //Tx.DeleteBucket() 刪除bucket //b := tx.Bucket([]byte("MyBucket")) b, err := tx.CreateBucketIfNotExists([]byte("MyBucket")) //要將 key/value 對儲存到 bucket,請使用 Bucket.Put() 函式: //這將在 MyBucket 的 bucket 中將 "answer" key的值設定為"42"。 err = b.Put([]byte("answer"),[]byte("42")) return err }) // 可以看到,傳入db.update函式一個引數,在函式內部你可以get/set資料和處理error。 // 如返回為nil,事務就會從資料庫得到一個commit,但如果返回一個實際的錯誤,則會做回滾, // 你在函式中做的事情都不會commit。這很自然,因為你不需要人為地去關心事務的回滾, // 只需要返回一個錯誤,其他的由Bolt去幫你完成。 // 只讀事務 只讀事務和讀寫事務不應該相互依賴,一般不應該在同一個例程中同時開啟。 // 這可能會導致死鎖,因為讀寫事務需要定期重新對映資料檔案, // 但只有在只讀事務處於開啟狀態時才能這樣做。 // 批量讀寫事務.每一次新的事物都需要等待上一次事物的結束, // 可以通過DB.Batch()批處理來完 err = db.Batch(func(tx *bolt.Tx) error { return nil }) //只讀事務在db.View函式之中:在函式中可以讀取,但是不能做修改。 db.View(func(tx *bolt.Tx) error { //要檢索這個value,我們可以使用 Bucket.Get() 函式: //由於Get是有安全保障的,所有不會返回錯誤,不存在的key返回nil b := tx.Bucket([]byte("MyBucket")) //tx.Bucket([]byte("MyBucket")).Cursor() 可這樣寫 v := b.Get([]byte("answer")) id, _ := b.NextSequence() fmt.Printf("The answer is: %s %d \n", v, id) //遊標遍歷key c := b.Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { fmt.Printf("key=%s, value=%s\n", k, v) } //遊標上有以下函式: //First()移動到第一個健. //Last()移動到最後一個健. //Seek()移動到特定的一個健. //Next()移動到下一個健. //Prev()移動到上一個健. //Prefix 字首掃描 prefix := []byte("1234") for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() { fmt.Printf("key=%s, value=%s\n", k, v) } return nil }) //範圍查詢 //另一個常見的用例是掃描範圍,例如時間範圍。如果你使用一個合適的時間編碼,如rfc3339然後可以查詢特定日期範圍的資料: db.View(func(tx *bolt.Tx) error { // Assume our events bucket exists and has RFC3339 encoded time keys. c := tx.Bucket([]byte("Events")).Cursor() // Our time range spans the 90's decade. min := []byte("1990-01-01T00:00:00Z") max := []byte("2000-01-01T00:00:00Z") // Iterate over the 90's. for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() { fmt.Printf("%s: %s\n", k, v) } return nil }) //如果你知道所在桶中擁有鍵,你也可以使用ForEach()來迭代: db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte("MyBucket")) b.ForEach(func(k, v []byte) error { fmt.Printf("key=%s, value=%s\n", k, v) return nil }) return nil }) //事務處理 // 開始事務 tx, err := db.Begin(true) if err != nil { return err } defer tx.Rollback() // 使用事務... _, err = tx.CreateBucket([]byte("MyBucket")) if err != nil { return err } // 事務提交 if err = tx.Commit(); err != nil { return err } return err //還可以在一個鍵中儲存一個桶,以建立巢狀的桶: //func (*Bucket) CreateBucket(key []byte) (*Bucket, error) //func (*Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) //func (*Bucket) DeleteBucket(key []byte) error } //備份 curl http://localhost/backup > my.db func BackupHandleFunc(w http.ResponseWriter, req *http.Request) { err := db.View(func(tx *bolt.Tx) error { w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Content-Disposition", `attachment; filename="my.db"`) w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size()))) _, err := tx.WriteTo(w) return err }) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } } //桶的自增 //利用nextsequence()功能,你可以讓boltdb生成序列作為你鍵值對的唯一標識。見下面的示例。 func (s *Store) CreateUser(u *User) error { return s.db.Update(func(tx *bolt.Tx) error { // 建立users桶 b := tx.Bucket([]byte("users")) // 生成自增序列 id, _ = b.NextSequence() u.ID = int(id) // Marshal user data into bytes. buf, err := Json.Marshal(u) if err != nil { return err } // Persist bytes to users bucket. return b.Put(itob(u.ID), buf) }) } // itob returns an 8-byte big endian representation of v. func itob(v int) []byte { b := make([]byte, 8) binary.BigEndian.PutUint64(b, uint64(v)) return b } type User struct { ID int }
本書《Go語言四十二章經》內容在github上同步地址:ofollow,noindex">https://github.com/ffhelicopter/Go42
本書《Go語言四十二章經》內容在簡書同步地址:https://www.jianshu.com/nb/29056963
雖然本書中例子都經過實際執行,但難免出現錯誤和不足之處,煩請您指出;如有建議也歡迎交流。
聯絡郵箱:[email protected]