spark學習五 DStream(spark流式資料處理)
流資料的特點
與一般的檔案(即內容已經固定)型資料來源相比,所謂的流資料擁有如下的特點
1. 資料一直處在變化中
2. 資料無法回退
3. 資料一直源源不斷的湧進
DStream
如果要用一句話來概括SparkStreaming的處理思路的話,那就是"將連續的資料持久化,離散化,然後進行批量處理"。
讓我們來仔細分析一下這麼作的原因。
· 資料持久化將從網路上接收到的資料先暫時儲存下來,為事件處理出錯時的事件重演提供可能,
· 離散化資料來源源不斷的湧進,永遠沒有一個盡頭,就像周星馳的喜劇中所說“崇拜之情如黃河之水綿綿不絕,一發而不可收拾”。既然不能窮盡,那麼就將其按時間分片。比如採用一分鐘為時間間隔,那麼在連續的一分鐘內收集到的資料集中儲存在一起。
· 批量處理將持久化下來的資料分批進行處理,處理機制套用之前的RDD模式
DStream可以說是對RDD的又一層封裝。如果開啟DStream.scala和RDD.scala,可以發現幾乎RDD上的所有operation在DStream中都有相應的定義。
作用於DStream上的operation分成兩類
1. Transformation
2. Output 表示將輸出結果,目前支援的有print,saveAsObjectFiles, saveAsTextFiles, saveAsHadoopFiles
DStreamGraph
有輸入就要有輸出,如果沒有輸出,則前面所做的所有動作全部沒有意義,那麼如何將這些輸入和輸出繫結起來呢?這個問題的解決就依賴於
privateval inputStreams = new ArrayBuffer[InputDStream[_]]()
privateval outputStreams = new ArrayBuffer[DStream[_]]()
var rememberDuration: Duration = null
var checkpointInProgress = false
outputStreams中的元素是在有Output型別的Operation作用於DStream上時自動新增到
outputStream區別於inputStream一個重要的地方就是會過載generateJob.
初始化流程
StreamingContext
StreamingContext是Spark Streaming初始化的入口點,主要的功能是根據入參來生成JobScheduler
設定InputStream
如果流資料來源來自於socket,則使用socketStream。如果資料來源來自於不斷變化著的檔案,則可使用fileStream
提交執行
StreamingContext.start()
資料處理
以socketStream為例,資料來自於socket。
SocketInputDstream啟動一個執行緒,該執行緒使用receive函式來接收資料
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
logInfo("Stopped receiving")
restart("Retrying connecting to " + host + ":" + port)
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
restart("Error receiving data", t)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
}
接收到的資料會被先儲存起來,儲存最終會呼叫到BlockManager.scala中的函式,那麼BlockManager是如何被傳遞到StreamingContext的呢?利用SparkEnv傳入的,注意StreamingContext建構函式的入參。
處理定時器
資料的儲存有是被socket觸發的。那麼已經儲存的資料被真正的處理又是被什麼觸發的呢?
記得在初始化StreamingContext的時候,我們指定了一個時間引數,那麼用這個引數會構造相應的重複定時器,一旦定時器超時,呼叫generateJobs函式。
privateval timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
事件處理函式
/** Processes all events */
privatedef processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
generteJobs
privatedef generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}
generateJobs->generateJob一路下去會呼叫到Job.run,在job.run中呼叫sc.runJob,在具體呼叫路徑就不一一列出。
privateclassJobHandler(job: Job)extendsRunnable {
def run() {
eventActor ! JobStarted(job)
job.run()
eventActor ! JobCompleted(job)
}
}
DStream.generateJob函式中定義了jobFunc,也就是在job.run()中使用到的jobFunc
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
在這個流程中,DStreamGraph起到非常關鍵的作用,非常類似於TridentStorm中的graph.
在generateJob過程中,DStream會通過呼叫compute函式生成相應的RDD,SparkContext則是將基於RDD的抽象轉換成為多個stage來執行。在StreamingContext中一個重要的轉換是DStream到RDD的轉換,另一個重要的轉換是RDD到Stage及Task的轉換。
相關推薦
spark學習五 DStream(spark流式資料處理)
流資料的特點 與一般的檔案(即內容已經固定)型資料來源相比,所謂的流資料擁有如下的特點 1. 資料一直處在變化中 2. 資料無法回退 3. 資料一直源源不斷的湧進 DStream 如果要用一句話來概括SparkStreaming的處理思路的話,那就是"將連續的資
Spark學習——Spark Streaming:大規模流式資料處理
提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於資料分析的軟體棧。從它的視角來看,目前的大資料處理可以分為如以下三個型別。 複雜的批量資料處理(batch data processing),通常的時間跨度
Spark Streaming:大規模流式資料處理
提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於資料分析的軟體棧。從它的視角來看,目前的大資料處理可以分為如以下三個型別。 複雜的批量資料處理(batch data processing),通常的時間跨度
《Java8實戰》-第六章讀書筆記(用流收集資料-01)
用流收集資料 我們在前一章中學到,流可以用類似於資料庫的操作幫助你處理集合。你可以把Java 8的流看作花哨又懶惰的資料集迭代器。它們支援兩種型別的操作:中間操作(如 filter 或 map )和終端操作(如 count 、 findFirst 、 forEach
kubernetes log 流式資料處理
PS: 最近在重構公司的業務容器化平臺,記錄一塊。關於容器日誌的, kubernetes python API本身提供了日誌流式資料,在以前的版本是不會輸出新資料的,後續版本進行了改進。 直接上程式碼 Flask 前端路由塊 # Router """獲取專案pod的日誌""" @api_cluster
Spring Cloud學習--容錯機制(Hystrix DashBoard之資料監控)
本文目錄: 一、使用Actuator監控 二 、使用Hystrix DashBoard監控 Actuator 能看到的是一大堆資料,而使用Hystrix DashBoard(儀表盤),使得監控資料圖形化、視覺化。Hystrix儀表板可以顯示每個斷
流式資料處理
1、直接登陸伺服器:ssh 2014210***@thumedia.org -p 6349 建立streaming.py: touch streaming.py,並且如下編輯: <span style="font-size:14px;">#! /usr/
JDK8 新特性流式資料處理
在學習JDK8新特性Optional類的時候,提到對於Optional的兩個操作對映和過濾設計到JDK提供的流式出來。這篇文章便詳細的介紹流式處理: 一. 流式處理簡介 流式處理給開發者的第一感覺就是讓集合操作變得簡潔了許多,通常我們需要多行程式碼才能完
【java8】持續精進-之流式資料處理
流式處理簡介 在我接觸到java8流式處理的時候,我的第一感覺是流式處理讓集合操作變得簡潔了許多,通常我們需要多行程式碼才能完成的操作,藉助於流式處理可以在一行中實現。比如我們希望對一個包含整數的集合中篩選出所有的偶數,並將其封裝成為一個新的List返回,那麼
Redis和nosql簡介,api呼叫;Redis資料功能(String型別的資料處理);List資料結構(及Java呼叫處理);Hash資料結構;Set資料結構功能;sortedSet(有序集合)數
1、Redis和nosql簡介,api呼叫14.1/ nosql介紹NoSQL:一類新出現的資料庫(not only sql),它的特點:1、 不支援SQL語法2、 儲存結構跟傳統關係型資料庫中的那種關係表完全不同,nosql中儲存的資料都是KV形式3、 NoSQL的世界中沒有一種通用的語言,每種no
Python實現讀取多個/批量txt檔案合併成一個txt(示例為tcga資料處理)
本程式 功能:將tcga資料的批量txt檔案合併成一個txt原始的第一個txt(代表一個病人)的資料內容 合併之後的txt資料,基因名不變,只是把病人的表達量收集到一起 操作說明:本人測試通過的執行環境:Python 2.7 Windows 7 64bit cmd命令執行
Spark學習之路 (二)Spark2.3 HA集群的分布式安裝
serve html 元數據 不安裝 rec ive cut 再次 apps 一、下載Spark安裝包 1、從官網下載 http://spark.apache.org/downloads.html 2、從微軟的鏡像站下載 http://mirrors.hust.
Spark學習之路 (十五)SparkCore的源碼解讀(一)啟動腳本
-o 啟動服務 binary dirname ppi std 參數 exp 情況 一、啟動腳本分析 獨立部署模式下,主要由master和slaves組成,master可以利用zk實現高可用性,其driver,work,app等信息可以持久化到zk上;slaves由一臺至多
Spark學習之路 (二十八)分布式圖計算系統
尺度 內存 底層 mapr 分區 ces 兩個 傳遞方式 cat 一、引言 在了解GraphX之前,需要先了解關於通用的分布式圖計算框架的兩個常見問題:圖存儲模式和圖計算模式。 二、圖存儲模式 巨型圖的存儲總體上有邊分割和點分割兩種存儲方式。2013年,Gra
Spark學習之路 (四)Spark的廣播變量和累加器
img 還原 變量定義 如果 style 調優 學習之路 park 系統 一、概述 在spark程序中,當一個傳遞給Spark操作(例如map和reduce)的函數在遠程節點上面運行時,Spark操作實際上操作的是這個函數所用變量的一個獨立副本。這些變量會被復制到每臺機器
Spark學習之路 (十一)SparkCore的調優之Spark內存模型
精準 規模 memory 此外 結構定義 申請 管理方式 存儲 內部 摘抄自:https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index
Spark學習之路 (十二)SparkCore的調優之資源調優JVM的基本架構
程序員 存儲 src ron 指示器 引用 double strong 功能 一、JVM的結構圖 1.1 Java內存結構 JVM內存結構主要有三大塊:堆內存、方法區和棧。 堆內存是JVM中最大的一塊由年輕代和老年代組成,而年輕代內存又被分成三部分,Eden空間、
Spark學習之路 (十二)SparkCore的調優之資源調優
限制 無法 數據 block 可能 executors 頻繁 通過 操作 摘抄自:https://tech.meituan.com/spark-tuning-basic.html 一、概述 在開發完Spark作業之後,就該為作業配置合適的資源了。Spark的資源參數,基
Spark學習之路 (十四)SparkCore的調優之資源調優JVM的GC垃圾收集器
當前 復制 event 只需要 引用 應用 之前 相互 分享 一、概述 垃圾收集 Garbage Collection 通常被稱為“GC”,它誕生於1960年 MIT 的 Lisp 語言,經過半個多世紀,目前已經十分成熟了。 jvm 中,程序計數
java實現spark streaming與kafka整合進行流式計算
背景:網上關於spark streaming的文章還是比較多的,可是大多數用scala實現,因我們的電商實時推薦專案以java為主,就踩了些坑,寫了java版的實現,程式碼比較意識流,輕噴,歡迎討論。流程:spark streaming從kafka讀使用者實時點選資料,過濾資