1. 程式人生 > >kafka叢集的安裝及資料的匯入匯出

kafka叢集的安裝及資料的匯入匯出

快速開始

本教程假設您剛剛開始,並且沒有現有的Kafka或ZooKeeper資料。由於Kafka控制檯指令碼對於基於Unix和Windows的平臺是不同的,因此在Windows平臺上使用bin\windows\而不是bin/將指令碼副檔名更改為.bat

下載 2.0.0版本並解壓縮它。

1

2

> tar -xzf kafka_2.11-2.0.0.tgz

> cd kafka_2.11-2.0.0

Kafka使用ZooKeeper,因此如果您還沒有ZooKeeper伺服器,則需要先啟動它。您可以使用與kafka一起打包的便捷指令碼來獲取快速且髒的單節點ZooKeeper例項。

1

2

3

> bin/zookeeper-server-start.sh config/zookeeper.properties

[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)

...

現在啟動Kafka伺服器:

1

2

3

4

> bin

/kafka-server-start.sh config/server.properties

[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

...

讓我們建立一個名為“test”的主題,它只包含一個分割槽,只有一個副本:

1

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

如果我們執行list topic命令,我們現在可以看到該主題:

1

2

> bin/kafka-topics.sh --list --zookeeper localhost:2181

test

或者,您也可以將代理配置為在釋出不存在的主題時自動建立主題,而不是手動建立主題。

Kafka附帶一個命令列客戶端,它將從檔案或標準輸入中獲取輸入,並將其作為訊息傳送到Kafka叢集。預設情況下,每行將作為單獨的訊息傳送。

執行生產者,然後在控制檯中鍵入一些訊息以傳送到伺服器。

1

2

3

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

Kafka還有一個命令列使用者,它會將訊息轉儲到標準輸出。

1

2

3

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

This is a message

This is another message

如果您在不同的終端中執行上述每個命令,那麼您現在應該能夠在生產者終端中鍵入訊息並看到它們出現在消費者終端中。

所有命令列工具都有其他選項; 執行不帶引數的命令將顯示更詳細地記錄它們的使用資訊。

到目前為止,我們一直在與一個經紀人競爭,但這並不好玩。對於Kafka,單個代理只是一個大小為1的叢集,因此除了啟動更多代理例項之外沒有太多變化。但是為了感受它,讓我們將我們的叢集擴充套件到三個節點(仍然在我們的本地機器上)。

首先,我們為每個代理程式建立一個配置檔案(在Windows上使用copy命令代替):

1

2

> cp config/server.properties config/server-1.properties

> cp config/server.properties config/server-2.properties

現在編輯這些新檔案並設定以下屬性:

1

2

3

4

6

7

8

9

config/server-1.properties:

broker.id=1

log.dirs=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2

log.dirs=/tmp/kafka-logs-2

broker.id屬性是群集中每個節點的唯一且永久的名稱。我們必須覆蓋埠和日誌目錄,因為我們在同一臺機器上執行這些,並且我們希望讓所有代理嘗試在同一埠上註冊或覆蓋彼此的資料。

我們已經啟動了Zookeeper並啟動了我們的單個節點,因此我們只需要啟動兩個新節點:

1

2

3

4

> bin/kafka-server-start.sh config/server-1.properties &

...

> bin/kafka-server-start.sh config/server-2.properties &

...

現在建立一個複製因子為3的新主題:

1

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好的,但現在我們有一個叢集,我們怎麼知道哪個經紀人正在做什麼?要檢視執行“describe topics”命令:

1

2

3

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:

Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

這是輸出的解釋。第一行給出了所有分割槽的摘要,每個附加行提供有關一個分割槽的資訊。由於我們只有一個分割槽用於此主題,因此只有一行。

  • “leader”是負責給定分割槽的所有讀取和寫入的節點。每個節點將成為隨機選擇的分割槽部分的領導者。
  • “replicas”是複製此分割槽日誌的節點列表,無論它們是否為領導者,或者即使它們當前處於活動狀態。
  • “isr”是“同步”複製品的集合。這是副本列表的子集,該列表當前處於活躍狀態並且已經被領導者捕獲。

請注意,在我的示例中,節點1是該主題的唯一分割槽的領導者。

我們可以在我們建立的原始主題上執行相同的命令,以檢視它的位置:

1

2

3

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:

Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

所以毫無疑問 - 原始主題沒有副本,位於伺服器0上,是我們建立它時群集中唯一的伺服器。

讓我們向我們的新主題釋出一些訊息:

1

2

3

4

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

...

my test message 1

my test message 2

^C

現在讓我們使用這些訊息:

1

2

3

4

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

^C

現在讓我們測試一下容錯性。經紀人1充當領導者所以讓我們殺了它:

1

2

3

> ps aux | grep server-1.properties

7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...

> kill -9 7564

在Windows上使用:

1

2

3

4

> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid

ProcessId

6016

> taskkill /pid 6016 /f

領導已切換到其中一個從屬節點,節點1不再位於同步副本集中:

1

2

3

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:

Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

但即使最初接受寫入的領導者已經失敗,這些訊息仍可供消費:

1

2

3

4

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

^C

從控制檯寫入資料並將其寫回控制檯是一個方便的起點,但您可能希望使用其他來源的資料或將資料從Kafka匯出到其他系統。對於許多系統,您可以使用Kafka Connect匯入或匯出資料,而不是編寫自定義整合程式碼。

Kafka Connect是Kafka附帶的工具,用於向Kafka匯入和匯出資料。它是一個可擴充套件的工具,執行 聯結器,實現與外部系統互動的自定義​​邏輯。在本快速入門中,我們將瞭解如何使用簡單的聯結器執行Kafka Connect,這些聯結器將資料從檔案匯入Kafka主題並將資料從Kafka主題匯出到檔案。

首先,我們將首先建立一些種子資料進行測試:

1

> echo -e "foo\nbar" > test.txt

或者在Windows上:

1

2

> echo foo> test.txt

> echo bar>> test.txt

接下來,我們將啟動兩個以獨立模式執行的聯結器,這意味著它們在單個本地專用程序中執行。我們提供三個配置檔案作為引數。第一個始終是Kafka Connect流程的配置,包含常見配置,例如要連線的Kafka代理和資料的序列化格式。其餘配置檔案均指定要建立的聯結器。這些檔案包括唯一的聯結器名稱,要例項化的聯結器類以及聯結器所需的任何其他配置。

1

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Kafka附帶的這些示例配置檔案使用您之前啟動的預設本地群集配置並建立兩個聯結器:第一個是源聯結器,它從輸入檔案讀取行並生成每個Kafka主題,第二個是宿聯結器從Kafka主題讀取訊息並將每個訊息生成為輸出檔案中的一行。

在啟動過程中,您將看到許多日誌訊息,包括一些指示正在例項化聯結器的日誌訊息。一旦Kafka Connect程序啟動,源聯結器應該開始從test.txt主題讀取行並生成它們connect-test,並且接收器聯結器應該開始從主題讀取訊息connect-test 並將它們寫入檔案test.sink.txt。我們可以通過檢查輸出檔案的內容來驗證資料是否已通過整個管道傳遞:

1

2

3

> more test.sink.txt

foo

bar

請注意,資料儲存在Kafka主題中connect-test,因此我們還可以執行控制檯使用者來檢視主題中的資料(或使用自定義使用者程式碼來處理它):

1

2

3

4

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

{"schema":{"type":"string","optional":false},"payload":"foo"}

{"schema":{"type":"string","optional":false},"payload":"bar"}

...

聯結器繼續處理資料,因此我們可以將資料新增到檔案中並看到它在管道中移動:

1

> echo Another line>> test.txt

您應該看到該行出現在控制檯使用者輸出和接收器檔案中。

Kafka Streams是一個客戶端庫,用於構建任務關鍵型實時應用程式和微服務,其中輸入和/或輸出資料儲存在Kafka叢集中。Kafka Streams結合了在客戶端編寫和部署標準Java和Scala應用程式的簡單性以及Kafka伺服器端叢集技術的優勢,使這些應用程式具有高度可擴充套件性,彈性,容錯性,分散式等等。本快速入門示例將演示如何執行在此庫中編碼的流應用程式。

相關推薦

Linux下mongodb安裝資料匯入匯出教程

Linux下mongodb安裝及資料匯入匯出教程 #檢視linux發行版本號 cat /etc/issue #檢視linux核心版本 uname -r 一、Linux下mongodb安裝的一般步驟 1.到mongodb的官網(https://www.mongodb.org/

Docker安裝,oracle安裝資料匯入

Docker安裝 安裝需要的依賴軟體包 yum-util 提供yum-config-manager功能,另外兩個是devicemapper驅動依賴的 sudo yum install -y yum-utils device-mapper-persistent-data lvm2 設定yum源 sudo yu

Solr7.4 Linux 安裝資料匯入

solr 讀音應該是叫 “so lar” 之前寫過一篇介紹solr的文章https://zhangdianlei.github.io/2018/09/11/Apache-Solr/ ,這裡就不再囉嗦solr的事情了。今天搞了一天的solr,現在記錄一下 solr

kafka叢集安裝資料匯入匯出

快速開始 本教程假設您剛剛開始,並且沒有現有的Kafka或ZooKeeper資料。由於Kafka控制檯指令碼對於基於Unix和Windows的平臺是不同的,因此在Windows平臺上使用bin\windows\而不是bin/將指令碼副檔名更改為.bat。 下載 2.0

.ElasticSearch的資料匯入匯出工具-ElasticDump,安裝使用

在linux上 安裝,步驟如下:   1) yum install epel-release      2) yum install nodejs      3) yum install nodejs

資料學習之路94-kafka叢集安裝

解壓 Kafka 安裝包 修改配置檔案 config/server.properties vi server.properties broker.id=0 //為依次增長的:0、1、2、3、4,叢集中唯一id log.dirs=/kafkaData/logs // Kafka

kafka叢集Producer基本資料結構工作流程深入剖析-kafka 商業環境實戰

本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡

Elasticsearch 兩個叢集之間資料匯入匯出

直接上程式碼 官網地址 參考地址 package net.abc; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSON

MongoDB學習(三)資料匯入匯出備份恢復

  這幾天想著公司要用MongoDB,自然就要用到資料匯入匯出,就自己學習了一下。   在Mongo學習(二)中就講到了在bin目錄下有一些工具,本篇就是使用這些工具進行資料的匯入匯出及備份恢復。   注意:以下命令均在cmd命令列中輸入,匯出及備份未指

Kafka資料匯入匯出

這個命令執行的時候建立了一個獨立模式的 Kafka 連線程序,程序中建立了兩個連線:一個是源連線(對應 connect-file-source.properties 的配置資訊),它從輸入檔案中逐行讀取資料釋出到 Kafka 主題上;另一個是讀取連線(對應 connect-file-sink.properti

poiExcel表格所有操作以及資料匯入匯出

這個是本人在學習中記錄的筆記以供大家參考 Poi簡介: 1.1什麼是poi Apache POI [1]  是用Java編寫的免費開源的跨平臺的 Java API,Apache POI提供API給Java程式對Microsoft Offi

使用POI將資料匯入匯出資料庫。

POI將資料匯入匯出資料庫。(採用ssm框架) 1.前臺列表的展示 <script type="text/javascript"> //進行資料的全選 function selectAll(){ var ids=document.get

【轉載】oracle 資料匯入匯出

oracle 資料匯入匯出   一、oracle中copy from的用法     1.語法及使用說明   1.1 語法   下面我們來看一下SQL*Copy Command的語法及使用說明。   在解釋SQ

Hive命令之三:hive的資料匯入匯出

Hive 資料的匯入匯出: 一 Hive資料匯出   1、匯出資料到本地檔案系統:      insert overwrite local directory '/software/data/data1' select * f

oracle 11g 資料庫表空間建立與資料匯入匯出

** oracle資料庫中進行資料的匯入匯出時要在cmd中進行而不是在sqlplus中進行操作! (1)向oracle資料庫中匯入完整的資料庫(字尾名:dmp)      首先進入cmd,輸入sqlplus /nolog執行oracle自帶程式,

MySQL安裝資料備份和恢復

MySQL二進位制格式安裝 首先下載mysql二進位制安裝包 //下載地址 [[email protected] src]# wget https://downloads.mysql.com/archives/get/file/mysql-5.7.22-lin

kafka叢集安裝步驟

準備工作: 安裝好zookeeper叢集 一、上傳並解壓 1. cd /usr/kafka (沒有目錄的話自己建立) 2. rz 3. tar -zxvf kafka_2.12-1.1.0.tgz 二、2.修改配置檔案 /usr/kafka/kafka_2.12-1.

Hive DML(資料匯入匯出)

DML DML:Data Manipulation Language(資料管理語言) 載入資料到表 語法 LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (part

Java架構-KafKa叢集安裝詳細步驟

最近在使用Spring Cloud進行分散式微服務搭建,順便對整合KafKa的方案做了一些總結,今天詳細介紹一下KafKa叢集安裝過程: 1.在根目錄建立kafka資料夾(service1、service2、service3都建立) [[email protected]

藍的成長記——追逐DBA(1):奔波於路上,挺進山東 藍的成長記——追逐DBA(3):古董上操作,資料匯入匯出成了問題 藍的成長記——追逐DBA(8):重拾SP報告,回憶oracle的STATSPACK實驗 藍的成長記— —追逐DBA(9):國慶漸去,追逐DBA,新規劃,新啟程

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!