實時分析Flume-Kafka框架搭建最終將資料在mysql中輸出
因為搭建框架比較複雜如果這其中有不足,歡迎提出指正。下面附上實時分析簡化框架圖幫助理解。
把離線分析框架也附上
實時分析搭建過程:
1.在命令提示符中(Windows+R)找到準備好的SocketTest.java路徑,javac SocketTest.java執行後生成SocketTest.class檔案(執行前將SocketTest.java中包名刪除)
將.class檔案拖到linux中指定目錄下我設定的是/home/cat/soft/ST 下
將你事先準備好的access.20120104.log(爬蟲日誌大約300萬條記錄)也放到ST下,再在ST中新建一個data.log儲存即可
2.在一個使用者下執行java SocketTest access.20120104.log data.log ----------->data.log是監控access.20120104.log中資訊並記錄的日誌。
在另起一個機器輸入 tail -F data.log 這時會在data.log中輸出access.20120104.log中的資料
得到結果說明連線成功
3. 開始進行kafka配置檔案修改
1、http://flume.apache.org/FlumeUserGuide.html#exec-source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /soft/ST/data.log
a1.sources.r1.channels = c1
2、flume1.6權威指南中http://flume.apache.org/releases/content/1.6.0/FlumeUserGuide
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.k1.topic = mytopic agent.sinks.k1.brokerList = lion:9092 agent.sinks.k1.batchSize = 20 agent.sinks.k1.requiredAcks = 1
完整版
agent.sources = r1
agent.channels = c1
agent.sinks = k1
# For each one of the sources, the type is defined
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /home/cat/soft/ST/data.log
# The channel can be defined as follows.
agent.sources.r1.channels = c1
# Each sink's type must be defined
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.topic = mytopic
agent.sinks.k1.brokerList = lion:9092
agent.sinks.k1.batchSize = 20
agent.sinks.k1.requiredAcks = 1
#Specify the channel the sink should use
agent.sinks.k1.channel = c1
# Each channel's type is defined.
agent.channels.c1.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100
現在配置檔案更改完畢可以測試了
(1)在一個機器中cd soft/kafka/ 執行 zookeeper-server-start.sh config/zookeeper.properties
(2)在另一臺機器中cd soft/kafka/ 執行 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning
(3)在另一臺機器中cd soft/flume/conf 執行 flume-ng agent --conf /home/cat/soft/flume/conf/ --conf-file tokafka.conf --name agent -Dflume.root.logger=INFO,console (檢驗一下kafka能否收到flume資料)
(4) 啟動消費者(用於測試)
flume-ng agent --conf /home/cat/soft/flume/conf/ --conf-file tokafka.conf --name agent -Dflume.root.logger=INFO,console
實際上在 IntelliJ IDEA 2017.2 x64上執行
先把配置檔案補充一些https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11/2.4.0
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
package Test1227
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object StreamingKafka {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("nwc").setMaster("local[2]")
val streamingContext = new StreamingContext(conf, Seconds(30))
streamingContext.sparkContext.setLogLevel("ERROR")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "lion:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val topics = Array("mytopic")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => ( record.value().toString)).saveAsTextFiles("D://123//StreamingKafka")
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
(5)最後在linux中輸入 java SocketTest access.20120104.log data.log
會在本地檔案中或 .print 直接在控制檯輸出結果
(6)mysql中得到結果