1. 程式人生 > >Kafka與.net core(三)kafka操作

Kafka與.net core(三)kafka操作

1.Kafka相關知識

  • Broker:即Kafka的伺服器,使用者儲存訊息,Kafa叢集中的一臺或多臺伺服器統稱為broker。
  • Message訊息:是通訊的基本單位,每個 producer 可以向一個 topic(主題)釋出一些訊息。
    • Kafka中的Message是以topic為基本單位組織的,不同的topic之間是相互獨立的。每個topic又可以分成幾個不同的partition(每個topic有幾個partition是在建立topic時指定的),每個partition儲存一部分Message。
    • partition中的每條Message包含了以下三個屬性:Kafka基於檔案儲存.通過分割槽,可以將日誌內容分散到多個server上,來避免檔案尺寸達到單機磁碟的上限,每個partiton都會被當前server(kafka例項)儲存可以將一個topic切分多任意多個partitions,來訊息儲存/消費的效率。
      • offset:訊息唯一標識:對應型別:long
      • MessageSize 對應型別:int32
      • data 是message的具體內容。
    • 越多的partitions意味著可以容納更多的consumer,有效提升併發消費的能力。
  • Message:在Broker中通Log追加的方式進行持久化儲存。並進行分割槽(patitions)。
    • 一個Topic可以認為是一類訊息,每個topic將被分成多partition(區),每個partition在儲存層面是append log檔案。任何釋出到此partition的訊息都會被直接追加到log檔案的尾部,每條訊息在檔案中的位置稱為offset(偏移量),partition是以檔案的形式儲存在檔案系統中。
    • Logs檔案根據broker中的配置要求,保留一定時間後刪除來釋放磁碟空間。

      

    • Topic物理上的分組,一個 topic可以分為多個 partition,每個 partition 是一個有序的佇列。partition中的每條訊息都會被分配一個有序的 id(offset)。
    • 為實現稀疏儲存,我們通過給檔案建索引,每隔一定位元組的資料建立一條索引

       

  • 為了減少磁碟寫入的次數,broker會將訊息暫時buffer起來,當訊息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO呼叫的次數。
  • Broker沒有副本機制,一旦broker宕機,該broker的訊息將都不可用。Message訊息是有多份的。
  • consumer:訊息和資料消費者,訂閱topics並處理其釋出的訊息的過程叫做consumers。
    • 在 kafka中,我們可以認為一個group是一個訂閱者,一個Topic中的每個partions,只會被一個訂閱者中的一個consumer消費,不過一個 consumer可以消費多個partitions中的訊息(消費者資料小於Partions  的數量時)。注意:kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到訊息。
    • 一個partition中的訊息只會被group中的一個consumer訊息。每個group中consumer訊息消費互相獨立。
  • 無狀態導致訊息的刪除成為難題(可能刪除的訊息正在被訂閱),kafka採用基於時間的SLA(服務水平保證),訊息儲存一定時間(通常為7天)後會被刪除。
  • 訊息訂閱者可以rewind back到任意位置重新進行消費,當訂閱者故障時,可以選擇最小的offset(id)進行重新讀取消費訊息。

2.kafka操作

2.1.檢視有哪些主題:

kafka-topics.sh --list --zookeeper 192.168.0.201:12181

2.2.檢視topic的詳細資訊

kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1

2.3.為topic增加副本

kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute

2.4.建立topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1

2.5為topic增加partition

bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1

2.6kafka生產者客戶端命令

kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1

2.7kafka消費者客戶端命令

kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1

2.8kafka服務啟動

kafka-server-start.sh -daemon ../config/server.properties

3..net core操作

producer端,引入Confluent.Kafka

Install-Package Confluent.Kafka -Version 1.0-beta2
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace KafkaTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Test().Wait();
        }
        static async Task Test()
        {
           var conf = new ProducerConfig { BootstrapServers = "39.**.**.**:9092" };

            Action<DeliveryReportResult<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $"Delivered message to {r.TopicPartitionOffset}"
                    : $"Delivery Error: {r.Error.Reason}");

            using (var p = new Producer<Null, string>(conf))
            {
                for (int i = 0; i < 100000; ++i)
                {
                    p.BeginProduce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler);
                }

                // wait for up to 10 seconds for any inflight messages to be delivered.
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }
}

consumer端,引入Confluent.Kafka

Install-Package Confluent.Kafka -Version 1.0-beta2
using Confluent.Kafka;
using System;
using System.Linq;
using System.Text;

namespace KafkaClient
{
    class Program
    {
        static void Main(string[] args)
        {
            

            var conf = new ConsumerConfig
            {
                GroupId = "test-consumer-group4",
                BootstrapServers = "39.**.**.**:9092",
                // Note: The AutoOffsetReset property determines the start offset in the event
                // there are not yet any committed offsets for the consumer group for the
                // topic/partitions of interest. By default, offsets are committed
                // automatically, so in this example, consumption will only start from the
                // earliest message in the topic 'my-topic' the first time you run the program.
                AutoOffsetReset = AutoOffsetResetType.Earliest
            };

            using (var c = new Consumer<Ignore, string>(conf))
            {
                c.Subscribe("my-topic");

                bool consuming = true;
                // The client will automatically recover from non-fatal errors. You typically
                // don't need to take any action unless an error is marked as fatal.
                c.OnError += (_, e) => consuming = !e.IsFatal;

                while (consuming)
                {
                    try
                    {
                        var cr = c.Consume();
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }

                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}