1. 程式人生 > >spark入門二(運算元介紹核wordcount入門)

spark入門二(運算元介紹核wordcount入門)

[[email protected] ~]# cd /usr/local/apps/spark-2.3.2-bin-hadoop2.7/
[[email protected] spark-2.3.2-bin-hadoop2.7]# ./sbin/start-all.sh

啟動日誌如下

starting org.apache.spark.deploy.master.Master, logging to /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-master.out
192.168.153.131: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slvae3.out
192.168.153.130: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave2.out
192.168.153.129: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave1.out
192.168.153.131: failed to launch: nice
-n 0 /usr/local/apps/spark-2.3.2-bin-hadoop2.7/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master:7077 192.168.153.130: failed to launch: nice -n 0 /usr/local/apps/spark-2.3.2-bin-hadoop2.7/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master:7077 192.168.153.130: full log in
/usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave2.out 192.168.153.131: full log in /usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slvae3.out 192.168.153.129: failed to launch: nice -n 0 /usr/local/apps/spark-2.3.2-bin-hadoop2.7/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master:7077 192.168.153.129: full log in
/usr/local/apps/spark-2.3.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave1.out

啟動spark-shell

spark-shell --master spark://master:7077 --total-executor-cores 2 --executor-memory 513m

啟動日誌如下:

18/10/03 10:30:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://master:4040
Spark context available as 'sc' (master = spark://master:7077, app id = app-20181003103051-0000).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

獲取SparkContext

scala> sc
res0: org.apache.spark.SparkContext = [email protected]

讀取本地檔案:

scala> val file = sc.textFile("licenses")
scala> file.count

wordcount程式碼如下:

scala> sc.textFile("licenses").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect

結果如下:

res2: Array[(String, Int)] = Array(("",1995), (the,366), (OR,303), (#,279), (OF,262), (of,214), (THE,204), (and,180), (to,160), (ANY,136), (in,130), (IN,121), (this,113), (AND,110), (or,104), (following,85), (FOR,82), (conditions,76), (without,74), (copyright,71), (*,69), (NOT,66), (above,61), (BUT,60), (LIMITED,57), (LIABILITY,,56), (is,54), (SOFTWARE,54), (provided,53), (with,53), (COPYRIGHT,50), (source,50), (THIS,49), (binary,49), (are,48), (IMPLIED,47), (Redistributions,46), (be,46), (list,46), (must,46), (notice,,46), (software,45), (TO,,45), (Copyright,44), (NO,44), (CONTRIBUTORS,43), ((c),43), (any,42), (that,42), (DAMAGES,41), (USE,40), (SHALL,40), (LIABLE,40), (BE,40), (rights,40), (WARRANTIES,39), (FITNESS,38), (PSF,38), (A,38), (and/or,38), (PARTICULAR,38), (documentation,38...

讀取hdfs的檔案並儲存結果到hdfs中:

scala> sc.textFile("hdfs://master:9000/wc/wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://master:9000/wc/out1")

檢視輸出結果:

[[email protected] apps]# hadoop fs -ls /wc/out1
Found 3 items
-rw-r--r--   3 root supergroup          0 2018-10-03 17:27 /wc/out1/_SUCCESS
-rw-r--r--   3 root supergroup         76 2018-10-03 17:27 /wc/out1/part-00000
-rw-r--r--   3 root supergroup        417 2018-10-03 17:27 /wc/out1/part-00001

檢視檔案內容:

[[email protected] apps]# hadoop fs -cat /wc/out/part-*

結果略去。。。

將輸出的結果儲存到一個檔案裡面:

scala> sc.textFile("hdfs://master:9000/wc/wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_,1).sortBy(_._2,false).saveAsTextFile("hdfs://master:9000/wc/out2")

檢視結果如下:

[[email protected] apps]# hadoop fs -ls /wc/out2
Found 2 items
-rw-r--r--   3 root supergroup          0 2018-10-03 17:31 /wc/out2/_SUCCESS
-rw-r--r--   3 root supergroup        493 2018-10-03 17:31 /wc/out2/part-00000

分析運算元:
分類

  • Transform(轉換)
  • Action (動作)

案例:呼叫 sc.textFile(“hdfs://master:9000/wc/wc.txt”) 產生了一個rdd,這裡rdd沒有資料,textFile是一個transform,並非一個動作,並不會執行真正的計算。

scala> val file = sc.textFile("hdfs://master:9000/wc/wc.txt")
file: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/wc/wc.txt MapPartitionsRDD[57] at textFile at <console>:24

動作執行:

scala> sc.textFile("hdfs://master:9000/wc/wc.txt").flatMap(_.split(" ")).map((_,1)).collect
res8: Array[(String, Int)] = Array((Apache,1), (Spark,1), (is,1), (a,1), (fast,1), (and,1), (general-purpose,1), (cluster,1), (computing,1), (system.,1), (It,1), (provides,1), (high-level,1), (APIs,1), (in,1), (Java,,1), (Scala,,1), (Python,1), (and,1), (R,,1), (and,1), (an,1), (optimized,1), (engine,1), (that,1), (supports,1), (general,1), (execution,1), (graphs.,1), (It,1), (also,1), (supports,1), (a,1), (rich,1), (set,1), (of,1), (higher-level,1), (tools,1), (including,1), (Spark,1), (SQL,1), (for,1), (SQL,1), (and,1), (structured,1), (data,1), (processing,,1), (MLlib,1), (for,1), (machine,1), (learning,,1), (GraphX,1), (for,1), (graph,1), (processing,,1), (and,1), (Spark,1), (Streaming.,1))

常用運算元介紹:

通過並行化的方式建立RDD

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[62] at parallelize at <console>:24

檢視該rdd的分取數量:

scala> rdd1.partitions.length
res10: Int = 2

將每個元素乘以10

scala> val rdd2 = rdd1.map(_*10).collect
rdd2: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80)

將rdd2裡面小於50的元素取出來

scala> rdd1.map(_*10).filter(_ < 50).collect
res16: Array[Int] = Array(10, 20, 30, 40)