1. 程式人生 > >大資料學習[10]:Kafka新手入門

大資料學習[10]:Kafka新手入門

這裡寫圖片描述

摘要:主要是學習Kafka文件,對Kafka官網的Quickstart進行了閱讀並試著翻譯。
來源:http://kafka.apache.org/quickstart

Quickstart

快速入門

This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data. Since Kafka console scripts are different for Unix-based and Windows platforms, on Windows platforms use bin\windows\ instead of bin/, and change the script extension to .bat.
這個教程假設你是剛開始學習的新手,沒有Kafka或Zookeeper資料。由於Kafka的控制檯指令碼對於Unix與Windows平臺是不同的,在Windows平臺上採用bin\windows目錄下而不是bin/目錄,要把指令碼換成.bat的副檔名。

Step 1: Download the code

Download the 0.11.0.1 release and un-tar it.

第一步:下載程式碼

下載0.11.1.1版本與解壓

> tar -xzf kafka_2.11-0.11.0.1.tgz
> cd kafka_2.11-0.11.0.1

Step 2: Start the server

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don’t already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.

第二步:啟動服務

因為Kafka執行需要Zookeeper,所以如果你沒有啟動ZooKeeper服務,首先要啟動它。也可以使用kafka的簡便指令碼去獲得一個臨時湊合的單節點ZooKeeper 例項。

 > bin/zookeeper-server-start.sh config/zookeeper.properties[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)...

Now start the Kafka server:
現在開啟Kafaka服務:

1234 > bin/kafka-server-start.sh config/server.properties[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)...

Step 3: Create a topic

Let’s create a topic named “test” with a single partition and only one replica:

第三步:建立一個主題

建立一個名為test的主題,只有一個分割槽與一個副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

We can now see that topic if we run the list topic command:
如果執行列出主題命令,可以看到這個主題:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.
或者,不是想手動去建立主題,也可以通過配置kafka的broker來實現,當主題沒有釋出時,會自動建立主題;

Step 4: Send some messages

Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate message.
Run the producer and then type a few messages into the console to send to the server.

第四步:傳送資訊

Kafka自帶一個命令列客戶端,可以接受來自檔案和標準輸入,並將接收到的內容作為資訊傳送到Kafka叢集中。在預設的情況下,一行作為一條單獨資訊傳送。
執行生產者然後在控制檯輸入幾條資訊資訊傳送給服務.

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Step 5: Start a consumer

Kafka also has a command line consumer that will dump out messages to standard output.

第五步:啟動消費者

Kafka也有一個命令的消費者,當啟動消費者,資訊將會從標準輸出顯示出來。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
All of the command line tools have additional options; running the command with no arguments will display usage information documenting them in more detail.
如果以上的生產者命令與消費者命令分別執行在不同的終端上,那麼你可以在生產者終端上輸入資訊同時可以看到在訊息者終端上出現。
所有的命令列工具都有其它的選項,執行不帶引數的命令時,將會顯示使用資訊文件。

Step 6: Setting up a multi-broker cluster

So far we have been running against a single broker, but that’s get feel for . For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let’s expand our cluster to three nodes (still all on our local machine).
First we make a config file for each of the brokers (on Windows use the copy command instead):

第六步:配置多broker叢集

現在已經可以執行單broker,但沒勁。對於Kafka,單個broker只不過是節點資料為1的叢集,與執行多節點沒有太大的差別。當我們意識到這個的時候,讓我們去擴充套件叢集到三個節點。

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

Now edit these new files and set the following properties:
現在編輯這些新檔案如下屬性設定:

config/server-1.properties:    
    broker.id=1    
    listeners=PLAINTEXT://:9093    
    log.dir=/tmp/kafka-logs-1 
config/server-2.properties:    
    broker.id=2    
    listeners=PLAINTEXT://:9094    
    log.dir=/tmp/kafka-logs-2

The broker.id property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each other’s data.
We already have Zookeeper and our single node started, so we just need to start the two new nodes:
broker.id是叢集中每個節點唯一的不變的名字。覆蓋預設[這裡指上面執行的單節點設定],設定埠與日誌路徑,因為要在一臺機器上執行這些,這樣設定可以防止所有節點試著去註冊同一商品或覆蓋資料。
已經有Zookeeper與一個節點已經運行了,現在只需要開兩個新節點就可以了。

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
>...

Now create a new topic with a replication factor of three:
現在建立一個帶有三份副本的新主題:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

Okay but now that we have a cluster how can we know which broker is doing what? To see that run the “describe topics” command:
好了,現在我們有了一個叢集,可是我們怎麼broker是怎樣工作的和正在做什麼呢?執行行describe topics命令去檢視:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicTopic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.
● “leader” is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
● “replicas” is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
● “isr” is the set of “in-sync” replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
Note that in my example node 1 is the leader for the only partition of the topic.
這裡對輸出解釋一下,第一行給出了所有分割槽的彙總,下面的每一行表示一個分割槽的資訊。由於我們對於這個topic只有一個分割槽,所於只有一行資料。
leader 是負責所有分割槽的讀寫的節點,在分割槽中隨機選舉成為leader.
replicas是一系列節點,這些節點複製了些分割槽日誌,不管它們是leader或還活動狀態的。
isr是‘in-sync’副本的伺服器集合,這樣集合中的伺服器目前處於活動狀態,並與leader資料一致。
注意到在我們的例子中,對於主題分割槽節點1是leader。
We can run the same command on the original topic we created to see where it is:
我們可以在建立原主題運行同樣的命令,看看是不是這樣:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic:test  PartitionCount:1    ReplicationFactor:1 Configs:    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.
毫不驚訝,這個主題沒有副本,在服務0上,在我們叢集中唯一的一個服務。
Let’s publish a few messages to our new topic:
讓我們釋出幾條資訊到我們新主題:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

Now let’s consume these messages:
現在讓我們消費這些資訊:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
> ...
> my test message 1
> my test message 2
> ^C

Now let’s test out fault-tolerance. Broker 1 was acting as the leader so let’s kill it:
現在讓我們測試一下容錯情況。Broker1充當著leader角色,所以把這個節點殺掉。

> ps aux | grep server-1.properties7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> > kill -9 7564

On Windows use:
在Windows系統使用:

> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.11-0.11.0.1.jar"  kafka.Kafka config\server-1.properties    644
> taskkill /pid 644 /f

Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:
領導權已轉向了slave節點中的其中一個了,節點1不在在in-sync的副本集合中了:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicTopic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

But the messages are still available for consumption even though the leader that took the writes originally is down:
但這些資訊仍然可被消費掉的,即使leader宕機了。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

Step 7: Use Kafka Connect to import/export data

Writing data from the console and writing it back to the console is a convenient place to start, but you’ll probably want to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom integration code you can use Kafka Connect to import or export data.
Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. In this quickstart we’ll see how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a Kafka topic to a file.

第七步:使用Kafka連線匯入/匯出資料

從控制檯輸入資料和把資料寫回控制檯比較方便,但你可能想讓資料從其它源匯入進來或從Kafka把資料匯出到其它系統中去。對於很多系統,你可以使用Kafka連線匯入匯出資料,而不是自己寫程式碼來整合。Kafka連線是一個使用Kafka去匯入匯出工具,它是執行聯結器的擴充套件工具,可以實現自定義的邏輯進行與外部系統互動。在這個入門教程中,我們將會看到怎樣使用Kafka簡單的聯結器從檔案中匯入資料並從Kafka主題中匯出資料到檔案中。
First, we’ll start by creating some seed data to test with:
首先,我們建立一些測試資料:

> echo -e "foo\nbar" > test.txt

Next, we’ll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data. The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.
然後,我們用standalone模式開啟兩個聯結器,這意味著執行在單機本地的專用程序。我們提供三個配置檔案作為引數。第一個依然是對於Kafka連線程序的配置,包含一些公共的配置例如broker的連線與資料可序列化形式。剩下來的兩個引數分別指定一個連線的建立。這些檔案包含著一個唯一連線名,這個連線型別的例項,和一些這些聯結器所需的其它配置。

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

These sample configuration files, included with Kafka, use the default local cluster configuration you started earlier and create two connectors: the first is a source connector that reads lines from an input file and produces each to a Kafka topic and the second is a sink connector that reads messages from a Kafka topic and produces each as a line in an output file.
During startup you’ll see a number of log messages, including some indicating that the connectors are being instantiated. Once the Kafka Connect process has started, the source connector should start reading lines from test.txt and producing them to the topic connect-test, and the sink connector should start reading messages from the topic connect-test and write them to the file test.sink.txt. We can verify the data has been delivered through the entire pipeline by examining the contents of the output file:
這些樣例配置檔案,包含使用Kafka, 使用了剛開始時的預設本地叢集配置,並建立了兩個聯結器:第一個是源聯結器,從輸入檔案讀取行資料和產生主題;第二個是一個sink聯結器,從Kafka主題讀取資訊和將每一行輸出到檔案中。
在啟動期間,你可以看到大量的日誌資訊,包括這些聯結器被實現化的提示資訊。一旦Kafka連線程序啟動,這個源聯結器就開始從test.txt檔案中讀取資料,並生成主題connect-test; 在sink聯結器開始從connect-test主題中讀取資訊並把資訊寫到檔案test.sink.txt中。我可以通過檢查輸出檔案內容來驗證這些資料是否已經通過整個管道傳送傳輸。

> cat test.sink.txt
> foo
> bar

Note that the data is being stored in the Kafka topic connect-test, so we can also run a console consumer to see the data in the topic (or use custom consumer code to process it):
注意這樣資料被儲存在Kafka主題connect-test中,所以我們也可以執行一個控制檯消費者去檢視這些在主題中的資料(或使用自定義程式碼來處理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}{"schema":{"type":"string","optional":false},"payload":"bar"}
...

The connectors continue to process data, so we can add data to the file and see it move through the pipeline:
這些聯結器一直在處理資料,因此可以向檔案增加資料並通過管道可以看到資料的傳輸:

 echo "Another line" >> test.txt

You should see the line appear in the console consumer output and in the sink file.
你可看到這行資料顯示在控制檯與sink檔案中。

Step 8: Use Kafka Streams to process data

Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more. This quickstart example will demonstrate how to run a streaming application coded in this library.

第八步:使用Kafka流去處理資料

Kafka流是一個建立關鍵性實時應用與微服務庫,輸入或輸出資料都被儲存在Kafka的叢集中。
Kafka流組合寫與佈署標準JAVA和Scala在客戶端利用Kafka服務叢集技術的高彈性,容錯,分散式等等應用的簡單性。這個快速教程例子(http://kafka.apache.org/0110/documentation/streams/quickstart)將演示了使用這個包怎麼執行一個流用。