1. 程式人生 > >Spark機器學習:TF-IDF實現原理

Spark機器學習:TF-IDF實現原理

先簡單地介紹下什麼是TF-IDF(詞頻-逆文件頻率),它可以反映出語料庫中某篇文件中某個詞的重要性。假設t表示某個詞,d表示一篇文件,則詞頻TF(t,d)是某個詞t在文件d中出現的次數,而文件DF(t,D)是包含詞t的文件數目。為了過濾掉常用的片語,如"the" "a" "of" "that",我們使用逆文件頻率來度量一個詞能提供多少資訊的數值: 

IDF(t,D)=log(|D|+1)/(DF(t,D)+1)

這裡|D|表示語料庫的文件總數,為了不讓分母為了0,在此進行了加1平滑操作。而詞頻-逆文件頻率就是TF和IDF的簡單相乘:

TFIDF(t,d,D)=TF(t,d)*IDF(t,D)

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.feature.IDF

object TF_IDF_Test {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("TfIdfTest")
    val sc = new SparkContext(conf)
    // Load documents (one per line).
    val documents: RDD[Seq[String]] = sc.textFile("...").map(_.split(" ").toSeq)
    val hashingTF = new HashingTF()
    val tf: RDD[Vector] = hashingTF.transform(documents)
    tf.cache()
    val idf = new IDF().fit(tf)
    val tfidf: RDD[Vector] = idf.transform(tf)
  }

下面對程式碼進行詳細的解釋:

1.首先看資料來源documents,它作為hashingTF.transform的引數,要求每一行為一篇文件的內容。

2.下面在看hashingTF.transform的方法原始碼,其呼叫了HashingTF類自身的transform方法對每一篇文件進行處理

  /**
   * Transforms the input document to term frequency vectors.
   */
  @Since("1.1.0")
  def transform[D <: Iterable[_]](dataset: RDD[D]): RDD[Vector] = {
    dataset.map(this.transform)
  }
3.HashingTF類自身的transform方法,這裡的引數document是按空格劃分了的單詞序列,numFeatures為HashingTF類的成員變數預設為2^20,也就是hash的維數。

最終我們獲得的是一個稀疏向量,其下index就是單詞的雜湊值,value就是單詞的頻數

* Transforms the input document into a sparse term frequency vector.
*/
@Since("1.1.0")
def transform(document: Iterable[_]): Vector = {
//hash(單詞的hash碼,單詞頻數)
val termFrequencies = mutable.HashMap.empty[Int, Double]
//遍歷文件的單詞
document.foreach { term =>
  val i = indexOf(term) //獲得單詞的hash碼
  //單詞頻數統計
  termFrequencies.put(i, termFrequencies.getOrElse(i, 0.0) + 1.0)
}
//把結果轉換成稀疏向量
Vectors.sparse(numFeatures, termFrequencies.toSeq)
}
3.1 indexof 方法,term.## 等價於獲得物件term的 雜湊值,使用Utils.nonNegativeMod對於獲得的雜湊值模numFeatures取正餘
  /**
   * Returns the index of the input term.
   */
  @Since("1.1.0")
  def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures)

Utils.nonNegativeMod

 /* Calculates 'x' modulo 'mod', takes to consideration sign of x,
  * i.e. if 'x' is negative, than 'x' % 'mod' is negative too
  * so function return (x % mod) + mod in that case.
  */
  def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }
4.val idf = new IDF().fit(tf),這裡的tf為RDD[Vector],每個稀疏向量的內容參考3。
  @Since("1.1.0")
  def fit(dataset: RDD[Vector]): IDFModel = {
    val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator(
          minDocFreq = minDocFreq))(
      seqOp = (df, v) => df.add(v),
      combOp = (df1, df2) => df1.merge(df2)
    ).idf()
    new IDFModel(idf)
  }

treeAggregate和Aggregate類似,它把IDF.DocumentFrequencyAggregator作為初始值,seqop為分割槽類的聚合操作,而comop為分割槽間的聚合操作。下面具體看下DocumentFrequencyAggregator的內容,其使用成員變數df(密集向量)來記錄index(單詞hash碼)在多少個文件中出現過。使用add方法來合併一個新的文件,並更新df和m的值;因為密集和稀疏操作類似,下面以匹配密集為例,values(j) > 0.0,說明j對應的單詞在這篇文件出現過,df(j)+=1。然後使用merge來合併分割槽間的統計結果(這裡只是進行簡單的相加)。最後使用idf()方法對treeAggregate的結果使用公式1得到IDF,把結果封裝到IDFModel類並返回。

class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable {
    //語料庫的文件總數
    private var m = 0L
	//BDV為一個密集向量的別名,df對應的值為該index(單詞hash碼)在多少個文件中出現過
    private var df: BDV[Long] = _

    def this() = this(0)

   //新增一個新的文件
    def add(doc: Vector): this.type = {
      if (isEmpty) {
        df = BDV.zeros(doc.size) //初始化0操作
      }
      doc match {
		//如果是稀疏向量
        case SparseVector(size, indices, values) =>
          val nnz = indices.size
          var k = 0
          while (k < nnz) {
            if (values(k) > 0) {
              df(indices(k)) += 1L
            }
            k += 1
          }
		//如果是密集向量
        case DenseVector(values) =>
          val n = values.size
          var j = 0
          while (j < n) {
			//values(j) > 0.0,說明j對應的單詞在這篇文件出現過
            if (values(j) > 0.0) {
              df(j) += 1L
            }
            j += 1
          }
        case other =>
          throw new UnsupportedOperationException(
            s"Only sparse and dense vectors are supported but got ${other.getClass}.")
      }
      m += 1L //語料庫的文件數加1
      this
    }

    //合併其他的文件,對文件總數和df進行簡單的相加
    def merge(other: DocumentFrequencyAggregator): this.type = {
      if (!other.isEmpty) {
        m += other.m
        if (df == null) {
          df = other.df.copy
        } else {
          df += other.df
        }
      }
      this
    }

    private def isEmpty: Boolean = m == 0L

    /** Returns the current IDF vector. */
    def idf(): Vector = {
      if (isEmpty) {
        throw new IllegalStateException("Haven't seen any document yet.")
      }
      val n = df.length
      val inv = new Array[Double](n)
      var j = 0
      while (j < n) {
        if (df(j) >= minDocFreq) {
          inv(j) = math.log((m + 1.0) / (df(j) + 1.0))
        }
        j += 1
      }
      Vectors.dense(inv)
    }
  }
}
5.val tfidf: RDD[Vector] = idf.transform(tf),對4得到的idf(IDFModel)乘上tf即得到最終的結果。 下面先把idf進行廣播,然後和各個分割槽的tf對應相乘
def transform(dataset: RDD[Vector]): RDD[Vector] = {
    val bcIdf = dataset.context.broadcast(idf)
    dataset.mapPartitions(iter => iter.map(v => IDFModel.transform(bcIdf.value, v)))
  }

對應相乘

def transform(idf: Vector, v: Vector): Vector = {
    val n = v.size
    v match {
      case SparseVector(size, indices, values) =>
        val nnz = indices.size
        val newValues = new Array[Double](nnz)
        var k = 0
        while (k < nnz) {
          newValues(k) = values(k) * idf(indices(k))
          k += 1
        }
        Vectors.sparse(n, indices, newValues)
      case DenseVector(values) =>
        val newValues = new Array[Double](n)
        var j = 0
        while (j < n) {
          newValues(j) = values(j) * idf(j)
          j += 1
        }
        Vectors.dense(newValues)
      case other =>
        throw new UnsupportedOperationException(
          s"Only sparse and dense vectors are supported but got ${other.getClass}.")
    }
}

tfidf最後的輸出格式為Sparse向量,具體的例子可以參看:Spark-MLib之TFIDF例項講解