pyspark之MLlib學習【載入和轉換資料】(1)
阿新 • • 發佈:2019-01-22
pyspark系列文章是本人根據《PySpark實戰指南》學習pyspark中學習筆記,這本書是一本譯文,有些地方感覺有點小問題,不過在本人的這些筆記中都是親自測試跑通後的小例子。僅作參考和學習。
1.概述
MLlib概括了單個核心機器學習功能:
- 資料準備:特徵提取、變換、選擇、分類特徵的雜湊和一些自然語言處理方法。
- 機器學習演算法:實現了一些流行和高階的迴歸,分類和聚類演算法。
- 使用程式:統計方法,如描述性統計、卡方檢驗、線性迴歸(稀疏稠密矩陣和向量)和模型評估方法。
雖然MLlib是著重為RDD和DStream設計的,但是為了方便轉換資料,我們將讀取資料並將其轉換為DataFrame
- 首先指定資料集的schema
from pyspark.sql import SparkSession
import pyspark.sql.types as typ
spark = SparkSession.builder.appName("mllibStudy").getOrCreate()
labels = [ ('INFANT_ALIVE_AT_REPORT', typ.StringType()), ('BIRTH_YEAR', typ.IntegerType()), ('BIRTH_MONTH', typ.IntegerType()), ('BIRTH_PLACE', typ.StringType()),('MOTHER_AGE_YEARS', typ.IntegerType()), ('MOTHER_RACE_6CODE', typ.StringType()), ('MOTHER_EDUCATION', typ.StringType()), ('FATHER_COMBINED_AGE', typ.IntegerType()), ('FATHER_EDUCATION', typ.StringType()), ('MONTH_PRECARE_RECODE', typ.StringType()), ('CIG_BEFORE', typ.IntegerType()), ('CIG_1_TRI', typ.IntegerType()),('CIG_2_TRI', typ.IntegerType()), ('CIG_3_TRI', typ.IntegerType()), ('MOTHER_HEIGHT_IN', typ.IntegerType()), ('MOTHER_BMI_RECODE', typ.IntegerType()), ('MOTHER_PRE_WEIGHT', typ.IntegerType()), ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()), ('MOTHER_WEIGHT_GAIN', typ.IntegerType()), ('DIABETES_PRE', typ.StringType()), ('DIABETES_GEST', typ.StringType()), ('HYP_TENS_PRE', typ.StringType()), ('HYP_TENS_GEST', typ.StringType()), ('PREV_BIRTH_PRETERM', typ.StringType()), ('NO_RISK', typ.StringType()), ('NO_INFECTIONS_REPORTED', typ.StringType()), ('LABOR_IND', typ.StringType()), ('LABOR_AUGM', typ.StringType()), ('STEROIDS', typ.StringType()), ('ANTIBIOTICS', typ.StringType()), ('ANESTHESIA', typ.StringType()), ('DELIV_METHOD_RECODE_COMB', typ.StringType()), ('ATTENDANT_BIRTH', typ.StringType()), ('APGAR_5', typ.IntegerType()), ('APGAR_5_RECODE', typ.StringType()), ('APGAR_10', typ.IntegerType()), ('APGAR_10_RECODE', typ.StringType()), ('INFANT_SEX', typ.StringType()), ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()), ('INFANT_WEIGHT_GRAMS', typ.IntegerType()), ('INFANT_ASSIST_VENTI', typ.StringType()), ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()), ('INFANT_NICU_ADMISSION', typ.StringType()), ('INFANT_SURFACANT', typ.StringType()), ('INFANT_ANTIBIOTICS', typ.StringType()), ('INFANT_SEIZURES', typ.StringType()), ('INFANT_NO_ABNORMALITIES', typ.StringType()), ('INFANT_ANCEPHALY', typ.StringType()), ('INFANT_MENINGOMYELOCELE', typ.StringType()), ('INFANT_LIMB_REDUCTION', typ.StringType()), ('INFANT_DOWN_SYNDROME', typ.StringType()), ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()), ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()), ('INFANT_BREASTFED', typ.StringType())]
- 載入資料
資料下載地址:http://www.tomdrabas.com/data/LearningPySpark/births_train.csv.gz
#載入資料 filePath = "file:../testData/births_train.csv.gz" births =spark.read.csv(filePath,header=True,schema=schema) #檢視資料 births.show()
從結果中可以看出,我們的資料集中很多屬性都是字串型別的,我們可以將其轉換為數字形式的。資料集中有大量的值是Yes/No/Unkonwn;而我們將僅僅把Yes編碼為1,其他值設定為0
- 定義需要等價的編碼:
#定義編碼 recode_dictionary = { 'YNU':{ "Y":1, "N":0, "U":0 } }我們的目標是預測:“INFANT_ALIVE_AT_REPORT”是0還是1.因此:
- 我們將僅僅基於與其父母親和出生地點相關的特徵來預測嬰兒的存活機率,丟棄其他與嬰兒相關的所有特徵
#選取部分特徵 selected_features = [ 'INFANT_ALIVE_AT_REPORT', 'BIRTH_PLACE', 'MOTHER_AGE_YEARS', 'FATHER_COMBINED_AGE', 'CIG_BEFORE', 'CIG_1_TRI', 'CIG_2_TRI', 'CIG_3_TRI', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT', 'MOTHER_DELIVERY_WEIGHT', 'MOTHER_WEIGHT_GAIN', 'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM' ] births_trimmed = births.select(selected_features)
#檢視選取後的資料 births_trimmed.show()
其中還有一個問題是,母親吸菸的數量如何編碼:因為0意味著母親在懷孕前或者懷孕期間是沒有抽菸的,1~97表示母親實際吸菸數量,98代表母親實際吸菸資料量是98或者更多,而99代表母親實際吸菸數量未知,在此我們假設未知狀態為0。
- 指定新的編碼
#重新編碼 import pyspark.sql.functions as func def recode(col,key): return recode_dictionary[key][col] def correct_cig(feat): return func.when(func.col(feat)!= 99,func.col(feat)).otherwise(0) rec_integer = func.udf(recode,typ.IntegerType())recode方法從recorde_dictionary中給出的鍵值中查詢正確的鍵,並返回更正的值.correct_cig方法檢測如下:當feat屬性的值不等於99時,返回屬性的值;如果值為99,則返回0。
我們不可以直接在DataFrame上使用recode函式,它需要轉化為Spark可以理解的UDF。
rec_integer是指:通過傳遞我們制定的recode函式及制定返回值資料型別,從而可以用來實現對Yes/No/Unknown屬性值的編碼
- 更正與吸菸數量相關的屬性。
#更正與吸菸數量相關的特徵: births_transformed = births_trimmed\ .withColumn('CIG_BEFORE',correct_cig('CIG_BEFORE'))\ .withColumn('CIG_1_TRI',correct_cig('CIG_1_TRI'))\ .withColumn('CIG_2_TRI',correct_cig('CIG_2_TRI'))\ .withColumn('CIG_3_TRI',correct_cig('CIG_3_TRI')).withColumn()方法用列名作為其第一個引數,轉換作為第二個引數。
- 選取需要轉換的Yes/No/Unknown屬性(特徵)
#選取需要轉換的特徵 cols = [(col.name,col.dataType) for col in births_trimmed.schema] YNU_cols = [] for i,s in enumerate(cols): if s[1] == typ.StringType(): dis = births.select(s[0]).distinct().rdd.map(lambda row:row[0]).collect() if 'Y' in dis: YNU_cols.append(s[0])
(注意)如果報錯為:Unexpected end of input stream,則說明資料有問題,可以檢視資料是否損壞等。
- 批量轉換特徵(DataFrame可以在選擇特徵時批量轉換特徵)
# #批量轉換特徵 births.select(['INFANT_NICU_ADMISSION', rec_integer('INFANT_NICU_ADMISSION', func.lit('YNU')).alias('INFANT_NICU_ADMISSION_RECODE')])\ .take(5)在上面,我們選擇了“INFANT_NICU_ADMISSION”列,並將其特徵的名稱傳遞給rec_integer方法,並設定新轉換的列的別名設定為“INFANT_NICU_ADMISSION_RECODE”
- 一次性準換所有的YNU_cols(轉換列表)
#一次性轉換所有的YNU_cols exprs_YNU = [ rec_integer(x,func.lit('YNU')).alias(x) if x in YNU_cols else x for x in births_transformed.columns ] births_transformed = births_transformed.select(exprs_YNU)
- 檢查結果是否正確
#檢查結果 births_transformed.select(YNU_cols[-5:]).show(5) spark.stop()
+------------+-------------+------------+-------------+------------------+ |DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM| +------------+-------------+------------+-------------+------------------+ | 0| 0| 0| 0| 0| | 0| 0| 0| 0| 0| | 0| 0| 0| 0| 0| | 0| 0| 0| 0| 1| | 0| 0| 0| 0| 0| +------------+-------------+------------+-------------+------------------+從結果可以看出,轉換正常。
今天就先到這裡吧 ~_~