1. 程式人生 > >go任務調度9(op實現分布式樂觀鎖)

go任務調度9(op實現分布式樂觀鎖)

eat range roc cee shadow 上鎖 import package urn

package main import ( "go.etcd.io/etcd/clientv3" "time" "fmt" "context" ) func main() { var ( config clientv3.Config client *clientv3.Client err error lease clientv3.Lease leaseGrantResp *clientv3.LeaseGrantResponse leaseId clientv3.LeaseID keepRespChan <-chan *clientv3.LeaseKeepAliveResponse keepResp *clientv3.LeaseKeepAliveResponse ctx context.Context cancelFunc context.CancelFunc kv clientv3.KV txn clientv3.Txn txnResp *clientv3.TxnResponse ) // 客戶端配置 config = clientv3.Config{ Endpoints: []string{"0.0.0.0:2379"}, DialTimeout: 5 * time.Second, } // 建立連接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } // lease實現鎖自動過期(上鎖之後,如果節點宕機,鎖會一直占用,所以要過期機制,也要續租機制): // op操作 // txn事務: if else then // 1, 上鎖 (創建租約, 自動續租, 拿著租約去搶占一個key) lease = clientv3.NewLease(client) // 申請一個5秒的租約 if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil { fmt.Println(err) return } // 拿到租約的ID leaseId = leaseGrantResp.ID // 準備一個用於取消自動續租的context ctx, cancelFunc = context.WithCancel(context.TODO()) // 確保函數退出後, 自動續租會停止 defer cancelFunc() //終止自動續租協程(goroutine) defer lease.Revoke(context.TODO(), leaseId) //告訴etcd把租約直接釋放掉,更直接,立即刪除,鎖就釋放了 // 5秒後會取消自動續租 if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil { fmt.Println(err) return } // 處理續約應答的協程 go func() { for { select { case keepResp = <- keepRespChan: if keepRespChan == nil { fmt.Println("租約已經失效了") goto END } else { // 每秒會續租一次, 所以就會受到一次應答 fmt.Println("收到自動續租應答:", keepResp.ID) } } } END: }() // if 不存在key, then 設置它, else 搶鎖失敗 kv = clientv3.NewKV(client) // 創建事務 txn = kv.Txn(context.TODO()) // 定義事務 // 如果key不存在(創建版本是0說明沒有被創建) txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)). Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))). Else(clientv3.OpGet("/cron/lock/job9")) // 否則搶鎖失敗 // 提交事務 if txnResp, err = txn.Commit(); err != nil { fmt.Println(err) return // 沒有問題 } // 判斷是否搶到了鎖 if !txnResp.Succeeded { fmt.Println("鎖被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value)) return } // 2, 處理業務 fmt.Println("處理任務") time.Sleep(5 * time.Second) // 3, 釋放鎖(取消自動續租, 釋放租約) // 上面的defer 會把租約釋放掉, 關聯的KV就被刪除了 }

技術分享圖片
(右邊的先執行,左邊的後執行,左邊會提示鎖已被占用)

go任務調度9(op實現分布式樂觀鎖)