1. 程式人生 > >Spark:聚類演算法之LDA主題模型演算法

Spark:聚類演算法之LDA主題模型演算法

Spark上實現LDA原理

LDA主題模型演算法

Spark實現LDA的GraphX基礎

在Spark 1.3中,MLlib現在支援最成功的主題模型之一,隱含狄利克雷分佈(LDA)。LDA也是基於GraphX上構建的第一個MLlib演算法,GraphX是實現它最自然的方式。

有許多演算法可以訓練一個LDA模型。我們選擇EM演算法,因為它簡單並且快速收斂。因為用EM訓練LDA有一個潛在的圖結構,在GraphX之上構建LDA是一個很自然的選擇。

LDA主要有兩類資料:詞和文件。我們把這些資料存成一個偶圖(如下所示),左邊是詞節點,右邊是文件節點。每個詞節點儲存一些權重值,表示這個詞語和哪個主題相關;類似的,每篇文章節點儲存當前文章討論主題的估計。

每當一個詞出現在一篇文章中,圖中就有一個邊連線對應的詞節點和文章節點。例如,在上圖中,文章1包含詞語“hockey” 和“system”

這些邊也展示了這個演算法的流通性。每輪迭代中,每個節點通過收集鄰居資料來更新主題權重資料。下圖中,文章2通過從連線的詞節點收集資料來更新它的主題估計。

GraphX因此是LDA自然的選擇。隨著MLlib的成長,我們期望未來可以有更多圖結構的學習演算法!

可擴充套件性

LDA的並行化並不直觀,已經有許多研究論文提出不同的策略來實現。關鍵問題是所有的方法都需要很大量的通訊。這在上圖中很明顯:詞和文件需要在每輪迭代中用新資料更新相鄰節點,而相鄰節點太多了。

我們選擇了EM演算法的部分原因就是它通過很少輪的迭代就能收斂。更少的迭代,更少的通訊。

Note: Spark的貢獻者正在開發更多LDA演算法:線上變分貝葉斯(一個快速近似演算法)和吉布斯取樣(一個更慢但是有時更準確的演算法)。

PySpark.ml庫中Clustering LDA簡介

LDA通過 setOptimizer 函式支援不同的推斷演算法。EMLDAOptimizer 對於似然函式用 expectation-maximization 演算法學習聚類,然後獲得一個合理的結果。OnlineLDAOptimizer使用迭代的mini-batch抽樣來進行 online variational inference,它通常對記憶體更友好。

LDA接收文件集合表示的詞頻向量,和下列引數(使用builder模式進行設定):

  • k: 主題數(也就是聚類中心數)
  • optimizer: 優化計算方法,目前支援"em", "online"。學習LDA模型使用的優化器,EMLDAOptimizer 或者 OnlineLDAOptimizer。
  • docConcentration: 文件-主題分佈的先驗Dirichlet引數。值越大,推斷的分佈越平滑。文章分佈的超引數(Dirichlet分佈的引數)。只支援對稱的先驗,因此在提供的k維向量中所有值都相等。所有值也必須大於1.0。
  • topicConcentration: 主題-詞語分佈的先驗Dirichlet引數。值越大,推斷的分佈越平滑。主題分佈的超引數(Dirichlet分佈的引數),必需>1.0。
  • maxIterations: 迭代次數的限制
  • checkpointInterval: 迭代計算時檢查點的間隔。如果你使用checkpointing(在Spark配置中設定),該引數設定checkpoints建立的次數,如果maxIterations過大,使用checkpointing可以幫助減少磁碟上shuffle檔案的大小,然後幫助失敗恢復。
  • setSeed:隨機種子

引數設定

Expectation Maximization

docConcentration: 提供Vector(-1)會導致預設值 (uniform k dimensional vector with value (50/k))+1。

topicConcentration: 提供-1會導致預設值0.1 加1。

Online Variational Bayes

docConcentration:Providing Vector(-1) results indefault behavior (uniform k dimensional vector with value (1.0/k)).

topicConcentration: Providing -1 results in defaulting to a value of (1.0/k).

[Latent Dirichlet allocation (LDA)]

所有spark.mllib的 LDA 模型都支援:

  • describeTopics: 返回主題,它是最重要的term組成的陣列和term對應的權重組成的陣列。
  • topicsMatrix: 返回一個 vocabSize*k 維的矩陣,每一個列是一個topic。

注意:LDA仍然是一個正在開發的實驗特性。某些特性只在兩種優化器/由優化器生成的模型中的一個提供。目前,分散式模型可以轉化為本地模型,反過來不可以。

LDA求解的優化器/模型

Expectation Maximization

Implemented in EMLDAOptimizer and DistributedLDAModel.

提供給LDA的引數有:

  • docConcentration: 只支援對稱的先驗,因此在提供的k維向量中所有值都相等。所有值也必須大於1.0。提供Vector(-1)會導致預設值 (uniform k dimensional vector with value (50/k))+1。
  • topicConcentration: 只支援對稱的先驗,所有值也必須大於1.0。提供-1會導致預設值0.1 加1。
  • maxIterations: EM迭代的最大次數。

注意:做足夠多次迭代是重要的。在早期的迭代中,EM經常會有一些無用的topics,但是這些topics經過更多次的迭代會有改善。依賴你的資料集,如果使用至少20個topic,可能需要50-100次的迭代。

EMLDAOptimizer 會產生 DistributedLDAModel, 它不只儲存推斷的主題,還有所有的訓練語料,以及訓練語料庫中每個文件的主題分佈:

  • topTopicsPerDocument: 訓練語料庫中每個文件的前若干個主題和對應的權重
  • topDocumentsPerTopic: 每個主題下的前若干個文件和文件中對應的主題的權重
  • logPrior: 基於超參doc Concentration 和 topic Concentration,估計的主題和文件-主題分佈的對數概率。
  • logLikelihood: 基於推斷的主題和文件-主題分佈,訓練語料的對數似然。

Online Variational Bayes

Implemented in OnlineLDAOptimizer and LocalLDAModel.

提供給LDA的引數有:

  • docConcentration: 通過傳遞每個維度值都等於Dirichlet引數的向量使用不對稱的先驗,值應該大於等於0 。提供 Vector(-1) 會使用預設值(uniform k dimensional vector with value (1.0/k))。
  • topicConcentration: 只支援對稱的先驗,值必須大於等於0。提供值-1會使用預設值 (1.0/k)
  • maxIterations: 提交的minibatches的最大次數。

此外,OnlineLDAOptimizer 接收下列引數:

  • miniBatchFraction: 每次迭代使用的語料庫抽樣比例。
  • optimizeDocConcentration: 如果設定為true,每次 minibatch 之後計算引數 docConcentration (aka alpha) 的最大似然估計,然後在返回的 LocalLDAModel 使用優化了的docConcentration。
  • τ0κ: 用作學習率衰減,用 (τ0+iter)κ 計算,這裡 iter是目前的迭代次數。

OnlineLDAOptimizer 生成 LocalLDAModel,它只儲存了推斷的主題。LocalLDAModel支援:

  • logLikelihood(documents): 給定推斷的主題,計算提供的文件的下界。
  • logPerplexity(documents): 給定推斷的主題,計算提供的文件的複雜度的上界。
[PySpark.ml庫中的Clustering]

Spark實現LDA例項

步驟

1)載入資料

返回的資料格式為:documents: RDD[(Long, Vector)],其中:Long為文章ID,Vector為文章分詞後的詞向量;使用者可以讀取指定目錄下的資料,通過分詞以及資料格式的轉換,轉換成RDD[(Long, Vector)]即可。

2)建立模型

模型引數設定說明見上面的簡介

3)結果輸出

topicsMatrix以及topics(word,topic))輸出。

注意事項

從Pyspark LDA model中獲取document-topic matrix

mllib上的lda不是分散式的,目前好像只儲存topic的資訊,而不儲存doc的資訊,所以是沒法獲取doc-topic矩陣的。

要獲取的話,只能使用ml中的lda,

或者使用scala版本的lda:

val ldaModel = lda.run(documents)

val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]

distLDAModel.topicDistributions

使用pyspark實現LDA

# -*- coding: utf-8 -*-
[dev_pipi]
corpus_filename = /home/pipi/files/DATASETS/SparkMLlib/sample_lda_data.txt
;corpus_filename = hdfs://...
SPARK_HOME = /home/pipi/ENV/spark
PYSPARK_PYTHON = /home/pipi/ENV/ubuntu_env/bin/python
SPARK_LOCAL_IP = 127.0.0.1
JAVA_HOME = /home/pipi/ENV/jdk
;topic modelK = 3
alpha = 5
beta = 5
max_iter = 20
seed = 0
checkin_point_interval = 10
optimizer = em

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
__title__ = 'Spark MLlib LDA例代'
__author__ = 'pipi'
__mtime__ = '16-10-24'
__email__ = '[email protected]'
# code is far away from bugs with the god animal protecting
    I love animals. They taste delicious.
              ┏┓      ┏┓
            ┏┛┻━━━┛┻┓
            ┃      ☃      ┃
            ┃  ┳┛  ┗┳  ┃
            ┃      ┻      ┃
            ┗━┓      ┏━┛
                ┃      ┗━━━┓
                ┃  神保佑    ┣┓
                ┃ 永無BUG!   ┏┛
                ┗┓┓┏━┳┓┏┛
                  ┃┫┫  ┃┫┫
                  ┗┻┛  ┗┻┛
"""
def config():
'''
行前的引數配置
    '''
import configparser, os
    SECTION = 'dev_pipi'
conf = configparser.ConfigParser()
    conf.read(os.path.join(os.path.split(os.path.realpath(__file__))[0], 'config.ini'))

    global corpus_filename, K, alpha, beta, max_iter, seed, checkin_point_interval, optimizer
    corpus_filename = conf.get(SECTION, 'corpus_filename')
    K = conf.getint(SECTION, 'K')
    alpha = conf.getfloat(SECTION, 'alpha')
    beta = conf.getfloat(SECTION, 'beta')
    max_iter = conf.getint(SECTION, 'max_iter')
    seed = conf.getint(SECTION, 'seed')
    checkin_point_interval = conf.getint(SECTION, 'checkin_point_interval')
    optimizer = conf.get(SECTION, 'optimizer')

    # spark environment settings
import sys, os
    os.environ['SPARK_HOME'] = conf.get(SECTION, 'SPARK_HOME')
    sys.path.append(os.path.join(conf.get(SECTION, 'SPARK_HOME'), 'python'))
    os.environ["PYSPARK_PYTHON"] = conf.get(SECTION, 'PYSPARK_PYTHON')
    os.environ['SPARK_LOCAL_IP'] = conf.get(SECTION, 'SPARK_LOCAL_IP')
    os.environ['JAVA_HOME'] = conf.get(SECTION, 'JAVA_HOME')

    import logging
    logging.basicConfig(filename=os.path.join(os.path.split(os.path.realpath(__file__))[0], 'log.txt'), level=logging.DEBUG)


config()

from pyspark import SparkContext
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

if __name__ == '__main__':
sc = SparkContext(master='local[4]', appName='lda')

    data = sc.textFile(corpus_filename).map(lambda line: Vectors.dense([float(i) for i in line.strip().split()]))
    corpus = data.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
    # print(corpus.take(5))
lda_model = LDA.train(rdd=corpus, maxIterations=max_iter, seed=seed, checkpointInterval=checkin_point_interval, k=K,
                          optimizer=optimizer, docConcentration=alpha, topicConcentration=beta)
    topics = lda_model.topicsMatrix()
    for tid in range(3):
print('Topic' + str(tid) + ':')
        for wid in range(0, lda_model.vocabSize()):
print(' ' + str(topics[wid, tid]))
    lda_model.describeTopics(4)
    sc.stop()

    # df = pyspark.createDataFrame([[1, Vectors.dense([0.0, 1.0])], [2, SparseVector(2, {0: 1.0})],], ["id", "features"])
資料及結果:

1 2 6 0 2 3 1 1 0 0 3
1 3 0 1 3 0 0 2 0 0 1
1 4 1 0 0 4 9 0 1 2 0
2 1 0 3 0 0 5 0 2 3 9
3 1 1 9 3 0 2 0 0 1 3
4 2 0 3 4 5 1 1 1 4 0
2 1 0 3 0 0 5 0 2 2 9
1 1 1 9 2 1 2 0 0 1 3
4 4 0 3 4 2 1 3 0 0 0
2 8 2 0 3 0 2 0 2 7 2
1 1 1 9 0 2 2 0 0 3 3
4 1 0 0 4 5 1 3 0 1 0

Topic0:        Topic1:      Topic2:       
7.37834974184 9.73045219375 8.89119806441
 6.59081862005 11.1175108178 11.2916705621
 3.49398022369 4.20302495549 4.30299482082
 22.1867881493 8.95779840996 8.8554134407 
 5.66785332714 10.4148634185 8.91728325435
 4.66999543003 9.0609229138  8.26908165618
 12.0788314276 8.65705135654 10.2641172159
 2.15391819    4.20420496512 3.64187684489
 2.92593942578 2.42997556379 2.64408501043
 7.77320999456 7.84974291061 8.37704709483
 19.3787362983 6.39079305857 7.2304706431 

使用scala的spark實現LDA

[spark的python, scala, java示例程式碼:spark/examples/src]

ref: