1. 程式人生 > >Deeplearning4j 實戰(2):Deeplearning4j 手寫體數字識別Spark實現

Deeplearning4j 實戰(2):Deeplearning4j 手寫體數字識別Spark實現

在前兩天的部落格中,我們用Deeplearning4j做了Mnist資料集的分類。算是第一個深度學習的應用。像Mnist資料集這樣圖片尺寸不大,而且是黑白的開源圖片集在本地完成訓練是可以的,畢竟我們用了Lenet這樣相對簡單的網路結構,而且本地的機器配置也有8G左右的記憶體。但實際生產中,圖片的數量要多得多,尺寸也大得多,用的網路也會是AlexNet、GoogLenet這樣更多層數的網路,所以往往我們需要用叢集來解決計算資源的問題。由於Deeplearning4j本身基於Spark實現了神經網路的分散式訓練,所以我們就以此作為我們的解決方案。

我們還是以Mnist資料集為例來做Deeplearning4j的第一個Spark版本的應用。首先需要在上一篇部落格的基礎上,在pom裡面加入新的依賴:

	     <dependency>
	    	<groupId>org.nd4j</groupId>
	    	<artifactId>nd4j-kryo_${scala.binary.version}</artifactId>
	    	<version>${nd4j.version}</version>
	     </dependency>
這個是為了將Nd4j的序列化形式從Java預設的形式轉到kryo的格式,以此提高序列化的效率。如果在程式碼中不為該類註冊kryo的序列化格式,那麼訓練的時候會拋異常。
接著程式碼分為2個部分,一個部分是將Mnist資料集在本地以JavaRDD<DataSet>的形式存到磁碟並最終推到HDFS上作為Spark job的輸入資料來源。另一個部分則是模型的訓練和儲存。

第一部分的邏輯大致如下:本地建立Spark任務-->獲取所有Mnist圖片的路徑-->讀取圖片並提取特徵,打上標註,以DataSet的形式作為一張圖片的wrapper-->將所有圖片構成的JavaRDD<DataSet>儲存下來。

這裡原始的Mnist資料集是以圖片形式存在,不再是二進位制格式的資料。這個例子這樣處理,也是方便日後用同樣的方式讀取一般的圖片。Mnist的圖片如下:


        SparkConf conf = new SparkConf()
                        .setMaster("local[*]")	//local mode
                        .set("spark.kryo.registrator", "org.nd4j.Nd4jRegistrator")
                        .setAppName("Mnist Java Spark (Java)");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        
        final List<String> lstLabelNames = Arrays.asList("零","一","二","三","四","五","六","七","八","九");  //Chinese Label
        final ImageLoader imageLoader = new ImageLoader(28, 28, 1);             //Load Image
        final DataNormalization scaler = new ImagePreProcessingScaler(0, 1);    //Normalize
        
        String srcPath = args[0];
        FileSystem hdfs = FileSystem.get(URI.create(srcPath),jsc.hadoopConfiguration());    //hdfs read local file system
        FileStatus[] fileList = hdfs.listStatus(new Path(srcPath));
        List<String> lstFilePath = new ArrayList<>();
        for( FileStatus fileStatus :  fileList){
            lstFilePath.add(srcPath + "/" + fileStatus.getPath().getName());
        }
        JavaRDD<String> javaRDDImagePath = jsc.parallelize(lstFilePath);
        JavaRDD<DataSet> javaRDDImageTrain = javaRDDImagePath.map(new Function<String, DataSet>() {

            @Override
            public DataSet call(String imagePath) throws Exception {
                FileSystem fs = FileSystem.get(new Configuration());
                DataInputStream in = fs.open(new Path(imagePath));
                INDArray features = imageLoader.asRowVector(in);            //features tensor
                String[] tokens = imagePath.split("\\/");
                String label = tokens[tokens.length-1].split("\\.")[0];     
                int intLabel = Integer.parseInt(label);
                INDArray labels = Nd4j.zeros(10);                           //labels tensor                     
                labels.putScalar(0, intLabel, 1.0);
                DataSet trainData = new DataSet(features, labels);          //DataSet, wrapper of features and labels
                trainData.setLabelNames(lstLabelNames);
                scaler.preProcess(trainData);                               //normalize
                fs.close();
                return trainData;
            }
        });
        javaRDDImageTrain.saveAsObjectFile("mnistNorm.dat");		//save training data
這裡有幾點需要解釋。
1.用hdfs.filesystem來獲取檔案。用Java原生態的File來操作也是完全可以的。只不過,這樣讀取檔案的方式,同時適用於讀取本地和HDFS上的檔案。

2.ImageLoader類。這個類是用來讀取圖片檔案的。類似的還有一個類,叫NativeImageLoader。不同的在於,NativeImageLoader是呼叫了OpenCV的相關方法來對圖片做處理的,效率更高,因此推薦使用NativeImageLoader

儲存的RDD的形式如下圖:



然後,講下模型訓練任務的邏輯。讀取HDFS上的以DataSet形式儲存的Mnist檔案-->定義引數中心服務-->定義神經網路結構(Lenet)--> 訓練網路-->儲存訓練好的模型。首先看前兩步的操作:
      SparkConf conf = new SparkConf()
                            .set("spark.kryo.registrator", "org.nd4j.Nd4jRegistrator")  //register kryo for nd4j
                            .setAppName("Mnist Java Spark (Java)");
        final String imageFilePath = args[0];
        final int numEpochs = Integer.parseInt(args[1]);
        final String modelPath = args[2];
        final int numBatch = Integer.parseInt(args[3]);
        //
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //
        JavaRDD<DataSet> javaRDDImageTrain = jsc.objectFile(imageFilePath);     //load image data from hdfs
        ParameterAveragingTrainingMaster trainMaster = new ParameterAveragingTrainingMaster.Builder(numBatch)   //weight average service
                                                            .workerPrefetchNumBatches(0)
                                                            .saveUpdater(true)
                                                            .averagingFrequency(5)
                                                            .batchSizePerWorker(numBatch)
這裡我們獲取傳入的一些引數,如檔案的hdfs路徑,最後儲存model的路徑,mini-batch的大小(一般32,62,128這樣的值為好,可以自行嘗試),總的訓練的輪次epoch。
這裡需要解釋的是ParameterAveragingTrainingMaster這個類。這個類的作用是用於將spark worker節點上各自計算的權重收回到driver節點上進行加權平均,並將最新的權重廣播到worker節點上。也即為:將各個工作節點的引數的均值作為全域性引數值。這種分散式機器學習中,資料並行化的一種操作。

下面是定義神經網路結構和訓練網路:

        int nChannels = 1;
        int outputNum = 10;
        int iterations = 1;
        int seed = 123;
        MultiLayerConfiguration.Builder builder = new NeuralNetConfiguration.Builder()  //define lenent
                .seed(seed)
                .iterations(iterations)
                .regularization(true).l2(0.0005)
                .learningRate(0.1)
                .learningRateScoreBasedDecayRate(0.5)
                .optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT)
                .updater(Updater.ADAM)
                .list()
                .layer(0, new ConvolutionLayer.Builder(5, 5)
                        .nIn(nChannels)
                        .stride(1, 1)
                        .nOut(20)
                        .weightInit(WeightInit.XAVIER)
                        .activation("relu")
                        .build())
                .layer(1, new SubsamplingLayer.Builder(SubsamplingLayer.PoolingType.MAX)
                        .kernelSize(2, 2)
                        .build())
                .layer(2, new ConvolutionLayer.Builder(5, 5)
                        .nIn(20)
                        .nOut(50)
                        .stride(2,2)
                        .weightInit(WeightInit.XAVIER)
                        .activation("relu")
                        .build())
                .layer(3, new SubsamplingLayer.Builder(SubsamplingLayer.PoolingType.MAX)
                        .kernelSize(2, 2)
                        .build())
                .layer(4, new DenseLayer.Builder().activation("relu")
                        .weightInit(WeightInit.XAVIER)
                        .nOut(500).build())
                .layer(5, new OutputLayer.Builder(LossFunctions.LossFunction.NEGATIVELOGLIKELIHOOD)
                        .nOut(outputNum)
                        .weightInit(WeightInit.XAVIER)
                        .activation("softmax")
                        .build())
                .backprop(true).pretrain(false);
        new ConvolutionLayerSetup(builder,28,28,1);

        MultiLayerConfiguration netconf = builder.build();
        MultiLayerNetwork net = new MultiLayerNetwork(netconf);
        net.setListeners(new ScoreIterationListener(1));
        net.init();
        SparkDl4jMultiLayer sparkNetwork = new SparkDl4jMultiLayer(jsc, net, trainMaster);
        //train the network on Spark
        for( int i = 0; i < numEpochs; ++i ){
            sparkNetwork.fit(javaRDDImageTrain);
            System.out.println("----- Epoch " + i + " complete -----");
            Evaluation evalActual = sparkNetwork.evaluate(javaRDDImageTrain);
            System.out.println(evalActual.stats());
        }
這部分沒有什麼特別的地方,和單機的形式差不太多。值得說明的就是,我們在每一輪次的訓練後,直接預測全部的訓練資料來做評估,並沒有做交叉驗證。當然,做交叉驗證也是完全可以的。
最後一部分是儲存模型到hdfs上:

        //save model
        FileSystem hdfs = FileSystem.get(jsc.hadoopConfiguration());
        Path hdfsPath = new Path(modelPath);
        FSDataOutputStream outputStream = hdfs.create(hdfsPath);
        MultiLayerNetwork trainedNet = sparkNetwork.getNetwork();
        ModelSerializer.writeModel(trainedNet, outputStream, true);
到此coding的部分就結束了,我們構建了在Spark進行分散式深度神經網路的訓練並儲存了模型。Spark的提交命令如下:

spark-submit --master yarn-cluster --executor-memory 5g --num-executors 16 --driver-memory 8g --conf "spark.executor.extraJavaOptions=-Dorg.bytedeco.javacpp.maxbytes=2921225472"  --conf spark.yarn.executor.memoryOverhead=5000

需要說明的是--conf後面的內容,因為Nd4j在計算的時候,實際需要兩部分的記憶體:on-heap memory和off-heap memory。前者就是jvm為開闢物件所需記憶體,後者是C++的記憶體。Nd4j為了效率,在底層是通過JavaCPP呼叫C++進行計算的。如果不顯示地申請C++的記憶體,那預設會從on-heap中分出10%給off-heap,但這樣可能會不夠。所以我們顯示地申請off-heap記憶體。
下面這張圖是正常的Spark UI顯示的Deeplearning4j的訓練過程:


然後,我們看下訓練的結果:

----- Epoch 149 complete -----

Examples labeled as 0 classified by model as 0: 4011 times
Examples labeled as 0 classified by model as 1: 2 times
Examples labeled as 0 classified by model as 2: 14 times
Examples labeled as 0 classified by model as 4: 9 times
Examples labeled as 0 classified by model as 5: 11 times
Examples labeled as 0 classified by model as 6: 28 times
Examples labeled as 0 classified by model as 7: 6 times
Examples labeled as 0 classified by model as 8: 40 times
Examples labeled as 0 classified by model as 9: 11 times
Examples labeled as 1 classified by model as 0: 1 times
Examples labeled as 1 classified by model as 1: 4598 times
Examples labeled as 1 classified by model as 2: 20 times
Examples labeled as 1 classified by model as 3: 7 times
Examples labeled as 1 classified by model as 4: 12 times
Examples labeled as 1 classified by model as 5: 3 times
Examples labeled as 1 classified by model as 6: 8 times
Examples labeled as 1 classified by model as 7: 10 times
Examples labeled as 1 classified by model as 8: 20 times
Examples labeled as 1 classified by model as 9: 5 times
Examples labeled as 2 classified by model as 0: 13 times
Examples labeled as 2 classified by model as 1: 20 times
Examples labeled as 2 classified by model as 2: 3910 times
Examples labeled as 2 classified by model as 3: 63 times
Examples labeled as 2 classified by model as 4: 22 times
Examples labeled as 2 classified by model as 5: 5 times
Examples labeled as 2 classified by model as 6: 4 times
Examples labeled as 2 classified by model as 7: 70 times
Examples labeled as 2 classified by model as 8: 54 times
Examples labeled as 2 classified by model as 9: 16 times
Examples labeled as 3 classified by model as 0: 2 times
Examples labeled as 3 classified by model as 1: 10 times
Examples labeled as 3 classified by model as 2: 55 times
Examples labeled as 3 classified by model as 3: 4104 times
Examples labeled as 3 classified by model as 4: 5 times
Examples labeled as 3 classified by model as 5: 53 times
Examples labeled as 3 classified by model as 6: 2 times
Examples labeled as 3 classified by model as 7: 42 times
Examples labeled as 3 classified by model as 8: 56 times
Examples labeled as 3 classified by model as 9: 22 times
Examples labeled as 4 classified by model as 0: 5 times
Examples labeled as 4 classified by model as 1: 6 times
Examples labeled as 4 classified by model as 2: 5 times
Examples labeled as 4 classified by model as 4: 3960 times
Examples labeled as 4 classified by model as 5: 3 times
Examples labeled as 4 classified by model as 6: 22 times
Examples labeled as 4 classified by model as 7: 9 times
Examples labeled as 4 classified by model as 8: 16 times
Examples labeled as 4 classified by model as 9: 46 times
Examples labeled as 5 classified by model as 0: 5 times
Examples labeled as 5 classified by model as 1: 7 times
Examples labeled as 5 classified by model as 2: 5 times
Examples labeled as 5 classified by model as 3: 40 times
Examples labeled as 5 classified by model as 4: 8 times
Examples labeled as 5 classified by model as 5: 3626 times
Examples labeled as 5 classified by model as 6: 27 times
Examples labeled as 5 classified by model as 7: 5 times
Examples labeled as 5 classified by model as 8: 66 times
Examples labeled as 5 classified by model as 9: 6 times
Examples labeled as 6 classified by model as 0: 9 times
Examples labeled as 6 classified by model as 1: 6 times
Examples labeled as 6 classified by model as 2: 5 times
Examples labeled as 6 classified by model as 3: 2 times
Examples labeled as 6 classified by model as 4: 47 times
Examples labeled as 6 classified by model as 5: 34 times
Examples labeled as 6 classified by model as 6: 3990 times
Examples labeled as 6 classified by model as 8: 43 times
Examples labeled as 6 classified by model as 9: 1 times
Examples labeled as 7 classified by model as 0: 6 times
Examples labeled as 7 classified by model as 1: 15 times
Examples labeled as 7 classified by model as 2: 57 times
Examples labeled as 7 classified by model as 3: 45 times
Examples labeled as 7 classified by model as 4: 22 times
Examples labeled as 7 classified by model as 5: 4 times
Examples labeled as 7 classified by model as 7: 4168 times
Examples labeled as 7 classified by model as 8: 21 times
Examples labeled as 7 classified by model as 9: 63 times
Examples labeled as 8 classified by model as 0: 15 times
Examples labeled as 8 classified by model as 1: 11 times
Examples labeled as 8 classified by model as 2: 23 times
Examples labeled as 8 classified by model as 3: 17 times
Examples labeled as 8 classified by model as 4: 19 times
Examples labeled as 8 classified by model as 5: 27 times
Examples labeled as 8 classified by model as 6: 35 times
Examples labeled as 8 classified by model as 7: 15 times
Examples labeled as 8 classified by model as 8: 3848 times
Examples labeled as 8 classified by model as 9: 53 times
Examples labeled as 9 classified by model as 0: 21 times
Examples labeled as 9 classified by model as 1: 3 times
Examples labeled as 9 classified by model as 2: 8 times
Examples labeled as 9 classified by model as 3: 26 times
Examples labeled as 9 classified by model as 4: 109 times
Examples labeled as 9 classified by model as 5: 23 times
Examples labeled as 9 classified by model as 6: 6 times
Examples labeled as 9 classified by model as 7: 62 times
Examples labeled as 9 classified by model as 8: 42 times
Examples labeled as 9 classified by model as 9: 3888 times


==========================Scores========================================
 Accuracy:        0.9548
 Precision:       0.9546
 Recall:          0.9547
 F1 Score:        0.9547
========================================================================
在150輪的訓練過後,模型的準確率達到了95.48%。誤判的情況也列在上面了。

到此,在Spark上進行Mnist資料集的訓練和評估就完成了。總結一下就是,先將資料以RDD的形式儲存到HDFS上,然後建模讀取RDD並訓練模型。其實,將圖片存在HDFS上也是一種方案,但是HDFS的一個block可能需要佔用32M,64M這樣的空間。因此圖片這樣的小檔案,是很佔用叢集的儲存空間的。並且,當圖片數量很多的時候,我們會為了讀取圖片頻繁地和HDFS建立和釋放網路連結,這樣同樣消耗HDFS的資源。因此我們選擇先在本地儲存RDD的形式來處理。其實分散式的機器學習有很多策略,比如資料的並行化和模型的並行化,這裡只是一筆掠過,待自己研究清楚了再寫點東西。最後就是模型的調參。這裡面我們也沒有提,其實是極其重要的。因為目前,還沒有非常權威的,或者定義的調參方案,因為訓練過程每個人是不同的,所以只能結合自己的訓練情況來調。一般當loss不下降的時候,調小學習率,batch-size也試著調小來看看效果,分散式的學習率較單機的要大些,這些原則去調。