1. 程式人生 > >在Apache Spark上跑Logistic Regression演算法及其中的一些錯誤問題

在Apache Spark上跑Logistic Regression演算法及其中的一些錯誤問題

本文旨在介紹使用機器學習演算法,來介紹Apache Spark資料處理引擎。我們一開始會先簡單介紹一下Spark,然後我們將開始實踐一個機器學習的例子。我們將使用Qualitative Bankruptcy資料集,來自UCI機器學習資料倉庫。雖然Spark支援同時Java,Scala,Python和R,在本教程中我們將使用Scala作為程式語言。不用擔心你沒有使用Scala的經驗。練習中的每個程式碼段,我們都會詳細解釋一遍。

APACHE SPARK

Apache Spark是一個開源的叢集計算框架,用Spark編寫的應用程式可以比Hadoop MapReduce正規化的速度高100倍以上。Spark的一個主要的特點,基於記憶體,執行速度快,不僅如此,複雜應用在Spark系統上執行,也比基於磁碟的MapReduce更有效。Spark還旨在更通用,因此它提供了以下庫:

  • Spark SQL,處理結構化資料的模組

  • MLlib,可擴充套件的機器學習庫

  • GraphX,圖和圖的平行計算API

  • Spark Streaming,可擴充套件的,可容錯的流式計算程式

正如已經提到的,Spark支援Java,Scala,Python和R程式語言。它還集成了其他大資料工具。特別是,Spark可以執行在Hadoop叢集,可以訪問任何資料來源,包括Hadoop Cassandra。

Spark核心概念

在一個高的抽象層面,一個Spark的應用程式由一個驅動程式作為入口,在一個叢集上執行各種並行操作。驅動程式包含了你的應用程式的main函式,然後將這些應用程式分配給叢集成員執行。驅動程式通過SparkContext物件來訪問計算叢集。對於互動式的shell應用,SparkContext預設可通過sc變數訪問。

Spark的一個非常重要的概念是RDD–彈性分散式資料集。這是一個不可改變的物件集合。每個RDD會分成多個分割槽,每個分割槽可能在不同的群集節點上參與計算。RDD可以包含任何型別的Java,Scala物件,Python或R,包括使用者自定義的類。RDDS的產生有兩種基本方式:通過載入外部資料集或分配物件的集合如,list或set。

在建立了RDDs之後,我們可以對RDDs做2種不同型別的操作:

  • Transformations - 轉換操作,從一個RDD轉換成另外一個RDD

  • Actions - 動作操作,通過RDD計算結果

RDDs通過lazy的方式計算 - 即當RDDs碰到Action操作時,才會開始計算。Spark的Transformations操作,都會積累成一條鏈,只有當需要資料的時候,才會執行這些Transformations操作。每一次RDD進行Action操作時,RDD都會重新生成。如果你希望某些中間的計算結果能被其他的Action操作複用,那麼你需要呼叫Spark的RDD.persist()來儲存中間資料。

Spark支援多種執行模式,你可以使用互動式的Shell,或者單獨執行一個standalone的Spark程式。不管哪一種方式,你都會有如下的工作流:

  • 輸入資料,用於生成RDD

  • 使用Transformations操作轉換資料集

  • 讓Spark儲存一些中間計算結果,用於複用計算

  • 使用Action操作,讓Spark平行計算。Spark內部會自動優化和執行計算任務。

安裝Apache Spark

為了開始使用Spark,需要先從官網下載。選擇“Pre-built for Hadoop 2.4 and later”版本然後點選“Direct Download”。如果是Windows使用者,建議將Spark放進名字沒有空格的資料夾中。比如說,將檔案解壓到:C:\spark。

正如上面所說的,我們將會使用Scala程式語言。進入Spark的安裝路徑,執行如下命令:

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">// Linux and Mac users

bin/spark-shell

// Windows users

bin\spark shell</span>

然後你可以在控制檯中看到Scala:

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">scala></span>

QUALITATIVE破產分類

現實生活中的問題是可以用機器學習演算法來預測的。我們將試圖解決的,通過一個公司的定性資訊,預測該公司是否會破產。資料集可以從UCI機器學習庫https://archive.ics.uci.edu/ml/datasets/qualitative_bankruptcy下載。在Spark的安裝資料夾中,建立一個新的資料夾命名為playground。複製qualitative_bankruptcy.data.txt檔案到這裡面。這將是我們的訓練資料。

這兒需要將檔案上傳到hdfs上,命令為:

hadoop dfs -copyFromLocal qualitative_bankruptcy.data.txt ./

資料集包含250個例項,其中143個例項為非破產,107個破產例項。

每一個例項資料格式如下:

  • 工業風險

  • 管理風險

  • 財務靈活性

  • 信譽

  • 競爭力

  • 經營風險

這些被稱為定性引數,因為它們不能被表示為一個數字。每一個引數可以取下以下值:

  • P positive

  • A average

  • N negative

資料集的最後一個列是每個例項的分類:B為破產或NB非破產。

鑑於此資料集,我們必須訓練一個模型,它可以用來分類新的資料例項,這是一個典型的分類問題。

解決問題的步驟如下:

  • 從qualitative_bankruptcy.data.txt檔案中讀取資料

  • 解析每一個qualitative值,並將其轉換為double型數值。這是我們的分類演算法所需要的

  • 將資料集劃分為訓練和測試資料集

  • 使用訓練資料訓練模型

  • 計算測試資料的訓練誤差

SPARK LOGISTIC REGRESSION

我們將用Spark的邏輯迴歸演算法訓練分類模型。如果你想知道更多邏輯迴歸演算法的原理,你可以閱讀以下教程http://technobium.com/logistic-regression-using-apache-mahout。

在Spark的Scala Shell中貼上以下import語句:

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#ff0000;">import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}</span><span style="color:#3e3e3e;">
</span><span style="color:#000099;">這找了半天沒有找到<span style="line-height: 25.6000003814697px; font-family: 'Helvetica Neue', Helvetica, 'Hiragino Sans GB', 'Microsoft YaHei', Arial, sans-serif;">LogisticRegressionWithLBFGS,所以一直報錯,最後改用LinearRegressionWithSGD,因此命令變為:</span></span></span>
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="font-family:Helvetica Neue, Helvetica, Hiragino Sans GB, Microsoft YaHei, Arial, sans-serif;"></span></span><pre name="code" style="margin-top: 0px; margin-bottom: 0px; font-size: 16px; line-height: 25.6000003814697px; font-family: 'Helvetica Neue', Helvetica, 'Hiragino Sans GB', 'Microsoft YaHei', Arial, sans-serif; padding: 0px; max-width: 100%; box-sizing: border-box !important; word-wrap: break-word !important; background-color: rgb(255, 255, 255);"><span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#000099;">import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, LogisticRegressionModel}</span></span>

import org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.linalg.{Vector, Vectors}

這將匯入所需的庫。

接下來我們將建立一個Scala函式,將資料集中的qualitative資料轉換為Double型數值。鍵入或貼上以下程式碼並回車,在Spark Scala Shell。

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">def getDoubleValue( input:String ) : Double = {

    var result:Double = 0.0

    if (input == "P")  result = 3.0 

    if (input == "A")  result = 2.0

    if (input == "N")  result = 1.0

    if (input == "NB") result = 1.0

    if (input == "B")  result = 0.0

    return result

   }</span>

如果所有的執行都沒有問題,你應該看到這樣的輸出:

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">getDoubleValue: (input: String)Double</span>

現在,我們可以讀取到qualitative_bankruptcy.data.txt檔案中的資料。從Spark的角度來看,這是一個Transformation操作。在這個階段,資料實際上不被讀入記憶體。如前所述,這是一個lazy的方式執行。實際的讀取操作是由count()引發,這是一個Action操作。

我把檔案放到了hdfs的根目錄下,所以下面的命令改為:

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#000099;">val data = sc.textFile("Qualitative_Bankruptcy.data.txt")</span></span>
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#cc0000;">val data = sc.textFile("playground/Qualitative_Bankruptcy.data.txt")</span><span style="color:#3e3e3e;">

data.count()</span></span>

用我們val關鍵字宣告一個常量data。它是一個包含輸入資料所有行的RDD。讀操作被SC或sparkcontext上下文變數監聽。count操作應返回以下結果:

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">res0: Long = 250</span>

現在是時候為邏輯迴歸演算法準備資料,將字串轉換為數值型。

下面命令有錯誤的地方,下面已經更正,注意標紅的地方。

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#3e3e3e;">val parsedData = data.map{line</span><span style="color:#ff0000;"> =></span><span style="color:#3e3e3e;">

    val parts = line.split(",")

    LabeledPoint(getDoubleValue(parts(6)), Vectors.dense(parts.slice(0,6).map(x </span><span style="color:#cc0000;">=></span><span style="color:#3e3e3e;">getDoubleValue(x))))

}</span></span>

在這裡,我們聲明瞭另外一個常量,命名為parsedData。對於data變數中的每一行資料,我們將做以下操作:

  • 使用“,”拆分字串,並獲得一個向量,命名為parts

  • 建立並返回一個LabeledPoint物件。每個LabeledPoint包含標籤和值的向量。在我們的訓練資料,標籤或類別(破產或非破產)放在最後一列,陣列下標0到6。這是我們使用的parts(6)。在儲存標籤之前,我們將用getDoubleValue()函式將字串轉換為Double型。其餘的值也被轉換為Double型數值,並儲存在一個名為稠密向量的資料結構。這也是Spark的邏輯迴歸演算法所需要的資料結構。

Spark支援map()轉換操作,Action動作執行時,第一個執行的就是map()。

我們來看看我們準備好的資料,使用take():

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">parsedData.take(10)</span>

上面的程式碼,告訴Spark從parsedData陣列中取出10個樣本,並列印到控制檯。一樣的,take()操作之前,會先執行map()。輸出結果如下:

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">res5: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((1.0,[3.0,3.0,2.0,2.0,2.0,3.0]), (1.0,[1.0,1.0,2.0,2.0,2.0,1.0]), (1.0,[2.0,2.0,2.0,2.0,2.0,2.0]), (1.0,[3.0,3.0,3.0,3.0,3.0,3.0]), (1.0,[1.0,1.0,3.0,3.0,3.0,1.0]), (1.0,[2.0,2.0,3.0,3.0,3.0,2.0]), (1.0,[3.0,3.0,2.0,3.0,3.0,3.0]), (1.0,[3.0,3.0,3.0,2.0,2.0,3.0]), (1.0,[3.0,3.0,2.0,3.0,2.0,3.0]), (1.0,[3.0,3.0,2.0,2.0,3.0,3.0]))</span>

接著我們劃分一下訓練資料和測試資料,將parsedData的60%分為訓練資料,40%分為測試資料。

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)

val trainingData = splits(0)

val testData = splits(1)</span>

訓練資料和測試資料也可以像上面一樣,使用take()者count()檢視。

激動人心的時刻,我們現在開始使用Spark的LogisticRegressioinWithLBFGS()來訓練模型。設定好分類個數,這裡是2個(破產和非破產):

由於改用LogisticRegressionWithSGD,所以下面的語句就會變化:

LogisticRegressionWithSGD的使用方法,主要參考

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#ff0000;">val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainingData)(這句要去掉,改為下面兩句)</span></span>

val numIterations = 20
val model =
LogisticRegressionWithSGD.train(parsedData, numIterations)

當模型訓練完,我們可以使用testData來檢驗一下模型的出錯率。

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#3e3e3e;">val labelAndPreds = testData.map { point =

  val prediction = model.predict(point.features)

  (point.label, prediction)

}
</span><span style="color:#000099;">下面這句話有誤,注意標紅的部分,原來為=,更正為=>:</span><span style="color:#3e3e3e;">
val trainErr = labelAndPreds.filter(r </span><span style="color:#cc0000;">=></span><span style="color:#3e3e3e;"> r._1 != r._2).count.toDouble / testData.count</span></span>

變數labelAndPreds儲存了map()轉換操作,map()將每一個行轉換成二元組。二元組包含了testData的標籤資料(point.label,分類資料)和預測出來的分類資料(prediction)。模型使用point.features作為輸入資料。

最後一行程式碼,我們使用filter()轉換操作和count()動作操作來計算模型出錯率。filter()中,保留預測分類和所屬分類不一致的元組。在Scala中_1和_2可以用來訪問元組的第一個元素和第二個元素。最後用預測出錯的數量除以testData訓練集的數量,我們可以得到模型出錯率:

<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">trainErr: Double = 0.20430107526881722</span>

總結

在這個教程中,你已經看到了Apache Spark可以用於機器學習的任務,如logistic regression。雖然這只是非分散式的單機環境的Scala shell demo,但是Spark的真正強大在於分散式下的記憶體並行處理能力。

在大資料領域,Spark是目前最活躍的開源專案,在過去幾年已迅速獲得關注和發展。在過去的幾年裡。採訪了超過2100受訪者,各種各樣的使用情況和環境。

[參考資料]

“Learning Spark” by HoldenKarau, Andy Konwinski, Patrick Wendell and Matei Zaharia, O’Reilly Media 2015

Lichman, M. (2013). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science

https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression

https://spark.apache.org/docs/1.1.0/mllib-data-types.html

https://archive.ics.uci.edu/ml/datasets/Qualitative_Bankruptcy

https://databricks.com/blog/2015/01/27/big-data-projects-are-hungry-for-simpler-and-more-powerful-tools-survey-validates-apache-spark-is-gaining-developer-traction.html

附:

執行的程式碼:




import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, LogisticRegressionModel}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
def getDoubleValue( input:String ) : Double = {
    var result:Double = 0.0
    if (input == "P")  result = 3.0 
    if (input == "A")  result = 2.0
    if (input == "N")  result = 1.0
    if (input == "NB") result = 1.0
    if (input == "B")  result = 0.0
    return result
   }


val data = sc.textFile("Qualitative_Bankruptcy.data.txt")
data.count()


val parsedData = data.map{line =>
    val parts = line.split(",")
    LabeledPoint(getDoubleValue(parts(6)), Vectors.dense(parts.slice(0,6).map(x =>getDoubleValue(x))))
}
parsedData.take(10)
val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
val trainingData = splits(0)
val testData = splits(1)
val numIterations = 20
val model = LogisticRegressionWithSGD.train(parsedData, numIterations)
val labelAndPreds = testData.map { point =>
 val prediction = model.predict(point.features)
 (point.label, prediction)
}
val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count