spark streaming 學習(和flume結合+和kafka 的結合)
spark 2.1 設定日誌級別很簡單 下面幾行程式碼就可以搞定 主要是下面畫橫線的程式碼
val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc, Seconds(5))
spark streaming 是實時計算
spark core 之類的涉及到rdd的是離線計算
所以說spark即是實時計算,又有離線計算
spark streaming 的第一個例子
利用 nc -lk 8888 在192.168.235.128的8888埠開啟一個輸入訊息的應用
在IDEA上建立一個spark streaming的程式
package com.wxa.spark.four import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object StreamingWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]") val sc = new SparkContext(conf) val ssc =new StreamingContext(sc ,Seconds(5)) //Dstream 是個特殊的RDD (有序) val ds =ssc.socketTextStream("192.168.235.128",8888) val result = ds.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) result.print() //這裡不能用print(result) print是一個action ssc.start() ssc.awaitTermination() } }
spark 和 flume相結合
flume-poll.conf 檔案的編寫+IDEA端FlumePollWordCount程式碼編寫
先把下面的三個jar包下載下來 放到 flume的lib目錄下
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /export/data/flume a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname = master a1.sinks.k1.port = 8888 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
IDEA端:package com.wxa.spark.five
import java.net.InetSocketAddress
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FlumePollWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
//從flume中拉取資料(flume的地址)
val address = Seq(new InetSocketAddress("192.168.235.128", 8888))
val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK)
val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1))
val results = words.reduceByKey(_+_)
results.print()
ssc.start()
ssc.awaitTermination()
}
}
flume-push.conf 檔案的編寫+IDEA端FlumePushWordCount 程式碼
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/data/flume
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = avro
#這是接收方
a1.sinks.k1.hostname = 192.168.235.1 //這是widows上的地址
a1.sinks.k1.port = 8888
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
IDEA端:object FlumePushWordCount {
def main(args: Array[String]) {
// val host = args(0)
// val port = args(1).toInt
val conf = new SparkConf().setAppName("FlumeWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
//推送方式: flume向spark傳送資料
val flumeStream = FlumeUtils.createStream(ssc, "192.168.235.1", 8888) //這是widows上的地址
//flume中的資料通過event.getBody()才能拿到真正的內容
val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1))
val results = words.reduceByKey(_ + _)
results.print()
ssc.start()
ssc.awaitTermination()
}
}
spark streaming和flume結合圖形概述
spark如果只與flume結合有它的弊端,spark會在woker的excutor中開啟一個reciver而不是在driver裡面開啟一個recevier,所以在叢集中跑的時候需要指定的是woker的地址,由於所有的東西都是由一個reciver來處理的所以當資料量大的時候,recevier可能負載不過來
sparkstreaming 和kafka的結合
producer和brokerlist建立連線,consumer與zk建立連線,consumer分組的意義在於:同一分組的機子多臺機子當一臺機子用,比如producer產生5000條資料,有兩臺機子是同一個分組的 機子1最後消費了3000條資料,機子2消費了2000條資料,加起來消費了5000條資料
新增pom配置
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.11</artifactId> <version>1.6.3</version> </dependency>
話不多說上程式碼
package com.wxa.spark.five
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by root on 2016/5/21.
*/
object KafkaWordCount {
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }
}
def main(args: Array[String]) {
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("f://ck2") //等同於sc的setCheckpointDir
//"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"
//"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val words = data.map(_._2).flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
傳入的引數如下圖
Spark升級到2.0後測試stream-kafka測試報java.lang.NoClassDefFoundError: org/apache/spark/Logging錯誤
解決方案:
spark-core_2.11-1.5.2.logging.jar包的下載地址
解決方法詳解:將上面的jar包下載下來,放到如下圖所示 spark2.1.0(本人用的是這個版本)的jar目錄下
在IDEA 中點選
在dependencies中 匯入jar包
即可解決java.lang.NoClassDefFoundError: org/apache/spark/Logging錯誤
接下來要解決的是弄一個生產者
kafka-console-producer.sh --broker-list hadoop01:9092 hadoop02:9092 --topic ssckfk
往裡面輸入資料IDEA端的sprkstreaming 就可以接收到
視窗函式(主要的是reduceByKeyAndWindow這個函式)
示例圖:兩者並沒有重複計算 10:15 這個資料
計算的時間段內的資料
如:
18:10:00 到 10:15 這段 是30 10:15 到 10:25 是 70
package com.wxa.spark.sixday
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
/**
* Created by ZX on 2016/4/19.
*/
object WindowOpts {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WindowOpts").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Milliseconds(5000))
val lines = ssc.socketTextStream("192.168.235.128", 9999)
val pairs = lines.flatMap(_.split(" ")).map((_, 1))
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(15), Seconds(10)) //視窗的長度和滑動間隔必須是批次的倍數才行
//Map((hello, 5), (jerry, 2), (kitty, 3))
windowedWordCounts.print()
// val a = windowedWordCounts.map(_._2).reduce(_+_)
// a.foreachRDD(rdd => {
// println(rdd.take(0))
// })
// a.print()
// //windowedWordCounts.map(t => (t._1, t._2.toDouble / a.toD))
// windowedWordCounts.print()
// //result.print()
ssc.start()
ssc.awaitTermination()
}
}
spark+kafka 的兩種連線方式
方式一 (有reciver)通過zk進行連線
方式二 (沒有reciver)通過broker list 來連線
相比較前一種優點:
1 高效
2 要自己管理偏移量(前一種是交給zk管理偏移量),但是更加靈活
必須新建一個 org.apache.spark.streaming.kafka 的包 才能在自己寫的kafkamanager中正常使用kafkacluster
kafkamanager 程式碼
package org.apache.spark.streaming.kafka
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import scala.reflect.ClassTag
/**
* 自己管理offset
*/
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {
private val kc = new KafkaCluster(kafkaParams)
/**
* 建立資料流
*/
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] = {
val groupId = kafkaParams.get("group.id").get
// 在zookeeper上讀取offsets前先根據實際情況更新offsets
setOrUpdateOffsets(topics, groupId)
//從zookeeper上讀取offset開始消費message
val messages = {
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
messages
}
/**
* 建立資料流前,根據實際消費情況更新消費offsets
* @param topics
* @param groupId
*/
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
var hasConsumed = true
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
if (hasConsumed) {// 消費過
/**
* 如果streaming程式執行的時候出現kafka.common.OffsetOutOfRangeException,
* 說明zk上儲存的offsets已經過時了,即kafka的定時清理策略已經將包含該offsets的檔案刪除。
* 針對這種情況,只要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 如果consumerOffsets比earliestLeaderOffsets還小的話,說明consumerOffsets已過時,
* 這時把consumerOffsets更新為earliestLeaderOffsets
*/
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get
// 可能只是存在部分分割槽consumerOffsets過時,所以只更新過時分割槽的consumerOffsets為earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({ case(tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已經過時,更新為" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else {// 沒有消費過
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
} else {
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(groupId, offsets)
}
})
}
/**
* 更新zookeeper上的消費offsets
* @param rdd
*/
def updateZKOffsets(rdd: RDD[(String, Long)]) : Unit = {
val groupId = kafkaParams.get("group.id").get
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
}
DirectKafkaWordCount程式碼
package com.wxa.spark.five
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectKafkaWordCount {
/* def dealLine(line: String): String = {
val list = line.split(',').toList
// val list = AnalysisUtil.dealString(line, ',', '"')// 把dealString函式當做split即可
list.get(0).substring(0, 10) + "-" + list.get(26)
}*/
def processRdd(rdd: RDD[(String, String)]): Unit = {
val lines = rdd.map(_._2)
val words = lines.map(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.foreach(println)
}
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(
s"""
|Usage: DirectKafkaWordCount <brokers> <topics> <groupid>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <groupid> is a consume group
|
""".stripMargin)
System.exit(1)
}
Logger.getLogger("org").setLevel(Level.WARN)
val Array(brokers, topics, groupId) = args
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
sparkConf.setMaster("local[*]")
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
)
val km = new KafkaManager(kafkaParams) //和KafkaUtils.createDirectStream()建立直接流基本一致
val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// messages.foreachRDD(rdd => {
// if (!rdd.isEmpty()) {
// // 先處理訊息
// processRdd(rdd)
// // 再更新offsets
// km.updateZKOffsets(rdd)
// }
// })
ssc.start()
ssc.awaitTermination()
}
}