Spark Streaming接入HDFS的資料模擬一個wordcount的功能,結果列印到控制檯,使用Local模式,使用Scala語言。



<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

      <name>Scala-Tools Maven2 Repository</name>

      <name>Scala-Tools Maven2 Repository</name>










package com.sid.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

  * Created by jy02268879 on 2018/7/16.
  * Spark Streaming 處理 HDFS檔案資料
object HDFSWordCount {
  def  main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("HDFSWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

      * Create an input stream that monitors a Hadoop-compatible filesystem
      * for new files and reads them as text files (using key as LongWritable, value
      * as Text and input format as TextInputFormat). Files must be written to the
      * monitored directory by "moving" them from another location within the same
      * file system. File names starting with . are ignored.
      * param directory HDFS directory to monitor for new file
    val lines = ssc.textFileStream("hdfs://node1:9000/testdata/sparkstreaming/hdfswordcount/")

    val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)






cd /app/hadoop/hadoop-2.9.0/sbin



cd /app/hadoop/hadoop-2.9.0/bin

hdfs dfs -mkdir -p /testdata/sparkstreaming/hdfswordcount


cd /app/spark/test_data/monitor_file

vi test.log



cd /app/spark/test_data/monitor_file

hdfs dfs -put test.log /testdata/sparkstreaming/hdfswordcount



