1. 程式人生 > >golang結合Kafka訊息佇列實踐(一)

golang結合Kafka訊息佇列實踐(一)

公司決定使用kafka來作為新一代的訊息佇列來使用,於是開始對kafka的機制,原理,go客戶端的使用,各種瞭解了一番,過程中也遇到了不少的坑,特地寫出來,和大家分享,也供自己參考,加深印象。

首先,kafka的設計思想,各個角色比如broker,producer,consumer,partition等等還有與它們相關的配置,這裡就先不作介紹了,官方文件都有,文章後面也會提到。

客戶端選擇:
go連線kafka的客戶端不多,綜合對比了下,決定使用sarama
“go get github.com/Shopify/sarama”

生產者:

func SaramaProducer()  {

    config := sarama.NewConfig
() //等待伺服器所有副本都儲存成功後的響應 config.Producer.RequiredAcks = sarama.WaitForAll //隨機向partition傳送訊息 config.Producer.Partitioner = sarama.NewRandomPartitioner //是否等待成功和失敗後的響應,只有上面的RequireAcks設定不是NoReponse這裡才有用. config.Producer.Return.Successes = true config.Producer.Return.Errors = true //設定使用的kafka版本,如果低於V0_10_0_0版本,訊息中的timestrap沒有作用.需要消費和生產同時配置 //注意,版本設定不對的話,kafka會返回很奇怪的錯誤,並且無法成功傳送訊息 config.Version
= sarama.V0_10_0_1 fmt.Println("start make producer") //使用配置,新建一個非同步生產者 producer, e := sarama.NewAsyncProducer([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config) if e != nil { fmt.Println(e) return } defer producer.AsyncClose() //迴圈判斷哪個通道傳送過來資料. fmt.Println
("start goroutine") go func(p sarama.AsyncProducer) { for{ select { case <-p.Successes(): //fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition) case fail := <-p.Errors(): fmt.Println("err: ", fail.Err) } } }(producer) var value string for i:=0;;i++ { time.Sleep(500*time.Millisecond) time11:=time.Now() value = "this is a message 0606 "+time11.Format("15:04:05") // 傳送的訊息,主題。 // 注意:這裡的msg必須得是新構建的變數,不然你會發現傳送過去的訊息內容都是一樣的,因為批次傳送訊息的關係。 msg := &sarama.ProducerMessage{ Topic: "0606_test", } //將字串轉化為位元組陣列 msg.Value = sarama.ByteEncoder(value) //fmt.Println(value) //使用通道傳送 producer.Input() <- msg } }

這裡使用的是非同步producer,kafka的producer有個特點是,批次傳送,這麼做的好處就是,可以提高吞吐量,所以我們在看幾個主流的訊息佇列效能測試對比的時候,kafka的吞吐量是遙遙領先的。
producer還有個特性是,就是每傳送一次訊息,都會要求broker返回一個訊息回執,即ack。如果ack沒有收到,producer會進行重發,如果設定了重發次數的話。這個ack有三種模式:

// The level of acknowledgement reliability needed from the broker (defaults
// to WaitForLocal). Equivalent to the `request.required.acks` setting of the JVM producer.
// 等同於jvm kafka中的`request.required.acks` 
        RequiredAcks RequiredAcks

type RequiredAcks int16
const (
// 第一個模式,NoResponse doesn't send any response, the TCP ACK is all you get.
    NoResponse RequiredAcks = 0
//第二個模式, WaitForLocal waits for only the local commit to succeed before responding.
    WaitForLocal RequiredAcks = 1
// 第三個模式,WaitForAll waits for all in-sync replicas to commit before responding.
// The minimum number of in-sync replicas is configured on the broker via
// the `min.insync.replicas` configuration key.
    WaitForAll RequiredAcks = -1
)

如果RequiredAcks設定為0,在這種情況下,伺服器是否收到請求是沒法保證的,並且引數retries(重發)也不會生效(因為客戶端無法獲得失敗資訊)。既然提到了重發,可以看一下下面sarama的重發定義:

Retry struct {
            // The total number of times to retry sending a message (default 3).
            // Similar to the `message.send.max.retries` setting of the JVM producer.
            Max int
            // How long to wait for the cluster to settle between retries
            // (default 100ms). Similar to the `retry.backoff.ms` setting of the
            // JVM producer.
            Backoff time.Duration
        }

消費者:

func SaramaConsumer()  {

    fmt.Println("start consume")
    config := sarama.NewConfig()

    //提交offset的間隔時間,每秒提交一次給kafka
    config.Consumer.Offsets.CommitInterval = 1 * time.Second

    //設定使用的kafka版本,如果低於V0_10_0_0版本,訊息中的timestrap沒有作用.需要消費和生產同時配置
    config.Version = sarama.V0_10_0_1

//consumer新建的時候會新建一個client,這個client歸屬於這個consumer,並且這個client不能用作其他的consumer
    consumer, err := sarama.NewConsumer([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config)
    if err != nil {
        panic(err)
    }

//新建一個client,為了後面offsetManager做準備
    client, err := sarama.NewClient([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config)
    if err != nil {
        panic("client create error")
    }
    defer client.Close()

//新建offsetManager,為了能夠手動控制offset
    offsetManager,err:=sarama.NewOffsetManagerFromClient("group111",client)
    if err != nil {
        panic("offsetManager create error")
    }
    defer offsetManager.Close()

//建立一個第2分割槽的offsetManager,每個partition都維護了自己的offset
    partitionOffsetManager,err:=offsetManager.ManagePartition("0606_test",2)
    if err != nil {
        panic("partitionOffsetManager create error")
    }
    defer partitionOffsetManager.Close()


    fmt.Println("consumer init success")

    defer func() {
        if err := consumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    //sarama提供了一些額外的方法,以便我們獲取broker那邊的情況
    topics,_:=consumer.Topics()
    fmt.Println(topics)
    partitions,_:=consumer.Partitions("0606_test")
    fmt.Println(partitions)

//第一次的offset從kafka獲取(傳送OffsetFetchRequest),之後從本地獲取,由MarkOffset()得來
    nextOffset,_:=partitionOffsetManager.NextOffset()
    fmt.Println(nextOffset)

//建立一個分割槽consumer,從上次提交的offset開始進行消費
    partitionConsumer, err := consumer.ConsumePartition("0606_test", 2, nextOffset+1)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    fmt.Println("start consume really")

ConsumerLoop:
    for {
        select {
        case msg := <-partitionConsumer.Messages():
            log.Printf("Consumed message offset %d\n message:%s", msg.Offset,string(msg.Value))
            //拿到下一個offset
            nextOffset,offsetString:=partitionOffsetManager.NextOffset()
            fmt.Println(nextOffset+1,"...",offsetString)
            //提交offset,預設提交到本地快取,每秒鐘往broker提交一次(可以設定)
            partitionOffsetManager.MarkOffset(nextOffset+1,"modified metadata")

        case <-signals:
            break ConsumerLoop
        }
    }
}

至此,一個初步的consumer構建好了,很多關於consumer的內容見上面程式碼的註釋。可以根據consumer.Partitions(“topic”)來獲取這個topic的所有分割槽,然後為每個分割槽構建一個consumer,然後進行消費。

這樣挺麻煩的,也不夠優雅,其實kafka的consumer還有個很重要的機制,就是consumer group,可惜sarama並不支援,不過有另一個開源的庫,叫做”github.com/bsm/sarama-cluster”,它是在sarama上加了一層封裝,支援了consumer group,這個我會在下一篇文章中寫到。