1. 程式人生 > >大資料之Spark(七)--- Spark機器學習,樸素貝葉斯,酒水評估和分類案例學習,垃圾郵件過濾學習案例,電商商品推薦,電影推薦學習案例

大資料之Spark(七)--- Spark機器學習,樸素貝葉斯,酒水評估和分類案例學習,垃圾郵件過濾學習案例,電商商品推薦,電影推薦學習案例

一、Saprk機器學習介紹
------------------------------------------------------------------
    1.監督學習
        a.有訓練資料集,符合規範的資料
        b.根據資料集,產生一個推斷函式
        c.根據產生的推斷函式,處理新的資料,從而產生預測結果
        d.常見的監督學習:電子郵件的垃圾分類;按照內容標記網頁;聲音識別等
        e.常見監督學習的演算法
            神經網路
            SVM
            貝葉斯分類器等

    2.非監督學習
        a.沒有訓練資料
        b.分析可用資料,找出資料的模式和趨勢,從而將資料聚類,分組
        c.常用的手段
            kmean
            自我組織的map
            層次聚類等

    3.推薦
        協同過濾,基於之前的購買點選和分級行為提供最接近的推薦:猜你喜歡,推薦你認識的人


二、樸素貝葉斯演算法
-----------------------------------------------
    1.公式
        P(B|A) = P(A|B) * P(B) / P(A)

    2.解釋
        A:事件A
        B:事件B
        P(B|A):A事件發生時,B事件發生的概率,依賴事件
        P(A|B):B事件發生時,A事件發生的概率,依賴事件
        P(B):B事件發生的概率,獨立事件
        P(A):A事件發生的概率,獨立事件


三、Spark機器學習庫
----------------------------------------------------
    [Estimator]
        評估器:操作在資料框DataFrame上的演算法
        執行在包含了feature和label(結果)的dataFrame之上,對資料進行訓練建立model模型。
        該模型用於以後的預測。

    [Transformer]
        資料框轉換器
        將包含feature的Dataframe變換成了包含了預測的dataframe.
        由Estimator建立的model就是Transformer。

    [Parameter]
        Estimator和Transformer使用的資料,通常和機器學習的演算法相關。
        Spark API給出了一致性API針對演算法。

    [Pipeline]
        將Estimators和Transformers組合在一起,形成機器學習工作流.


四、酒水評估[線性迴歸演算法]和分類[邏輯迴歸演算法]案例學習
----------------------------------------------------------
    1.資料集下載地址:http://archive.ics.uci.edu/ml/datasets/Wine+Quality

    2.線性迴歸實現酒水評估 ---scala版
        
import org.apache.spark.ml.classification.LogisticRegression
        import org.apache.spark.ml.param.ParamMap
        import org.apache.spark.ml.linalg.{Vector, Vectors}
        import org.apache.spark.ml.regression.LinearRegressionModel
        import org.apache.spark.sql.{Row, SparkSession}

        //線性迴歸實現酒水等級評估0-10
        object SparkScalaML1 {

            def main(args: Array[String]): Unit = {
                val sess = SparkSession.builder()
                        .appName("ml")
                        .master("local[4]")
                        .getOrCreate();

                val sc = sess.sparkContext;
                //資料目錄
                val dataDir = "file:///D:/share/spark/ml/winequality-white.csv"
                //定義樣例類
                case class Wine(FixedAcidity: Double, VolatileAcidity: Double,
                                CitricAcid: Double, ResidualSugar: Double, Chlorides: Double,
                                FreeSulfurDioxide: Double, TotalSulfurDioxide: Double, Density: Double, PH:
                                Double, Sulphates: Double, Alcohol: Double, Quality: Double)

                //變換
                val wineDataRDD = sc.textFile(dataDir)
                        .map(_.split(";"))
                        .map(
                            w => Wine(w(0).toDouble, w(1).toDouble,
                                w(2).toDouble, w(3).toDouble, w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble
                                , w(9).toDouble, w(10).toDouble, w(11).toDouble)
                        )

                //轉換RDD成DataFrame
                import sess.implicits._
                val trainingDF = wineDataRDD.map(w => (w.Quality,
                        Vectors.dense(w.FixedAcidity, w.VolatileAcidity, w.CitricAcid,
                            w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
                            w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")

                trainingDF.show()
                println("=============================================")

                //建立線性迴歸物件
                val lr = new LinearRegression()
                //設定最大迭代次數
                //lr.setMaxIter(10).setRegParam(0.01)
                lr.setMaxIter(10)
                //通過線性迴歸,擬合訓練資料,生成model
                val model = lr.fit(trainingDF)

                //建立測試Dataframe
        //        val testDF = sess.createDataFrame(Seq((1.0,Vectors.dense(6.1, 0.32, 0.24, 1.5, 0.036, 43, 140, 0.9894, 3.36, 0.64, 10.7)),
        //            (0.0, Vectors.dense(5.2, 0.44, 0.04, 1.4, 0.036, 38, 124, 0.9898, 3.29, 0.42, 12.4)),
        //            (0.0,Vectors.dense(7.2, 0.32, 0.47, 5.1, 0.044, 19, 65, 0.9951, 3.38, 0.36, 9)),
        //            (0.0, Vectors.dense(6.4, 0.595, 0.14, 5.2, 0.058, 15, 97, 0.991, 3.03, 0.41, 12.6)))
        //        ).toDF("label", "features")

                val testDF = sess.createDataFrame(Seq((5.0000, Vectors.dense(7.4,
                    0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 0.9968, 3.2, 0.68,9.8)),(5.00000,
                        Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 0.9978, 3.51, 0.56,
                            9.4)),(7.00000, Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 0.9968,
                    3.36, 0.57, 9.5)))).toDF("label", "features")

                //顯式測試資料
                testDF.show();
                println("========================")
                //預測測試資料(帶標籤),評測模型的質量。
                testDF.createOrReplaceTempView("test")
                val tested = model.transform(testDF).select("features", "label", "prediction")
                tested.show();

                println("========================")
                //預測無標籤的測試資料。
                val predictDF = sess.sql("SELECT features FROM test")
                //預測結果
                val predicted = model.transform(predictDF).select("features", "prediction")
                predicted.show();
            }
        }


    3.Java程式碼實現 =====》 TODO

    4.Wine案例機器學習步驟分析
        a.讀取訓練資料,生成訓練資料框
        b.建立LinearRegression線性迴歸物件
        c.通過訓練資料擬合出模型,完成評估管線
        d.讀取帶評級結果的測試資料,生成測試資料框,用於測試[測試資料要包含正確的評級結果,以便於校驗模型是否完美預測]
        e.使用模型對測試資料進行變換,產生新的資料框,抽取特徵,完成預測評級,輸出預測評級結果
        f.使用OK的模型,對生產資料進行變換,完成生產資料的評級

    5.邏輯迴歸實現酒水分類
        
import org.apache.spark.ml.classification.LogisticRegression
            import org.apache.spark.ml.linalg.Vectors
            import org.apache.spark.ml.regression.LinearRegression
            import org.apache.spark.sql.SparkSession


            /**
              * 邏輯迴歸實現酒水分類 --好/壞 0/1
              */
            object SparkML2 {

                def main(args: Array[String]): Unit = {
                    val sess = SparkSession.builder()
                            .appName("ml")
                            .master("local[4]")
                            .getOrCreate();

                    val sc = sess.sparkContext;
                    //資料目錄
                    val dataDir = "file:///D:/share/spark/ml/winequality-white.csv"
                    //定義樣例類
                    case class Wine(FixedAcidity: Double, VolatileAcidity: Double,
                                    CitricAcid: Double, ResidualSugar: Double, Chlorides: Double,
                                    FreeSulfurDioxide: Double, TotalSulfurDioxide: Double, Density: Double, PH:
                                    Double, Sulphates: Double, Alcohol: Double, Quality: Double)

                    //變換
                    val wineDataRDD = sc.textFile(dataDir)
                            .map(_.split(";"))
                            .map(
                                w => Wine(w(0).toDouble, w(1).toDouble,
                                    w(2).toDouble, w(3).toDouble, w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble
                                    , w(9).toDouble, w(10).toDouble, w(11).toDouble)
                            )

                    //轉換RDD成DataFrame
                    import sess.implicits._
                    val trainingDF = wineDataRDD.map(w => (if(w.Quality < 7) 0D else
                        1D, Vectors.dense(w.FixedAcidity, w.VolatileAcidity, w.CitricAcid,
                        w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
                        w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")

                    trainingDF.show()
                    println("=============================================")

                    //建立線性迴歸物件
                    val lr = new LogisticRegression()
                    //設定最大迭代次數
                    //lr.setMaxIter(10).setRegParam(0.01)
                    lr.setMaxIter(10).setRegParam(0.01)
                    //通過線性迴歸,擬合訓練資料,生成model
                    val model = lr.fit(trainingDF)

                    //val loadmodel = LinearRegressionModel.load("file:///d:/share/spark/model");

                    //model.save("file:///d:/share/spark/model")

                    //建立測試Dataframe
                    //        val testDF = sess.createDataFrame(Seq((1.0,Vectors.dense(6.1, 0.32, 0.24, 1.5, 0.036, 43, 140, 0.9894, 3.36, 0.64, 10.7)),
                    //            (0.0, Vectors.dense(5.2, 0.44, 0.04, 1.4, 0.036, 38, 124, 0.9898, 3.29, 0.42, 12.4)),
                    //            (0.0,Vectors.dense(7.2, 0.32, 0.47, 5.1, 0.044, 19, 65, 0.9951, 3.38, 0.36, 9)),
                    //            (0.0, Vectors.dense(6.4, 0.595, 0.14, 5.2, 0.058, 15, 97, 0.991, 3.03, 0.41, 12.6)))
                    //        ).toDF("label", "features")

                    val testDF = sess.createDataFrame(Seq((5.0000, Vectors.dense(7.4,
                        0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 0.9968, 3.2, 0.68, 9.8)), (5.00000,
                            Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 0.9978, 3.51, 0.56,
                                9.4)), (7.00000, Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 0.9968,
                        3.36, 0.57, 9.5)))).toDF("label", "features")

                    //顯式測試資料
                    testDF.show();
                    println("========================")
                    //預測測試資料(帶標籤),評測模型的質量。
                    testDF.createOrReplaceTempView("test")
                    val tested = model.transform(testDF).select("features", "label", "prediction")
                    tested.show();

                    println("========================")
                    //預測無標籤的測試資料。
                    val predictDF = sess.sql("SELECT features FROM test")
                    //預測結果
                    val predicted = model.transform(predictDF).select("features", "prediction")
                    predicted.show();
                }

            }

    6.Java程式碼實現 =====》 TODO


五、模型的儲存和載入
------------------------------------------------------
    1.模型持久化Save
        //建立線性迴歸物件
        val lr = new LogisticRegression()
        //設定最大迭代次數
        //lr.setMaxIter(10).setRegParam(0.01)
        lr.setMaxIter(10)
        //通過線性迴歸,擬合訓練資料,生成model
        val model = lr.fit(trainingDF)
        model.save("file:///d:/share/spark/model")

    2.模型的載入
        val loadmodel = LinearRegressionModel.load("file:///d:/share/spark/model");


六、垃圾郵件過濾[Hash詞頻 + 邏輯迴歸]
------------------------------------------------------
    
import org.apache.spark.ml.classification.LogisticRegression
    import org.apache.spark.ml.param.ParamMap
    import org.apache.spark.ml.linalg.{Vector, Vectors}
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.feature.{HashingTF, RegexTokenizer, StopWordsRemover, Tokenizer, Word2Vec}
    /**
      * 垃圾郵件過濾
      */
    object SparkMLSpamFilter {

        def main(args: Array[String]): Unit = {

            val spark = SparkSession.builder()
                    .appName("ml")
                    .master("local[4]")
                    .getOrCreate();

            val sc = spark.sparkContext;

            //訓練資料
            val training = spark.createDataFrame(Seq(
                ("[email protected]", "hope you are well", 0.0),
                ("[email protected]", "nice to hear from you", 0.0),
                ("[email protected]", "happy holidays", 0.0),
                ("[email protected]", "see you tomorrow", 0.0),
                ("[email protected]", "save money", 1.0),
                ("[email protected]", "low interest rate", 1.0),
                ("[email protected]", "cheap loan", 1.0))
            ).toDF("email", "message", "label")

            training.show();

            //分詞器:將輸入的資料轉成小寫,然後按照若干空格分割開來,輸出新的列
            val tokenizer = new Tokenizer().setInputCol("message").setOutputCol("words")

            //hash詞頻  --- 設定桶數,設定輸入列,設定輸出列
            val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol("words").setOutputCol("features")

            //邏輯迴歸物件 --- 迭代10次,引數0.01
            val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)

            //新建管線
            val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr));

            //管線擬合數據,產生模型
            val model = pipeline.fit(training)

            //測試資料
            val test = spark.createDataFrame(Seq(
                ("[email protected]", "how are you"),
                ("[email protected]", "hope doing well"),
                ("[email protected]", "want some money"),
                ("[email protected]", "secure loan"),
                ("[email protected]", "need loan"))
            ).toDF("email", "message")

            test.show()

            //測試資料的結果
            val prediction = model.transform(test).select("email", "message", "prediction")
            //展示測試結果
            prediction.show()

            println("==================================")
            //分詞,輸出words
            val wordsDF = tokenizer.transform(training)
            wordsDF.show();
            println("==================================")

            val featurizedDF = hashingTF.transform(wordsDF)
            featurizedDF.show()
            println("==================================")

            featurizedDF.createOrReplaceTempView("featurized")
            val selectedFeaturizedFieldstDF = spark.sql("SELECT words, features FROM featurized")
            selectedFeaturizedFieldstDF.show()
        }
    }

七、推薦[最小二乘法]
-------------------------------------------------------
    1.最小二乘法ALS
        點距離求和的最小值

    2.訓練資料[test.data]
        1,0,1.0
        1,1,2.0
        1,2,5.0
        1,3,5.0
        1,4,5.0
        2,0,1.0
        2,1,2.0
        2,2,5.0
        2,5,5.0
        2,6,4.5
        3,1,2.5
        3,2,5.0
        3,3,4.0
        3,4,3.0
        4,0,5.0
        4,1,5.0
        4,2,5.0
        4,3,0.0

    3.scala實現
        
package test.spark.examples.mllib

        import org.apache.spark.{SparkConf, SparkContext}
        // $example on$
        import org.apache.spark.mllib.recommendation.ALS
        import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
        import org.apache.spark.mllib.recommendation.Rating

        /**
          * 推薦
          */
        object RecomendationDemo {
            def main(args: Array[String]): Unit = {

                val conf = new SparkConf().setAppName("CollaborativeFilteringExample").setMaster("local[*]")

                val sc = new SparkContext(conf)
                // $example on$
                // 載入和解析資料
                val data = sc.textFile("file:///D:/share/spark/ml/data/mllib/als/test.data")

                //將資料轉換成Rating評分物件
                val ratings = data.map(
                    _.split(',') match
                    {
                        case Array(user, item, rate) =>
                            Rating(user.toInt, item.toInt, rate.toDouble)
                    }
                )

                println("==========    ratings:原始資料集    ===============")
                ratings.collect().foreach(println);

                // 使用最小二乘法,構建模型
                val rank = 10
                val numIterations = 10
                val model = ALS.train(ratings, rank, numIterations, 0.01)

                //通過測試資料,測試模型的準確性
                //準備測試資料,去掉評分
                val usersProducts = ratings.map { case Rating(user, product, rate) =>
                    (user, product)
                }

                println("==========    usersProducts:測試資料    ===============")
                usersProducts.collect().foreach(println)

                //對usersProducts進行預測,產生rate
                val predictions =
                    model.predict(usersProducts).map { case Rating(user, product, rate) =>
                        ((user, product), rate)
                    }
                println("==========    predictions:預測結果    ===============")
                predictions.collect().foreach(println)

                //對比
                val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
                    ((user, product), rate)
                }.join(predictions)
                println("==========    ratings + predictions:對比真實和預測結果    ===============")
                ratesAndPreds.collect().foreach(println)

                //給2號使用者推薦5款商品
                val res = model.recommendProducts(2,5);
                println("==========    res:給2號客戶的推薦結果    ===============")
                res.foreach(println)

                //計算誤差
                val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
                    val err = (r1 - r2)
                    err * err
                }.mean()
                println("Mean Squared Error = " + MSE)

                // 儲存和載入模型的方式
                model.save(sc, "target/tmp/myCollaborativeFilter")
                val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
                // $example off$
            }
        }
    4.java實現
        
package test.spark.examples.mllib;

        import scala.Tuple2;
        import org.apache.spark.api.java.*;
        import org.apache.spark.api.java.function.Function;
        import org.apache.spark.mllib.recommendation.ALS;
        import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
        import org.apache.spark.mllib.recommendation.Rating;
        import org.apache.spark.SparkConf;

        public class RecomendationDemoJava {

            public static void main(String[] args) {
                // $example on$
                SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example").setMaster("local[*]");
                JavaSparkContext jsc = new JavaSparkContext(conf);

                //載入和解析資料
                String path = "file:///D:/share/spark/ml/data/mllib/als/test.data";
                JavaRDD<String> data = jsc.textFile(path);
                JavaRDD<Rating> ratings = data.map(
                        new Function<String, Rating>() {
                            public Rating call(String s) {
                                String[] sarray = s.split(",");
                                return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),
                                        Double.parseDouble(sarray[2]));
                            }
                        }
                );
                System.out.println("==========    ratings:原始資料集    ===============");
                ratings.collect().forEach( x -> System.out.println(x));


                //使用最小二乘法構建模型
                int rank = 10;
                int numIterations = 10;
                MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);

                //通過測試資料,測試模型的準確性
                //準備測試資料,去掉評分
                JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
                        new Function<Rating, Tuple2<Object, Object>>() {
                            public Tuple2<Object, Object> call(Rating r) {
                                return new Tuple2<Object, Object>(r.user(), r.product());
                            }
                        }
                );

                System.out.println(("==========    usersProducts:測試資料    ==============="));
                userProducts.collect().forEach(x -> System.out.println(x._1 + ":" + x._2));

                //對usersProducts進行預測,產生rate
                JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
                        model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
                                new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
                                    public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
                                        return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
                                    }
                                }
                        ));

                System.out.println(("==========    predictions:預測結果    ==============="));
                predictions.collect().forEach(x-> System.out.println(x._1 + ":" + x._2));

                System.out.println(("==========    給2號客戶推薦5款商品    ==============="));
                Rating[] ratings1 = model.recommendProducts(2, 5);
                for(Rating r : ratings1)
                {
                    System.out.println(r.user() + ":" + r.product() + ":" + r.rating());
                }

                JavaRDD<Tuple2<Double, Double>> ratesAndPreds =
                        JavaPairRDD.fromJavaRDD(ratings.map(
                                new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
                                    public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
                                        return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
                                    }
                                }
                        )).join(predictions).values();

                System.out.println(("==========    ratings + predictions:對比真實和預測結果    ==============="));
                ratesAndPreds.collect().forEach(x -> System.out.println(x._1 + ":" + x._2));

                //計算誤差
                double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(
                        new Function<Tuple2<Double, Double>, Object>() {
                            public Object call(Tuple2<Double, Double> pair) {
                                Double err = pair._1() - pair._2();
                                return err * err;
                            }
                        }
                ).rdd()).mean();
                System.out.println("Mean Squared Error = " + MSE);

                // 模型的儲存和載入
                //model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");
                //MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),
                //"target/tmp/myCollaborativeFilter");
                // $example off$

                jsc.stop();
            }
        }
    5.猜你喜歡--推薦
        /*******向用戶推薦n款商品********/
        val res = model.recommendProducts(5,8);

        /*******將指定的商品推薦給n個使用者********/
        val res = model.recommendUsers(3,5)

        /*******向所有使用者推薦3種商品********/
        val res = model.recommendProductsForUsers(3)


八、電影推薦案例
---------------------------------------------------------------
    1.元資料
        0::2::3::1424380312
        0::3::1::1424380312
        0::5::2::1424380312
        0::9::4::1424380312
        0::11::1::1424380312
        0::12::2::1424380312
        1::15::1::1424380312
        2::17::1::1424380312
        2::19::1::1424380312
        ...

    2.scala實現
        
import org.apache.spark.{SparkConf, SparkContext}
        import org.apache.spark.ml.evaluation.RegressionEvaluator
        import org.apache.spark.ml.recommendation.ALS
        import org.apache.spark.sql.SparkSession


        object MovieRecc {

            //定義評級樣例類【注意:樣例類千萬不要定義到main函式內部,會解析失敗】
            case class Rating0(userId: Int, movieId: Int, rating: Float, timestamp: Long)

            def main(args: Array[String]): Unit = {
                val conf = new SparkConf();
                conf.setAppName("movieRecomm");
                conf.setMaster("local[4]")

                val spark = SparkSession.builder().config(conf).getOrCreate() ;
                import spark.implicits._

                //解析評級
                def parseRating(str: String): Rating0 = {
                    val fields = str.split("::")
                    assert(fields.size == 4)
                    Rating0(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
                }

                //轉換成Rating的DF物件
                var ratings = spark.sparkContext.textFile("file:///D:\\share\\spark\\ml\\data\\mllib\\als\\sample_movielens_ratings.txt");
                val ratings0 = ratings.map(parseRating)
                val df = ratings0.toDF()
                println("=========  df:資料來源  ==========================")
                df.collect().foreach(println)

                //隨機切割訓練資料,生成兩個一個數組,第一個元素是training,第二個是test
                val Array(training, test) = df.randomSplit(Array(0.99, 0.01))

                //構建ALS推薦演算法並設定引數
                val als = new ALS().setMaxIter(5)
                        .setRegParam(0.01)
                        .setUserCol("userId")
                        .setItemCol("movieId")
                        .setRatingCol("rating")

                //通過als物件對訓練資料進行擬合,生成推薦模型
                val model = als.fit(training)

                //使用模型對test,進行結果預測
                val predictions = model.transform(test);
                println("=========  predictions 預測結果  ==========================")
                predictions.collect().foreach(println)
            }
        }


    3.java實現
        













package mllib;
        import java.io.Serializable;

        import org.apache.spark.SparkConf;
        import org.apache.spark.SparkContext;
        import org.apache.spark.api.java.JavaRDD;
        import org.apache.spark.ml.evaluation.RegressionEvaluator;
        import org.apache.spark.ml.recommendation.ALS;
        import org.apache.spark.ml.recommendation.ALSModel;
        import org.apache.spark.sql.Dataset;
        import org.apache.spark.sql.Row;
        import org.apache.spark.sql.SparkSession;

        public class MovieReccJava {

            public static void main(String[] args) {
                SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("movie");
                SparkContext sc = new SparkContext(conf);
                SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

                JavaRDD<Rating0> ratingsRDD = spark
                        .read().textFile("file:///D:\\share\\spark\\ml\\data\\mllib\\als\\sample_movielens_ratings.txt").javaRDD()
                        .map(Rating0::parseRating);

                Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating0.class);
                Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.99, 0.01});
                Dataset<Row> training = splits[0];
                Dataset<Row> test = splits[1];

                System.out.println(("=========  trains 訓練資料  =========================="));
                training.show();
                System.out.println(("=========  test 測試資料  =========================="));
                test.show();


                // Build the recommendation model using ALS on the training data
                ALS als = new ALS()
                        .setMaxIter(5)
                        .setRegParam(0.01)
                        .setUserCol("userId")
                        .setItemCol("movieId")
                        .setRatingCol("rating");
                ALSModel model = als.fit(training);

                Dataset<Row> predictions = model.transform(test);

                System.out.println(("=========  predictions 預測結果  =========================="));
                predictions.show();
            }
        }