etcd原始碼閱讀(四):lease
lease是租約,類似於Redis中的TTL(Time To Live)。可以看一下怎麼使用lease:
cli, err := clientv3.New(clientv3.Config{ Endpoints:endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() // minimum lease TTL is 5-second resp, err := cli.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) } // after 5 seconds, the key 'foo' will be removed _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) }
可以看出來,我們就是拿一個lease的ID作為憑證。那麼,lease是怎麼實現的呢?
type Lease struct { IDLeaseID ttlint64 // time to live of the lease in seconds remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used // expiryMu protects concurrent accesses to expiry expiryMu sync.RWMutex // expiry is time when lease should expire. no expiration when expiry.IsZero() is true expiry time.Time // mu protects concurrent accesses to itemSet musync.RWMutex itemSet map[LeaseItem]struct{} revokec chan struct{} }
可以看出來,Lease在建立的時候,就會分配一個ID和設定好TTL。
接下來看看lessor這個介面的定義:
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee. type Lessor interface { // SetRangeDeleter lets the lessor create TxnDeletes to the store. // Lessor deletes the items in the revoked or expired lease by creating // new TxnDeletes. SetRangeDeleter(rd RangeDeleter) SetCheckpointer(cp Checkpointer) // Grant grants a lease that expires at least after TTL seconds. Grant(id LeaseID, ttl int64) (*Lease, error) // Revoke revokes a lease with given ID. The item attached to the // given lease will be removed. If the ID does not exist, an error // will be returned. Revoke(id LeaseID) error // Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set // the expiry of leases to less than the full TTL when possible. Checkpoint(id LeaseID, remainingTTL int64) error // Attach attaches given leaseItem to the lease with given LeaseID. // If the lease does not exist, an error will be returned. Attach(id LeaseID, items []LeaseItem) error // GetLease returns LeaseID for given item. // If no lease found, NoLease value will be returned. GetLease(item LeaseItem) LeaseID // Detach detaches given leaseItem from the lease with given LeaseID. // If the lease does not exist, an error will be returned. Detach(id LeaseID, items []LeaseItem) error // Promote promotes the lessor to be the primary lessor. Primary lessor manages // the expiration and renew of leases. // Newly promoted lessor renew the TTL of all lease to extend + previous TTL. Promote(extend time.Duration) // Demote demotes the lessor from being the primary lessor. Demote() // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist, // an error will be returned. Renew(id LeaseID) (int64, error) // Lookup gives the lease at a given lease id, if any Lookup(id LeaseID) *Lease // Leases lists all leases. Leases() []*Lease // ExpiredLeasesC returns a chan that is used to receive expired leases. ExpiredLeasesC() <-chan []*Lease // Recover recovers the lessor state from the given backend and RangeDeleter. Recover(b backend.Backend, rd RangeDeleter) // Stop stops the lessor for managing leases. The behavior of calling Stop multiple // times is undefined. Stop() }
可以看出來作為一個lessor,也就是管理lease的東東,需要實現這些介面。我們具體關注怎麼完成Grant,此外,expiration是怎麼做的。 所以我們找到具體的實現來看看:
// lessor implements Lessor interface. // TODO: use clockwork for testability. type lessor struct { mu sync.RWMutex // demotec is set when the lessor is the primary. // demotec will be closed if the lessor is demoted. demotec chan struct{} leaseMapmap[LeaseID]*Lease leaseHeapLeaseQueue leaseCheckpointHeap LeaseQueue itemMapmap[LeaseItem]LeaseID // When a lease expires, the lessor will delete the // leased range (or key) by the RangeDeleter. rd RangeDeleter // When a lease's deadline should be persisted to preserve the remaining TTL across leader // elections and restarts, the lessor will checkpoint the lease by the Checkpointer. cp Checkpointer // backend to persist leases. We only persist lease ID and expiry for now. // The leased items can be recovered by iterating all the keys in kv. b backend.Backend // minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any // requests for shorter TTLs are extended to the minimum TTL. minLeaseTTL int64 expiredC chan []*Lease // stopC is a channel whose closure indicates that the lessor should be stopped. stopC chan struct{} // doneC is a channel whose closure indicates that the lessor is stopped. doneC chan struct{} lg *zap.Logger // Wait duration between lease checkpoints. checkpointInterval time.Duration }
可以看到,裡邊有一個leaseMap
, 有leaseHeap
,為什麼要有兩個呢?堆的特性不知道大家還記得嗎?此處的leaseHeap
實現是
一個小堆,比較的關鍵是Lease失效的時間:
type LeaseQueue []*LeaseWithTime func (pq LeaseQueue) Len() int { return len(pq) } func (pq LeaseQueue) Less(i, j int) bool { return pq[i].time < pq[j].time } func (pq LeaseQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j } func (pq *LeaseQueue) Push(x interface{}) { n := len(*pq) item := x.(*LeaseWithTime) item.index = n *pq = append(*pq, item) } func (pq *LeaseQueue) Pop() interface{} { old := *pq n := len(old) item := old[n-1] item.index = -1 // for safety *pq = old[0 : n-1] return item }
所以,怎麼保證lease失效呢?我們每次從小堆裡判斷堆頂元素是否失效,失效就Pop
就可以了。那為什麼又要有leaseMap
呢?因為
這樣可以加速查詢,畢竟,雜湊表的時間複雜度是O(1)
。
那麼,什麼時候會進行lease的失效管理呢?我們看新建lessort的地方:
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { return newLessor(lg, b, cfg) } func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval if checkpointInterval == 0 { checkpointInterval = 5 * time.Minute } l := &lessor{ leaseMap:make(map[LeaseID]*Lease), itemMap:make(map[LeaseItem]LeaseID), leaseHeap:make(LeaseQueue, 0), leaseCheckpointHeap: make(LeaseQueue, 0), b:b, minLeaseTTL:cfg.MinLeaseTTL, checkpointInterval:checkpointInterval, // expiredC is a small buffered chan to avoid unnecessary blocking. expiredC: make(chan []*Lease, 16), stopC:make(chan struct{}), doneC:make(chan struct{}), lg:lg, } l.initAndRecover() go l.runLoop() return l }
倒數第二行,go l.runLoop()
,跟進去看:
func (le *lessor) runLoop() { defer close(le.doneC) for { le.revokeExpiredLeases() le.checkpointScheduledLeases() select { case <-time.After(500 * time.Millisecond): case <-le.stopC: return } } }
每500毫秒會進行一次迴圈,檢查失效的lease然後傳遞到expiredC
這個channel裡。但是type lessor struct
這個結構體並沒有
對其進行處理,估計是呼叫者負責處理,所以我搜索了一下是不是有地方處理:
$ ack -Q 'ExpiredLeasesC()' lease/lessor.go 126:ExpiredLeasesC() <-chan []*Lease 559:func (le *lessor) ExpiredLeasesC() <-chan []*Lease { 906:func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil } lease/lessor_test.go 380:case el := <-le.ExpiredLeasesC(): 433:case el := <-le.ExpiredLeasesC(): etcdserver/server.go 1013:expiredLeaseC = s.lessor.ExpiredLeasesC()
果然,etcdserver/server.go
作為呼叫者對它進行處理。
最後,需要提到的是,lease也會進行持久化的,並且新建lessort的時候會優先看是否能從已有持久化的檔案中恢復。