1. 程式人生 > >Scala和Java二種方式實戰Spark Streaming開發

Scala和Java二種方式實戰Spark Streaming開發

在這裡我主要借鑑課上老師講的以及官網的API來進行簡單的Spark Streaming的開發:

一:java形式:

1.我們可以總結一下步驟:

第一步:建立SparkConf物件
第二步:建立SparkStreamingContext
第三步:建立愛你SparkStreaming輸入資料來源(我們將資料來源配置為本地埠9999(要求埠沒有被佔用))
第四步:我們就像對RDD程式設計一樣,基於DStream進行程式設計,原因是DStream是RDD產生的模板,在Spark Streaming發生計算前,其實質是把每個Batch的DStream的操作翻譯成了RDD操作
補充說明:
除了print()方法將處理後的資料輸出之外,還有其他的方法也非常重要,在開發中需要重點掌握,比如SaveAsTextFile,SaveAsHadoopFile等,最為重要的是foreachRDD方法,這個方法可以將資料寫入Redis,DB,DashBoard等,甚至可以隨意的定義資料放在哪裡,功能非常強大。

public class WordCountOnline {

    public static void main(String[] args) {
/*  第一步:配置SparkConf:
    1,至少兩條執行緒因為Spark Streaming應用程式在執行的時候至少有一條執行緒用於
        不斷地迴圈接受程式,並且至少有一條執行緒用於處理接受的資料(否則的話有執行緒用於處理資料,隨著時間的推移記憶體和磁碟都會
    不堪重負)
    2,對於叢集而言,每個Executor一般肯定不止一個執行緒,那對於處理SparkStreaming
        應用程式而言,每個Executor一般分配多少Core比較合適?根據我們過去的經驗,5個左右的Core是最佳的
    (一個段子分配為奇數個Core表現最佳,例如3個,5個,7個Core等)
*/
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountOnline"); SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("WordCountOnline"); /* 第二步:建立SparkStreamingContext, 1,這個是SparkStreaming應用春香所有功能的起始點和程式排程的核心 SparkStreamingContext的構建可以基於SparkConf引數也可以基於持久化的SparkStreamingContext的內容 來恢復過來(典型的場景是Driver崩潰後重新啟動,由於SparkStreaming具有連續7*24 小時不間斷執行的特徵,所以需要Driver重新啟動後繼續上一次的狀態,此時的狀態恢復需要基於曾經的Checkpoint)) 2,在一個Sparkstreaming 應用程式中可以建立若干個SparkStreaming物件,使用下一個SparkStreaming 之前需要把前面正在執行的SparkStreamingContext物件關閉掉,由此,我們獲取一個重大的啟發 我們獲得一個重大的啟發SparkStreaming也只是SparkCore上的一個應用程式而已,只不過SparkStreaming框架想執行的話需要 spark工程師寫業務邏輯 */
@SuppressWarnings("resource") JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5)); /* 第三步:建立SparkStreaming輸入資料來源input Stream 1,資料輸入來源可以基於File,HDFS,Flume,Kafka-socket等 2,在這裡我們指定資料來源於網路Socket埠,SparkStreaming連線上該埠並在執行時候一直監聽 該埠的資料(當然該埠服務首先必須存在,並且在後續會根據業務需要不斷地資料產生當然對於SparkStreaming 應用程式的而言,有無資料其處理流程都是一樣的); 3,如果經常在每個5秒鐘沒有資料的話不斷地啟動空的Job其實會造成排程資源的浪費,因為並沒有資料發生計算 所以實際的企業級生成環境的程式碼在具體提交Job前會判斷是否有資料,如果沒有的話就不再提交資料 */ JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999); /* * 第四步:接下來就像對於RDD程式設計一樣,基於DStream進行程式設計!!!原因是Dstream是RDD產生的模板(或者說類 * ),在SparkStreaming發生計算前,其實質是把每個Batch的Dstream的操作翻譯成RDD的操作 * 對初始的DTStream進行Transformation級別處理 * */ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,String>(){ //如果是Scala,由於SAM裝換,可以寫成val words = lines.flatMap{line => line.split(" ")} @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" "));//將其變成Iterable的子類 } }); // 第四步:對初始DStream進行Transformation級別操作 //在單詞拆分的基礎上對每個單詞進行例項計數為1,也就是word => (word ,1 ) JavaPairDStream<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word,1); } }); //對每個單詞事例技術為1的基礎上對每個單詞在檔案中出現的總次數 JavaPairDStream<String,Integer> wordsCount = pairs.reduceByKey(new Function2<Integer,Integer,Integer>(){ /** * */ private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { // TODO Auto-generated method stub return v1 + v2; } }); /* * 此處的print並不會直接出發Job的支援,因為現在一切都是在SparkStreaming的框架控制之下的 * 對於spark而言具體是否觸發真正的JOb執行是基於設定的Duration時間間隔的 * 諸位一定要注意的是Spark Streaming應用程式要想執行具體的Job,對DStream就必須有output Stream操作 * output Stream有很多型別的函式觸發,類print,savaAsTextFile,scaAsHadoopFiles等 * 其實最為重要的一個方法是foreachRDD,因為SparkStreaming處理的結果一般都會放在Redis,DB * DashBoard等上面,foreach主要就是用來完成這些功能的,而且可以自定義具體的資料放在哪裡!!! * */ wordsCount.print(); // SparkStreaming 執行引擎也就是Driver開始執行,Driver啟動的時候位於一條新執行緒中的,當然 // 其內部有訊息接受應用程式本身或者Executor中的訊息 jsc.start(); jsc.close(); } }

2:接下來向你展示Scala的程式碼形式,步驟思維是一樣的基本步驟可以總結為:
(1)接收資料來源:
(2)flatMap操作
(3)map操作:
(4)reduce操作:
(5)print()等操作:
(6)awaitTermination操作
程式碼形式如下:

object WordCountOnline {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("WorldCountOnline").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(1))
    //可以設定資料的來源,比如源於socket
//    設定埠9999,資料儲存級別為MEMORY_AND_DISK
    val receiverInputStream = ssc.socketTextStream("localhost",9999,StorageLevel.MEMORY_AND_DISK)
     /*
     * 他的來源可以是File
     * 他還可以來自db,hdfs等等資料來源,甚至可以自定義任何資料來源,給開發者帶來巨大想象空間*/
    val directory = ssc.textFileStream("")
    /*
    * 接下來,我們開始對接收進來的資料進行操作
    * 第一步:通過“,”將每一行的單詞分割開來,得到一個個的單詞*/
    val words = receiverInputStream.flatMap{_.split(",")}
    /*將每個單詞計數為1*/
    val pairs = words.map(word => (word,1))
    /*reduceByKey通過將區域性樣本相同單詞的個數相加,從而得到從提的單詞個數*/
    val word_count = pairs.reduceByKey(_+_)
    /*與java程式設計一樣,除了print()方法將處理後的資料輸出之外,
    * 還有其他的方法也非常重啊喲,在開發中需要重點掌握比如
    * SaveAsTextFile,SaveAshadoopFile等,最為重要的是foreachRDD方法,這個方法可以將資料寫入Redis,
    * DB,DashBoard等,甚至可以隨意的定義資料放在那裡。功能非常強大。*/
    word_count.print()
    /*此處的print方法並不會觸發job執行,因為目前程式碼還處在Spark Streaming框架的控制之下,具體
    * 是否觸發job是取決於設定的Duration時間間隔*/
    ssc.start()
    /*等待程式結束*/
    ssc.awaitTermination()

  }
}

補充說明:使用Spark Streaming可以處理各種資料來源型別,如:資料庫、HDFS,伺服器log日誌、網路流,其強大超越了你想象不到的場景,只是很多時候大家不會用,其真正原因是對Spark、spark streaming本身不瞭解。

博文內容源自DT大資料夢工廠Spark課程。相關課程內容視訊可以參考:
百度網盤連結:http://pan.baidu.com/s/1slvODe1(如果連結失效或需要後續的更多資源,請聯絡QQ460507491或者微訊號:DT1219477246 獲取上述資料)。