1. 程式人生 > >在.NET中使用Apache Kafka(一)

在.NET中使用Apache Kafka(一)

​曾經在你的應用程式中使用過非同步處理嗎?在處理不需要立即執行的任務時,非同步程式碼似乎是不可避免的。Apache Kafka是最常用和最健壯的開源事件流平臺之一。許多公司和開發者利用它的強大功能來建立高效能的非同步操作,用於微服務的資料整合,以及用於應用程式健康指標的監控工具。這篇文章解釋了在.NET應用程式中使用Kafka的細節,還展示瞭如何在Windows作業系統上安裝及使用。

它是如何工作的

當今世界,資料正在以指數形式增長。為了容納不斷增長的資料,Kafka這樣的工具應運而生,提供了健壯而令人印象深刻的架構。

但是Kafka是如何在幕後工作的呢?

Kafka在生產者和消費者之間交換資訊。生產者和消費者是這一線性過程的兩個主要角色。

Kafka也可以在一個或多個伺服器的叢集中工作。這些伺服器被稱為Kafka代理,通過代理你可以受益於多種特性,例如資料複製、容錯和高可用。

這些代理由另一個叫做Zookeeper的工具管理。總之,它是一種旨在保持分散式系統中同步和組織配置資料的服務。

Kafka Topics

Kafka只是一個代理,所有的行為都發生在這。生產者向世界傳送訊息,而消費者讀取特定的資料塊。如何區分資料的一個特定部分與其他部分?消費者如何知道要使用哪些資料?要理解這一點,你需要一個新的內容:topic。

Kafka topics是傳遞訊息的載體。由生產者產生的Kafka記錄被組織並存儲到topic中。

假設你正在處理一個用於記載植物目錄的API專案。你要確保公司中的每個人都能夠訪問每一個新註冊的植物。所以你選了Kafka。

在系統中註冊的每一個新植物都將通過Kafka進行廣播。topic的名稱是tree_catalog。

在這種情況下,topic像堆疊一樣工作。它將資訊儲存在到達時的相同位置,並保證資料不會丟失。

到達的每個資料記錄被儲存在一個slot中,並用一個稱為offset的唯一位置號註冊。

例如,當一個消費者消費了儲存在offset是0的訊息時,它提交訊息,宣告一切正常,然後移動到下一個offset,依此類推。這個過程通常是線性的。然而,由於許多消費者可以同時將記錄“插入”到同一個topic中,所以確定哪些資料位置已經被佔用的責任留給了消費者。這意味著消費者可以決定使用訊息的順序,甚至決定是否從頭開始重新開始處理(offset為0)。

分割槽

分散式系統的一個關鍵特性是資料複製。它允許一個更安全的體系結構,資料可以被複制到其他地方,以防不好的事情發生。Kafka通過分割槽處理複製。Kafka topics被配置為分散在幾個分割槽(可配置的)。每個分割槽通過唯一的offset儲存資料記錄。

為了實現冗餘,Kafka在分割槽(一個或多個)建立副本,並在叢集中傳播資料。

這個過程遵循leader-follower模型,其中一個leader副本總是處理給定分割槽的請求,而follower複製該分割槽。每次製作人將訊息推送到某個主題時,它都會直接傳遞給該主題的領導者。

消費組

在Kafka中,消費來自topic的訊息最合適的方式是通過消費組。

顧名思義,這些組由一個或多個消費者組成,目的是獲取來自特定主題的所有訊息。

為此,組必須始終具有唯一的id(由屬性group.id設定)。無論何時消費者想要加入那個組,它都將通過組id來完成。

每次你新增或刪除一個組的消費者,Kafka會重新平衡它們之間的負載,這樣就不會過載。

設定

現在,你已經瞭解了Kafka的通用工作原理,是時候開始環境設定了。為了簡化,這個例子將使用Docker來儲存Kafka和Zookeeper映像,而不是將它們安裝到你的機器上。這樣可以節省一些空間和複雜性。

對於Windows使用者,Docker提供了一種安裝和管理Docker容器的簡單方式:Docker桌面。進入它的下載頁面並下載安裝程式。執行它,並在不更改預設設定選項的情況下繼續到最後。

確保在此過程完成後重新啟動計算機。重啟後,Docker可能會要求你安裝其他依賴項,所以請確保接受每一個依賴項。在Docker上安裝一個有效的Kafka本地環境最快的路徑之一是通過Docker Compose。通過這種方式,可以通過一個YAML檔案建立應用程式服務,並快速地讓它們執行。

建立一個名為docker-compose的新檔案,並將以下的內容儲存到其中:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_CREATE_TOPICS: "simpletalk_topic:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

注意,程式碼從Docker Hub的wurstmeister帳戶中匯入了兩個服務映象(kafka和zookeeper)。這是在Docker上使用Kafka最穩定的映象之一。埠也使用它們的推薦值進行設定,因此請注意不要更改它們。

其中最重要的設定之一屬於KAFKA_CREATE_TOPICS。在這裡,你必須定義要建立的topic名稱。還有其他方法可以建立主題,以後你將看到。

通過命令列導航到docker-compose.yml所在的資料夾。然後執行如下命令啟動映象:

docker-compose up

這段程式碼將載入所有依賴項並啟動映象。在此過程中,可能會看到大量的日誌。

如果沒有錯誤日誌顯示,說明啟動成功。

為了檢查Docker映象是否啟動,在另一個cmd視窗中執行以下命令:

docker ps

顯示如下:​

親自動手

你的Kafka環境已經可以使用了。下一步是在Visual Studio中進行專案建立。進入專案建立視窗。搜尋ASP.NET Core Web Application模板,單擊Next。

解決方案新建一個名稱消費者專案和生產者專案將在同一個解決方案中共存。

下一個視窗選擇API模板。取消勾選“配置為HTTPS”選項。

建立專案後,右鍵單擊解決方案,選擇新增新專案,然後,選擇ASP.NET Core Web Application專案型別。

繼續並像前面一樣選擇API模板。

現在,在ST-Kafka-NET解決方案中有兩個專案。

NuGet包

為了讓C#程式碼理解如何產生和消費訊息,你需要一個Kafka的客戶端。現在最常用的客戶端是Confluent’s Kafka .NET Client。

選擇並單擊Install。或者,你可以通過命令列新增它們:

PM> Install-Package Confluent.Kafka

設定消費者

現在來實現消費者專案。雖然它是一個類似rest的應用程式,但消費者不是必需的。任何型別的.net專案都可以監聽topic訊息。

該專案已經包含一個Controllers資料夾。你需要建立一個名為Handlers的新類,並向其新增一個KafkaConsumerHandler.cs的檔案。內容如下:

using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ST_KafkaConsumer.Handlers
{
    public class KafkaConsumerHandler : IHostedService
    {
        private readonly string topic = "simpletalk_topic";
        public Task StartAsync(CancellationToken cancellationToken)
        {
            var conf = new ConsumerConfig
            {
                GroupId = "st_consumer_group",
                BootstrapServers = "localhost:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
            using (var builder = new ConsumerBuilder<Ignore, 
                string>(conf).Build())
            {
                builder.Subscribe(topic);
                var cancelToken = new CancellationTokenSource();
                try
                {
                    while (true)
                    {
                        var consumer = builder.Consume(cancelToken.Token);
                        Console.WriteLine($"Message: {consumer.Message.Value} received from {consumer.TopicPartitionOffset}");
                    }
                }
                catch (Exception)
                {
                    builder.Close();
                }
            }
            return Task.CompletedTask;
        }
        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }
    }
}

這個處理程式必須在一個單獨的執行緒中執行,因為它將永遠在while迴圈中監視傳入訊息。因此,需要在這個類中使用非同步任務。

請注意topic名稱和消費者配置。它們與docker-compose.yml中的設定完全匹配。一定要反覆檢查你的輸入,否則可能會導致一些莫名其妙的錯誤。

消費者組id可以是任何你想要的。通常,它們都有直觀的名稱,以幫助進行維護和故障排除。

每當新訊息被髮布到simpletalk_topic時,該消費者將使用它並將其記錄到控制檯。當然,在現實應用程式中,你會更好地利用這些資料。

你還需要將這個託管服務類新增到Startup中,因此,開啟它,並在ConfigureServices方法中新增以下程式碼行:

services.AddSingleton<IHostedService, KafkaConsumerHandler>();

並確保引入了以下名稱空間:

using ST_KafkaConsumer.Handlers;

設定生產者

至於生產者,這裡的處理方式會有所不同。由於不需要無限迴圈來監聽到達的訊息,生產者可以簡單地從任何地方釋出訊息,甚至是從控制器。在實際的應用程式中,最好將這類程式碼與MVC層分開,但本例堅持使用控制器,以保持簡單。

在Controllers資料夾中建立一個名為KafkaProducerController.cs的檔案,並向其新增一下內容:

using System;
using Confluent.Kafka;
using Microsoft.AspNetCore.Mvc;
namespace Kafka.Producer.API.Controllers
{
    [Route("api/kafka")]
    [ApiController]
    public class KafkaProducerController : ControllerBase
    {
        private readonly ProducerConfig config = new ProducerConfig 
                             { BootstrapServers = "localhost:9092" };
        private readonly string topic = "simpletalk_topic";
        [HttpPost]
        public IActionResult Post([FromQuery] string message)
        {
            return Created(string.Empty, SendToKafka(topic, message));
        }
        private Object SendToKafka(string topic, string message)
        {
            using (var producer = 
                 new ProducerBuilder<Null, string>(config).Build())
            {
                try
                {
                    return producer.ProduceAsync(topic, new Message<Null, string> { Value = message })
                        .GetAwaiter()
                        .GetResult();
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Oops, something went wrong: {e}");
                }
            }
            return null;
        }
    }
}

生產者程式碼比消費者程式碼簡單得多。ProducerBuilder類負責根據提供的配置選項、Kafka伺服器和topic名稱建立一個功能齊全的Kafka生產者。

重要的是要記住整個過程是非同步的。但是,你可以使用Confluent的API來檢索awaiter物件,然後從API方法返回結果。

測試

要測試這個示例,你需要分別執行生產者和使用者應用程式。在工具欄中,找到Startup Projects組合框並選擇ST-KafkaConsumer選項:

點選按鈕IIS Express來執行消費者應用程式。這將啟動一個新的瀏覽器視窗,我們將忽略並最小化它,因為消費者API不是重點。

開啟一個新的cmd視窗,跳轉到producer資料夾。執行命令dotnet run來啟動它。

請注意它所執行的URL和埠。

現在是時候通過producer API傳送一些訊息了。為此,你可以使用任何API測試工具,例如Postman。

為了讓下面的命令正常工作,必須確保Docker映象正常工作。因此,請確保再次執行docker ps來檢查。有時,重新啟動計算機會停止這些程序。

如果命令沒有任何日誌資訊,那麼再執行一次docker-compose。

要測試釋出-訂閱訊息,開啟另一個cmd視窗併發出以下命令:

curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Hello,kafka!"

這個請求傳送到生產者API並向Kafka釋出一個新訊息。

要檢查消費者是否收到了它,你可以找到輸出視窗並選擇ST-KafkaConsumer – ASP.NET Core Web Server,如圖所示:

cmd視窗也可以顯示JSON結果。但是,它沒有格式化。要解決這個問題,如果你安裝了Python,你可以執行以下命令:

curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Hello,kafka!" | python -m json.tool

輸出如下:

這是目前可以獲得的關於topic message物件的所有資訊。第二個測試將顯示當消費者專案關閉併發布訊息時發生了什麼。

停止Visual Studio中的consumer專案,但這一次有一個不同的訊息:

curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Is%20anybody%20there?" | python -m json.tool

接著啟動消費者專案,觀察日誌記錄,內容如下:

總結

Kafka是一個靈活和健壯的工具,它允許在許多型別的專案中進行強大的實現,這是它被廣泛採用的第一個原因。

這篇文章只是對它的世界的一個簡要介紹,但是還有更多的東西可以看到。在下一篇文章中,我將探討Kafka的功能。

原文連結:https://www.red-gate.com/simple-talk/dotnet/net-development/using-apache-kafka-with-net/