1. 程式人生 > >初識中介軟體Kafka

初識中介軟體Kafka

初識中介軟體Kafka

Author:SimplelWu

什麼是訊息中介軟體?
  • 非底層作業系統軟體,非業務應用軟體,不是直接給終端使用者使用的,不能直接給客戶帶來價值的軟體統稱為中介軟體
  • 關注於資料的傳送和接收,利用高效可靠的非同步訊息傳遞機制整合分散式系統。
什麼是Kafka?

Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,是一個分散式的、分割槽的、可靠的分散式日誌儲存服務。它通過一種獨一無二的設計提供了一個訊息系統的功能。

kafka官方:http://kafka.apache.org/

Kafka作為一個分散式的流平臺,這到底意味著什麼?

我們認為,一個流處理平臺具有三個關鍵能力:

  • 釋出和訂閱訊息(流),在這方面,它類似於一個訊息佇列或企業訊息系統。
  • 容錯的方式儲存訊息(流)。
  • 在訊息流發生時處理它們。
什麼是kakfa的優勢?

它應用於2大類應用:

  • 構建實時的流資料管道,可靠地獲取系統和應用程式之間的資料。
  • 構建實時流的應用程式,對資料流進行轉換或反應。
kafka有四個核心API
  • 應用程式使用 Producer API 釋出訊息到1個或多個topic(主題)。
  • 應用程式使用 Consumer API 來訂閱一個或多個topic,並處理產生的訊息。
  • 應用程式使用 Streams API 充當一個流處理器,從1個或多個topic消費輸入流,並生產一個輸出流到1個或多個輸出topic,有效地將輸入流轉換到輸出流。
  • Connector API允許構建或執行可重複使用的生產者或消費者,將topic連線到現有的應用程式或資料系統。例如,一個關係資料庫的聯結器可捕獲每一個變化。

Client和Server之間的通訊,是通過一條簡單、高效能並且和開發語言無關的TCP協議。並且該協議保持與老版本的相容。Kafka提供了Java Client(客戶端)。除了Java Client外,還有非常多的其它程式語言的Client

主流訊息中介軟體比較
ActiveMQ RabbitMQ Kafka
跨語言 支援(Java優先) 語言無關 支援(Java優先)
支援協議 OpenWire,Stomp, XMPP,AMQP AMQP
優點 遵循JMS規範,安裝部署方便。 繼承Erlang天生的併發性,最初用於金融行業,穩定性,安全性有保障。 依賴zk,可動態擴充套件節點,高效能,高吞吐量,無線擴容訊息可指定追溯。
缺點 根據其他使用者反饋,會莫名丟失訊息,目前重心在下一代的apolle上,目前社群不活躍,對5.X維護較少。 Erlang語言難度較大,不支援動態擴充套件。 嚴格的順序機制,不支援訊息優先順序,不支援標準的訊息協議,不利於平臺遷移。
綜合評價 適合中小企業訊息應用場景,不適合上千個佇列的應用場景。 適合對穩定性要求較高的企業應用。 一般應用在大資料日誌處理或對實時性,可靠性要求稍低的場景。
Kafka好處
  • 可靠性 - Kafka是分散式,分割槽,複製和容錯的。
  • 可擴充套件性 - Kafka訊息傳遞系統輕鬆縮放,無需停機。
  • 耐用性 - Kafka使用分散式提交日誌,這意味著訊息會盡可能快地保留在磁碟上,因此它是持久的。
  • 效能 - Kafka對於釋出和訂閱訊息都具有高吞吐量。 即使儲存了許多TB的訊息,它也保持穩定的效能。

Kafka非常快,並保證零停機和零資料丟失。

應用場景
  • 指標 - Kafka通常用於操作監控資料。 這涉及聚合來自分散式應用程式的統計資訊,以產生操作資料的集中饋送。
  • 日誌聚合解決方案 - Kafka可用於跨組織從多個服務收集日誌,並使它們以標準格式提供給多個伺服器。
  • 流處理 - 流行的框架(如Storm和Spark Streaming)從主題中讀取資料,對其進行處理,並將處理後的資料寫入新主題,供使用者和應用程式使用。 Kafka的強耐久性在流處理的上下文中也非常有用。
Kafka相關術語
序號 元件和說明
1 Topics(主題)屬於特定類別的訊息流稱為主題。 資料儲存在主題中。主題被拆分成分割槽。 對於每個主題,Kafka儲存一個分割槽的資料。 每個這樣的分割槽包含不可變有序序列的訊息。 分割槽被實現為具有相等大小的一組分段檔案。
2 Partition(分割槽)主題可能有許多分割槽,因此它可以處理任意數量的資料。
3 Partition offset(分割槽偏移)每個分割槽訊息具有稱為 offset 的唯一序列標識。
4 Replicas of partition(分割槽備份)副本只是一個分割槽的備份。 副本從不讀取或寫入資料。 它們用於防止資料丟失。
5 Brokers(經紀人)代理是負責維護髮布資料的簡單系統。 每個代理中的每個主題可以具有零個或多個分割槽。 假設,如果在一個主題和N個代理中有N個分割槽,每個代理將有一個分割槽。假設在一個主題中有N個分割槽並且多於N個代理(n + m),則第一個N代理將具有一個分割槽,並且下一個M代理將不具有用於該特定主題的任何分割槽。假設在一個主題中有N個分割槽並且小於N個代理(n-m),每個代理將在它們之間具有一個或多個分割槽共享。 由於代理之間的負載分佈不相等,不推薦使用此方案。
6 Kafka Cluster(Kafka叢集)Kafka有多個代理被稱為Kafka叢集。 可以擴充套件Kafka叢集,無需停機。 這些叢集用於管理訊息資料的永續性和複製。
7 Producers(生產者)生產者是傳送給一個或多個Kafka主題的訊息的釋出者。 生產者向Kafka經紀人傳送資料。 每當生產者將訊息釋出給代理時,代理只需將訊息附加到最後一個段檔案。 實際上,該訊息將被附加到分割槽。 生產者還可以向他們選擇的分割槽傳送訊息。
8 Consumers(消費者)Consumers從經紀人處讀取資料。 消費者訂閱一個或多個主題,並通過從代理中提取資料來使用已釋出的訊息。
9 Leader(領導者) Leader 是負責給定分割槽的所有讀取和寫入的節點。每個分割槽都有一個伺服器充當Leader
10 Follower(追隨者)跟隨領導者指令的節點被稱為Follower。 如果領導失敗,一個追隨者將自動成為新的領導者。 跟隨者作為正常消費者,拉取訊息並更新其自己的資料儲存。
使用Kafka
  1. 安裝jdk
  2. 安裝zookepper 官方:http://zookeeper.apache.org/
  3. 安裝kafka

我這裡jdk是已經安裝好的。

安裝zookepper:

tar -zxvf zookeeper-3.4.13.tar.gz #解壓
cd zookeeper-3.4.13/config #進入配置目錄
#zookeeper執行需要config裡有config檔案。但是解壓後預設只有zoo_sample.cfg,我們將名字修改下即可
mv zoo_sample.cfg zoo.cfg #修改配置檔名字

啟動zookeper,來到bin目錄:

./zkServer.sh start #啟動zookepper

停止zookeper,來到bin目錄:

./zkServer.sh start #停止zookepper

kafka下載:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

使用Kafka

tar -zxvf kafka_2.11-2.1.0.tgz #解壓kafka

啟動zookpper服務,來到kafka的bin目錄:

./zookeeper-server-start.sh config/zookeeper.properties #啟動服務

啟動kafka服務:

./kafka-server-start.sh config/server.properties #啟動kafka服務

建立一個主題:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #topic_name

topic_name:主題的名字'test'。

建立好後檢視主題:

kafka-topics.sh --list --zookeeper localhost:2181

Kafka提供了一個命令列的工具,可以從輸入檔案或者命令列中讀取訊息併發送給Kafka叢集。每一行是一條訊息。
執行producer(生產者),然後在控制檯輸入幾條訊息到伺服器。

傳送訊息:

./kafka-console-producer.sh --broker-list localhost:9092 --topic test #主題為test

進入之後就可傳送訊息!!!

Kafka也提供了一個消費訊息的命令列工具,將儲存的資訊輸出出來。

消費訊息:

#topic主題需要與被消費的主題對應上
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Kafka常用命令
#檢視所有主題列表
kafka-topics.sh --zookeeper localhost:2181 --list
#檢視指定topic資訊
kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic_name
#控制檯向topic生產資料
kafka-console-producer.sh --broker-list localhost:9092 --topic topic_name 
#控制檯消費topic的資料
kafka-console-consumer.sh  --zookeeper localhost:2181  --topic topic_name --from-beginning 
#檢視topic某分割槽偏移量最大(小)值
kafka-run-class.sh kafka.tools.GetOffsetShell --topic hive-mdatabase-hostsltable  --time -1 --broker-list localhost:9092 --partitions 0 
#增加topic分割槽數
kafka-topics.sh --zookeeper localhost:2181  --alter --topic topic_name --partitions 10
#刪除topic,慎用,只會刪除zookeeper中的元資料,訊息檔案須手動刪除
kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic topic_name