1. 程式人生 > >Spark元件之Spark Streaming學習4--HdfsWordCount 學習

Spark元件之Spark Streaming學習4--HdfsWordCount 學習

1.理解:HdfsWordCount 是從hdfs的檔案讀入流檔案,即制定檔案目錄,每個一段時間掃描該路徑下的檔案,不掃描子目錄下的檔案。

如果有新增加的檔案,則進行流計算

 val ssc = new StreamingContext(sparkConf, Seconds(2))

處理跟前面差不多

2.執行:

輸入:

[email protected]:~/cloud/testByXubo/spark/Streaming/data$ hadoop fs -put 2.txt /xubo/spark/data/Streaming/hdfsWordCount/
[email protected]:~/cloud/testByXubo/spark/Streaming/data$ hadoop fs -put 3.txt /xubo/spark/data/Streaming/hdfsWordCount/

[email protected]:~/cloud/testByXubo/spark/Streaming/data$ cat 3.txt 
hello world
hello world
hello world
hello world
hello world
hello world
hello world
a
a
a
a
a
a
a b b b 


輸出:

16/04/26 21:26:06 INFO scheduler.DAGScheduler: Job 19 finished: print at HdfsWordCount.scala:52, took 0.023056 s
-------------------------------------------
Time: 1461677166000 ms
-------------------------------------------
(hello,1)
(world,1)

加入檔案後:
-------------------------------------------
Time: 1461677550000 ms
-------------------------------------------
(b,3)
(hello,7)
(world,7)
(a,7)

3.原始碼:
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package org.apache.spark.Streaming.learning

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

/**
 * Counts words in new text files created in the given directory
 * Usage: HdfsWordCount <directory>
 *   <directory> is the directory that Spark Streaming will use to find and read new text files.
 *
 * To run this on your local machine on directory `localdir`, run this example
 *    $ bin/run-example \
 *       org.apache.spark.examples.streaming.HdfsWordCount localdir
 *
 * Then create a text file in `localdir` and the words in the file will get counted.
 */
object HdfsWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println


相關推薦

Spark元件Spark Streaming學習4--HdfsWordCount 學習

1.理解:HdfsWordCount 是從hdfs的檔案讀入流檔案,即制定檔案目錄,每個一段時間掃描該路徑下的檔案,不掃描子目錄下的檔案。 如果有新增加的檔案,則進行流計算  val ssc =

Spark元件Spark Streaming學習6--如何呼叫Dstream裡面的getOrCompute方法?

1解釋 下圖中有getOrCompute在 在Dstream中有對getOrCompute的定義,但是是 private[streaming] 的,所以需要在streaming包下才能呼叫

Spark元件GraphX學習16--最短路徑ShortestPaths

1解釋 求圖中的最短路徑,更多的請見參考【3】,這篇寫的很詳細 2.程式碼: /** * @author xubo * ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html *

Spark元件GraphX學習7--隨機圖生成和reduce最大或最小出度/入度/度

1解釋 通過自定義函式 reduce最大或最小出度/入度/度 2.程式碼: /** * @author xubo * ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html *

Spark元件GraphX學習14--TriangleCount例項和分析

1解釋 統計圖中的Triangle,並返回 原始碼: /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.

spark筆記Spark Streaming整合flume實戰

a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.

spark筆記Spark Streaming整合kafka實戰

kafka作為一個實時的分散式訊息佇列,實時的生產和消費訊息,這裡我們可以利用SparkStreaming實時地讀取kafka中的資料,然後進行相關計算。 在Spark1.3版本後,KafkaUtils裡面提供了兩個建立dstream的方法,一種為KafkaUtils.cr

《深入理解Sparkspark Streaming概念的再理解

1、spark Streaming是一個微批處理的框架 2、批處理時間間隔 batchInterval       >> 表示在batchInterval時間內Spark 所接收的資料被當做一個批次做處理 3、批處理時間間隔(batchInterval)、視窗長

python學習路7 前端學習4 jQuery 學習

前端學習 ast first 對象 獲取 前端 索引 簡寫 [0 轉換:   jquery 對象[0] => Dom對象 $(Dom 對象 ) => jquery 對象 1.id $("#id") 2.class $(".c

spark筆記Spark任務調度

fda 調度 water 周期 taskset 完成 構建 任務 shadow 9.1 任務調度流程圖各個RDD之間存在著依賴關系,這些依賴關系就形成有向無環圖DAG,DAGScheduler對這些依賴關系形成的DAG進行Stage劃分,劃分的規則很簡單,從後往前回溯,遇到

spark筆記Spark運行架構

示意圖 exe 使用 sta yarn 釋放 構建 遠程 work Spark運行基本流程Spark運行基本流程參見下面示意圖:1) 構建Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(可以是Stand

《深入理解SparkSpark常用運算元詳解(java版+spark1.6.1)

最近公司要用Java開發Spark專案,以前用的是Scala語言,今天就把Spark常用的運算元使用java語言實現了一遍 XML Code  1 2 3 4 5 6 7 8 9 10 11 12

《深入理解SparkSpark-Stream概述1(官方文件翻譯版)

最近在學英語,學以致用,就嘗試著看Spark的官方文件,並試著翻譯了部分,由於水平有限如果有所疏漏的地方歡迎指正 * Spark Stream Overview * Spark Streaming

《深入理解SparkSpark與Kafka整合原理

spark和kafka整合有2中方式 1、receiver 顧名思義:就是有一個執行緒負責獲取資料,這個執行緒叫receiver執行緒 解釋: 1、Spark叢集中的某個executor中有一個receiver執行緒,這個執行緒負責從kafka中獲取資料  注意

Spark元件Mllib的學習4examples中的MovieLensALS修改本地執行

環境: spark-1.5.2 在idea中自己定義的project中local模式執行,需要注意幾個地方: 1.檔案匯入: MovieLensALS.scala 和AbstractParams.scala 其中繼承了MovieLensALSAbst

大資料學習筆記sparkspark streaming----快速通用計算引擎

導語 spark 已經成為廣告、報表以及推薦系統等大資料計算場景中首選系統,因效率高,易用以及通用性越來越得到大家的青睞,我自己最近半年在接觸spark以及spark streaming之後,對spark技術的使用有一些自己的經驗積累以及心得體會,在此分享給大家。 本文依

Spark-Avro學習4使用AvroWritePartitioned儲存AVRO檔案時進行劃分

1.主要是partition儲存avro檔案 2.程式碼: /** * @author xubo * @time 20160502 * ref https://github.com/datab

Spark SQL and DataFrame Guide(1.4.1)——DataFrames

ati been -m displays txt -a 版本 ava form Spark SQL是處理結構化數據的Spark模塊。它提供了DataFrames這樣的編程抽象。同一時候也能夠作為分布式SQL查詢引擎使用。 DataFrames D

Spark學習路 (十一)SparkCore的調優Spark內存模型

精準 規模 memory 此外 結構定義 申請 管理方式 存儲 內部 摘抄自:https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index

大資料學習SPARK計算天下

學習大資料技術,SPARK無疑是繞不過去的技術之一,它的重要性不言而喻,本文將通過提問的形式圍繞著SPARK進行介紹,希望對大家有幫助,與此同時,感謝為本文提供素材的科多大資料的武老師。 為了輔助大家更好去了解大資料技術,本文集中討論Spark的一系列技術問題,大家在學習過程中如果遇到困難,可以