1. 程式人生 > >spark scala基礎知識彙總

spark scala基礎知識彙總

前段時間搞了一陣spark scala,處理一個APP大半年的日誌。本意是將日誌格式化,挖掘其中有用的資訊,儘管後來變成了資料統計。但這段時間確實學習了spark scala,知道了這麼一個大資料處理工具。本文將一些基本用法記下來。

個人體會,spark是相對於MapReduce更高層次的抽象。使用MapReduce時,需要將每個任務拆分成Map和Reduce過程,在處理連續任務時,整個流程比較複雜。我在初次使用spark時,產生了如同使用matlab那般的想法,操作物件是資料集,有兩種操作:轉換(transformation)和動作(action)。spark完全封裝了資料的分佈、排程問題,而且充分利用記憶體的特性,通過把中間資料直接儲存在記憶體中來提高效能。程式設計師只需要關注如何將任務拆分成兩種變換即可。

讀寫檔案 

val lines = sc.textFile("file:///path_to_local/file")
val lines = sc.textFile("hdfs:///path_to_hdfs/file")
rdd.saveAsTextFile("hdfs://")
spark可以直接讀寫文字檔案,無論本地或者HDFS,無論是單個文字還是目錄,還可以對目錄進行正則匹配。得到的結果是以行為單位的文字集合,上述lines就是文字的行的集合。 如果是parquet格式檔案,可以用下面的辦法,得到一個DataFrame,同樣可以識別本地及hdfs檔案,也可以識別目錄及正則。
val parquetFile = sqlContext.read.parquet("people.parquet")
df.write.save("temp.parquet")
如果是JSON格式檔案,其讀寫辦法如下:
val df = sqlContext.read.json("path to json file")
val df = sqlContext.read.format("json").load("path to file")
df.write.format("json").save("path to save")

spark操作:轉換和動作

spark裡面一個核心的概念是RDD,簡而言之,就是一個可以操作的資料集合。如果你有matlab程式設計的經驗,你會比較習慣spark的程式設計思維,即對集合進行運算。spark支援兩種RDD操作,轉換和動作。轉換是對集合每個元素進行某種運算,得到一個新的集合。動作是對集合所有元素進行相同運算,最後聚合成一個元素。 下面是一個用spark統計字元數的例子例子:
val lines = sc.textFile("data.txt")            //讀檔案,得到以行字串為單位的RDD
val lineLengths = lines.map(s => s.length)    //轉換,將字串元素對映為其長度 
val totalLength = lineLengths.reduce((a, b) => a + b)        //動作,將所有元素加起來

最常用的轉換操作有兩個:map和filter,map(func)是將func應用到所有元素,得到一個新的RDD。filter是將func返回為true的元素過濾出來,組成一個新的RDD。一些比較常用的轉換如下:
map(func)  返回一個新的分散式資料集,將資料來源的每一個元素傳遞給函式 func 對映組成。
filter(func)  返回一個新的資料集,從資料來源中選中一些元素通過函式 func 返回 true。
flatMap(func)  類似於 map,但是每個輸入項能被對映成多個輸出項(所以 func 必須返回一個 Seq,而不是單個 item)。 union(otherDataset)   兩個RDD求並集
intersection(otherDataset)  兩個RDD求交集 groupByKey()  作用於(K,V)的資料集,依據K對值進行歸併,返回一個(K, Iterable) reduceByKey(func) 作用於(K,V)的資料集,依據K對值使用func進行歸約,返回一個(K,V)資料集 sortByKey([asending])  返回一個依據K進行排序的資料集 最常用的動作就是reduce,將資料集歸約為一個結果。一些比較常用的動作如下: reduce(func)  按照func函式對資料集進行歸約,func接受兩個引數,返回一個結果,須滿足結合律和交換律,以便於分散式計算。 count()返回資料集的元素個數 first()返回第一個元素 take(n)  以陣列形式返回集合的前n個元素 saveAsTextFile(path)將資料集儲存為文字檔案

一個單詞統計的例子

val textFile = sc.textFile("hdfs://...")                //讀取hdfs檔案,轉換為以行為單位的文字集合
val counts = textFile.flatMap(line => line.split(" "))  //轉換,將行字串轉換為單詞,組成新的RDD
                 .map(word => (word, 1))                //轉換,將單詞轉換為詞頻統計
                 .reduceByKey(_ + _)                    //轉換,根據key值進行歸約
counts.saveAsTextFile("hdfs://...")                     //儲存
在上面的程式碼中,下劃線表示臨時的變數。

自定義轉換函式

spark基本語法可以支援很多常用的轉換操作。但是在實際的業務場景中,對轉換的需求是比較複雜的。這就需要自定義轉換函式。舉個例子,後臺日誌中,以tag:123的形式記錄了某個含義的欄位。現在,要把這個數值全部提取出來。寫法如下:
object test {
	def fetch_tag(line : String) : Int = {
		try {
			val regex = "tag:([0-9]+)".r
			val regex(tag) = line
			tag.toInt
		}
		catch {
			case ex: Exception => 0
		}
	}
}
lines.map(test.fetch_tag(_)).reduce(_+_)
通過test.fetch_tag函式將字串中的要求值正則出來,最後返回一個值的資料集,然後進行歸約。 通過以上基本功能的組合,就能寫出處理複雜業務的工具了。