1. 程式人生 > >用go實現的kafka客戶端,基於sarama和sarama-cluster

用go實現的kafka客戶端,基於sarama和sarama-cluster

工作中需要將原先的訊息佇列替換成kafka,於是接觸了基於go實現的sarama,又因為sarama不支援consumer group,於是又使用了sarama cluster,同時又希望儘量保證消費一次的語義,說到這個exactly once,sarama從去年就立了issue要支援exactly once,結果到現在還沒支援(https://github.com/Shopify/sarama/issues/901)。

於是就自己造了個簡單的輪子,把sarama和sarama cluster封裝到一起,同時實現了保證消費一次的語義,我給它起名為kago。

先附上kago的依賴,需要先進行安裝:

go get github.com/Shopify/sarama
go get github.com/bsm/sarama-cluster

然後便可以安裝kago:

go get go get github.com/JeffreyDing11223/kago

先看一下kago的程式碼結構:
這裡寫圖片描述

asyncProducer.go 負責初始化非同步producer單個例項或例項group,以及傳送訊息,接受錯誤資訊等。
syncProducer.go 負責初始化同步producer單個例項或例項group,同步傳送訊息等。
consumer.go 負責初始化consumer group單個成員或多個成員,以及初始化partition consumer,還有標記offset,提交offset,獲取所有topics,以及獲取某個topic下所有分割槽等等。
message.go

各類訊息體的定義,基本都沿用了sarama和sarama cluster的訊息型別。
offsetFile.go 初始化,修改,儲存offset檔案的相關操作。
offsetManager.go offsetManager的初始化,標記offset等,這個主要結合partition consumer來使用。
config.go kafka 生產者和消費者以及其他的各項配置。
util.go 各類功能函式。

這裡有小夥伴一定有疑問了,既然已經有標記offset和提交offset了,為什麼還要offsetFile.go去操作檔案來儲存offset呢,這就是我上面說的儘可能保證消費一次的語義,試想一下,現在我拿到一條訊息,各種加工處理,消費完了,當我要提交offset給kafka的時候,我的客戶端出現網路問題了或者kafka server出了問題,導致offset提交失敗。也就是說,下次繼續消費的時候,就會繼續從這條訊息開始消費,那就相當於是重複消費了這條訊息。於是我在處理訊息和提交offset的中間,加了一步,就是檔案儲存offset,並且供使用者自己選擇,繼續消費是按照kafka server儲存的offset來,還是按照本地檔案來,或者取兩者最大的,這些選項可以在config.go

中看到。

offsetFile.go的實現也很簡單,就是封裝一把鎖到os.file中,並結合sync.map來支援併發讀寫,檔案內部統一使用json格式,以topic為單位來分類檔案。具體可以看原始碼。

還有具體關於exactly once語義的內容,可以參考我之前發表在部落格中的文章 “kafka訊息交付語義的分析https://blog.csdn.net/jeffrey11223/article/details/80775080“ 。

附上使用例子:

//ayncProducer

import (
    ...
    "github.com/JeffreyDing11223/kago"
    ...
)

    config:=kago.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    produ,_:=kago.InitManualRetryAsyncProducer([]string{"127.0.0.1:9092"}, config)
    defer produ.Close()
    go func(p *kago.AsyncProducer) {
        for{
            select {
            case  suc:=<-p.Successes():
                bytes,_:=suc.Value.Encode()
                value:=string(bytes)
                fmt.Println("offsetCfg:", suc.Offset, " partitions:", suc.Partition," metadata:",suc.Metadata," value:",value)
            case fail := <-p.Errors():
                fmt.Println("err: ", fail.Err)
            }
        }
    }(produ)
    var value string
    for i:=0;;i++ {

        time11:=time.Now()
        value = "this is a message 0805 "+time11.Format("15:04:05")

        //傳送的訊息,主題,key
        msg := &kago.ProducerMessage{
            Topic: "0805_test",
        }

        //將字串轉化為位元組陣列
        msg.Value = sarama.ByteEncoder(value)

        //使用通道傳送
        produ.Send() <- msg

        time.Sleep(500*time.Millisecond)
    }
//consumerGroup

    config:=kago.NewConfig()
    config.Consumer.Return.Errors=true
    config.Group.Return.Notifications =true
    config.Consumer.Offsets.CommitInterval=1*time.Second
    consumer,err:=kago.InitOneConsumerOfGroup([]string{"127.0.0.1:9092"}, "0805_test","cg1",config)
    if err!=nil{
        log.Println(err)
        return
    }
    defer consumer.Close()
    kago.InitOffsetFile() //初始化offset檔案,全域性執行一次即可
    go func() {
        for err := range consumer.Errors() {
            log.Printf("Error: %s\n", err.Error())
        }
    }()

    go func() {
        for ntf := range consumer.Notifications() {
            log.Printf("Rebalanced: %+v\n", ntf)
        }
    }()
    for{
        select {
        case msg, ok := <-consumer.Recv():
            if ok {
                fmt.Fprintf(os.Stdout, ": %s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
                consumer.MarkOffset(msg.Topic,msg.Partition,msg.Offset,"cg1",true) // 提交offset,最後一個引數為true時,會將offset儲存到檔案中
            }
        }
    }
//partition Consumer

    config:=kago.NewConfig()
    config.Consumer.Return.Errors=true
    config.Group.Return.Notifications =true
    config.Consumer.Offsets.CommitInterval=1*time.Second
    config.OffsetLocalOrServer=0 //選項配置為優先讀offset檔案

    kago.InitOffsetFile() //初始化offset檔案,全域性執行一次即可

    pconsumer,err:=kago.InitPartitionConsumer([]string{"127.0.0.1:9092"}, "0805_test",0,"cg1",config) //會根據config.OffsetLocalOrServe來讓pconsumer從指定的offset開始消費
    if err!=nil{
        log.Println(err)
        return
    }
    defer pconsumer.Close()

    pOffsetManager,err2:=kago.InitPartitionOffsetManager([]string{"127.0.0.1:9092"}, "0805_test","cg1",0,config)
    if err2 !=nil{
        fmt.Println(err2)
        return
    }
    defer pOffsetManager.Close()

    go func() {
        for err := range pconsumer.Errors() {
            fmt.Printf("Error: %s\n", err.Error())
        }
    }()

    for{
        msg := <-pconsumer.Recv()
        fmt.Printf("Consumed message offsetCfg %d\n message:%s", msg.Offset,string(msg.Value))
        pOffsetManager.MarkOffset("0805_test",0,msg.Offset,"cg1",true)
    }