1. 程式人生 > >Spark-Programming Guides官網一

Spark-Programming Guides官網一

Quick Start

Spark2.0之前是RDD(彈性分散式資料集)。Spark2.0後RDD被Dataset取代

Interactive Analysis with the Spark Shell

  1. 讀檔案得到資料集 textFile=spaek.read.text(“README.md”) 注意從本地讀使用絕對路徑 file:/wdd/app/spark/README.mdhdfs上讀 用絕對路徑 hdfs:/wdd/…

  2. 計數textFile.count() 顯示第一個textFile.first()

  3. 將這個資料集轉成一個新的資料集 linesWithSpark = textFile.filter(textFile.value.contains(“Spark”)

    ) 將資料集按行讀取每讀取一行取得其值判斷是否包含Spark,包含就留著加入新飛資料集中

More on Dataset Operations

  1. \s表示 空格,回車,換行等空白符,+號表示一個或多個的意思,所以 split("\s+") 這個就能實現你的 多個空格切割的效果
  2. from pyspark.sql.functions import * textFile.select(size(split(textFile.value,"\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)]
    對每行按空格切分後計數並命名為numWords,然後呼叫agg找到最大計數字
  3. wordCounts = textFile.select(explode(split(textFile.value, “\s+”)).alias(“word”)).groupBy(“word”).count() explode:我們在select中使用爆炸式函式,將行資料集轉換為詞資料集

Caching

  1. 如果某個詞反覆用到我們可以把它放到叢集的記憶體中去 linesWithSpark.cache()

Self-Contained Applications

簡單的應用程式

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

使用SparkSession建立資料集 這個程式只計算文字檔案中包含“a”的行數和包含“b”的行數。注意,您需要將您的_spark_home替換為Spark安裝位置。與Scala和Java示例一樣,我們使用SparkSession建立資料集。對於使用自定義類或第三方庫的應用程式,我們還可以通過將程式碼打包為.zip檔案(請參閱spark-submit—help獲取詳細資訊),通過它的-py-files引數向spark-submit新增程式碼依賴項。SimpleApp非常簡單,我們不需要指定任何程式碼依賴關係。

Where to Go from Here

1.For Scala and Java, use run-example: ./bin/run-example SparkPi

2.For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py

3.For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R

踩坑

worker記憶體不能低於1G local[n] n個執行緒必須與你虛擬機器上設定的邏輯執行緒個數一致