1. 程式人生 > >Spark學習筆記——文本處理技術

Spark學習筆記——文本處理技術

使用 ken ins main 最小 leg tran sparse rain

1.建立TF-IDF模型

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.{SparseVector => SV}
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.feature.IDF

/**
  * Created by common on 17-5-6.
  */
object TFIDF {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)

//    val path = "hdfs://master:9000/user/common/20Newsgroups/20news-bydate-train/*"
    val path = "file:///media/common/工作/kaggle/test/*"
    val rdd = sc.wholeTextFiles(path)

    // 提取文本信息
    val text = rdd.map { case (file, text) => text }
    //    print(text.count())

    val regex = """[^0-9]*""".r

    // 排除停用詞
    val stopwords = Set(
      "the", "a", "an", "of", "or", "in", "for", "by", "on", "but", "is", "not",
      "with", "as", "was", "if",
      "they", "are", "this", "and", "it", "have", "from", "at", "my",
      "be", "that", "to"
    )

    // 以使用正則表達切分原始文檔來移除這些非單詞字符
    val nonWordSplit = text.flatMap(t =>
      t.split("""\W+""").map(_.toLowerCase))

    // 過濾掉數字和包含數字的單詞
    val filterNumbers = nonWordSplit.filter(token =>
      regex.pattern.matcher(token).matches)

    // 基於出現的頻率,排除很少出現的單詞,需要先計算一遍整個測試集
    val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _)
    val rareTokens = tokenCounts.filter { case (k, v) => v < 2 }.map {
      case (k, v) => k
    }.collect.toSet

    // 每一個文檔的預處理函數
    def tokenize(line: String): Seq[String] = {
      line.split("""\W+""")
        .map(_.toLowerCase)
        .filter(token => regex.pattern.matcher(token).matches)
        .filterNot(token => stopwords.contains(token))
        .filterNot(token => rareTokens.contains(token))
        .filter(token => token.size >= 2) //刪除只有一個字母的單詞
        .toSeq
    }

    // 每一篇文檔經過預處理之後,每一個文檔成為一個Seq[String]
    val tokens = text.map(doc => tokenize(doc)).cache()

    println(tokens.distinct.count)
    // 第一篇文檔第一部分分詞之後的結果
    println(tokens.first())
    println(tokens.first().length)

    // 生成2^18維的特征
    val dim = math.pow(2, 18).toInt
    val hashingTF = new HashingTF(dim)

    // HashingTF 的 transform 函數把每個輸入文檔(即詞項的序列)映射到一個MLlib的Vector對象
    val tf = hashingTF.transform(tokens)
    // tf的長度是文檔的個數,對應的是文檔和維度的矩陣
    tf.cache

    // 取得第一個文檔的向量
    val v = tf.first.asInstanceOf[SV]
    println(v.size)
    // v.value和v.indices的長度相等,value是詞頻,indices是詞頻非零的下標
    println(v.values.size)
    println(v.indices.size)
    println(v.values.toSeq)
    println(v.indices.take(10).toSeq)

    // 對每個單詞計算逆向文本頻率
    val idf = new IDF().fit(tf)
    // 轉換詞頻向量為TF-IDF向量
    val tfidf = idf.transform(tf)
    val v2 = tfidf.first.asInstanceOf[SV]
    println(v2.values.size)
    println(v2.values.take(10).toSeq)
    println(v2.indices.take(10).toSeq)

    // 計算整個文檔的TF-IDF最小和最大權值
    val minMaxVals = tfidf.map { v =>
      val sv = v.asInstanceOf[SV]
      (sv.values.min, sv.values.max)
    }
    val globalMinMax = minMaxVals.reduce { case ((min1, max1),
    (min2, max2)) =>
      (math.min(min1, min2), math.max(max1, max2))
    }
    println(globalMinMax)

    // 比較幾個單詞的TF-IDF權值
    val common = sc.parallelize(Seq(Seq("you", "do", "we")))
    val tfCommon = hashingTF.transform(common)
    val tfidfCommon = idf.transform(tfCommon)
    val commonVector = tfidfCommon.first.asInstanceOf[SV]
    println(commonVector.values.toSeq)

    val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation","investment")))
    val tfUncommon = hashingTF.transform(uncommon)
    val tfidfUncommon = idf.transform(tfUncommon)
    val uncommonVector = tfidfUncommon.first.asInstanceOf[SV]
    println(uncommonVector.values.toSeq)

  }


}

Spark學習筆記——文本處理技術