1. 程式人生 > >spark學習五 DStream(spark流式資料處理)

spark學習五 DStream(spark流式資料處理)

流資料的特點

與一般的檔案(即內容已經固定)型資料來源相比,所謂的流資料擁有如下的特點

1.   資料一直處在變化中

2.   資料無法回退

3.   資料一直源源不斷的湧進

DStream

如果要用一句話來概括SparkStreaming的處理思路的話,那就是"將連續的資料持久化,離散化,然後進行批量處理"

讓我們來仔細分析一下這麼作的原因。

·        資料持久化將從網路上接收到的資料先暫時儲存下來,為事件處理出錯時的事件重演提供可能,

·        離散化資料來源源不斷的湧進,永遠沒有一個盡頭,就像周星馳的喜劇中所說崇拜之情如黃河之水綿綿不絕,一發而不可收拾。既然不能窮盡,那麼就將其按時間分片。比如採用一分鐘為時間間隔,那麼在連續的一分鐘內收集到的資料集中儲存在一起。

·        批量處理將持久化下來的資料分批進行處理,處理機制套用之前的RDD模式

DStream可以說是對RDD的又一層封裝。如果開啟DStream.scalaRDD.scala,可以發現幾乎RDD上的所有operationDStream中都有相應的定義。

作用於DStream上的operation分成兩類

1.   Transformation

2.   Output 表示將輸出結果,目前支援的有print,saveAsObjectFiles, saveAsTextFiles, saveAsHadoopFiles

DStreamGraph

有輸入就要有輸出,如果沒有輸出,則前面所做的所有動作全部沒有意義,那麼如何將這些輸入和輸出繫結起來呢?這個問題的解決就依賴於

DStreamGraphDStreamGraph記錄輸入的Stream和輸出的Stream

  privateval inputStreams = new ArrayBuffer[InputDStream[_]]()
  privateval outputStreams = new ArrayBuffer[DStream[_]]()
  var rememberDuration: Duration = null
  var checkpointInProgress = false

outputStreams中的元素是在有Output型別的Operation作用於DStream上時自動新增到

DStreamGraph中的。

outputStream區別於inputStream一個重要的地方就是會過載generateJob.

初始化流程

StreamingContext

StreamingContextSpark Streaming初始化的入口點,主要的功能是根據入參來生成JobScheduler

設定InputStream

如果流資料來源來自於socket,則使用socketStream。如果資料來源來自於不斷變化著的檔案,則可使用fileStream

提交執行

StreamingContext.start()

資料處理

socketStream為例,資料來自於socket

SocketInputDstream啟動一個執行緒,該執行緒使用receive函式來接收資料

def receive() {                                                                                                          
    var socket: Socket = null                                                                                              
    try {                                                                                                                  
      logInfo("Connecting to " + host + ":" + port)                                                                        
      socket = new Socket(host, port)                                                                                      
      logInfo("Connected to " + host + ":" + port)                                                                         
      val iterator = bytesToObjects(socket.getInputStream())                                                               
      while(!isStopped && iterator.hasNext) {                                                                              
        store(iterator.next)                                                                                               
      }                                                                                                                    
      logInfo("Stopped receiving")                                                                                         
      restart("Retrying connecting to " + host + ":" + port)                                                               
    } catch {                                                                                                              
      case e: java.net.ConnectException =>                                                                                 
        restart("Error connecting to " + host + ":" + port, e)                                                             
      case t: Throwable =>                                                                                                 
        restart("Error receiving data", t)                                                                                 
    } finally {       
 
   if (socket != null) {                                                                                                
        socket.close()                                                                                                     
        logInfo("Closed socket to " + host + ":" + port)                                                                   
      }                                                                                                                    
    }                                                                                                                      
  }                                                                                                                        
}        


接收到的資料會被先儲存起來,儲存最終會呼叫到BlockManager.scala中的函式,那麼BlockManager是如何被傳遞到StreamingContext的呢?利用SparkEnv傳入的,注意StreamingContext建構函式的入參。

處理定時器

資料的儲存有是被socket觸發的。那麼已經儲存的資料被真正的處理又是被什麼觸發的呢?

記得在初始化StreamingContext的時候,我們指定了一個時間引數,那麼用這個引數會構造相應的重複定時器,一旦定時器超時,呼叫generateJobs函式。

privateval timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")

事件處理函式

 /** Processes all events */                                                                                              
  privatedef processEvent(event: JobGeneratorEvent) {                                                                     
    logDebug("Got event " + event)                                                                                         
    event match {                                                                                                          
      case GenerateJobs(time) => generateJobs(time)                                                                        
      case ClearMetadata(time) => clearMetadata(time)                                                                      
      case DoCheckpoint(time) => doCheckpoint(time)                                                                        
      case ClearCheckpointData(time) => clearCheckpointData(time)                                                          
    }                                                                                                                      
  }     

generteJobs

 privatedef generateJobs(time: Time) {                                                                                   
    SparkEnv.set(ssc.env)                                                                                                  
    Try(graph.generateJobs(time)) match {                                                                                  
      case Success(jobs) =>                                                                                                
        val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>                                              
          val streamId = stream.id                                                                                         
          val receivedBlockInfo = stream.getReceivedBlockInfo(time)                                                        
          (streamId, receivedBlockInfo)                                                                                    
        }.toMap                                                                                                            
        jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))                                                   
      case Failure(e) =>                                                                                                   
        jobScheduler.reportError("Error generating jobs for time " + time, e)                                              
    }                                                                                                                      
    eventActor ! DoCheckpoint(time)                                                                                        
  }          

 generateJobs->generateJob一路下去會呼叫到Job.run,job.run中呼叫sc.runJob,在具體呼叫路徑就不一一列出。

 privateclassJobHandler(job: Job)extendsRunnable {
    def run() {
      eventActor ! JobStarted(job)
      job.run()
      eventActor ! JobCompleted(job)
    }
  }

DStream.generateJob函式中定義了jobFunc,也就是在job.run()中使用到的jobFunc

  private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) => {
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      }
      case None => None
    }
  }


在這個流程中,DStreamGraph起到非常關鍵的作用,非常類似於TridentStorm中的graph.

generateJob過程中,DStream會通過呼叫compute函式生成相應的RDDSparkContext則是將基於RDD的抽象轉換成為多個stage來執行。在StreamingContext中一個重要的轉換是DStreamRDD的轉換,另一個重要的轉換RDDStageTask的轉換。

相關推薦

spark學習 DStreamspark資料處理

流資料的特點 與一般的檔案(即內容已經固定)型資料來源相比,所謂的流資料擁有如下的特點 1.   資料一直處在變化中 2.   資料無法回退 3.   資料一直源源不斷的湧進 DStream 如果要用一句話來概括SparkStreaming的處理思路的話,那就是"將連續的資

Spark學習——Spark Streaming:大規模資料處理

提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於資料分析的軟體棧。從它的視角來看,目前的大資料處理可以分為如以下三個型別。  複雜的批量資料處理(batch data processing),通常的時間跨度

Spark Streaming:大規模資料處理

提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於資料分析的軟體棧。從它的視角來看,目前的大資料處理可以分為如以下三個型別。  複雜的批量資料處理(batch data processing),通常的時間跨度

《Java8實戰》-第六章讀書筆記收集資料-01

用流收集資料 我們在前一章中學到,流可以用類似於資料庫的操作幫助你處理集合。你可以把Java 8的流看作花哨又懶惰的資料集迭代器。它們支援兩種型別的操作:中間操作(如 filter 或 map )和終端操作(如 count 、 findFirst 、 forEach

kubernetes log 資料處理

PS: 最近在重構公司的業務容器化平臺,記錄一塊。關於容器日誌的, kubernetes python API本身提供了日誌流式資料,在以前的版本是不會輸出新資料的,後續版本進行了改進。 直接上程式碼 Flask 前端路由塊 # Router """獲取專案pod的日誌""" @api_cluster

Spring Cloud學習--容錯機制Hystrix DashBoard之資料監控

本文目錄: 一、使用Actuator監控 二 、使用Hystrix DashBoard監控 Actuator 能看到的是一大堆資料,而使用Hystrix DashBoard(儀表盤),使得監控資料圖形化、視覺化。Hystrix儀表板可以顯示每個斷

資料處理

1、直接登陸伺服器:ssh 2014210***@thumedia.org -p 6349 建立streaming.py:   touch streaming.py,並且如下編輯: <span style="font-size:14px;">#! /usr/

JDK8 新特性資料處理

在學習JDK8新特性Optional類的時候,提到對於Optional的兩個操作對映和過濾設計到JDK提供的流式出來。這篇文章便詳細的介紹流式處理: 一. 流式處理簡介 流式處理給開發者的第一感覺就是讓集合操作變得簡潔了許多,通常我們需要多行程式碼才能完

【java8】持續精進-之資料處理

流式處理簡介 在我接觸到java8流式處理的時候,我的第一感覺是流式處理讓集合操作變得簡潔了許多,通常我們需要多行程式碼才能完成的操作,藉助於流式處理可以在一行中實現。比如我們希望對一個包含整數的集合中篩選出所有的偶數,並將其封裝成為一個新的List返回,那麼

Redis和nosql簡介,api呼叫;Redis資料功能String型別的資料處理;List資料結構及Java呼叫處理;Hash資料結構;Set資料結構功能;sortedSet有序集合

1、Redis和nosql簡介,api呼叫14.1/ nosql介紹NoSQL:一類新出現的資料庫(not only sql),它的特點:1、  不支援SQL語法2、  儲存結構跟傳統關係型資料庫中的那種關係表完全不同,nosql中儲存的資料都是KV形式3、  NoSQL的世界中沒有一種通用的語言,每種no

Python實現讀取多個/批量txt檔案合併成一個txt示例為tcga資料處理

本程式 功能:將tcga資料的批量txt檔案合併成一個txt原始的第一個txt(代表一個病人)的資料內容 合併之後的txt資料,基因名不變,只是把病人的表達量收集到一起 操作說明:本人測試通過的執行環境:Python 2.7  Windows 7 64bit  cmd命令執行

Spark學習之路 Spark2.3 HA集群的分布安裝

serve html 元數據 不安裝 rec ive cut 再次 apps 一、下載Spark安裝包 1、從官網下載 http://spark.apache.org/downloads.html 2、從微軟的鏡像站下載 http://mirrors.hust.

Spark學習之路 SparkCore的源碼解讀啟動腳本

-o 啟動服務 binary dirname ppi std 參數 exp 情況 一、啟動腳本分析 獨立部署模式下,主要由master和slaves組成,master可以利用zk實現高可用性,其driver,work,app等信息可以持久化到zk上;slaves由一臺至多

Spark學習之路 二十八分布圖計算系統

尺度 內存 底層 mapr 分區 ces 兩個 傳遞方式 cat 一、引言   在了解GraphX之前,需要先了解關於通用的分布式圖計算框架的兩個常見問題:圖存儲模式和圖計算模式。 二、圖存儲模式   巨型圖的存儲總體上有邊分割和點分割兩種存儲方式。2013年,Gra

Spark學習之路 Spark的廣播變量和累加器

img 還原 變量定義 如果 style 調優 學習之路 park 系統 一、概述 在spark程序中,當一個傳遞給Spark操作(例如map和reduce)的函數在遠程節點上面運行時,Spark操作實際上操作的是這個函數所用變量的一個獨立副本。這些變量會被復制到每臺機器

Spark學習之路 十一SparkCore的調優之Spark內存模型

精準 規模 memory 此外 結構定義 申請 管理方式 存儲 內部 摘抄自:https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index

Spark學習之路 十二SparkCore的調優之資源調優JVM的基本架構

程序員 存儲 src ron 指示器 引用 double strong 功能 一、JVM的結構圖 1.1 Java內存結構 JVM內存結構主要有三大塊:堆內存、方法區和棧。 堆內存是JVM中最大的一塊由年輕代和老年代組成,而年輕代內存又被分成三部分,Eden空間、

Spark學習之路 十二SparkCore的調優之資源調優

限制 無法 數據 block 可能 executors 頻繁 通過 操作 摘抄自:https://tech.meituan.com/spark-tuning-basic.html 一、概述 在開發完Spark作業之後,就該為作業配置合適的資源了。Spark的資源參數,基

Spark學習之路 十四SparkCore的調優之資源調優JVM的GC垃圾收集器

當前 復制 event 只需要 引用 應用 之前 相互 分享 一、概述 垃圾收集 Garbage Collection 通常被稱為“GC”,它誕生於1960年 MIT 的 Lisp 語言,經過半個多世紀,目前已經十分成熟了。 jvm 中,程序計數

java實現spark streaming與kafka整合進行計算

背景:網上關於spark streaming的文章還是比較多的,可是大多數用scala實現,因我們的電商實時推薦專案以java為主,就踩了些坑,寫了java版的實現,程式碼比較意識流,輕噴,歡迎討論。流程:spark streaming從kafka讀使用者實時點選資料,過濾資