SparkStreaming的WordCount示例及原始碼分析(一)
阿新 • • 發佈:2019-02-16
一.程式碼示例
object WordCount { def main(args: Array[String]) { val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]") //設定batchDuration時間間隔來控制Job生成的頻率並且建立Spark Streaming執行的入口 val ssc = new StreamingContext(conf, Seconds(5)) //5秒間隔 val lines = ssc.socketTextStream( "127.0.0.1", 6666, StorageLevel.MEMORY_AND_DISK_SER) // 伺服器地址,埠,序列化方案 val words = lines.flatMap(_.split(",")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() //真正的排程開始 ssc.start() ssc.awaitTermination() } }
二. 原始碼分析
建立StreamingContext:StreamingContext類結構如下:/** * Create a StreamingContext by providing the configuration necessary for a new SparkContext. * @param conf a org.apache.spark.SparkConf object specifying Spark parameters * @param batchDuration the time interval at which streaming data will be divided into batches */ def this(conf: SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf), null, batchDuration) }
class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
)
資料來源獲取是通過ssc.socketTextStream(…)來獲取的,socketTextStream()會返回一個ReceiverInputDStream物件:/** * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) }
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
private[streaming]
class SocketInputDStream[T: ClassTag](
ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
ReceiverInputDStream是InputDStream的子類,在InputDStream中:abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
extends DStream[T](ssc_) {
private[streaming] var lastValidTime: Time = null
ssc.graph.addInputStream(this)
...
可以看出在例項化時會向ssc.graph新增該InputDStream,ssc.graph即DStreamGraph。final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
DStreamGraph的成員inputStreams就是存放InputDStream的資訊。接下來是一系列DStream操作:
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
flatMap()、map()等是Transformations Operations,每個Operations會返回一個對應的DStream,拿map來看: /** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
這裡返回了一個MappedDStream,第一個引數是parent DStream,這就是表明DStream會儲存依賴關係。print()是Output Operations操作,會生成一個ForEachDStream並註冊到DStreamGraph中: new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
/**
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
*/
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
在addOutputStream方法中就是將這個ForEachDStream新增到DStreamGraph的成員outputStreams中。之後呼叫ssc.start(),在StreamingContext中:
/**
* Start the execution of the streams.
*
* @throws IllegalStateException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
先檢查當前StreamingContext的狀態,為ACTIVE時不允許新的StreamingContext運行了,因為目前Spark還不支援多個SparkContext同時執行。如果是INITIALIZED狀態,則會啟動一個streaming-start執行緒,呼叫scheduler.start()方法,這裡的scheduler定義如下:private[streaming] val scheduler = new JobScheduler(this)
在Spark Streaming裡,總體負責動態作業排程的具體類是JobScheduler, treamingContext例項擁有JobScheduler例項,在ssc.start() 開始執行時,會呼叫JobScheduler例項的start()方法。
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
JobScheduler是Spark Streaming的Job總排程者。在JobScheduler的start()方法中,會首先建立EventLoop[JobSchedulerEvent]類用來處理各類JobSchedulerEvent。JobScheduler有兩個非常重要的成員:JobGenerator和ReceiverTracker。JobScheduler將每個batch的RDD DAG具體生成工作委託給JobGenerator,而將源頭輸入資料的記錄工作委託給ReceiverTracker。