1. 程式人生 > >Spark學習散點總結

Spark學習散點總結

python spark 人的 bin == count mic alex ext

使用Spark 時,通常會有兩種模式。
一、在交互式編程環境(REPL, a.k.a spark-shell)下實現一些代碼,測試一些功能點。
二、像MapReduce 那樣提前編寫好源代碼並編譯打包(僅限 Java 或 Scala,Python 不需要),然後將程序代碼通過spark-submit 命令提交到 YARN 集群完成計算。

spark-shell

啟動 spark-shell 通常需要指定 master、executor 內存、executor 數量等參數。由於 YARN 集群有審計機制,每個人提交的 spark application 需要指定 name 參數,同時確保 name 是以個人的 LDAP 用戶名為後綴。另外,如果你不確定 driver 是否有足夠的內存能容納一個 RDD 的計算結果,建議不要使用 RDD 的 collect 方法而使用其 take 方法,否則會使 driver 發生 OOM。

  1.scala交互式編程環境

  通過命令啟動sprak-shell

/opt/tige/spark2/bin/spark-shell --master yarn-client --queue root.default --driver-memory 4g --executor-memory 8g--conf spark.dynamicAllocation.maxExecutors=10 --name spark_test_{your username} 

啟動spark後系統自動創建sc和sqlContext(HiveContext實例),可以使用它們來創建RDD或者DataFarme

  2.使用Python交互式編程環境

  通過命令pyspark

/opt/tiger/spark_deploy/spark2/bin/ipyspark --master yarn-client --queue root.default --driver-memory 4g --executor-memory 8g --num-executors 8 --name spark_test_${your LDAP user name}

spark-submit

首先我們需要使用 Spark 的 API 實現一個擁有入口(main)的程序,然後通過 spark-submit 提交到 YARN 集群。
  1. Scala 版本的 WordCount

    import org.apache.spark.{SparkConf, SparkContext}
      
    object WordCount extends App {
        val sparkConf = new SparkConf()
        sparkConf.setAppName("spark_test_${your LDAP user name}")
        sparkConf.setMaster("yarn-client")
        sparkConf.set("spark.driver.memory", "4g")
        sparkConf.set("spark.executor.memory", "8g")
        sparkConf.set("spark.dynamicAllocation.initialExecutors", "3")
        sparkConf.set("spark.dynamicAllocation.maxExecutors", "10")
        val sc = new SparkContext(sparkConf)
        val words = sc.textFile("/path/to/text/file")
        val wordCount = words.map(word => (word, 1)).reduceByKey(_ + _).collect()
        wordCount.foreach(println)
    }

    完成代碼編寫與編譯打包之後就可以通過 spark-submit 來提交應用了,命令如下:

    /opt/tiger/spark_deploy/spark2/bin/spark-submit --master yarn-client --class WordCount your_spark_test.jar

  2. python版本的WordCount
    from pyspark import SparkContext, SparkConf
    from operator import add
      
    if __name__ == __main__:
        conf = SparkConf()
        conf.setMaster(yarn-client)
        conf.setAppName(spark_test_${your LDAP user name})
        conf.set("spark.driver.memory", "4g")
        conf.set("spark.executor.memory", "8g")
        conf.set("spark.dynamicAllocation.initialExecutors", "3")
        conf.set("spark.dynamicAllocation.maxExecutors", "10")
        sc = SparkContext(conf=conf)
      
        words = sc.textFile("/path/to/text/file")
        wordCount = words.map(lambda word: (word, 1)).reduceByKey(add).collect()
        for key, value in wordCount:
            print key, value
    假設上面這段 Python 代碼的文件名為 your_spark_test.py,那麽提交這段代碼到 YARN 集群的命令如下:
    /opt/tiger/spark_deploy/spark2/bin/spark-submit --master yarn-client your_spark_test.py

Spark學習散點總結