Spark學習之15:Spark Streaming執行流程(1)
阿新 • • 發佈:2019-02-05
DStream的所有轉換和計算操作最終會基於該雜湊表中的RDD進行。
Receiver啟動分為兩個部分,一是在Driver端初始化ReceiverTracker等,二是在將所有的Receiver封裝成RDD,併發送的Executor執行。
到此,Driver端的啟動準備工作就結束了,接下來開始Executor端的執行。
1. Receiver啟動
流程如下:Receiver啟動分為兩個部分,一是在Driver端初始化ReceiverTracker等,二是在將所有的Receiver封裝成RDD,併發送的Executor執行。
1.1. StreamingContext.start
該方法是流程執行入口:(1)檢查StreamingContext狀態; (2)啟動JobScheduler。def start(): Unit = synchronized { if (state == Started) { throw new SparkException("StreamingContext has already been started") } if (state == Stopped) { throw new SparkException("StreamingContext has already been stopped") } validate() sparkContext.setCallSite(DStream.getCreationSite()) scheduler.start() state = Started }
1.2. JobScheduler.start
(1)建立匿名Actor用於處理JobSchedulerEvent訊息; (2)建立並啟動ReceiverTracker; (3)啟動JobGenerator。def start(): Unit = synchronized { if (eventActor != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { def receive = { case event: JobSchedulerEvent => processEvent(event) } }), "JobScheduler") listenerBus.start() receiverTracker = new ReceiverTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
1.3. ReceiverTracker
1.3.1 ReceiverTracker初始化
(1)從DStreamGraph中讀取所有的ReceiverInputDStream;DStreamGraph儲存了所有的InputDStream和output DStream; (2)建立ReceiverLauncher物件,ReceiverTracker的內部類,用於啟動Receiver; (3)建立ReceivedBlockTracker物件,用於儲存Receiver建立好的Block資料描述資訊,具體的Block資料儲存在Executor的BlockManager中;其功能類似Driver端的BlockManager。private[streaming] class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging { private val receiverInputStreams = ssc.graph.getReceiverInputStreams() private val receiverInputStreamIds = receiverInputStreams.map { _.id } private val receiverExecutor = new ReceiverLauncher() private val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo] private val receivedBlockTracker = new ReceivedBlockTracker( ssc.sparkContext.conf, ssc.sparkContext.hadoopConfiguration, receiverInputStreamIds, ssc.scheduler.clock, Option(ssc.checkpointDir) ) private val listenerBus = ssc.scheduler.listenerBus // actor is created when generator starts. // This not being null means the tracker has been started and not stopped private var actor: ActorRef = null
1.3.2. ReceiverTracker.start
def start() = synchronized {
if (actor != null) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
"ReceiverTracker")
if (!skipReceiverLaunch) receiverExecutor.start()
logInfo("ReceiverTracker started")
}
}
(1)建立ReceiverTrackerActor物件,內部類,處理Receiver的註冊及Receiver建立的Block資訊;
(2)啟動ReceiverLauncher;ReceiverLauncher物件負責建立執行緒,並在執行緒中呼叫自身的startReceivers方法來啟動所有的Receiver。
1.3.3. ReceiverLauncher.startReceivers
private def startReceivers() {
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
running = true
ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))
running = false
logInfo("All of the receivers have been terminated")
}
(1)遍歷receiverInputStreams中的ReceiverInputDStream,以建立對應的Receiver,SocketInputDStream對應為SocketReceiver;
(2)通過makeRDD方法將所有的Receiver封裝成RDD,分割槽數等於Receiver數量,每個Receiver將會佔用一個Task,即一個core;
(3)建立startReceiver函式物件,將在Executor端執行;
(4)基於Receiver的RDD提交Job,每個Receiver將作為一個Task在Executor上執行。
Receiver類圖關係:
到此,Driver端的啟動準備工作就結束了,接下來開始Executor端的執行。
1.4. Receiver Task
每個Receiver都會作為一個Task在Executor端執行,執行的具體函式就是上面建立的startReceiver函式物件。程式碼如下: // Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
}
每個Receiver對應一個ReceiverSupervisorImpl,負責監管Receiver。
1.4.1. ReceiverSupervisor.start
/** Start the supervisor */
def start() {
onStart()
startReceiver()
}
(1)onStart方法(ReceiverSupervisorImpl)用於啟動BlockGenerator(呼叫其start方法);
(2)startReceiver則啟動對應的Receiver。
雖然是先啟動BlockGenerator,但資料由startReceiver產生,這裡先介紹startReceiver方法。
1.5. ReceiverSupervisor.startReceiver
def startReceiver(): Unit = synchronized {
try {
logInfo("Starting receiver")
receiver.onStart()
logInfo("Called receiver onStart")
onReceiverStart()
receiverState = Started
} catch {
case t: Throwable =>
stop("Error starting receiver " + streamId, Some(t))
}
}
呼叫Receiver的onStart方法,啟動Receiver資料接收流程。
下面以SocketReceiver為例,描述資料接收過程。
1.5.1. SocketReceiver.onStart
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
建立執行緒並啟動,線上程中執行receive方法。
1.5.2. SocketReceiver.receive
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
logInfo("Stopped receiving")
restart("Retrying connecting to " + host + ":" + port)
} catch {
...
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
(1)建立TCP連線;
(2)迴圈讀取資料,並呼叫store方法儲存資料。
1.5.3. Receiver.store
def store(dataItem: T) {
executor.pushSingle(dataItem)
}
executor是ReceiverSupervisorImpl物件,在ReceiverSupervisor構造體中設定。
ReceiverSupervisorImpl.pushSingle程式碼如下:
def pushSingle(data: Any) {
blockGenerator.addData(data)
}
BlockGenerator.addData方法如下:
def addData (data: Any): Unit = synchronized {
waitToPush()
currentBuffer += data
}
addData方法將data資料放入currentBuffer陣列,該陣列由BlockGenerator的定時器執行緒定時呼叫,把其中的內容封裝成Block。
1.6 BlockGenerator
1.6.1. BlockGenerator初始化及啟動
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf
) extends RateLimiter(conf) with Logging {
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
private val clock = new SystemClock()
private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
private val blockIntervalTimer =
new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
@volatile private var currentBuffer = new ArrayBuffer[Any]
@volatile private var stopped = false
/** Start block generating and pushing threads. */
def start() {
blockIntervalTimer.start()
blockPushingThread.start()
logInfo("Started BlockGenerator")
}
(1)建立RecurringTimer物件,其內部包含一個執行緒,啟動後將週期性的呼叫updateCurrentBuffer方法;
(2)建立blockPushingThread執行緒,啟動後將呼叫keepPushingBlocks方法;
(3)start方法負責啟動上述的兩個執行緒。
1.6.2. BlockGenerator.updateCurrentBuffer
private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
if (newBlockBuffer.size > 0) {
val blockId = StreamBlockId(receiverId, time - blockInterval)
val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock) // put is blocking when queue is full
logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
case e: Exception =>
reportError("Error in block updating thread", e)
}
}
(1)這個一個同步方法;
(2)建立新Buffer與currentBuffer進行交換,currentBuffer中儲存的是Receiver接收到的資料;
(3)將currentBuffer中的所有資料封裝成Block物件,blockId名稱格式:input-${streamId}-${uniqueId},其中streamId表示Receiver的編號,uniqueId為時間戳;
(4)將Block物件放入blocksForPushing佇列。
1.6.3 BlockGenerator.keepPushingBlocks
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
try {
while(!stopped) {
Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
...
} catch {
...
}
}
從blocksForPushing取出一個Block,然後呼叫BlockGenerator.pushBlock方法。
private def pushBlock(block: Block) {
listener.onPushBlock(block.id, block.buffer)
logInfo("Pushed block " + block.id)
}