1. 程式人生 > >go任務調度6(etcd租約機制/自動過期)

go任務調度6(etcd租約機制/自動過期)

mod dial 配置 out 程序 調度 string grant endpoint

對於實現分布式樂觀鎖非常重要。如果鎖了,突然宕機了,鎖是需要自動釋放的。所以這鎖在etcd裏是需要生命期的。
過期演示:

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "time"
)

func main() {
    var (
        config  clientv3.Config
        client  *clientv3.Client
        err     error
        lease clientv3.Lease
        leaseGrantResp *clientv3.LeaseGrantResponse
        leaseId clientv3.LeaseID
        putResp *clientv3.PutResponse
        kv clientv3.KV
        getResp *clientv3.GetResponse
    )

    //客戶端配置
    config = clientv3.Config{
        Endpoints:   []string{"0.0.0.0:2379"}, //集群列表
        DialTimeout: 5 * time.Second,
    }

    //建立客戶端
    if client, err = clientv3.New(config); err != nil {
        fmt.Println(err)
        return
    }

    //申請一個lease(租約)
    lease = clientv3.NewLease(client)

    //申請一個5秒的租約
    if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
        fmt.Println(err)
        return
    }

    //拿到租約的id
    leaseId = leaseGrantResp.ID

    //獲得kv api子集
    kv = clientv3.NewKV(client)

    //put一個kv,讓它與租約關聯起來,從而實現10秒後自動過期
    if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("寫入成功:", putResp.Header.Revision)

    //定時看key過期沒
    for {
        if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
            fmt.Println(err)
            return
        }
        if getResp.Count == 0 {
            fmt.Println("kv過期了")
            break
        }
        fmt.Println("還沒過期:", getResp.Kvs)
        time.Sleep(time.Second)
    }
}

[[email protected] etcd]# go run demo6.go
寫入成功: 27
還沒過期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]
還沒過期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]
還沒過期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]

還沒過期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]
還沒過期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]
還沒過期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]
kv過期了
[[email protected] etcd]#

申請一把分布式鎖的時候,是誰搶到了key就是搶到了鎖,如果不主動釋放這鎖,按道理講不應該讓租約過期,租約過期主要是為了程序宕掉之後,鎖自動釋放,防止程序異常退出。如果程序搶到了這個鎖,我們希望鎖一直不失效,知道我們主動釋放它:

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "time"
)

func main() {
    var (
        config  clientv3.Config
        client  *clientv3.Client
        err     error
        lease clientv3.Lease
        leaseGrantResp *clientv3.LeaseGrantResponse
        leaseId clientv3.LeaseID
        putResp *clientv3.PutResponse
        kv clientv3.KV
        getResp *clientv3.GetResponse
        keepResp *clientv3.LeaseKeepAliveResponse
        keepRespChan <-chan *clientv3.LeaseKeepAliveResponse //只讀channel
    )

    //客戶端配置
    config = clientv3.Config{
        Endpoints:   []string{"0.0.0.0:2379"}, //集群列表
        DialTimeout: 5 * time.Second,
    }

    //建立客戶端
    if client, err = clientv3.New(config); err != nil {
        fmt.Println(err)
        return
    }

    //申請一個lease(租約)
    lease = clientv3.NewLease(client)

    //申請一個5秒的租約
    if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
        fmt.Println(err)
        return
    }

    //拿到租約的id
    leaseId = leaseGrantResp.ID

    //(自動續租)當我們申請了租約之後,我們就可以啟動一個續租
    if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
        fmt.Println(err)
        return
    }

    //處理續租應答的協程
    go func() {
        for {
            select {
            case keepResp = <-keepRespChan:
                if keepRespChan == nil {
                    fmt.Println("租約已經失效")
                    goto END
                } else { //每秒會續租一次,所以就會受到一次應答
                    fmt.Println("收到自動續租應答:", keepResp.ID)
                }
            }
        }
        END:
    }()

    //獲得kv api子集
    kv = clientv3.NewKV(client)

    //put一個kv,讓它與租約關聯起來,從而實現10秒後自動過期
    if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("寫入成功:", putResp.Header.Revision)

    //定時看key過期沒
    for {
        if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
            fmt.Println(err)
            return
        }
        if getResp.Count == 0 {
            fmt.Println("kv過期了")
            break
        }
        fmt.Println("還沒過期:", getResp.Kvs)
        time.Sleep(time.Second)
    }
}

[[email protected] etcd]# go run demo7.go
寫入成功: 30
收到自動續租應答: 7587837741646622039
還沒過期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
還沒過期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
收到自動續租應答: 7587837741646622039
還沒過期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
還沒過期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
還沒過期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
收到自動續租應答: 7587837741646622039
還沒過期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
還沒過期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
收到自動續租應答: 7587837741646622039
......

go任務調度6(etcd租約機制/自動過期)