1. 程式人生 > >SparkStreaming(5):例項-SparkStreaming處理本地或者HDFS檔案

SparkStreaming(5):例項-SparkStreaming處理本地或者HDFS檔案

1.實現功能:

SparkStreaming處理本地或者HDFS檔案,並進行wordcount的統計。

2.前提開啟:

(1)hdfs

(2)metastore

3.scala程式碼:

(1)本地目錄寫法:

file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\datas\\

(2)hdfs目錄寫法:

/spark/

(3)程式碼(以本地為例)

package Spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 使用spark Streaming處理檔案系統(local/hdfs)的資料
  */
object FileWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setMaster("local[2]").setAppName("FileWordCount")

    val ssc=new StreamingContext(sparkConf,Seconds(5))




    //    file:///opt/modules/spark-2.1.0-bin-2.7.3/README.md
    val lines=ssc.textFileStream("file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\datas\\")

    val result= lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    result.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

4.測試:

(1)將內容寫入test.log

(2)將檔案test.log採用cp方式,放到對應datas檔案下面

cp .\test.log .\datas\

(注意:(2)非常重要,一定要通過cp或者mv的方式移動進去,否者streaming讀取不到增加的流資訊!)