1. 程式人生 > >spark | 手把手教你用spark進行資料預處理

spark | 手把手教你用spark進行資料預處理

本文始發於個人公眾號:**TechFlow**,原創不易,求個關注

今天是spark專題的第七篇文章,我們一起看看spark的資料分析和處理。

過濾去重

在機器學習和資料分析當中,對於資料的瞭解和熟悉都是最基礎的。所謂巧婦難為無米之炊,如果說把用資料構建一個模型或者是支撐一個複雜的上層業務比喻成做飯的話。那麼資料並不是“米”,充其量最多隻能算是未脫殼的稻。要想把它做成好吃的料理,必須要對原生的稻穀進行處理。

但是處理也並不能亂處理,很多人做資料處理就是悶頭一套三板斧。去空值、標準化還有one-hot,這一套流程非常熟悉。以至於在做的時候都不會想,做這些處理的意義是什麼。我們做資料處理也是有的放矢的,針對不同的情況採取不同的策略。所以說到這裡,你應該已經明白了,首要任務還是需要先對資料有個基本的瞭解,做到心中有數。

那麼怎麼做到心中有數呢?我們先來看一個具體的例子,假設現在我們有了這麼一批資料:

df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])

這批資料粗略看起來沒什麼問題,但實際上藏著好幾個坑。

首先,id為3的資料有兩條,不僅如此,這兩條資料的特徵也完全一樣。其次,id為1和4的資料特徵也完全相同,只是id不同。除此之外,id為5的資料也有兩條,但是它們的特徵都不同。顯然這不是同一條資料,應該是記錄的時候出現的錯誤。

那麼對於這樣一份資料,我們怎麼發現它們當中的問題,又怎麼修正呢?

我們先從最簡單開始,先來找找完全一樣的資料。我們通過count方法可以求出整個資料集當中的條數,通過distinct().count()可以獲得去重之後的資料數量。這兩個結合一起使用,就可以看出是否存在資料完全重複的情況。

可以看出來,直接count是7條,如果加上distinct的話是6條,也就是說出現了資料的完全重複。那麼我們可以知道,我們需要做一下去重,去除掉完全重複的行,要去除也非常簡單,dataframe當中自帶了dropDuplicates方法,我們直接呼叫即可:

很明顯,剛才兩條完全一樣id為3的資料少了一條,被drop掉了。

接下來,我們繼續分析,怎麼判斷是否存在id不同但是其他資料相同的情況呢?

其實也是一樣使用distinct.count,只不過我們需要把count distinct運算的範疇去除掉id。我們可以通過columns獲取dataframe當中的列名,我們遍歷一下列名,過濾掉id即可。

這裡我們依然還是套用的distinct.count只不過我們在使用之前通過select限制了使用範圍,只針對除了id之外的列進行去重的計算。

不僅distinct如此,dropDuplicate同樣可以限制作用的範圍。使用的方法也很簡單,我們通過subset這個變數來進行控制,我們傳入一個list,表示duplicate的範圍。

可以很明顯地看到,我們的資料又減少了一條。說明我們去除掉了id不同但是內容一樣的情況,最後還剩下id相同,但是內容不同的情況。這種情況一般是由於記錄的時候發生了錯誤,比如併發沒有處理好,導致兩條不同的資訊採用了同一個id。

這個很簡單,因為我們已經經過了整體去重了,所以正常是不應該存在id一樣的條目的。所以我們只需要判斷id是否有重複就好了。判斷的方法也很簡單,我們count一下id的數量。

這裡我們可以和之前一樣通過distinct.count來判斷,這裡我們介紹一種新的方法,叫做agg。agg是aggregate的縮寫,直譯過來是聚合的意思。通過agg我們可以對一些列進行聚合計算,比如說sum、min、max這些。在這個問題當中,我們要進行的聚合計算就是count和count distinct,這兩個也有現成的函式,我們匯入就可以直接用了。

也就是說通過agg我們可以同時對不同的列進行聚合操作,我們發現加上了distinct之後,只剩下了4條,說明存在兩條不同的資料id一樣的情況。

接下來我們要做的就是給這些資料生成新的id,從而保證每一條資料的id都是unique的。這個也有專門的函式,我們直接呼叫就好:

monotonically_increasing_id這個方法會生成一個唯一併且遞增的id,這樣我們就生成了新的id,完成了整個資料的去重過濾。

空值處理

當我們完成了資料的過濾和清洗還沒有結束,我們還需要對空值進行處理。因為實際的資料往往不是完美的,可能會存在一些特徵沒有收集到資料的情況。空值一般是不能直接進入模型的,所以需要我們對空值進行處理。

我們再建立一批資料:

df_miss = spark.createDataFrame([
(1, 143.5, 5.6, 28, 'M', 100000),
(2, 167.2, 5.4, 45, 'M', None),
(3, None , 5.2, None, None, None),
(4, 144.5, 5.9, 33, 'M', None),
(5, 133.2, 5.7, 54, 'F', None),
(6, 124.1, 5.2, None, 'F', None),
(7, 129.2, 5.3, 42, 'M', 76000),
], ['id', 'weight', 'height', 'age', 'gender', 'income'])

這份資料和剛才的相比更加貼近我們真實的情況,比如存在若干行資料大部分列為空,存在一些列大部分行為空。因為現實中的資料往往是分佈不均勻的,存在一些特徵和樣本比較稀疏。比如有些標籤或者是行為非常小眾,很多使用者沒有,或者是有些使用者行為非常稀疏,只是偶爾使用過產品,所以缺失了大部分特徵。

所以我們可能會希望檢視一下有哪些樣本的缺失比較嚴重,我們希望得到一個id和缺失特徵數量對映的一個pair對。這個操作通過dataframe原生的api比較難實現,我們需要先把dataframe轉成rdd然後再通過MapReduce進行:

image-20200525163206376

我們可以看到是3對應的缺失值最多,所以我們可以單獨看下這條資料:

我們可能還會向看下各列缺失值的情況,究竟有多少比例缺失了。由於我們需要對每一列進行聚合,所以這裡又用到了agg這個方法:

這段程式碼可能看起來稍稍有一點複雜,因為用到了*這個操作。因為當agg這個函式傳入一個list之後,可以對多列進行操作。而在這裡,我們要對每一列進行統計。由於列數很多,我們手動列舉顯然是不現實的。所以我們用迴圈實現,*操作符的意思就是將迴圈展開。count('*')等價於SQL語句當中的count(1),也就是計算總條數的意思。

從結果當中我們可以看出來,income這個特徵缺失得最嚴重,足足有71%的資料是空缺的。那麼顯然這個特徵對我們的用處很小,因為缺失太嚴重了,也不存在填充的可能。所以我們把這行去掉:

我們去掉了income之後發現還是存在一些行的缺失非常嚴重,我們希望設定一個閾值,將超過一定數量特徵空缺的行過濾,因為起到的效果也很小。

這個功能不用我們自己開發了,dataframe當中原生的api就支援。

經過這樣的處理之後,剩下的缺失就比較少了。這個時候我們就不希望再進行刪除了,因為只有個別資料空缺,其他資料還是有效果的, 如果刪除了會導致資料量不夠。所以我們通常的方式是對這些特徵進行填充。

缺失值填充是一種非常常見的資料處理方式,填充的方式有好幾種。比如可以填充均值,也可以填充中位數或者是眾數,還可以另外訓練一個模型來根據其他特徵來預測。總之手段還是挺多的,我們這裡就用最簡單的方法,也就是均值來填充。看看spark當中使用均值填充是怎麼操作的。

既然要填充,那麼顯然需要先算出均值。所以我們首先要算出每一個特徵的均值。這裡性別是要排除的,因為性別是類別特徵,不存在均值。所以如果要填充性別的話,就只能填充眾數或者是用模型來預測了,不能直接用均值。

均值的計算本身並不複雜,和剛才的一系列操作差不多。但是有一點需要注意,我們這裡得到了結果但是卻不能直接作為引數傳入。因為dataframe中的fillna方法只支援傳入一個整數、浮點數、字串或者是dict。所以我們要把這份資料轉化成dict才行。這裡的轉化稍稍有些麻煩,因為dataframe不能直接轉化,我們需要先轉成pandas再呼叫pandas當中的to_dict方法。

我們有了dict型別的均值就可以用來填充了:

總結

在實際的工作或者是kaggle比賽當中,涉及的資料處理和分析的流程遠比文章當中介紹到的複雜。但去重、過濾、填充是資料處理當中最基礎也是最重要的部分。甚至可以說無論應用場景如何變化,解決問題的方法怎麼更新,這些都是不可缺失的部分。

如果喜歡本文,可以的話,請點個關注,給我一點鼓勵,也方便獲取更多文章。

本文使用 mdnice 排版

![](https://user-gold-cdn.xitu.io/2020/7/2/1730db75b52634ed?w=258&h=258&f=png&