1. 程式人生 > >Spark機器學習之特徵提取、選擇、轉換

Spark機器學習之特徵提取、選擇、轉換

本節介紹了處理特徵的演算法,大致分為以下幾組:
     1、提取:從“原始”資料提取特徵
     2、轉換:縮放,轉換或修改要素
     3、選擇:從一組較大的要素中選擇一個子集
     4、區域性敏感雜湊(LSH):這類演算法將特徵變換的方面與其他演算法相結合。


1、特徵提取
1.1 TF-IDF
(term frequency–inverse document frequency/詞頻-逆文字/文件頻率)
    詞頻-逆文字頻率(TF-IDF)是在文字挖掘中廣泛使用的特徵向量化方法,反映了語料庫中一個詞對文件的重要性(語料庫中的其中一份文件的重要程度)。 
用t表示一個單詞,用d表示文件,用D表示語料庫。詞頻率TF(t,d)是單詞t出現在文件d中的次數,而文件頻率DF(t,D)是單詞t在語料庫D中的頻率(出現單詞t的文件的次數)。 
如果我們只使用詞頻率來測量重要性,那麼很容易過分強調出現得非常頻繁但是攜帶關於文件的資訊很少的單詞,例如, “a”,“the”和“of”。 如果一個單詞在語料庫中經常出現,
則意味著它不攜帶關於特定文件的特殊資訊。 逆文字頻率是一個單詞提供多少資訊的數值度量:
 
                            
    其中| D | 是語料庫中文件的總數。 由於使用對數,如果一個單詞出現在所有文件中,則其IDF值變為0。如果一個詞越常見,那麼分母就越大,逆文件頻率就越小越接近0。
分母之所以要加1,是為了避免分母為0(即所有文件都不包含該單詞的情況下出現0)。TF-IDF度量僅僅是TF和IDF的乘積:
                                                                                                             
    詞頻率和文件頻率的定義有幾個變體。 在MLlib中,我們分離TF和IDF以使它們靈活。
    
    TF:HashingTF和CountVectorizer都可以用於生成詞頻率向量。
    HashingTF是一個變換器,它採用詞集合並將這些集合轉換成固定長度的特徵向量。在文字處理中,“一組詞”可能是一袋詞。 HashingTF使用雜湊技巧。
通過應用雜湊函式將原始要素對映到索引。這裡使用的雜湊函式是MurmurHash 3。然後基於對映的索引來計算項頻率。這種方法避免了計算全域性術語到索引對映的需要,
其對於大語料庫可能是昂貴的,但是它遭受潛在的雜湊衝突,其中不同的原始特徵可能在雜湊後變成相同的術語。為了減少衝突的機會,我們可以增加目標要素維度,即雜湊表的桶數。
由於使用簡單模數將雜湊函式轉換為列索引,因此建議使用2的冪作為要素維度,否則不會將要素均勻對映到列。預設要素尺寸為2的18次方 = 262144。可選的二進位制切換引數控制單詞頻率計數。
設定為true時,所有非零頻率計數都設定為1.這對於模擬二進位制計數而不是整數計數的離散概率模型特別有用。
    CountVectorizer將文字文件轉換為單詞計數的向量。 有關更多詳細資訊,請參閱CountVectorizer。
    IDF:IDF是一個擬合數據集並生成IDFModel的估計器。 IDFModel獲取特徵向量(通常由HashingTF或CountVectorizer建立)並縮放每個列。 直觀地,它降低了語料庫中頻繁出現的列的權重。
    注意:spark.ml不提供文字分段工具。 推薦使用者使用NLP、scalanlp和chalk。
    
    Examples:
    在下面的程式碼段中,我們從一組句子開始。 我們使用Tokenizer將每個句子分成單詞。 對於每個句子(詞袋),我們使用HashingTF將句子雜湊成特徵向量。 我們使用IDF重新縮放特徵向量; 
這通常會提高使用文字作為特徵時的效能。 然後我們的特徵向量可以傳遞到學習演算法中。


from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
    ],["label", "sentence"])


tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)


hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
#CountVectorizer也可以用於獲得項頻率向量


idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)


rescaledData.select("label", "features").show()
rescaledData.take(1)
輸出:
[Row(label=0.0, sentence='Hi I heard about Spark', words=['hi', 'i', 'heard', 'about', 'spark'],
rawFeatures=SparseVector(20, {0: 1.0, 5: 1.0, 9: 1.0, 17: 2.0}), features=SparseVector(20, {0: 0.6931, 5: 0.6931, 9: 0.2877, 17: 1.3863}))]

1.2 Word2Vec
Word2Vec是一個估計器,它接受代表文件的單詞序列,並訓練一個Word2VecModel。 該模型將每個詞對映到唯一的固定大小的向量。 Word2VecModel使用文件中所有單詞的平均值將每個文件轉換為向量; 此向量可以用作預測,文件相似性計算等的特徵。有關詳細資訊,請參閱ML2ib使用者指南。

在下面的程式碼段中,我們從一組文件開始,每個文件表示為一個單詞序列。 對於每個文件,我們將其轉換為特徵向量。 然後可以將該特徵向量傳遞到學習演算法

from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([ ("Hi I heard about Spark".split(" "), ), ("I wish Java could use case classes".split(" "), ), ("Logistic regression models are neat".split(" "), ) ], ["text"]) # Learn a mapping from words to Vectors. word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result") model = word2Vec.fit(documentDF) result = model.transform(documentDF) for row in result.collect(): text, vector = row print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
1.3 CountVectorizer

CountVectorizer和CountVectorizerModel旨在幫助將文字文件的集合轉換為令牌計數的向量。 當一個先驗字典不可用時,CountVectorizer可以用作估計器來提取詞彙表,並生成CountVectorizerModel。 該模型為詞彙表上的文件生成稀疏表示,然後可以將其傳遞給其他演算法,如LDA。

在擬合過程中,CountVectorizer將選擇通過語料庫的詞頻率排序的頂部vocabSize詞。 可選引數minDF還通過指定詞彙必須出現在詞彙表中的文件的最小數量(或小於1.0)來影響擬合過程。 另一個可選的二進位制切換引數控制輸出向量。 如果設定為true,則所有非零計數都設定為1.這對於模擬二進位制計數而不是整數計數的離散概率模型特別有用。

Examples:

假設我們有以下DataFrame和列id和文字:

 id | texts
----|----------
 0  | Array("a", "b", "c")
 1  | Array("a", "b", "b", "c", "a")
文字中的每一行都是Array [String]型別的文件。 呼叫CountVectorizer的匹配產生具有詞彙(a,b,c)的CountVectorizerModel。 然後轉換後的輸出列“向量”包含
id | texts                           | vector
----|---------------------------------|---------------
 0  | Array("a", "b", "c")            | (3,[0,1,2],[1.0,1.0,1.0])
 1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)
2 特徵變換

2.1 Tokenizer /分詞器

標記化是獲取文字(例如句子)並將其分解成單個術語(通常是單詞)的過程。 一個簡單的Tokenizer類提供了這個功能。 下面的例子顯示瞭如何將句子分成單詞序列。

RegexTokenizer允許基於正則表示式(regex)匹配的更高階的標記化。 預設情況下,引數“pattern”(regex,預設:“\\ s +”)用作分隔符以分隔輸入文字。 或者,使用者可以將引數“gap”設定為false,指示正則表示式“模式”表示“令牌”,而不是分割間隙,並且找到所有匹配的出現作為令牌化結果。

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
2.2 StopWordsRemover 

停止詞是應該從輸入中排除的詞,通常是因為詞頻繁出現並且不具有任何意義。
StopWordsRemover接受字串序列(例如Tokenizer的輸出)作為輸入,並從輸入序列中刪除所有停止詞。 停用詞列表由stopWords引數指定。 某些語言的預設停用詞可通過呼叫StopWordsRemover.loadDefaultStopWords(語言)訪問,其中可用的選項是“danish”,“dutch”,“english”,“finnish”,“french”,“german”,“hungarian” “義大利語”,“挪威語”,“葡萄牙語”,“俄語”,“西班牙語”,“瑞典語”和“土耳其語”。 布林引數caseSensitive指示匹配是否區分大小寫(預設為false)。

例子:
假設我們有以下DataFrame和列id和raw:

id | raw
----|----------
 0  | [I, saw, the, red, baloon]
 1  | [Mary, had, a, little, lamb]
應用StopScriptRemover,將raw作為輸入列,並過濾為輸出列,我們應該得到以下結果:
id | raw                         | filtered
----|-----------------------------|--------------------
 0  | [I, saw, the, red, baloon]  |  [saw, red, baloon]
 1  | [Mary, had, a, little, lamb]|[Mary, little, lamb]
在過濾時,停止詞“I”,“the”,“had”和“a”已被過濾掉。
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
2.3 n-gram

n-gram是對於某個整數n的n個令牌(通常為字)的序列。 NGram類可以用於將輸入特徵轉換為n-gram。
NGram將字串序列(例如,Tokenizer的輸出)作為輸入。 引數n用於確定每個n-gram中的項的數量。 輸出將包括一個n-gram序列,其中每個n-gram由n個連續字的空格分隔的字串表示。 如果輸入序列包含少於n個字串,則不會生成輸出。

from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

2.4 Binarizer (二值化

二值化是將數字特徵閾值為二進位制(0/1)特徵的過程。
Binarizer接受通用引數inputCol和outputCol以及二進位制閾值。 大於閾值的特徵值被二進位制化為1.0; 等於或小於閾值的值被二值化為0.0。 inputCol支援Vector和Double型別。

from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
2.5 PCA(主成分分析

PCA是使用正交變換將可能相關的變數的觀察值的集合轉換成稱為主分量的線性不相關變數的值的集合的統計過程。 PCA類訓練模型使用PCA將向量投影到低維空間。 下面的示例顯示瞭如何將5維特徵向量投影到3維主成分中。

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
2.6 PolynomialExpansion (多項式擴充套件

多項式展開是將特徵擴充套件到多項式空間的過程,該多項式空間由原始尺寸的n度組合表示。 PolynomialExpansion類提供此功能。 下面的示例顯示瞭如何將您的要素擴充套件到3度多項式空間。

from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)
2.7 Discrete Cosine Transform (DCT-離散餘弦變換)

離散餘弦變換將時域中的長度N實值序列變換為頻域中的另一長度N實值序列。 DCT類提供此功能,實現DCT-II並且通過1/√2/縮放結果,使得用於變換的表示矩陣是統一的。 沒有偏移被應用於變換序列(例如,變換序列的第0個元素是第0個DCT係數而不是第N / 2個)。

from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)
2.8 StringIndexer 字串索引

StringIndexer將標籤的字串列編碼為標籤索引列。 索引位於[0,numLabels),按標籤頻率排序,因此最常見的標籤獲取索引0.如果輸入列是數字,我們將其轉換為字串並索引字串值。 當下遊管道元件(如Estimator或Transformer)使用此字串索引標籤時,必須將元件的輸入列設定為此字串索引的列名稱。 在許多情況下,可以使用setInputCol設定輸入列。

例子
假設我們有以下DataFrame和列id和category:

id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | a
 4  | a
 5  | c
category是具有三個標籤:“a”,“b”和“c”的字串列。 應用StringIndexer,其中category作為輸入列,categoryIndex作為輸出列,我們應該得到以下結果:
 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0
“a”獲得索引0,因為它是最頻繁的,隨後是具有索引1的“c”和具有索引2的“b”。
此外,有兩個策略,關於StringIndexer如何處理未見的標籤,當你已經適應一個StringIndexer在一個數據集,然後使用它來轉換另一個:
丟擲異常(這是預設值)
請跳過包含未看見標籤的行

例子
讓我們回到前面的例子,但這次重用我們以前定義的StringIndexer在下面的資料集:

id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d
如果您沒有設定StringIndexer如何處理未看見的標籤或將其設定為“錯誤”,則會丟擲異常。 但是,如果您呼叫了setHandleInvalid(“skip”),將生成以下資料集:
 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
請注意,包含“d”的行不會出現。
from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
2.9 IndexToString

對稱到StringIndexer,IndexToString將一列標籤索引映射回包含原始標籤的列作為字串。 一個常見的用例是使用StringIndexer從標籤生成索引,使用這些索引訓練模型,並從IndexToString的預測索引列中檢索原始標籤。 但是,您可以自由提供您自己的標籤。
例子
基於StringIndexer示例,讓我們假設我們有以下DataFrame和列id和categoryIndex:

 id | categoryIndex
----|---------------
 0  | 0.0
 1  | 2.0
 2  | 1.0
 3  | 0.0
 4  | 0.0
 5  | 1.0
應用IndexToString,將categoryIndex作為輸入列,將originalCategory作為輸出列,我們可以檢索我們的原始標籤(它們將從列的元資料中推斷):
id | categoryIndex | originalCategory
----|---------------|-----------------
 0  | 0.0           | a
 1  | 2.0           | b
 2  | 1.0           | c
 3  | 0.0           | a
 4  | 0.0           | a
 5  | 1.0           | c
from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
    [(0, "a"<