1. 程式人生 > >大資料之Spark(一)--- Spark簡介,模組,安裝,使用,一句話實現WorldCount,API,scala程式設計,提交作業到spark叢集,指令碼分析

大資料之Spark(一)--- Spark簡介,模組,安裝,使用,一句話實現WorldCount,API,scala程式設計,提交作業到spark叢集,指令碼分析

一、Spark簡介
----------------------------------------------------------
    1.快如閃電的叢集計算
    2.大規模快速通用的計算引擎
    3.速度: 比hadoop 100x,磁碟計算快10x
    4.使用: java / Scala /R /python
    5.提供80+運算元(操作符),容易構建並行應用。
    6.通用: 組合SQL ,流計算 + 複雜分析。
    7.執行: Hadoop, Mesos, standalone, or in the cloud,local.
    8.DAG    //direct acycle graph,有向無環圖


二、Spark模組
--------------------------------------------------------
    Spark core    //核心模組
    Spark SQL     //SQL
    Spark Streaming    //流計算
    Spark MLlib       //機器學習
    Spark graph       //圖計算


三、安裝Spark
--------------------------------------------------------
    1.下載spark-2.1.0-bin-hadoop2.7.tgz
        ..
    2.解壓
        ..
    3.環境變數
        [/etc/profile]
        SPARK_HOME=/soft/spark
        PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
        /soft/spark/bin
        /soft/spark/sbin
        [source]
        $>source /etc/profile

    4.驗證spark

        $>cd /soft/spark
        $>./spark-shell

    5.webui
        http://s100:4040/


四、使用Spark
--------------------------------------------------------
    1.進入終端
        $>

    2.sc ==> spark程式的入口點,封裝了整個spark執行環境的資訊
        scala> sc
        sc
        res0: org.apache.spark.SparkContext = 
[email protected]
五、一句話實現WorldCount ---------------------------------------------------------- //載入檔案,返回RDD[按行切割] scala> val rdd1 = sc.textFile("/home/ubuntu/downloads/1.txt"); rdd1: org.apache.spark.rdd.RDD[String] = /home/ubuntu/downloads/1.txt MapPartitionsRDD[1] at textFile at <console>:24 //開始按照',' 進行切割,然後壓扁成一個集合,單個單詞的集合 rdd1.flatMap(line => {line.split(",")}) //進行 單詞和數量的對映 map(word => (word,1)) //reduce聚合,按照key val rdd2 = reduceByKey(_ + _) //檢視單詞統計的結果 scala> rdd2.collect res2: Array[(String, Int)] = Array((tom1,1), (4,1), (14,1), (7,1), (15,1), (5,1), (tom2,1), (6,1), (tom6,1), (2,1), (16,1), (3,1), (tom3,1), (tom4,1), (17,1), (12,1), (13,1), (tom5,1), (1,1), (11,1), (tom7,1)) //一句話實現wc scala> sc.textFile("/home/ubuntu/downloads/1.txt").flatMap(line => {line.split(",")}).map(word => (word,1)).reduceByKey(_ + _).collect //加單詞過濾,遮蔽"tom"關鍵字 scala> sc.textFile("/home/ubuntu/downloads/1.txt") .flatMap(line => {line.split(",")}) .filter(!_.contains("tom")) .map(word => (word,1)) .reduceByKey(_ + _) .collect res6: Array[(String, Int)] = Array((4,1), (14,1), (7,1), (15,1), (5,1), (6,1), (2,1), (16,1), (3,1), (17,1), (12,1), (13,1), (1,1), (11,1)) 六、API ------------------------------------------------------ 1.[SparkContext] Spark功能的主要入口點。代表到Spark叢集的連線,可以建立RDD、累加器和廣播變數. 每個JVM只能啟用一個SparkContext物件,在建立sc之前需要stop掉active的sc。 2.[RDD] resilient distributed dataset,彈性分散式資料集。等價於集合。以換行符作為檔案分割 3.[SparkConf] spark配置物件,設定Spark應用各種引數,kv形式 七、scala程式設計 -- idea 引入spark類庫,完成wordcount ------------------------------------------------------------- 1.建立spark模組 2.新增maven支援 <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies> 3.新增scala支援 4.編寫Object -- WorldCountDemo import org.apache.spark.{SparkConf, SparkContext} object WorldCountDemo { def main(args: Array[String]): Unit = { //建立spark配置物件 val conf = new SparkConf(); //設定appname conf.setAppName("sparkwc"); //設定本地模式 conf.setMaster("local"); //建立核心 -- 上下文 val sc = new SparkContext(conf); val rdd1 = sc.textFile("d:\\calllog.log"); val rdd2 = rdd1.flatMap(line => line.split(",")); val rdd3 = rdd2.map(word => (word,1)); val rdd4 = rdd3.reduceByKey(_ + _); val r = rdd4.collect(); r.foreach(e => println(e)); } } 5.執行app,檢視結果 6.java實現 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.*; /** * 單詞統計java版 */ public class WorldCountDemoJava { public static void main(String [] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("wcjava"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD jrdd1 = jsc.textFile("d:\\calllog.log"); //壓扁成單個單詞 JavaRDD jrdd2 = jrdd1.flatMap(new FlatMapFunction<String,String>() { public Iterator call(String s) throws Exception { List<String> list = new ArrayList<String>(); String [] strs = s.split(","); for(String ss : strs) { list.add(ss); } return list.iterator(); } }); //完成 單詞到數量的對映(word -- (word,1)) JavaPairRDD<String,Integer> jrdd3 = jrdd2.mapToPair(new PairFunction<String,String,Integer>() { public Tuple2<String,Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s,1); } }); //開始統計 Map<String, Long> map = jrdd3.countByKey(); Set<String> set = map.keySet(); for(String s : set) { System.out.println(s + ":" + map.get(s)); } } } 八、提交作業到spark叢集上執行 ------------------------------------------------ 1.匯出jar包 2.複製到共享目錄下 3.使用spark-submit提交命令,執行jar $> spark-submit --master local --name wc --class com.spark.demo.java.WorldCountDemoJava TestSpark-1.0-SNAPSHOT.jar /home/ubuntu/downloads/1.txt $> spark-submit --master local --name wc --class com.spark.demo.scala.WorldCountDemoScala TestSpark-1.0-SNAPSHOT.jar /home/ubuntu/downloads/1.txt 九、部署spark叢集 ----------------------------------------------- 1.local nothing! spark-shell --master local; //預設 2.standalone 獨立。 a)複製spark目錄到其他主機 b)配置其他主機的所有環境變數 [/etc/profile] SPARK_HOME PATH c)配置master節點s100的slaves,並分發到所有節點 [/soft/spark/conf/slaves] s202 s203 s204 d)在s100上啟動spark叢集 /soft/spark/sbin/start-all.sh [為了避免和hadoop叢集混淆,要進入到sbin目錄下執行sh檔案] e)檢視程序 $>xcall.jps jps master //s100 worker //s200 worker //s300 worker //s400 e)webui http://s100:8080/ 十、提交作業jar作業到完全分散式spark叢集 -------------------------------------------------------- 1.啟動hadoop的hdfs叢集 $> start-dfs.sh 2.put要進行單詞統計的檔案到hdfs 3.執行spark-submit $> spark-submit --master spark://s100:7077 --name wc --class com.spark.demo.java.WorldCountDemoJava TestSpark-1.0-SNAPSHOT.jar hdfs://s500:8020/data/spark/1.txt $> spark-submit --master spark://s100:7077 --name wc --class com.spark.demo.scala.WorldCountDemoScala TestSpark-1.0-SNAPSHOT.jar hdfs://s500:8020/data/spark/1.txt 十一、指令碼分析 -------------------------------------------------------- [start-all.sh] sbin/spark-config.sh sbin/spark-master.sh //啟動master程序 sbin/spark-slaves.sh //啟動worker程序 [start-master.sh] sbin/spark-config.sh org.apache.spark.deploy.master.Master spark-daemon.sh start org.apache.spark.deploy.master.Master --host --port --webui-port ... [spark-slaves.sh] sbin/spark-config.sh slaves.sh //conf/slaves [slaves.sh] for conf/slaves{ ssh host start-slave.sh ... } [start-slave.sh] CLASS="org.apache.spark.deploy.worker.Worker" sbin/spark-config.sh for (( .. )) ; do start_instance $(( 1 + $i )) "
[email protected]
" done $>cd /soft/spark/sbin $>./stop-all.sh //停掉整個spark叢集. $>./start-all.sh //啟動整個spark叢集. $>./start-master.sh //啟動master節點 $>./start-slaves.sh //啟動所有worker節點 $s400>./start-slave.sh spark://s100:7077 //在s400上啟動單個worker節點