1. 程式人生 > >Flume+Kafka+SparkStreaming整合

Flume+Kafka+SparkStreaming整合

目錄

1. Flume介紹

Flume是Cloudera提供的一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的日誌收集系統,支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力。

1.1 Flume資料來源以及輸出方式

Flume提供了從console(控制檯)、RPC(Thrift-RPC)、text(檔案)、tail(UNIX tail)、syslog(syslog日誌系統,支援TCP和UDP等2種模式),exec(命令執行)等資料來源上收集資料的能力,在我們的系統中目前使用exec方式進行日誌採集。

Flume的資料接受方,可以是console(控制檯)、text(檔案)、dfs(HDFS檔案)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日誌系統)等。本測試研究中由kafka來接收資料。

1.2 Flume的核心概念

1.  Agent:使用JVM執行Flume。每臺機器執行一個agent,但是可以在一個agent中包含多個sources和sinks。

2.  Client:生產資料,執行在一個獨立的執行緒。

3.  Source:從Client收集資料,傳遞給Channel。

4.  Sink:從Channel收集資料,執行在一個獨立執行緒。

5.  Channel

:連線 sources和 sinks ,這個有點像一個佇列。

6.  Events :可以是日誌記錄、 avro物件等。

1.3 結構

Flume以agent為最小的獨立執行單位。一個agent就是一個JVM。單agent由Source、Sink和Channel三大元件構成,如下圖:

 

Flume提供了大量內建的Source、Channel和Sink型別。不同型別的Source,Channel和Sink可以自由組合。組合方式基於使用者設定的配置檔案,非常靈活。比如:Channel可以把事件暫存在記憶體裡,也可以持久化到本地硬碟上。Sink可以把日誌寫入HDFS, HBase,甚至是另外一個Source等等。Flume支援使用者建立多級流,也就是說,多個agent可以協同工作,並且支援Fan-in、Fan-out、ContextualRouting、Backup Routes。如下圖所示:



1.4 安裝測試

解壓apache-flume-1.6.0-bin.tar.gz:tar –zxvf apache-flume-1.6.0-bin.tar.gz

cp conf/flume-conf.properties.template conf/exec.conf

cp conf/flume-env.sh.template conf/flume-env.sh    配置JAVA_HOME

exec.conf配置如下:

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = exec

a2.sources.r2.channels = c2

a2.sources.r2.command=tail -n +0 -F /usr/local/hadoop/flume/test.log

# Describe the sink

a2.sinks.k2.type = logger

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

驗證安裝:flume-ng version



1.5 啟動flume

flume-ng agent --conf ./flume/conf/ -f ./flume/conf/exec.conf-Dflume.root.logger=DEBUG,console -n a2

傳送資料和flume接收資料:



2.Kafka介紹

2.1 產生背景

Kafka 是分散式釋出-訂閱訊息系統。它最初由 LinkedIn 公司開發,使用 Scala語言編寫,之後成為 Apache 專案的一部分。Kafka是一個分散式的,可劃分的,多訂閱者,冗餘備份的永續性的日誌服務。它主要用於處理活躍的流式資料。

在大資料系統中,常常會碰到一個問題,整個大資料是由各個子系統組成,資料需要在各個子系統中高效能,低延遲的不停流轉。傳統的企業訊息系統並不是非常適合大規模的資料處理。為了已在同時搞定線上應用(訊息)和離線應用(資料檔案,日誌)Kafka 就出現了。Kafka 可以起到兩個作用:

降低系統組網複雜度

降低程式設計複雜度,各個子系統不在是相互協商介面,各個子系統類似插口插在插座上,Kafka 承擔高速資料匯流排的作用。

2.2 部署結構

 

2.3 叢集架構


2.4 基本概念

Topic:特指 Kafka 處理的訊息源(feeds of messages)的不同分類。

Partition:Topic 物理上的分組,一個topic 可以分為多個 partition,每個 partition 是一個有序的佇列。partition 中的每條訊息都會被分配一個有序的id(offset)。

Message:訊息,是通訊的基本單位,每個 producer 可以向一個 topic(主題)釋出一些訊息。

Producers:訊息和資料生產者,向 Kafka的一個 topic 釋出訊息的過程叫做 producers。

Consumers:訊息和資料消費者,訂閱 topics 並處理其釋出的訊息的過程叫做 consumers。

Broker:快取代理,Kafa 叢集中的一臺或多臺伺服器統稱為broker。

2.5 安裝測試

解壓Kafka: tar -xzf kafka_2.10-0.8.1.1.tgz

啟動ZK bin/zookeeper-server-start.shconfig/zookeeper.properties

啟動服務bin/kafka-server-start.sh config/server.properties>/dev/null 2>&1&

建立主題 bin/kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor 1 --partitions 1 --topic test

檢視主題 bin/kafka-topics.sh --list --zookeeperlocalhost:2181

檢視主題詳情 bin/kafka-topics.sh--describe --zookeeper localhost:2181 --topic test

刪除主題 bin/kafka-run-class.shkafka.admin.DeleteTopicCommand --topic test --zookeeper localhost:2181

建立生產者bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

建立消費者bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning

3.Flume和Kafka整合

3.1 兩者整合優勢

Flume更傾向於資料傳輸本身,Kakfa是典型的訊息中介軟體用於解耦生產者消費者。

具體架構上,Agent並沒把資料直接傳送到Kafka,在Kafka前面有層由Flume構成的forward。這樣做有兩個原因:

Kafka的API對非JVM系的語言支援很不友好,forward對外提供更加通用的HTTP介面。

forward層可以做路由、Kafka topic和Kafkapartition key等邏輯,進一步減少Agent端的邏輯。

資料有資料來源到flume再到Kafka時,資料一方面可以同步到HDFS做離線計算,另一方面可以做實時計算。本文實時計算採用SparkStreaming做測試。

3.2 Flume和Kafka整合安裝

提取外掛中的flume-conf.properties檔案:修改如下:flume源採用exec

producer.sources.s.type = exec

producer.sources.s.command=tail -f -n+1/usr/local/Hadoop/flume/test.log

producer.sources.s.channels = c

修改producer代理的topic為test

將配置放到flume/cong/producer.conf中

複製外掛包中的jar包到flume/lib中:刪除掉版本不同的相同jar包,這裡需要刪除scala-compiler-z.9.2.jar包,否則flume啟動會出現問題。


複製kafka/libs中的jar包到flume/lib中。

完整producer.conf:

producer.conf:

#agentsection 

producer.sources= s 

producer.channels= c 

producer.sinks= r 

#sourcesection 

producer.sources.s.type= exec

#producer.sources.s.spoolDir= /usr/local/hadoop/flume/logs

#producer.sources.s.fileHeader= true

producer.sources.s.command= tail -f -n+1 /usr/local/hadoop/flume/aaa.log

producer.sources.s.channels= c 

# Eachsink's type must be defined 

producer.sinks.r.type= org.apache.flume.plugins.KafkaSink 

producer.sinks.r.metadata.broker.list=localhost:9092 

producer.sinks.r.partition.key=0 

producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition 

producer.sinks.r.serializer.class=kafka.serializer.StringEncoder 

producer.sinks.r.request.required.acks=0 

producer.sinks.r.max.message.size=1000000 

producer.sinks.r.producer.type=sync 

producer.sinks.r.custom.encoding=UTF-8 

producer.sinks.r.custom.topic.name=test 

#Specifythe channel the sink should use 

producer.sinks.r.channel= c 

# Eachchannel's type is defined. 

producer.channels.c.type= memory 

producer.channels.c.capacity= 1000

producer.channels.c.transactionCapacity= 100


3.3 啟動kafka flume相關服務

啟動ZKbin/zookeeper-server-start.sh config/zookeeper.properties

啟動Kafka服務 bin/kafka-server-start.sh config/server.properties

建立消費者bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning

啟動flume

flume-ng agent --conf./flume/conf/ -f ./flume/conf/producer.conf -Dflume.root.logger=DEBUG,console-n producer

向flume傳送資料:


Kafka消費者資料:



3.4 Kafka和SparkStreaming整合

核心程式碼:

完整程式碼路徑:

spark-1.4.0\examples\src\main\java\org\apache\spark\examples\streaming



執行引數:


傳送資料:

由於flume採用exec資料來源的方式,因此flume會監聽配置的相應的檔案: tail -f -n+1 /usr/local/Hadoop/flume/aaa.log

當向該檔案追加檔案時,flume就會獲取追加的資料:

writetoflume.py


flume將獲取的增量資料由sink傳送給kafka,以下是kafka comsumer消費的資料


執行結果:

SparkStreaming訂閱kafka的test主題的資料,將訂閱的資料進行單詞計數處理。


相關推薦

Flume+Kafka+SparkStreaming整合

目錄 1. Flume介紹 Flume是Cloudera提供的一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的日誌收集系統,支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受

SparkStreaming(14):log4j日誌-flume-kafka-SparkStreaming整合

一、功能實現 模擬log4j的日誌生產,將日誌輸出到flume伺服器。然後,通過flume將日誌資訊輸出到kafka,進而Streaming可以從kafka獲得日誌,並且進行簡單的處理。 二、步驟 1.目的: 使用log4j將日誌輸按照一定格式輸出,並且傳遞給flume伺服器特定埠接

Flume+Kafka+SparkStreaming+Hbase+可視化(一)

日誌導入 ash channels style 導入 com system ase spark 一、前置準備: Linux命令基礎 Scala、Python其中一門 Hadoop、Spark、Flume、Kafka、Hbase基礎知識 二、分布式日誌收集框架Flume

使用Flume+Kafka+SparkStreaming進行實時日誌分析

每個公司想要進行資料分析或資料探勘,收集日誌、ETL都是第一步的,今天就講一下如何實時地(準實時,每分鐘分析一次)收集日誌,處理日誌,把處理後的記錄存入Hive中,並附上完整實戰程式碼 1. 整體架構 思考一下,正常情況下我們會如何收集並分析日誌呢?

flume +kafka+SparkStreaming日誌監控平臺

流程圖 採集方案#agentsectionproducer.sources= s1producer.channels= c1producer.sinks= k1#配置資料來源producer.sourc

Flume+Kafka+Sparkstreaming日誌分析

  最近要做一個日誌實時分析的應用,採用了flume+kafka+sparkstreaming框架,先搞了一個測試Demo,本文沒有分析其架構原理。   簡介:flume是一個分散式,高可靠,可用的海量日誌聚合系統,kafka是一高吞吐量的分散式釋出訂閱系統,s

flume+kafka+storm整合實現實時計算小案例

    我們做資料分析的時候常常會遇到這樣兩個場景,一個是統計歷史資料,這個就是要分析歷史儲存的日誌。我們會使用hadoop,具體框架可以設計為:1.flume收集日誌;2.HDFS輸入路徑儲存日誌;3.MapReduce計算,將結果輸出到HDFS輸出路徑;4.hive+sq

flume+kafka+storm整合00

一、安裝 flume,kafka, storm 的安裝在下面三篇文章: flume:1.6.0 kafka:注意這裡最好下載scala2.10版本的kafka,因為scala2.10版本的相容性比較好和2.11版本差別太大 二、各個部分除錯

FlumeKafkaSparkStreaming 整合程式設計

Flume 、Kafka 與SparkStreaming 整合程式設計 一、 Kafka 與SparkStreaming 整合程式設計 1、程式 pull方式,可靠Recerver ,工作常

SparkStreaming整合Kafka--Direct方式

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.

大資料學習之路96-SparkStreaming整合Kafka

我們前面SparkStreaming獲取資料的來源是TCP,但是平常是不會這麼用的,我們通常用的是Kafka。 SparkStreamingContext是不直接提供對Kafka的訪問的。 這個時候就有KafkaUtils 這裡有兩個方法 1.createDirectStream

Kafka 學習筆記(5)—— flume + kafka 整合(1)

1 需求分析 採集訂單系統應用列印的日誌檔案。 日誌檔案使用 log4j 生成,滾動生成。 將採集的日誌檔案儲存到 kafka中。 (source) 輸入: tail -F xx.log

SparkStreaming整合kafka直連模式direct方式

org.apache.spark spark-streaming_2.10 1.6.2 org.apache.spark spark-streaming-kafka_2.10 1.

flume+kafka+storm的整合使用

Flume-ng Flume是一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。        不過這裡寫寫自己的見解 這個是flume的架構圖  從上圖可以看到幾個名詞: Agent: 一個Agent包含Source、Channel、Sink和其他的元件

SparkStreaming整合Kafka-0.8的官方文件要點翻譯

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher) Note: Kafka 0.8 support is deprecated as of Spark 2.3.0

SparkStreaming整合kafka入門

package kafka import com.typesafe.config.ConfigFactory import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.seriali

如何將Flumekafka進行整合

自從Flume1.6開始,新增了對Kafka的支援,極大地提升了Flume的採集能力。避免後端因熱點問題導致kafka的channel爆滿而無法採集資料。 本篇介紹使用Flume當前最新版本1.8與Kafka的結合使用。基本環境Kafka (192.168.156.101:9092)Zookeeper(192

kafka&&sparkstreaming整合入門之Wordcount

/**  * @author Mr.lu  * @Title: KafkaStreamingWordCount  * @ProjectName spark-scala  * @Description: TODO  * @date 2018/11/

sparkStreaming整合Kafka

這幾天看了spark整合Kafka,消費Kafka資料並向Kafka傳送資料,仿照官方樣例寫了兩個小例子。在此分享一下。 1.新增Kafka的repository 2.DirectKafkaWordCountDemo程式碼展示 3.kafkaProduc

SparkStreaming整合kafka的補充

clas metrics clu head zookeepe 大量 topic 自動重啟 備份 (1)SparkStreaming 整合 kafka 兩種方式對比 Direct 方式的優缺點分析 : 優點: 簡化並行(Simplified Parallelism)。不現