1. 程式人生 > >Flume+Kafka環境構建和實戰

Flume+Kafka環境構建和實戰

1. 準備工作

apache上下載 apache-flume-1.7.0, apache-kafka_2.12-0.11, apache-zookeeper-3.4.9

下載後分別解壓至/home/hadoop/bigdata並重命名目錄為flume, kafka, zk, (便於在.bashrc中export各個HOME變數及後續升級)

2. 配置並啟動zookeeper

zk配置主要有兩個方面

2.1  conf/zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.       // 下面這兩個目錄要提前手工建好


dataDir=/home/hadoop/bigdata/zk/zkdata
dataLogDir=/home/hadoop/bigdata/zk/zkdatalog     

server.1=master:2888:3888
server.2=ndh-slave01:2888:3888
server.3=slave02:2888:3888

# the port at which the clients will connect
clientPort=2181

2.2  zkdata/myid

這裡面主要儲存與conf/zoo.cfg中server.N對應的數N,如master主機上是server.1, myid中即寫1,其它兩臺機器同理。

2.3  啟動並驗證zk

把zookeeper目錄及剛修改的配置檔案一併拷貝到另兩臺slave機器上,並分別執行下面兩個命令,以確保正常啟動。

./zkServer.sh start

./zkServer.sh status

如遇到./zkServer.sh status報錯問題,通常都是配置和操作的問題,請參考先前文章:http://blog.csdn.net/wqhlmark64/article/details/73250662。

3. flume配置和啟動

Flume是一個分散式、可靠、和高可用的海量日誌聚合的系統,支援在系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力。


簡要來說,其設計目標和優勢如下:

(1) 可靠性
當節點出現故障時,日誌能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到資料agent首先將event寫到磁碟上,當資料傳送成功後,再刪除;如果資料傳送失敗,可以重新發送。),Store on failure(這也是scribe採用的策略,當資料接收方crash時,將資料寫到本地,待恢復後,繼續傳送),Best effort(資料傳送到接收方後,不會進行確認)。
(2) 可擴充套件性
Flume採用了三層架構,分別為agent,collector和storage,每一層均可以水平擴充套件。其中,所有agent和collector由master統一管理,這使得系統容易監控和維護,且master允許有多個(使用ZooKeeper進行管理和負載均衡),這就避免了單點故障問題。
(3) 可管理性
所有agent和colletor由master統一管理,這使得系統便於維護。多master情況,Flume利用ZooKeeper和gossip,保證動態配置資料的一致性。使用者可以在master上檢視各個資料來源或者資料流執行情況,且可以對各個資料來源配置和動態載入。Flume提供了web 和shell script command兩種形式對資料流進行管理。
(4) 功能可擴充套件性
使用者可以根據需要新增自己的agent,collector或者storage。此外,Flume自帶了很多元件,包括各種agent(file, syslog等),collector和storage(file,HDFS等)。
很明顯,使用flume,主要就是確定好資料來源和目標接收地,以及flume agent相關的source, channel, sink. 3.1  常用配置模式 主要修改flume/conf/flume-conf.properties檔案。安裝包中預設的檔名有後綴.template,要copy一份並重命名為flume-conf.properties。 3.1.1 監控特定檔案 //宣告名字為agent的Agent的source, sink, channel agent.sources=s1
agent.sinks=k1
agent.channels=c1

// 定義各source,sink,channel的屬性
agent.sources.s1.type=exec
agent.sources.s1.command = tail -f -n+1 /home/hadoop/bigdata/spark/logs/spark-hadoop-org.apache.spark.deploy.master.Master-1-master.out  // 這裡取了spark的任務日誌檔案作為監控源,指定任何有流式輸入的檔案均可
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100

agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink     // lib/flume-ng-kafka-sink-1.7.0.jar, lib/flume-ng-core-1.7.0.jar中有,注意最後的KafkaSink首字母大寫,執行過程中曾因把開關的K小寫了,導致啟動報錯找不到類
agent.sinks.k1.brokerList=master:9092
agent.sinks.k1.topic=test0   // 與第4節中Kafka定義的topic相一致
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder

agent.sinks.k1.channel=c1
3.1.2  偵聽網路埠資料 Agent名稱定義為agent.   Source:可以理解為輸入端,定義名稱為s1 channel: 傳輸頻道,定義為c1,設定為記憶體模式   sinks:可以理解為輸出端,定義為sk1,   agent.sources = s1   agent.channels = c1   agent.sinks = sk1   #設定Source的netcat 埠為5678,使用的channel為c1   agent.sources.s1.type = netcat   agent.sources.s1.bind = localhost   agent.sources.s1.port = 5678  agent.sources.s1.channels = c1   #設定Sink為logger模式,使用的channel為c1   agent.sinks.sk1.type = logger   agent.sinks.sk1.channel = c1   #設定channel資訊   agent.channels.c1.type = memory   #記憶體模式   agent.channels.c1.capacity = 1000   agent.channels.c1.transactionCapacity = 100   #傳輸引數設定。

3.1.2  特定目錄下新增檔案 agent.sources = s1   agent.channels = c1   agent.sinks = sk1   #設定spooldir   agent.sources.s1.type = spooldir   agent.sources.s1.spoolDir =/var/logs/xxx    agent.sources.s1.fileHeader = true   agent.sources.s1.channels = c1   agent.sinks.sk1.type = logger   agent.sinks.sk1.channel = c1 #In Memory !!!   agent.channels.c1.type = memory   agent.channels.c1.capacity = 10004   agent.channels.c1.transactionCapacity = 100

3.2 啟動flume 因後面要和Kafka配合,這裡啟動第一種模式的配置。 ./bin/flume-ng agent --name agent --conf conf --conf-file conf/flume-conf.properties  -Dflume.root.logger=INFO,console &  
命令說明:指定Agent name為agent, 與flume-conf.properties中配置的agent名字一致,使用conf/flume-conf.properties配置檔案 4. Kafka配置和啟動 4.1  kafka配置和server啟動 因為以叢集方式啟動,在conf下server.properties分別拷貝兩份且命名為server-1.properties, server-2.properties. 以server.properties為例,修改其中的屬性欄位如下 broker.id=0 log.dirs=/tmp/kafka-logs-0 server-1,server-2中分別把對應的0修改為1和2,以便和名字對應方便識別。 然後分別執行以啟動3個server程序,與後面使用過程中指定副本對應:  ./bin/kafka-server-start.shconfig/server.properties &
./bin/kafka-server-start.shconfig/server-1.properties &
./bin/kafka-server-start.shconfig/server-2.properties &
4.2  建立topic,讀取資料並消費

./bin/kafka-topics.sh --create  --zookeeper master:2181  --partitions 1  --replication-factor 3 --topic test0  //建立名字為test0的topic, 1個分割槽,3份副本

./bin/kafka-topics.sh --list --zookeeper master:2181   // 檢視建立的topic

test
test-replica-3
test0
test1 

./bin/kafka-topics.sh --describe --zookeeper master:2181 --topic test0  // 檢視topic test0的狀態資訊
Topic:test0PartitionCount:1ReplicationFactor:3Configs:
Topic: test0Partition: 0Leader: 0    Replicas: 0,2,1Isr: 0,2,1
  • leader:負責處理訊息的讀和寫,leader是從所有節點中隨機選擇的.
  • replicas:列出了所有的副本節點,不管節點是否在服務中.
  • isr:是正在服務中的節點.

./bin/kafka-console-producer.sh --broker-list master:9092 --topic test0   // 命令列中通過生產者向test0的topic中寫資料,即除了flume中向test0寫外,命令列中也在寫

iko nknxlr mwqoi  

./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test0 --from-beginning   //消費test0的topic中的資料


17/09/14 19:15:55 INFO master.Master: Registering app Spark Pi
17/09/14 19:15:55 INFO master.Master: Registered app Spark Pi with ID app-20170914191555-0025
17/09/14 19:15:55 INFO master.Master: Launching executor app-20170914191555-0025/0 on worker worker-20170825172933-10.76.9.198-40179
17/09/14 19:37:33 INFO master.Master: Received unregister request from application app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: Removing app app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:50653 got disassociated, removing it.
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:59059 got disassociated, removing it.
17/09/14 19:37:34 WARN master.Master: Got status update for unknown executor app-20170914191555-0025/0



iko nknxlr mwqoi  
值得提及的是,flume對資料作有儲存,建立多個consumer程序同時消費test0 topic中的資料時,每個程序獲取的資料也都是相同的。 到此,Flume+Kafka完成,後續將還有至少兩方面需要嘗試: a. flume其它兩種模式和更多的組合 b. flume+kafka與storm的整合。 5. 期間遇到的問題 5.1    "Unable to load sink type: org.apache.flume.sink.kafka.kafkaSink"
 如3.1.1中所說,沒有大寫kafkaSink中的首字母,導致類找不到。 5.2   "Agent configuration for 'agent' does not contain any channels. Marking it as invalid" 在3.2中啟動 flume時沒指定 --name引數且名字與flume-conf.properties中的agent不一致。如下
./bin/flume-ng agent --name agent --conf conf --conf-file conf/flume-conf.properties  -Dflume.root.logger=INFO,console & 17/09/14 19:15:55 INFO master.Master: Registering app Spark Pi
17/09/14 19:15:55 INFO master.Master: Registered app Spark Pi with ID app-20170914191555-0025
17/09/14 19:15:55 INFO master.Master: Launching executor app-20170914191555-0025/0 on worker worker-20170825172933-10.76.9.198-40179
17/09/14 19:37:33 INFO master.Master: Received unregister request from application app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: Removing app app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:50653 got disassociated, removing it.
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:59059 got disassociated, removing it.
17/09/14 19:37:34 WARN master.Master: Got status update for unknown executor app-20170914191555-0025/0








iko nknxlr mwqoi   17/09/14 19:15:55 INFO master.Master: Registering app Spark Pi
17/09/14 19:15:55 INFO master.Master: Registered app Spark Pi with ID app-20170914191555-0025
17/09/14 19:15:55 INFO master.Master: Launching executor app-20170914191555-0025/0 on worker worker-20170825172933-10.76.9.198-40179
17/09/14 19:37:33 INFO master.Master: Received unregister request from application app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: Removing app app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:50653 got disassociated, removing it.
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:59059 got disassociated, removing it.
17/09/14 19:37:34 WARN master.Master: Got status update for unknown executor app-20170914191555-0025/0








iko nknxlr mwqoi