1. 程式人生 > >spark入門實踐之單詞統計

spark入門實踐之單詞統計

2017-07-01

簡介

Apache Spark 是專為大規模資料處理而設計的快速通用的計算引擎。 Spark由UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室) 於2009年開始開發並開源. 目前是apache頂級專案.

spark 支援scala,java,python,R. 於 2017年5月釋出2.1.1版本.

建議最好使用scala語言來開發. 因為java和python版本經常跟不上spark的進度. java,python語言還會有各種資料轉換.

spark 組成部分

spark stack

spark core

spark 的基礎, 包括任務計劃, 記憶體管理, 容錯處理, 儲存管理等, 同時也是resilient distributed datasets (RDD)的定義的地方. RDD表示spark可以在多臺裝置中進行分散式處理的資料集.

spark sql

spark sql 是spark管理結構化資料的包. 提供SQL查詢介面. 相容Apache Hive Sql 語言(HQL). 支援各種資料來源, 如Hive 表,Parquet,Json格式. 支援sql查詢的資料和各種程式設計RDD資料混合使用.

spark sql 是 加州大學伯克利分校的shark的替代品.

spark streaming

spark streaming 是spark 處理實時資料流的元件. 它提供api操作流式資料, 使其符合RDD的格式要求.

MLlib

提供通用機器學習演算法,包括分類,迴歸,聚類和協同過濾, 模型評估和資料匯入功能. 還有梯度下降優化演算法等基礎功能.

所有演算法支援分散式擴容.

GraphX

GraphX 是提供圖操作的元件. 如處理社交網路的朋友關係網路圖. 實現併發圖計算. 擴充套件了RDD api, 以直接建立圖的節點和邊, 並且各附帶不同的屬性. GraphX還提供圖操作的各種方法(如subgraph 和 mapVertices), 以及通用圖演算法庫,如pagerank和三角計算.

叢集管理

Spark 支援從一臺節點到數千臺節點的裝置運算. 對單臺的裝置, 通過自身攜帶的Standalone Scheduler管理. 對多臺裝置, 通過Hadoop YARN, Apache Mesos來管理叢集.

Spark 下載安裝

spark獨立程式

spark 獨立程式必須對SparkContext進行初始化. 如scala和java相關包可以通過maven等進行管理. 可以通過mvnrepository 查到相關依賴.

maven

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.1</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.11</artifactId>
    <version>2.1.1</version>
    <scope>provided</scope>
</dependency>


gradle

provided group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.1.1'
provided group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.1.1'
provided group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.1.1'
provided group: 'org.apache.spark', name: 'spark-mllib_2.11', version: '2.1.1'

sbt

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "2.1.1" % "provided"

初始化程式碼

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
  • setMaster 如何連線叢集,示例是”local”本地.
  • setAppName 用於標識在叢集中執行的名字, 會在監測UI上看到.

停止程式

可以呼叫SparkContext的stop(),也可以用system.exit(0),sys.exit(0)等.

測試

可以用maven或sbt 示例是一個單詞計數.

單詞計數程式碼

/* wordcount.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object WordCount {
 def main(args: Array[String]) {
   val logFile = "/Users/zhouhh/spark/README.md"
   val outputFile = "/Users/zhouhh/wc.txt"
   val conf = new SparkConf().setAppName("Word count")
   val sc = new SparkContext(conf)
   val logData = sc.textFile(logFile, 2).cache()
   val words = logData.flatMap(line => line.split(" "))
   val wordsmap = words.map(w => (w,1))
   val wordcount = wordsmap.reduceByKey(_ + _) //reduceByKey{case (x, y) => x + y}
   wordcount.saveAsTextFile(outputFile)
 }
}

編寫sbt檔案

name := "wordcount spark"

version := "0.0.1"

scalaVersion := "2.12.2"

// additional libraries
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.1" % "provided"


設定sbt 國內映象

中心maven庫http://repo1.maven.org/maven2/國內訪問非常慢, 經常被斷開,幾乎到不可用狀態. 阿里雲的映象算是造福廣大碼農了.

[email protected]/Users/zhouhh/.sbt $ vi repositories
[repositories]
    local
    aliyun: http://maven.aliyun.com/nexus/content/groups/public/
    central: http://repo1.maven.org/maven2/

配置檔案解釋順序是:本地->阿里雲映象->Maven主映象。

編譯

[email protected]/Users/zhouhh/test/spark/wordcount $ sbt package
[info] Set current project to wordcount spark (in build file:/Users/zhouhh/test/spark/wordcount/)
[info] Compiling 1 Scala source to /Users/zhouhh/test/spark/wordcount/target/scala-2.12/classes...
[info] Packaging /Users/zhouhh/test/spark/wordcount/target/scala-2.12/wordcount-spark_2.12-0.0.1.jar ...
[info] Done packaging.
[success] Total time: 8 s, completed 2017-7-1 23:43:35


提交

[email protected]/Users/zhouhh/test/spark/wordcount $ spark-submit --class WordCount --master local target/scala-2.12/wordcount-spark_2.12-0.0.1.jar
...
Exception in thread "main" java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction2$mcIII$sp
	at WordCount$.main(wordcount.scala:15)
	at WordCount.main(wordcount.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction2$mcIII$sp

這是spark帶的scala庫比較舊(2.11.8), 系統安裝的安裝scala比較新(2.12.2)引起的問題.


[email protected]/Users/zhouhh/test/spark/wordcount $ ls $SPARK_HOME/jars

scala-compiler-2.11.8.jar
scala-library-2.11.8.jar
scala-reflect-2.11.8.jar
scala-xml_2.11-1.0.2.jar
scalap-2.11.8.jar
scala-parser-combinators_2.11-1.0.4.jar

[email protected]/Users/zhouhh/test/spark/wordcount $ scala -version
Scala code runner version 2.12.2 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.

修改build.sbt

[email protected]/Users/zhouhh/test/spark/wordcount $ vi build.sbt
scalaVersion := "2.11.8"

重新編譯提交到spark

[email protected]/Users/zhouhh/test/spark/wordcount $ sbt clean package
[email protected]/Users/zhouhh/test/spark/wordcount $ spark-submit --class WordCount --master local target/scala-2.11/wordcount-spark_2.11-0.0.1.jar

執行結果

[email protected]/Users/zhouhh/test/spark/wordcount $ ls ~/wc.txt
_SUCCESS  part-00000  part-00001
[email protected]/Users/zhouhh/test/spark/wordcount $ head -10 ~/wc.txt/part-00000

(package,1)
(this,1)
(Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1)
(Because,1)
(Python,2)
(page](http://spark.apache.org/documentation.html).,1)
(cluster.,1)
(its,1)
([run,1)
(general,3)

參考

《learning spark》

如非註明轉載, 均為原創. 本站遵循知識共享CC協議,轉載請註明來源