1. 程式人生 > >Kafka體系架構、命令、Go案例

Kafka體系架構、命令、Go案例

原文地址:https://github.com/WilburXu/blog/blob/master/kafka/Kafka基本架構和命令.md ## Kafka體系架構 ![](https://img2020.cnblogs.com/blog/1062001/202012/1062001-20201230232502355-1349949813.png) ### Broker服務代理節點 服務代理節點。對於Kafka而言,Broker可以簡單地看作一個獨立的Kafka服務節點或Kafka服務例項。大多數情況下也可以將Broker看作一臺Kafka伺服器,前提是這臺伺服器上只部署了一個Kafka例項,一個或多個Broker組成了一個Kafka叢集。 ### Producer和Consumer ![](https://img2020.cnblogs.com/blog/1062001/202012/1062001-20201230232515385-1099033831.png) #### Producer生產者 生產者,也就是傳送訊息的一方。生產者負責建立訊息,然後將其投遞到Kafka中。 一個正常的生產邏輯需要具備以下幾個步驟: 1. 建立生產者例項 2. 構建待發送的訊息 3. 傳送訊息到指定的`Topic`、`Partition`、`Key` 4. 關閉生產者例項 #### Consumer消費者 消費者,也就是接收訊息的一方。消費者連線到Kafka上並接收訊息,從而進行相應的業務邏輯處理。 消費一般有三種消費模式: ##### 單執行緒模式 ![](https://img2020.cnblogs.com/blog/1062001/202012/1062001-20201230232526167-1110298177.png) 單個執行緒消費多個`Partition` 問題: - 效率低,併發上不去 - 可用性差,單個執行緒掛了,將無法消費 #### 多執行緒模式 ##### 獨立消費者模式 ![](https://img2020.cnblogs.com/blog/1062001/202012/1062001-20201230232535260-8525770.png) 和單執行緒模式類似,區別就是為每一個`Partition`單獨起一個執行緒進行消費。 問題: - 執行緒和併發增加了,但是單執行緒掛了,該執行緒的分割槽還是無法消費。 ##### 消費組模式 ![](https://img2020.cnblogs.com/blog/1062001/202012/1062001-20201230232544259-2054852396.png) 也是目前最常用的消費模式,我們可以建立多個消費例項並設定同一個`group-id`來區分消費組,同一個消費組可以指定一個或多個`Topic`進行消費: - 消費組自平衡(Rebalance),kafka會根據消費組例項數量和分割槽數量自平衡分配 - 不會重複消費,同個組內kafka確保一個分割槽只會發往一個消費例項,避免重複消費 - 高可用,當一個消費例項掛了,kafka會自動調整消費例項和分割槽的關係 ### Topic主題 Kafka中的訊息以主題為單位進行歸類(邏輯概念,生產者負責將訊息傳送到特定的主題(傳送到Kafka叢集中的每一條訊息都要指定一個主題),而消費者負責訂閱主題並進行消費。 ### Partition分割槽 物理分割槽,主題細分為了1或多個分割槽,一個分割槽只能屬於單個主題,一般也會把分割槽稱為主題分割槽(Topic-Partition)。 ### Segment 實際儲存資料的地方,`Segment`包含一個數據檔案和一個索引檔案。一個`Partition`有多個大小相同的`Segment`,可以理解為`Partition`是在`Segment`之上進行的邏輯抽象。 ## Kafka基本命令 ### zookeeper broker節點儲存在zookeeper,所有需要: 1. 進入zookeeper,然後 `./bin/zkCli.sh` 2. 執行`ls /brokers/ids` #### 檢視broker詳情 `kafka-log-dirs.sh --describe --bootstrap-server kafka:9092 --broker-list 1` ### topic #### 檢視列表 `kafka-topics.sh --list --zookeeper zookeeper:2181` #### 建立 ` kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic [topic_name]` #### 檢視詳情 ` kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic [topic_name]` #### 刪除 `kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic [topic_name]` #### topic消費情況 ##### topic offset 最小 `kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic [topic_name] --time -2` ##### topic offset最大 `kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic [topic_name] --time -1` ### 生產 ##### 新增資料 `kafka-console-producer.sh --broker-list localhost:9092 --topic [topic_name]` ### 消費 ##### 從頭部開始消費 `kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --from-beginning` ##### 從尾部開始消費,必需要指定分割槽 `kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset latest --partition 0` ##### 從某個位置開始消費(--offset [n]) `kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset 100 --partition 0` ##### 消費指定個數(--max-messages [n]) `kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset latest --partition 0 --max-messages 2` ### 消費組 ##### 檢視消費組列表 `kafka-consumer-groups.sh --list --bootstrap-server localhost:9092` ##### 檢視消費組情況 `kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group [group_id]` ##### offset 偏移設定為最早 `kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-earliest --all-topics --execute` ##### offset 偏移設定為新 `kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-latest --all-topics --execute` ##### offset 偏移設定為指定位置 `kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-offset 2000 --all-topics --execute` ##### offset 偏移設定某個時間之後最早位移 `kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-datetime 2020-12-28T00:00:00.000 --all-topics --execute` ## Go案例 基於`https://github.com/Shopify/sarama`的生產和消費案例 ###生產者 `InitKafka.go` ```go package kafka var ( kafkaClient *Client ) func InitKafka() { var err error var config = Config{ Host: []string{"kafka:9092"}, } kafkaClient, err = NewClient(config) if err != nil { panic(err) } } func GetClient() *Client { return kafkaClient } ``` `Producer.go` ```go package kafka import ( "errors" "github.com/Shopify/sarama" ) type Client struct { sarama.AsyncProducer msgPool chan *sarama.ProducerMessage } type Config struct { Host []string `json:"host"` ReturnSuccess bool `json:"return_success"` ReturnErrors bool `json:"return_errors"` } func NewClient(cfg Config) (*Client, error) { // create client var err error c := &Client{ msgPool: make(chan *sarama.ProducerMessage, 2000), } config := sarama.NewConfig() config.Producer.Return.Errors = cfg.ReturnErrors config.Producer.Return.Successes = cfg.ReturnSuccess config.Version = sarama.V2_0_0_0 c.AsyncProducer, err = sarama.NewAsyncProducer(cfg.Host, config) if err != nil { return nil, err } return c, nil } // run func (c *Client) Run() { for { select { case msg := <-c.msgPool: c.Input() <- msg logger.Info("%+v", msg) } } } // send msg func (c *Client) Send(topic string, msg []byte) error { if topic == "" { return errors.New("kafka producer send msg topic empty") } kafkaMsg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(msg), } c.msgPool <- kafkaMsg return nil } ``` **生產者初始化**: ```go // kafka init kafka.InitKafka() go kafka.GetClient().Run() ``` ### 消費者 consumer.go ```go package kafka_consumer import ( "context" "github.com/Shopify/sarama" "os" "os/signal" "sync" "syscall" ) // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool } func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error { //panic("implement me") return nil } func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error { //panic("implement me") return nil } func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { logger.Info("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) session.MarkMessage(message, "") c.Handler(message.Topic, message.Value) } return nil } func (c *Consumer) Handler(topic string, msg []byte) { switch topic { case conscom.KafkaTopicGiftOrder: GiftOrder(topic, msg) case conscom.KafkaTopicFollow: UserFollow(topic, msg) } } func ConsumeInit(topics []string, groupID string) { consumer := Consumer{ ready: make(chan bool), } brokerList := []string{"kafka:9092"} config := sarama.NewConfig() config.Version = sarama.V1_0_0_0 ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(brokerList, groupID, config) if err != nil { log.Printf("kafka consumer err %v", err) return } wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for { // server-side rebalance happens, the consumer session will need to be if err := client.Consume(ctx, topics, &consumer); err != nil { log.Printf("kafka consumer: %v", err) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { return } consumer.ready = make(chan bool) } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-ctx.Done(): log.Printf("kafka consume gift terminating: context cancelled") case <-sigterm: log.Printf("kafka consume gift terminating: via signal") } cancel() wg.Wait() if err = client.Close(); err != nil { log.Printf("kafka consume gift Error closing client: %v", err) } } ``` **消費者初始化**: ```go // kafka consumer go kafka_consumer.ConsumeInit([]string{"topicA", "topicB", "group-name") ``` ## 參考 《深入理解Kafka:核心設計與實踐原理》作者:朱忠華 https://github.com/Shopify/sarama http://kafka.apache.org/documentation/ https://crossoverjie.top/2018/11/20/kafka/kafka-c