1. 程式人生 > >如何基於spark做深度學習:從ML到keras、Elephas

如何基於spark做深度學習:從ML到keras、Elephas

http://blog.csdn.net/Richard_More/article/details/53215142

Elephas的網址:https://github.com/maxpumperla/elephas

分散式深層神經網路的Spark ML模型管線

該筆記本描述瞭如何使用Spark ML為分散式版本的Keras深度學習模型構建機器學習流水線作為資料集,我們使用來自Kaggle的Otto產品分類挑戰。我們選擇這個資料的原因是它很小,結構非常好。這樣,我們可以更多地關注技術元件,而不是進行復雜的處理。此外,具有較慢硬體或沒有完整的Spark群集的使用者應該能夠在本地執行此示例,並且仍然會了解有關分散式模式的許多內容。

通常,模型訓練不需要分配計算,而是建立資料流水線,即攝入,轉換等。在訓練中,深層神經網路往往在一臺機器上的一個或多個GPU上做得相當好。大多數情況下,使用梯度下降方法,您將一個接一個地處理。即使如此,使用像Spark這樣的框架也可能有益於將您的模型與您的周邊基礎架構相整合。除此之外,Spark ML管道提供的便利性非常有價值(在語法上非常接近你所知道的scikit-learn)。

TL; DR:我們將展示如何使用分散式深層神經網路和Spark ML管道來解決分類問題,這個例子基本上是這裡發現的分散式版本

使用這個筆記本

當我們要使用elephas時,您將需要訪問正在執行的Spark上下文才能執行此筆記本。

如果您還沒有,請按照本文提供說明在本地安裝Spark 確保還匯出SPARK_HOME到您的路徑並啟動您的ipython / jupyter筆記本如下:

IPYTHON_OPTS="notebook" ${SPARK_HOME}/bin/pyspark --driver-memory 4G elephas/examples/Spark_ML_Pipeline.ipynb
  • 1
  • 1

要測試您的環境,請嘗試列印Spark上下文(提供sc),即執行以下單元格。

from __future__ import print_function
print(sc)
  • 1
  • 2
  • 1
  • 2
<pyspark.context.SparkContext object at 0x1132d61d0>

奧托產品分類資料

培訓和測試資料在這裡可用繼續下載資料。檢查它,您將看到提供的csv檔案包含一個id列,93個整數特徵列。train.csv有一個額外的標籤欄,test.csv缺少。挑戰是準確預測測試標籤。對於本筆記本的其餘部分,我們將假設儲存資料data_path,您應根據需要修改下面的資料。

data_path = "./" # <-- Make sure to adapt this to where your csv files are.
  • 1
  • 1

載入資料比較簡單,但是我們要照顧幾件事情。首先,雖然你可以洗牌RDD,但通常不是很有效率。但是由於資料train.csv按類別排序,所以我們必須洗牌才能使模型執行良好。這是shuffle_csv下面的功能接下來,我們用明文讀入load_data_rdd,以逗號分割,並將要素轉換為浮點型向量。另外請注意,最後一列train.csv表示具有Class_字首的類別

定義資料幀

Spark有一些核心的資料結構,其中包括data frame,這是一個分散式版本的命名列資料結構,現在很多都是來自R熊貓我們需要一個所謂的SQLContext和可選的列到名稱對映來建立從頭開始的資料框架。

from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors
import numpy as np
import random

sql_context = SQLContext(sc)

def shuffle_csv(csv_file):
    lines = open(csv_file).readlines()
    random.shuffle(lines)
    open(csv_file, 'w').writelines(lines)

def load_data_frame(csv_file, shuffle=True, train=True):
    if shuffle:
        shuffle_csv(csv_file)
    data = sc.textFile(data_path + csv_file) # This is an RDD, which will later be transformed to a data frame
    data = data.filter(lambda x:x.split(',')[0] != 'id').map(lambda line: line.split(','))
    if train:
        data = data.map(
            lambda line: (Vectors.dense(np.asarray(line[1:-1]).astype(np.float32)),
                          str(line[-1])) )
    else:
        # Test data gets dummy labels. We need the same structure as in Train data
        data = data.map( lambda line: (Vectors.dense(np.asarray(line[1:]).astype(np.float32)),"Class_1") ) 
    return sqlContext.createDataFrame(data, ['features', 'category'])
  • 1
  • 2
  • 3
  • 4
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 1
  • 2
  • 3
  • 4
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

我們載入訓練和測試資料,並使用方便的show方法列印幾行資料

train_df = load_data_frame("train.csv")
test_df = load_data_frame("test.csv", shuffle=False, train=False) # No need to shuffle test data

print("Train data frame:")
train_df.show(10)

print("Test data frame (note the dummy category):")
test_df.show(10)
  • 1
  • 2
  • 3
  • 4
  • 6
  • 7
  • 8
  • 1
  • 2
  • 3
  • 4
  • 6
  • 7
  • 8
Train data frame:
+--------------------+--------+
|            features|category|
+--------------------+--------+
|[0.0,0.0,0.0,0.0,...| Class_8|
|[0.0,0.0,0.0,0.0,...| Class_8|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,1.0,0.0,1.0,...| Class_6|
|[0.0,0.0,0.0,0.0,...| Class_9|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,0.0,0.0,0.0,...| Class_3|
|[0.0,0.0,4.0,0.0,...| Class_8|
|[0.0,0.0,0.0,0.0,...| Class_7|
+--------------------+--------+
only showing top 10 rows

Test data frame (note the dummy category):
+--------------------+--------+
|            features|category|
+--------------------+--------+
|[1.0,0.0,0.0,1.0,...| Class_1|
|[0.0,1.0,13.0,1.0...| Class_1|
|[0.0,0.0,1.0,1.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[2.0,0.0,5.0,1.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,1.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
+--------------------+--------+
only showing top 10 rows

預處理:定義變壓器

到目前為止,我們基本上只讀原始資料。幸運的是,Spark ML有很多預處理功能可用,所以我們唯一要做的就是定義資料幀的轉換。

要繼續,我們將首先將類別字串轉換為雙精度值。這是由一個所謂的StringIndexer請注意,我們已經在這裡進行了實際的轉型,但這只是為了演示的目的。我們真正需要的是太多的定義,string_indexer以便稍後再進行管理。

from pyspark.ml.feature import StringIndexer

string_indexer = StringIndexer(inputCol="category", outputCol="index_category")
fitted_indexer = string_indexer.fit(train_df)
indexed_df = fitted_indexer.transform(train_df)
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

接下來,將功能規範化,這是一個很好的做法StandardScaler

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
fitted_scaler = scaler.fit(indexed_df)
scaled_df = fitted_scaler.transform(indexed_df)
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4
print("The result of indexing and scaling. Each transformation adds new columns to the data frame:")
scaled_df.show(10)
  • 1
  • 2
  • 1
  • 2
The result of indexing and scaling. Each transformation adds new columns to the data frame:
+--------------------+--------+--------------+--------------------+
|            features|category|index_category|     scaled_features|
+--------------------+--------+--------------+--------------------+
|[0.0,0.0,0.0,0.0,...| Class_8|           2.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_8|           2.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_2|           0.0|[-0.2535060296260...|
|[0.0,1.0,0.0,1.0,...| Class_6|           1.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_9|           4.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_2|           0.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_2|           0.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_3|           3.0|[-0.2535060296260...|
|[0.0,0.0,4.0,0.0,...| Class_8|           2.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_7|           5.0|[-0.2535060296260...|
+--------------------+--------+--------------+--------------------+
only showing top 10 rows

Keras深度學習模式

現在我們有一個具有處理特徵和標籤的資料框架,我們定義一個深層神經網路,我們可以使用它來解決分類問題。你有機會來這裡,因為你知道一兩件關於深入學習的東西。如果是這樣,下面的模型看起來很簡單。我們通過選擇一組三個連續的密集層來建立一個keras模型,其中包含退出和ReLU啟用。對於這個問題肯定有更好的架構,但是我們只是想在這裡展示一般的流程。

from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.utils import np_utils, generic_utils

nb_classes = train_df.select("category").distinct().count()
input_dim = len(train_df.select("features").first()[0])

model = Sequential()
model.add(Dense(512, input_shape=(input_dim,)))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam')
  • 1
  • 2
  • 3
  • 4
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 1
  • 2
  • 3
  • 4
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

分散式Elephas模型

為了將上述Keras提升model到Spark,我們定義了一個Estimator一個Estimator是,仍然有被訓練模型的星火的化身。它本質上只有一個(必需)的方法,即fit一旦我們呼叫fit了資料框架,我們就回到了一個Model,這是一個訓練有素的模型,transform用來預測標籤方法。

我們通過初始化ElephasEstimator和設定一些屬性來實現。到目前為止,我們的輸入資料框架將有很多列,我們必須通過列名告訴模型在哪裡查詢功能和標籤。然後我們提供Keras模型和Elephas優化器的序列化版本。我們不能直接插入keras模型Estimator,因為Spark將不得不將其序列化為與工作人員的溝通,所以最好自己提供序列化。事實上,雖然pyspark知道如何序列化model,但它是非常低效的,如果模型變得太大,可能會破裂。Spark ML對引數特別挑剔(正確地),或多或少地禁止您提供後者的非原子型別和陣列。大多數剩餘的引數是可選的,而且是自我解釋的。加,許多人,你知道如果你以前曾經執行過克拉斯模型。我們只是將他們包括在內,以顯示全套培訓配置。

from elephas.ml_model import ElephasEstimator
from elephas import optimizers as elephas_optimizers

# Define elephas optimizer (which tells the model how to aggregate updates on the Spark master)
adadelta = elephas_optimizers.Adadelta()

# Initialize SparkML Estimator and set all relevant properties
estimator = ElephasEstimator()
estimator.setFeaturesCol("scaled_features")             # These two come directly from pyspark,
estimator.setLabelCol("index_category")                 # hence the camel case. Sorry :)
estimator.set_keras_model_config(model.to_yaml())       # Provide serialized Keras model
estimator.set_optimizer_config(adadelta.get_config())   # Provide serialized Elephas optimizer
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(1)  # We just use one worker here. Feel free to adapt it.
estimator.set_nb_epoch(20) 
estimator.set_batch_size(128)
estimator.set_verbosity(1)
estimator.set_validation_split(0.15)
  • 1
  • 2
  • 3
  • 4
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 1
  • 2
  • 3
  • 4
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
ElephasEstimator_415398ab22cb1699f794

SparkML管道

現在的簡單部分:定義管道真的像列出流水線階段一樣簡單。我們可以提供TransformersEstimators真正的任何配置,但是這裡我們只需要先前定義的三個元件。請注意,string_indexerscaler和互換,而estimator有些明明已經來到最後的管道。

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[string_indexer, scaler, estimator])
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

安裝和評估管道

現在的最後一步是適應管道的培訓資料和評估。我們對訓練資料進行評估,即轉換,因為只有在這種情況下,我們有標籤來檢查模型的準確性。如果你喜歡,你也可以改造test_df

from pyspark.mllib.evaluation import MulticlassMetrics

fitted_pipeline = pipeline.fit(train_df) # Fit model to data

prediction = fitted_pipeline.transform(train_df) # Evaluate on train data.
# prediction = fitted_pipeline.transform(test_df) # <-- The same code evaluates test data.
pnl = prediction.select("index_category", "prediction")
pnl.show(100)

prediction_and_label = pnl.map(lambda row: (row.index_category, row.prediction))
metrics = MulticlassMetrics(prediction_and_label)
print(metrics.precision())
  • 1
  • 2
  • 3
  • 4
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 1
  • 2
  • 3
  • 4
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
61878/61878 [==============================] - 0s     
+--------------+----------+
|index_category|prediction|
+--------------+----------+
|           2.0|       2.0|
|           2.0|       2.0|
|           0.0|       0.0|
|           1.0|       1.0|
|           4.0|       4.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           3.0|       3.0|
|           2.0|       2.0|
|           5.0|       0.0|
|           0.0|       0.0|
|           4.0|       4.0|
|           0.0|       0.0|
|           4.0|       1.0|
|           2.0|       2.0|
|           1.0|       1.0|
|           0.0|       0.0|
|           6.0|       0.0|
|           2.0|       2.0|
|           1.0|       1.0|
|           2.0|       2.0|
|           8.0|       8.0|
|           1.0|       1.0|
|           5.0|       0.0|
|           0.0|       0.0|
|           0.0|       3.0|
|           0.0|       0.0|
|           1.0|       1.0|
|           4.0|       4.0|
|           2.0|       2.0|
|           0.0|       3.0|
|           3.0|       3.0|
|           0.0|       0.0|
|           3.0|       0.0|
|           1.0|       5.0|
|           3.0|       3.0|
|           2.0|       2.0|
|           1.0|       1.0|
|           0.0|       0.0|
|           2.0|       2.0|
|           2.0|       2.0|
|           1.0|       1.0|
|           6.0|       6.0|
|           1.0|       1.0|
|           0.0|       3.0|
|           7.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           1.0|       1.0|
|           1.0|       1.0|
|           6.0|       6.0|
|           0.0|       0.0|
|           0.0|       3.0|
|           2.0|       2.0|
|           0.0|       0.0|
|           2.0|       2.0|
|           0.0|       0.0|
|           4.0|       4.0|
|           0.0|       0.0|
|           6.0|       6.0|
|           2.0|       5.0|
|           0.0|       3.0|
|           3.0|       0.0|
|           0.0|       0.0|
|           3.0|       3.0|
|           4.0|       4.0|
|           0.0|       3.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           4.0|       4.0|
|           3.0|       0.0|
|           2.0|       2.0|
|           1.0|       1.0|
|           7.0|       7.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       3.0|
|           1.0|       1.0|
|           1.0|       1.0|
|           5.0|       4.0|
|           1.0|       1.0|
|           1.0|       1.0|
|           4.0|       4.0|
|           3.0|       3.0|
|           0.0|       0.0|
|           2.0|       2.0|
|           4.0|       4.0|
|           7.0|       7.0|
|           2.0|       2.0|
|           0.0|       0.0|
|           1.0|       1.0|
|           0.0|       0.0|
|           4.0|       4.0|
|           1.0|       1.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       3.0|
|           0.0|       3.0|
|           0.0|       0.0|
+--------------+----------+
only showing top 100 rows

0.764132648114

結論

當然,需要一些時間掌握Keras和Spark的原理和語法,這取決於您來自哪裡。然而,我們也希望您得出結論,一旦您超越了定義模型和預處理資料的艱鉅階段,構建和使用SparkML流水線的業務是非常優雅和有用的。

如果您喜歡您所看到的,請考慮進一步改善對Ceras或Spark的影響。你對這款膝上型電腦有什麼建設性的意見嗎?有什麼要我澄清嗎?無論如何,請隨時與我聯絡。