初學者入門-用Spark ML來處理超大資料

還是轉譯KDNuggets的文章。微軟的Dmitry Petrov介紹的如何用Spark ML來處理超過記憶體大小的資料。原文的 Link






下面要做的事情就是配置Spark環境。原文的例子是跑在Spark 1.5.1的環境下,我實際用的是1.5.2的配合Hadoop 2.6。具體的安裝步驟參考另外一個博文:hadoop叢集的搭建指令碼及構思(N):一個簡化的Hadoop+Spark on Yarn叢集快速搭建



//import scala.xml._

// Spark data manipulation libraries
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark._

// Spark machine learning libraries
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.Pipeline

object postsClassifier {
  def main(args: Array[String]){
  val conf = new SparkConf().setAppName("BinaryClassifier");  //need to create SparkConf for SparkContext
  val sc = new SparkContext(conf);                            //need to change the sc in shell to SparkContext in IDE
  val fileName = "hdfs://Master1:9000/xml/Posts.xml"
  val textFile = sc.textFile(fileName)
  val postsXml = textFile.map(_.trim).
   filter(!_.startsWith("<?xml version=")).
   filter(_ != "<posts>").
   filter(_ != "</posts>")
  val postsRDD = postsXml.map { s =>
     val xml = XML.loadString(s)
     val id = (xml \ "@Id").text
     val tags = (xml \ "@Tags").text
     val title = (xml \ "@Title").text
     val body = (xml \ "@Body").text
     val bodyPlain = ("<\\S+>".r).replaceAllIn(body, " ")
     val text = (title + " " + bodyPlain).replaceAll("\n", 
      " ").replaceAll("( )+", " ");
     Row(id, tags, text)
  val schemaString = "Id Tags Text"
  val schema = StructType(
     schemaString.split(" ").map(fieldName => 
      StructField(fieldName, StringType, true)))

  val sqlContext = new SQLContext(sc)                          //need to change the sqlContext in shell to SQLContext in IDE 
  val postsDf = sqlContext.createDataFrame(postsRDD, schema)
  val targetTag = "java"
  val myudf: (String => Double) = (str: String) => 
      {if (str.contains(targetTag)) 1.0 else 0.0}

  val sqlfunc = udf(myudf)
  val postsLabeled = postsDf.withColumn("Label", sqlfunc(col("Tags")) )
  val positive = postsLabeled.filter("Label > 0.0")              //something is wrong here, need to check the DataFrame's filter method documents
  val negative = postsLabeled.filter("Label < 1.0")              //need to enclose the whole express with "", not just a ' in one side as the original codes 

  val positiveTrain = positive.sample(false, 0.9)
  val negativeTrain = negative.sample(false, 0.9)
  val training = positiveTrain.unionAll(negativeTrain)
  val negativeTrainTmp = negativeTrain
    .withColumnRenamed("Label", "Flag").select("Id", "Flag")  //need to enclose the whole express with "", not just a ' in one side as the original codes
  val negativeTest = negative.join(negativeTrainTmp, negative("Id") === negativeTrainTmp("Id"), "leftouter")  //need to change double == to triple ===
    .filter("Flag is null")
    .select(negative("Id"), negative("Tags"), negative("Text"), negative("Label"))                            //need to add dataframe name to all column names
  val positiveTrainTmp = positiveTrain
      .withColumnRenamed("Label", "Flag")
      .select("Id", "Flag")
  val positiveTest = positive.join( positiveTrainTmp, positive("Id") === positiveTrainTmp("Id"), "leftouter")  //need to change double == to triple ===
    .filter("Flag is null")
    .select(positive("Id"), positive("Tags"), positive("Text"), positive("Label"))                            //need to add dataframe name to all column names
  val testing = negativeTest.unionAll(positiveTest)
  val numFeatures = 64000
  val numEpochs = 30
  val regParam = 0.02

  val tokenizer = new Tokenizer().setInputCol("Text")
  val hashingTF = new  org.apache.spark.ml.feature.HashingTF()
  val lr = new LogisticRegression().setMaxIter(numEpochs)
  val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, lr))
  val model = pipeline.fit(training)
  val testTitle = 
   "Easiest way to merge a release into one JAR file"
  //val tBoby = """Is there a tool or script which easily merges a bunch of href="http:/en.wikipedia.org/wiki/JAR_%28file_format %29" JAR files into one JAR file? A bonus would be to easily set the main-file manifest and make it executable. I would like to run it with something like: As far as I can tell, it has no dependencies which indicates that it shouldn't be an easy single-file tool, but the downloaded ZIP file contains a lot of libraries."""
  val testText = testTitle + """Is there a tool or script which easily merges a bunch of href="http:/en.wikipedia.org/wiki/JAR_%28file_format %29" JAR files into one JAR file? A bonus would be to easily set the main-file manifest and make it executable. I would like to run it with something like: As far as I can tell, it has no dependencies which indicates that it shouldn't be an easy single-file tool, but the downloaded ZIP file contains a lot of libraries."""
  val testDF = sqlContext
     .createDataFrame(Seq( (99.0, testText)))
     .toDF("Label", "Text")
  val result = model.transform(testDF)
  val prediction = result.collect()(0)(6)
  printf("Prediction: "+ prediction)
  val testingResult = model.transform(testing)
  val testingResultScores = testingResult
     .select("Prediction", "Label").rdd
     .map(r => (r(0).asInstanceOf[Double], r(1)
  val bc = 
     new BinaryClassificationMetrics(testingResultScores)
  val roc = bc.areaUnderROC
  printf("Area under the ROC:" + roc)