1. 程式人生 > >spark中文文字分類

spark中文文字分類

最近要做一個點評中文的文字分類模型,想在spark中訓練模型,然後通過把tf過程、idf過程、以及模型封裝到一個類裡面一個java類中,然後寫到redis中,但是spark中idf過程中碰到一些困難,忘高手賜教,先看下面程式碼:
package com.meituan.spark.model

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.SQLContext
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import com.meituan.nlp.util.WordUtil
import com.meituan.nlp.util.TextUtil
import org.apache.spark.mllib.feature.IDF
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.feature.VectorTransformer
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.classification.NaiveBayes
import com.meituan.model.util.NaiveBaysianBean
import com.alibaba.fastjson.JSON
object Test {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("AnalyzeSlotPrice").setMaster("local[16]")
    val start = System.currentTimeMillis()
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val rdd = sc.textFile("/Users/shuubiasahi/Desktop/一個月內資料.csv")
    rdd.map { x =>
      val spt = x.split("\t")
      (spt.apply(1))
    }.map(x => (x, 1)).reduceByKey(_ + _).take(10).foreach(println)

    val data = rdd.map {
      x =>
        val spt = x.split("\t")
        val content = spt.apply(0)
        val result = ToAnalysis.parse(TextUtil.fan2Jian(WordUtil.replaceAll(content))).
          getTerms.asScala.map(x => x.getName).filter { x => !WordUtil.isStopword(x) && x.length() > 1 }.mkString(" ")
         val label=spt.apply(1)
//        val label = if (spt.apply(1) == "-1") "-1" else "1"
        (label, result)
    }
    data.map(x => (x._1, 1)).reduceByKey(_ + _).take(10).foreach(println)
    val splits = data.randomSplit(Array(0.7, 0.3))
    val train = splits(0)
    val test = splits(1)

    val hashingTF = new HashingTF(500000)
    val tf_num_pairs_train = train.map(x => (x._1, hashingTF.transform(x._2.split(" ").toSeq)))
    val idf = new IDF().fit(tf_num_pairs_train.values)
    val num_idf_pairs_train = tf_num_pairs_train.mapValues(v => idf.transform(v)).map(x => LabeledPoint(x._1.toDouble, Vectors.dense(x._2.toArray)))

//        val num_idf_pairs_train = tf_num_pairs_train.map(x => LabeledPoint(x._1.toDouble, Vectors.dense(x._2.toArray)))

        
    val model = NaiveBayes.train(num_idf_pairs_train, lambda =5, modelType = "multinomial")

    val tf_num_pairs_test = test.map(x => (x._1, hashingTF.transform(x._2.split(" ").toSeq)))
    val idf1 = new IDF().fit(tf_num_pairs_test.values)
    val num_idf_pairs_test = tf_num_pairs_test.mapValues(v => idf1.transform(v)).map(x => LabeledPoint(x._1.toDouble, Vectors.dense(x._2.toArray)))

//      val num_idf_pairs_test = tf_num_pairs_test.map(x => LabeledPoint(x._1.toDouble, Vectors.dense(x._2.toArray)))

      
      
    val testpredictionAndLabel = num_idf_pairs_test.map(p => (model.predict(p.features), p.label))
    var testaccuracy = 1.0 * testpredictionAndLabel.filter(x => x._1 == x._2).count() / num_idf_pairs_test.count()
    print("output:"+testaccuracy)
    

    /*val modelJson=JSON.toJSON(model).toString()
     val  naiveBaysianModel = new NaiveBaysianBean()
       naiveBaysianModel.setHashingTf(hashingTF)
       naiveBaysianModel.setIdf(idf)
       naiveBaysianModel.setModelBase64(modelJson)*/

  }

}


 在程式碼中可以看出,tf過程可以處理seq[str]   ,但是idf過程要處理的是Rdd[vector]  這個真不好弄,不怎麼怎麼封裝到java類中,不再依賴於spark,在別的地方呼叫,希望高手賜教。。。。。