1. 程式人生 > >mllib實踐(一)之LinearRegressionWithSGD實踐(整合網際網路上多個例項)

mllib實踐(一)之LinearRegressionWithSGD實踐(整合網際網路上多個例項)

package mllib;

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.{ LabeledPoint, LinearRegressionWithSGD }

object App {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("kimiYang");
    val sc = new SparkContext(conf);
    //val data = sc.textFile("/test/kimi.txt");

//好些文章直接使用了LIBSVM格式

label index1:value1 index2:value2 ...

,這裡使用的是普通標籤格式   

標籤,特徵值1 特徵值2 特徵值3...

//y, x1 x2


    val data = sc.textFile("/home/hadoop/mllibdata/kimi.txt");

    val parseData = data.map { line =>
      val parts = line.split(',') //根據逗號進行分割槽
      LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
    }.cache() //轉化資料格式

    //網上文章最新都是使用dataset方式,這裡使用的仍舊是rdd方式傳入
    println("data rdd print:")
    parseData.foreach(println)

    //val model = LinearRegressionWithSGD.train(parseData, 100, 0.1) //建立模型

//遮蔽這裡原呼叫方式,改用存在intercept截距的函式形式
    var lr = new LinearRegressionWithSGD().setIntercept(true)
    lr.optimizer.setNumIterations(100).setStepSize(0.1).setMiniBatchFraction(1.0)
    val model = lr.run(parseData)

//列印斜率與截距  y=i+w1x1+w2x2       i:  model.intercept    w:   model.weights

 

  println("model weights:")
    println(model.weights)
    println("model intercept:")
    println(model.intercept)

    //通過模型預測模型
    // 對樣本進行測試
    val prediction = model.predict(parseData.map(_.features))
    val predictionAndLabel = prediction.zip(parseData.map(_.label))
    // 選取前100個樣本
    val show_predict = predictionAndLabel.take(100)
    val n = parseData.count()

    println("Prediction" + "\t" + "Label" + "\t" + "Diff")
    for (i <- 0 to show_predict.length - 1) {
      val diff = show_predict(i)._1 - show_predict(i)._2
      println(show_predict(i)._1 + "\t" + show_predict(i)._2 + "\t" + diff)
    }
    // 計算均方差
    val loss = predictionAndLabel.map {
      case (p, l) =>
        val err = p - l
        err * err
    }.reduce(_ + _)

    val rmse = math.sqrt(loss / n)
    println("RMSE result:", rmse)

    //預測特徵點 ( 2,1)
    val result = model.predict(Vectors.dense(2, 1))
    println("result:")
    println(result) //列印預測結果
        //預測特徵點 (1,1)

//模型評估裡有1,1的列印,相同,拉出來看一下

//列印斜率與截距  y=i+w1x1+w2x2       i:  model.intercept    w:   model.weights

//4.944400642057738=1.2558550451436383+2.337819138213189*1+1.3507264587009111*1


    val result1 = model.predict(Vectors.dense(1, 1))
    println("result1:")
    println(result1) //列印預測結果
    sc.stop
  }
}

console輸出:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/10/16 17:34:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/10/16 17:34:22 WARN Utils: Your hostname, dblab-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.4 instead (on interface enp0s3)
18/10/16 17:34:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
data rdd print:
(5.0,[1.0,1.0])
(7.0,[2.0,1.0])
(9.0,[3.0,2.0])
(11.0,[4.0,1.0])
(19.0,[5.0,3.0])
(18.0,[6.0,2.0])
18/10/16 17:34:25 WARN Executor: 1 block locks were not released by TID = 1:
[rdd_2_0]
18/10/16 17:34:26 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
18/10/16 17:34:26 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
model weights:
[2.337819138213189,1.3507264587009111]
model intercept:
1.2558550451436383
Prediction	Label	Diff
4.944400642057738	5.0	-0.05559935794226156
7.282219780270927	7.0	0.2822197802709274
10.970765377185028	9.0	1.9707653771850282
11.957858056697306	11.0	0.9578580566973063
16.997130112312316	19.0	-2.002869887687684
17.984222791824592	18.0	-0.01577720817540751
(RMSE result:,1.2176400829045526)
result:
7.282219780270927
result1:
4.944400642057738

/home/hadoop/mllibdata/kimi.txt檔案內容很簡單,自己copy成檔案

5,1 1
7,2 1
9,3 2
11,4 1
19,5 3
18,6 2