1. 程式人生 > >學習筆記--Spark

學習筆記--Spark

lin 4.0 sequence output dds ava hashmap rdd www

參考來源:http://www.yiibai.com/spark/

概述
Apache Spark是一個集群計算設計的快速計算。它是建立在Hadoop MapReduce之上,它擴展了 MapReduce 模式,有效地使用更多類型的計算,其中包括交互式查詢和流處理。Spark的主要特征是其內存集群計算,增加的應用程序的處理速度。

三種部署方法:

  • 單機版 ? Spark獨立部署是指Spark占據在HDFS之上(Hadoop分布式文件系統)並將空間分配給HDFS。在這裏,Spark和MapReduce將並列覆蓋所有Spark的作業集群。
  • Hadoop Yarn ? Hadoop Yarn部署方式,簡單地說,spark運行在Yarn沒有任何必要預安裝或使用root訪問權限。它有助於Spark融入Hadoop生態系統和Hadoop堆棧。它允許在其它部件疊上層的頂部上運行。
  • Spark 在MapReduce (SIMR) ? Spark在MapReduce的用於啟動spark作業,除了獨立部署。通過SIMR,用戶可以啟動Spark和使用Shell,而不需要任何管理權限。

Spark RDD
彈性分布式數據集(RDD)是Spark的基本數據結構。它是對象的不可變的分布式集合。在RDD中每個數據集被劃分成邏輯分區,這可能是在群集中的不同節點上計算的。RDDS可以包含任何類型,如:Python,Java,或者Scala的對象,包括用戶定義的類。

安裝
按順序安裝Java、Scala、Spark

Spark核心編程
創建簡單RDD
Spark容器會自動創建Spark 上下文對象名為sc

$ spark-shell
scala> val inputfile = sc.textFile(“input.txt”)

RDD轉換
S.No | 轉換&含義
--------|----------------
1 | map(func) 返回一個新的分布式數據集,傳遞源的每個元素形成通過一個函數 func
2 | filter(func) 返回由選擇在func返回true,源元素組成了一個新的數據集
3 | flatMap(func) 類似映射,但每個輸入項目可以被映射到0以上輸出項目(所以func應返回seq而不是單一的項目)
4 | mapPartitions(func) 類似映射,只不過是單獨的每個分區(塊)上運行RDD,因此 func 的類型必須是Iterator

動作
S.No | 操作 & 含義
--------|---------------------
1 | reduce(func) 合計數據集的元素,使用函數 func (其中有兩個參數和返回一行). 該函數應該是可交換和可結合,以便它可以正確地在並行計算。
2 | collect() 返回數據集的所有作為數組在驅動程序的元素。這是一個過濾器或其它操作之後返回數據的一個足夠小的子集,通常是有用的
3 | count() 返回該數據集的元素數
4 | first() 返回的數據集的第一個元素(類似於使用(1))
5 | take(n) 返回與該數據集的前n個元素的陣列。
6 | takeSample (withReplacement,num, [seed]) 返回數組的數據集num個元素,有或沒有更換隨機抽樣,預指定的隨機數發生器的種子可選
7 | takeOrdered(n, [ordering]) 返回RDD使用或者按其自然順序或自定義比較的前第n個元素
8 | saveAsTextFile(path) 寫入數據集是一個文本文件中的元素(或一組文本文件),在給定的目錄的本地文件系統,HDFS或任何其他的Hadoop支持的文件系統。Spark調用每個元素的 toString,將其轉換為文件中的文本行
9 | saveAsSequenceFile(path) (Java and Scala) 寫入數據集,為Hadoop SequenceFile元素在給定的路徑寫入在本地文件系統,HDFS或任何其他Hadoop支持的文件系統。 這是適用於實現Hadoop可寫接口RDDS的鍵 - 值對。在Scala中,它也可以在屬於隱式轉換為可寫(Spark包括轉換為基本類型,如 Int, Double, String 等等)類型。
10 | saveAsObjectFile(path) (Java and Scala) 寫入數據集的內容使用Java序列化為一個簡單的格式,然後可以使用SparkContext.objectFile()加載。
11 | countByKey() 僅適用於RDDS的類型 (K, V). 返回(K, Int)對與每個鍵的次數的一個HashMap。
12 | foreach(func) 數據集的每個元素上運行函數func。這通常對於不良反應,例如更新累加器或與外部存儲系統進行交互進行。

示例程序

//打開Spark-Shell
$ spark-shell 
//創建一個RDD
scala> val inputfile = sc.textFile("input.txt")
//執行字數轉換
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
//當前RDD
scala> counts.toDebugString
//緩存轉換
scala> counts.cache()
//應用動作
scala> counts.saveAsTextFile("output")

Spark部署
Spark應用程序使用spark-submit(shell命令)來部署在集群中的Spark應用程序
示例:
SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
  def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
        
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
        
      valcount = input.flatMap(line ? line.split(" ")) 
      .map(word ? (word, 1)) 
      .reduceByKey(_ + _) 
      
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
  } 
}  

步驟:
1、下載Spark Ja
下載spark-core_2.10-1.3.0.jar
2、編譯程序

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala 

3、創建JAR

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

4、提交spark應用

spark-submit --class SparkWordCount --master local wordcount.jar 

學習筆記--Spark