WordCount程序【Spark Streaming版本】
阿新 • • 發佈:2019-02-27
context 電腦 更多 ring 需要 -s 文件系統 cas key
~~
前置
~~
Spark Streaming 常常對接 :本地文件、HDFS、端口、flume、kafka
package february.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * ==== Spark Streaming可以監聽本地文件、HDFS、端口、flume、kafka ===== * * * Description: 使用Spark Streaming處理文件系統(local/hdfs)的數據 * 通過 SparkStreaming 來實現WordCount * Spark Streaming * * 提交代碼 * spark-submit --master spark://spark001:7077 --deploy-mode client --class february.streaming.SparkStreamingWordCount /home/liuge36/jars/SparkDayDemo.jar * * @Author: 留歌36 * @Date: 2019/2/21 17:27 */ object SparkStreamingWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .setMaster("local[2]") //local[2] 或local也都可以 .setAppName(this.getClass.getSimpleName) //拿到StreamingContext 這個上下文對象 val ssc = new StreamingContext(sparkConf, Seconds(5)) // 讀取Windows 10電腦上的文件目錄,沒有成功,可能是因為moving的原因吧 // val input = ssc.textFileStream("file:///f:\\tmp") // 讀取centos local 從源碼中也可以看出,需要移動文件到指定目錄下,才能檢測到 val input = ssc.textFileStream("file:///home/liuge36/feb/") // 讀取Linux服務器的文件目錄 // val lines = ssc.textFileStream("file:///") val lines = input.flatMap(line => line.split(",")) val count = lines.map(word => (word, 1)).reduceByKey{case (x, y)=> x+y } // 輸出結果 println("==================華麗分割線開始============================") count.print() //啟動主程序, ssc.start() //阻塞 等待主程序被關閉 ssc.awaitTermination() } }
更多相關小demo:每天一個程序:https://blog.csdn.net/liuge36/column/info/34094
WordCount程序【Spark Streaming版本】