1. 程式人生 > >.net core kafka 入門例項 一篇看懂

.net core kafka 入門例項 一篇看懂

  kafka 相信都有聽說過,不管有沒有用過,在江湖上可以說是大名鼎鼎,就像天龍八部裡的喬峰。國際慣例,先介紹生平事蹟   簡介

Kafka 是由 Apache軟體基金會 開發的一個開源流處理平臺,由 Scala 和 Java 編寫。Kafka是一種高吞吐量的 分散式 ,支援分割槽(partition),多副本(replica)的 釋出訂閱訊息系統 。與其他MQ最大不同是Topic 具有分割槽(Partition)的概念,訊息出隊的速度也比其他MQ快。

 

特性及適用場景
  • 高吞吐量、低延遲
  • 可擴充套件性:叢集支援熱擴充套件
  • 永續性、可靠性
  • 容錯性:允許叢集中節點失敗(若副本數量為n,則允許n-1個節點失敗)
  • 高併發:支援數千個客戶端同時讀寫
常用場景
  • 日誌收集
  • 訊息系統:生產者和消費者、快取訊息等。
  • 使用者活動跟蹤:流網頁、搜尋、點選等活動
  • 運營指標
  • 工作流處理
  • 對實時性要求不高的資料處理

 

 

 

 

Kafka基礎概念

 

Topic

Kafka 中可將訊息分類,每一類的訊息稱為一個 Topic(主題),消費者可以對不同的 Topic 進行不同的處理。Topic相當於傳統訊息系統MQ中的一個佇列queue,producer端傳送的message必須指定是傳送到哪個topic,但是不需要指定topic下的哪個partition,因為kafka會把收到的message進行load balance,均勻的分佈在這個topic下的不同的partition上

Broker

每個 Broker(代理) 即一個 Kafka 服務例項,多個 Broker 構成一個 Kafka 叢集,生產者釋出的訊息將儲存在 Broker 中,消費者將從 Broker 中拉取訊息進行消費。

 

producer

生產者

consumer

消費者

Partition

分割槽,Kafka 中比較特色的部分,一個 Topic 可以分為多個 Partition,每個 Partition 是一個有序的佇列,Partition 中的每條訊息都存在一個有序的偏移量(Offest) ,同一個 Consumer Group 中,只有一個 Consumer 例項可消費某個 Partition 的訊息。

 

持久化

Kafka會把訊息持久化到本地檔案系統中,每個 Topic 將訊息分成多 Partition,每個 Partition 在儲存層面是 append log 檔案。任何釋出到此 Partition 的訊息都會被直接追加到 log 檔案的尾部,每條訊息在檔案中的位置稱為 Offest(偏移量),Partition 是以檔案的形式儲存在檔案系統中,log 檔案根據 Broker 中的配置保留一定時間後刪除來釋放磁碟空間。

由於message的寫入持久化是順序寫入的,因此message在被消費的時候也是按順序被消費的,保證partition的message是順序消費的。

 

 

看到上面的一堆特性,巴拉巴拉,一頓吹,道理我都懂,怎麼操作,還是沒看到效果。

別急,接下來就上程式碼,這個是不能少的。保證你們拿去就能用

 

 

上程式碼,demo測試 先建立兩個介面,寫好基礎類庫,後面直接應用就行了,我這裡就直接放一起了
 /// <summary>
    /// 消費者
    /// </summary>
    public interface IKafkaConsumer : IDisposable
    {
        /// <summary>
        /// 消費資料
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <returns></returns>
        T Consume<T>() where T : class;
    }

  public interface IKafkaProducer : IDisposable
    {
        /// <summary>
        /// 釋出訊息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <param name="data"></param>
        /// <param name="operateType"></param>
        /// <returns></returns>
        bool Produce<T>(string key, T data, int operateType) where T : class;
    }

  實現方法

using Confluent.Kafka;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;

namespace Kafka
{
    public class KafkaConsumer : IKafkaConsumer
    {
        private bool disposeHasBeenCalled = false;
        private readonly object disposeHasBeenCalledLockObj = new object();

        private readonly IConsumer<string, string> _consumer;

        /// <summary>
        /// 建構函式,初始化配置
        /// </summary>
        /// <param name="config">配置引數</param>
        /// <param name="topic">主題名稱</param>
        public KafkaConsumer(ConsumerConfig config, string topic)
        {
            _consumer = new ConsumerBuilder<string, string>(config).Build();

            _consumer.Subscribe(topic);
        }

        /// <summary>
        /// 消費
        /// </summary>
        /// <returns></returns>
        public T Consume<T>() where T : class
        {
            try
            {
                var result = _consumer.Consume(TimeSpan.FromSeconds(1));
                if (result != null)
                {
                    if (typeof(T) == typeof(string))
                        return (T)Convert.ChangeType(result.Value, typeof(T));

                    return JsonConvert.DeserializeObject<T>(result.Value);
                }
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"consume error: {e.Error.Reason}");
            }
            catch (Exception e)
            {
                Console.WriteLine($"consume error: {e.Message}");
            }

            return default;
        }

        /// <summary>
        /// 釋放
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// Dispose
        /// </summary>
        /// <param name="disposing"></param>
        protected virtual void Dispose(bool disposing)
        {
            lock (disposeHasBeenCalledLockObj)
            {
                if (disposeHasBeenCalled) { return; }
                disposeHasBeenCalled = true;
            }

            if (disposing)
            {
                _consumer?.Close();
            }
        }
    }

}

  

 public class KafkaProducer : IKafkaProducer
    {
        private bool disposeHasBeenCalled = false;
        private readonly object disposeHasBeenCalledLockObj = new object();

        private readonly IProducer<string, string> _producer;
        private readonly string _topic;

        /// <summary>
        /// 建構函式,初始化配置
        /// </summary>
        /// <param name="config">配置引數</param>
        /// <param name="topic">主題名稱</param>
        public KafkaProducer(ProducerConfig config, string topic)
        {
            _producer = new ProducerBuilder<string, string>(config).Build();
            _topic = topic;
        }

        /// <summary>
        /// 釋出訊息
        /// </summary>
        /// <typeparam name="T">資料實體</typeparam>
        /// <param name="key">資料key,partition分割槽會根據key</param>
        /// <param name="data">資料</param>
        /// <param name="operateType">操作型別[增、刪、改等不同型別]</param>
        /// <returns></returns>
        public bool Produce<T>(string key, T data, int operateType) where T : class
        {
            var obj = JsonConvert.SerializeObject(new
            {
                Type = operateType,
                Data = data
            });

            try
            {
                var result = _producer.ProduceAsync(_topic, new Message<string, string>
                {
                    Key = key,
                    Value = obj
                }).ConfigureAwait(false).GetAwaiter().GetResult();

#if DEBUG

                Console.WriteLine($"Topic: {result.Topic} Partition: {result.Partition} Offset: {result.Offset}");
#endif
                return true;

            }
            catch (ProduceException<string, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
            catch (Exception e)
            {
                Console.WriteLine($"Delivery failed: {e.Message}");
            }

            return false;
        }

        /// <summary>
        /// 釋放
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// Dispose
        /// </summary>
        /// <param name="disposing"></param>
        protected virtual void Dispose(bool disposing)
        {
            lock (disposeHasBeenCalledLockObj)
            {
                if (disposeHasBeenCalled) { return; }
                disposeHasBeenCalled = true;
            }

            if (disposing)
            {
                _producer?.Dispose();
            }
        }
    }

  

再寫兩個測試方法,一個傳送訊息,一個接收訊息,控制檯就好 注意  kafka 通過 topic 來接收訊息 new KafkaProducer(config, "topic-c"))  傳送方和接收方的topic要一致
 static void Main(string[] args)
        {
            var config = new ProducerConfig
            {
                BootstrapServers = "localhost:9092",
                Acks = Acks.All
            };
             //傳送訊息
        
            using (var kafkaProducer = new KafkaProducer(config, "topic-d"))
            {
                var result = kafkaProducer.Produce<object>("a", new { name = "豬八戒3" }, 1);

            }
            Console.WriteLine("訊息傳送成功");
        }        








static void Main(string[] args)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = "localhost:9092",
                GroupId = "test",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            string text;
            Console.WriteLine("接受中......");
            while ((text = Console.ReadLine()) != "q")
            {
               //接受訊息
                using (var kafkaProducer = new KafkaConsumer(config, "topic-d"))
                {
                    var result = kafkaProducer.Consume<object>();
                    if (result != null)
                    {
                        Console.WriteLine(result.ToString());
                    }

                }
            }

        }

 上結果、

 

 

可以看到,訊息已經收到了。這個demo裡,消費端要一直處於正常狀態才行,才能消費生產者得資訊

    

本文版權歸作者和部落格園共有,來源網址:歡迎各位轉載,但是未經作者本人同意,轉載文章之後必須在文章頁面明顯位置給出作者和原文連線,否則保留追究法律責任的權利。