1. 程式人生 > >Kafka訊息佇列介紹、環境搭建及應用:C#實現消費者-生產者訂閱

Kafka訊息佇列介紹、環境搭建及應用:C#實現消費者-生產者訂閱

一:kafka介紹

kafka(官網地址:http://kafka.apache.org)是一種高吞吐量的分散式釋出訂閱的訊息佇列系統,具有高效能和高吞吐率。

1.1 術語介紹

  • Broker
    Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker
  • Topic
    主題:每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)
  • Partition
    分割槽:Partition是物理上的概念,每個Topic包含一個或多個Partition.(一般為kafka節點數cpu的總核數)
  • Producer
    生產者,負責釋出訊息到Kafka broker
  • Consumer
    消費者:從Kafka broker讀取訊息的客戶端。
  • Consumer Group
    消費者組:每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)。

1.2 基本特性

  • 可擴充套件性
  1. 在不需要下線的情況下進行擴容
  2. 資料流分割槽(partition)儲存在多個機器上
  • 高效能
  1. 單個broker就能服務上千客戶端
  2. 單個broker每秒種讀/寫可達每秒幾百兆位元組
  3. 多個brokers組成的叢集將達到非常強的吞吐能力
  4. 效能穩定,無論資料多大
  5. Kafka在底層摒棄了Java堆快取機制,採用了作業系統級別的頁快取,同時將隨機寫操作改為順序寫,再結合Zero-Copy的特性極大地改善了IO效能。

1.3 訊息格式

  1. 一個topic對應一種訊息格式,因此訊息用topic分類
  2. 一個topic代表的訊息有1個或者多個patition(s)組成
  3. 一個partition應該存放在一到多個server上,如果只有一個server,就沒有冗餘備份,是單機而不是叢集;如果有多個server,一個server為leader(領導者),其他servers為followers(跟隨者),leader需要接受讀寫請求,followers僅作冗餘備份,leader出現故障,會自動選舉一個follower作為leader,保證服務不中斷;每個server都可能扮演一些partitions的leader和其它partitions的follower角色,這樣整個叢集就會達到負載均衡的效果
  4. 訊息按順序存放;訊息順序不可變;只能追加訊息,不能插入;每個訊息都有一個offset,用作訊息ID, 在一個partition中唯一;offset有consumer儲存和管理,因此讀取順序實際上是完全有consumer決定的,不一定是線性的;訊息有超時日期,過期則刪除

1.4 原理解析

producer建立一個topic時,可以指定該topic為幾個partition(預設是1,配置num.partitions),然後會把partition分配到每個broker上,分配的演算法是:a個broker,第b個partition分配到b%a的broker上,可以指定有每個partition有幾分副本Replication,副本的分配策略為:第c個副本儲存在第(b+c)%a的broker上。一個partition在每個broker上是一個資料夾,資料夾中檔案的命名方式為:topic名稱+有序序號。每個partition中檔案是一個個的segment,segment file由.index和.log檔案組成。兩個檔案的命名規則是,上一個segmentfile的最後一個offset。這樣,可以快速的刪除old檔案。

producer往kafka裡push資料,會自動的push到所有的分割槽上,訊息是否push成功有幾種情況:1,接收到partition的ack就算成功,2全部副本都寫成功才算成功;資料可以儲存多久,預設是兩天;producer的資料會先存到快取中,等大小或時間達到閾值時,flush到磁碟,consumer只能讀到磁碟中的資料。

consumer從kafka裡poll資料,poll到一定配置大小的資料放到記憶體中處理。每個group裡的consumer共同消費全部的訊息,不同group裡的資料不能消費同樣的資料,即每個group消費一組資料。

consumer的數量和partition的數量相等時消費的效率最高。這樣,kafka可以橫向的擴充broker數量和partitions;資料順序寫入磁碟;producer和consumer非同步

二:環境搭建(windows)

2.1 安裝zookeeper

kafka需要用到zookeeper,所以需要先安裝zookeeper

  1. 到官網下載最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/
  2. 解壓到指定路徑
  3. 複製conf目錄下zoo_sample.cfg,貼上改名為zoo.cfg,修改zoo.cfg中的dataDir的值為E:/data/zookeeper,並新增一行dataLogDir=E:/log/zookeeper
  4. 修改系統環境變數,在Path後新增 ;E:\zookeeper\zookeeper-3.4.10\bin
  5. 執行cmd命令視窗,輸入zkServer回車,啟動

2.2 安裝kafka

  1. 到官網下載最新版kafka,http://kafka.apache.org/downloads
  2. 解壓到指定路徑,如:E:\kafka_2.12-0.10.2.0
  3. 修改E:\kafka_2.12-0.10.2.0\config目錄下的server.properties中 log.dirs的值為E:/log/kafka
  4. 新增系統環境變數,在Path後新增 ;E:\kafka_2.12-0.10.2.0\bin\windows
  5. 啟動kafka,在cmd命令列用cd命令切換到kafka根目錄E:\kafka_2.12-0.10.2.0,輸入命令
    .\bin\windows\kafka-server-start.bat .\config\server.properties
    出現started (kafka.server.KafkaServer)字樣表示啟動成功
  6. 執行cmd命令列,建立一個topic,命令如下:
    kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  7. 再開啟一個cmd,建立一個Producer,命令如下:
    kafka-console-producer.bat --broker-list localhost:9092 --topic test
  8. 再開啟一個cmd,建立一個Customer,命令如下:
    kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
  9. 在Producer視窗下輸入資訊進行測試 ,每輸入一行回車後訊息馬上就會出現在Customer中,表明kafka已經安裝測試成功

三:基於.net的常用類庫

基於.net實現kafka的訊息佇列應用,常用的類庫有kafka-net,Confluent.Kafka,官網推薦使用Confluent.Kafka,本文也是基於該庫的實現,使用版本預發行版1.0.0-beta,建立控制檯應用程式。

四:應用–生產者

生產者將資料釋出到指定的主題,一般生產環境下的負載均衡,服務代理會有多個,BootstrapServers屬性則為以逗號隔開的多個代理地址

/// <summary>
/// 生產者
/// </summary>
public static void Produce()
{
     var config = new ProducerConfig { BootstrapServers = "localhost:9092" }
     Action<DeliveryReportResult<Null, string>> handler = r =>
     Console.WriteLine(!r.Error.IsError
         ? $"Delivered message to {r.TopicPartitionOffset}"
         : $"Delivery Error: {r.Error.Reason}");

    using (var producer = new Producer<Null, string>(config))
    {
        // 錯誤日誌監視
        producer.OnError += (_, msg) => { Console.WriteLine($"Producer_Erro資訊:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };

        for (int i = 0; i < 5; i++)
        {
            // 非同步傳送訊息到主題
            producer.BeginProduce("MyTopic", new Message<Null, string> { Value = i.ToString() }, handler);
        }   
        // 3後 Flush到磁碟
        producer.Flush(TimeSpan.FromSeconds(3));
    }
}

五:應用–消費者

消費者使用消費者組名稱標記自己,並且釋出到主題的每個記錄被傳遞到每個訂閱消費者組中的一個消費者例項。消費者例項可以在單獨的程序中,也可以在不同的機器

如果所有消費者例項具有相同的消費者組,則記錄將有效地在消費者例項上進行負載平衡。

如果所有消費者例項具有不同的消費者組,則每個記錄將廣播到所有消費者程序
在這裡插入圖片描述
上圖為兩個伺服器Kafka群集,託管四個分割槽(P0-P3),包含兩個消費者組。消費者組A有兩個消費者例項,B組有四個消費者例項。

預設EnableAutoCommit 是自動提交,只要從佇列取出訊息,偏移量自動移到後一位,無論訊息後續處理成功與否,該條訊息都會消失,所以為免除處理失敗的資料丟失,消費者方可設定該屬性為false,後面進行手動commint()提交偏移

  /// <summary>
  /// 消費者
  /// </summary>
  public static void Consumer()
  {
      var conf = new ConsumerConfig
      {
          GroupId = "test-consumer-group",
          BootstrapServers = "localhost:9092",
          AutoOffsetReset = AutoOffsetResetType.Earliest,
          EnableAutoCommit = false  // 設定非自動偏移,業務邏輯完成後手動處理偏移,防止資料丟失
      };

      using (var consumer = new Consumer<Ignore, string>(conf))
      {
          // 訂閱topic
          consumer.Subscribe("MyTopic");
          // 錯誤日誌監視 
          consumer.OnError += (_, msg) => { Console.WriteLine($"Consumer_Error資訊:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };

          while (true)
          {
              try
              {
                  var consume = consumer.Consume();
                  string receiveMsg = consume.Value;
                  Console.WriteLine($"Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'.");
                  // 開始我的業務邏輯
                  ...
                  // 業務結束
                  if(成功)
                  {
                   	 consumer.Commit(new List<TopicPartitionOffset>() { consume.TopicPartitionOffset }); //手動提交偏移
                  }
              }
              catch (ConsumeException e)
              {
                  Console.WriteLine($"Consumer_Error occured: {e.Error.Reason}");
              }
          }
      }
  }

執行結果

在這裡插入圖片描述

常見資料問題處理

  1. 重複消費最常見的原因:re-balance問題,通常會遇到消費的資料,處理很耗時,導致超過了Kafka的session timeout時間(0.10.x版本預設是30秒),那麼就會re-balance重平衡,此時有一定機率offset沒提交,會導致重平衡後重復消費。
    去重問題:訊息可以使用唯一id標識
  2. 保證不丟失訊息:
    生產者(ack= -1 或 all 代表至少成功傳送一次)
    消費者 (offset手動提交,業務邏輯成功處理後,提交offset)
  3. 保證不重複消費:落表(主鍵或者唯一索引的方式,避免重複資料)
    業務邏輯處理(選擇唯一主鍵儲存到Redis或者mongdb中,先查詢是否存在,若存在則不處理;若不存在,先插入Redis或Mongdb,再進行業務邏輯處理)

Kafka 視覺化除錯

藉助視覺化客戶端工具 kafka tool
具體使用可參考:https://www.cnblogs.com/frankdeng/p/9452982.html

END