1. 程式人生 > >Spark MLlib LDA 基於GraphX實現原理及原始碼分析

Spark MLlib LDA 基於GraphX實現原理及原始碼分析

LDA背景

LDA(隱含狄利克雷分佈)是一個主題聚類模型,是當前主題聚類領域最火、最有力的模型之一,它能通過多輪迭代把特徵向量集合按主題分類。目前,廣泛運用在文字主題聚類中。
LDA的開源實現有很多。目前廣泛使用、能夠分散式並行處理大規模語料庫的有微軟的LightLDA,谷歌plda、plda+,sparkLDA等等。下面介紹這3種LDA:

LightLDA依賴於微軟自己實現的multiverso引數伺服器,伺服器底層使用mpi或zeromq傳送訊息。LDA模型(word-topic矩陣)由引數伺服器儲存,它為文件訓練程序提供引數查詢、更新服務。

plda、plda+使用mpi訊息通訊,將mpi程序分為word、doc倆部分。doc程序訓練文件,word程序為doc程序提供模型的查詢、更新功能。

spark LDA有兩種實現:1.基於gibbs sampling原理和使用GraphX實現的版本(即spark文件上所說的EMLDAOptimizer and DistributedLDAModel),2.基於變分推斷原理實現的版本(即spark文件上的OnlineLDAOptimizer and LocalLDAModel)。

LightLDA,plda、plda+,spark LDA比較

論能夠處理預料庫的規模大小,LihgtLDA要遠遠好於plda和spark LDA
經過測試,在10個伺服器(8核40GB)叢集規模下:
LihgtLDA能夠處理上億文件、百萬詞彙的語料庫,能夠訓練上百萬主題數。這樣的處理能力使得LihgtLDA能夠輕鬆訓練絕大多數語料庫。微軟號稱使用幾十機器的叢集便能訓練Bing搜尋引擎爬下資料的十分之一。
相對於LihgtLDA ,plda+能夠處理規模小的多,上限是:詞彙數目*主題數(模型大小) < 5億。當語料庫規模達到上限後,mpi叢集會因記憶體不夠而終止,或因為記憶體資料頻繁切換,迭代速度十分緩慢。雖然plda+對語料庫的詞彙數目和訓練的主題數目很敏感,但對文件的規模並不是很敏感,在詞彙數目和主題數目較小的情況下,1000萬級別的文件也能夠輕鬆解決。
spark LDA的GraphX版處理規模衡量標準是圖的頂點資料,即(文件數 + 詞彙數目)*主題數目,上限是 文件數*主題數 < 50億(由於詞彙數目相對於文件數目往往較小,近似等於 文件數*主題數)。當超過這個規模後,spark叢集進入假死狀態。不停有節點出現OOM,直至任務以失敗告終。
變分推斷實現的spark LDA瓶頸是 詞彙數目*主題數目,這個值也就是我們所說的模型大小,上限約1億。為什麼存在這個瓶頸呢?是因為變分推斷的實現過程中,模型使用矩陣本地儲存,各個分割槽計算模型的部分值,然後在driver上將矩陣reduce疊加。當模型過大,driver節點的記憶體就無法承受各個分割槽發過來的模型。
收斂速度上,LightLDA要遠快於plda、plda+和spark LDA。小規模語料庫(30萬文件,10萬詞,1000主題)測試,LightLDA : plda+ : spark LDA(graphx) = 1:4:50

為什麼各種LDA的能夠處理語料庫規模的衡量標準不一樣呢?這與它們的實現方式有關,不同的LDA有不同的瓶頸,我們這裡單講spark LDA,其他lda後續介紹。

spark LDA

spark機器學習庫MLlib實現了2個版本的LDA,這裡分別叫做Spark EM LDA和Spark Online LDA。它們使用同樣的資料輸入,但是內部的實現和依據的原理完全不同。Spark EM LDA使用GraphX實現的,通過對圖的邊和頂點資料的操作來訓練模型。而Spark Online LDA採用抽樣的方式,每次抽取一些文件訓練模型,通過多次訓練,得到最終模型。在引數估計上,Spark EM LDA使用gibbs取樣原理估計模型引數,Spark Online LDA使用貝葉斯變分推斷原理估計引數。在模型儲存上,Spark EM LDA將訓練的主題-詞模型儲存在GraphX圖頂點上,屬於分散式儲存方式。Spark Online使用矩陣來儲存主題-詞模型,屬於本地模型。通過這些差異,可以看出Spark EM LDA和Spark Online LDA的不同之處,同時他們各自也有各自的瓶頸。Spark EM LDA在訓練時shuffle量相當大,嚴重拖慢速度。而Spark Online LDA使用矩陣儲存模型,矩陣規模直接限制訓練文件集的主題數和詞的數目。另外,Spark EM LDA每輪迭代完畢後更新模型,Spark Online LDA每訓練完抽樣的文字更新模型,因而Spark Online LDA模型更新更及時,收斂速度更快。

Spark EM LDA之GraphX實現原理

Spark EM LDA基於gibbs取樣原理估計引數,凡是基於gibbs取樣原理推斷引數的LDA訓練過程大都如下:
這裡寫圖片描述
LDA中文件裡的每個詞都屬於一個主題,LDA訓練過程的大體思路是,一輪迭代中,為每篇文件裡的每一個詞重新選擇主題,選擇的依據是gibbs取樣公式,詳細原理參見Parameter estimation for text analysis這篇文章。
這裡寫圖片描述
這裡寫圖片描述
LDA實現演算法的核心是,為每篇文件的每個詞重新選取主題。這個過程GraphX做了巧妙的實現,它以文件到詞作為邊,以詞頻作為邊資料,把語料庫構造成圖,把對語料庫中每篇文件的每個詞操作轉化為在圖中每條邊上的操作,而對邊RDD處理是GraphX中最常見的的處理方法。
GraphX把nkmnkt矩陣儲存在文件頂點和詞頂點上,把詞頻資訊儲存在邊上。它把整個文件聚類結果矩陣、模型矩陣和語料庫詞頻矩陣都表達在圖結構中,把LDA演算法處理過程表達為對邊的遍歷處理過程。由於基於gibbs採用的LDA可方便的建模成圖,又由機器學習的多輪迭代性質,Spark將其簡單高效地實現在GraphX之上,形成了Spark MLlib LDA。

Spark EM LDA初始化

Spark LDA的輸入資料為詞頻矩陣RDD[(Long, Vector)],其儲存格式如下表所示:
這裡寫圖片描述
為了將文件頂點和詞頂點統一編號,Spark LDA將文件頂點和詞頂點的頂點ID進行了分配。文件頂點ID編號從0遞增,詞頂點編號從-1遞減。上表中詞頻矩陣轉換為下表所示:
這裡寫圖片描述
Spark LDA根據文件到邊的關係生成的GraphX邊,如下圖所示,邊的格式為[(源頂點ID,目的頂點ID,詞頻)],如下表所示。
這裡寫圖片描述

(0, -1, 2.0), (0, -2, 1.0), (0, -3, 3.0), (0, -4, 4.0) …
(1, -1, 3.0), (1, -2, 0.0), (1, -3, 2.0), (1, -4, 5.0) …
…
  • Spark LDA邊構建
M     語料庫中文件數目
V     詞頻矩陣中詞的數目
D     M*V詞頻矩陣

1       for document m from [0,M-1]:
2           for word w in document m, w from [-1,-V]:
3               generate an edge (m, w, D[m][w]) as an element of EdgeRDD

將預料庫中所有文件到詞構建成RDD[(Long, Long, Double)],GraphX進一步在RDD分割槽中建立索引,進行優化,形成邊RDD。

  • Spark LDA頂點向量構建
    GraphX使用邊RDD初始化頂點RDD。
    這裡寫圖片描述
    Spark LDA初始完後,語料庫被描述為GraphX的圖物件,它擁有包括文件和詞的頂點RDD和文件指向詞的邊RDD。頂點RDD擁有一個K維主題分佈向量,邊擁有詞頻資料。

Spark LDA迭代

虛擬碼:
這裡寫圖片描述

原始碼:

    val sendMsg: EdgeContext[TopicCounts, TokenCount, (Boolean, TopicCounts)] => Unit =
      (edgeContext) => {
        // Compute N_{wj} gamma_{wjk}
        val N_wj = edgeContext.attr
        // E-STEP: Compute gamma_{wjk} (smoothed topic distributions), scaled by token count
        // N_{wj}.
        val scaledTopicDistribution: TopicCounts =
          computePTopic(edgeContext.srcAttr, edgeContext.dstAttr, N_k, W, eta, alpha) *= N_wj
        edgeContext.sendToDst((false, scaledTopicDistribution))
        edgeContext.sendToSrc((false, scaledTopicDistribution))
      }
    // The Boolean is a hack to detect whether we could modify the values in-place.
    // TODO: Add zero/seqOp/combOp option to aggregateMessages. (SPARK-5438)
    val mergeMsg: ((Boolean, TopicCounts), (Boolean, TopicCounts)) => (Boolean, TopicCounts) =
      (m0, m1) => {
        val sum =
          if (m0._1) {
            m0._2 += m1._2
          } else if (m1._1) {
            m1._2 += m0._2
          } else {
            m0._2 + m1._2
          }
        (true, sum)
      }
    // M-STEP: Aggregation computes new N_{kj}, N_{wk} counts.
    val docTopicDistributions: VertexRDD[TopicCounts] =
      graph.aggregateMessages[(Boolean, TopicCounts)](sendMsg, mergeMsg)
        .mapValues(_._2)

原始碼中sendMsg對應於虛擬碼中7-8行,mergeMsg對應於虛擬碼第9行。
虛擬碼中第2步,計算所有詞頂點的向量疊加值WV。Spark對頂點RDD使用filter運算元過濾,得到詞頂點RDD。再對詞頂點RDD的values呼叫fold進行頂點向量求和。scala程式碼實現如下:

graph.vertices.filter(isTermVertex).values.fold(BDV.zeros[Double](numTopics))(_ += _)

虛擬碼中第3-9步則是由GraphX的aggregateMasseges方法來實現。第3-8步,屬於aggregateMasseges map階段,它由邊三元組edge(srcId, dv, freq, dstId, wv)生成訊息msg(k維向量),並將msg發往兩端頂點。aggregateMasseges訊息產生依據的公式是虛擬碼中的第4-5步,它跟LDA gibbs實現中第13步,主題選取依據的gibbs取樣公式是一樣的。

第9步,屬於aggregateMasseges reduce階段,頂點將收到的所有msg疊加。使用者傳入的訊息聚合函式mergeMsg是向量相加。

綜上,Spark LDA迭代過程主要分為兩個過程,(1)計算所有詞頂點資料疊加的值——向量WV;(2)呼叫aggregateMasseges方法進行訊息產生、傳送、聚合; (3)頂點將聚合後的訊息作為其新的頂點資料。

Spark LDA實現與LDA gibbs實現演算法略有差別。LDA gibbs實現演算法中第13步為文件每個單詞選擇主題之前,會取消詞當前選擇主題,重新選擇完主題後更新全域性計數器nkmntknmnk。LDA實現演算法中,每個詞處理完畢,文件-主題分佈nkm、主題-詞分佈ntk都會改變,從而影響下一個詞的主題選取。因而,LDA gibbs實現演算法是一個細粒度的演算法。另外,LDA gibbs演算法輸入是文件分詞後的向量,並非詞頻矩陣。因而,文件中相同的詞會先後多次處理。

Spark LDA在迭代過程中,頂點向量和詞頂點疊加向量WV保持不變,等效於全域性計數器nkmntknmnk在迭代過程中保持不變,即文件-主題分佈、主題-詞分佈保持不變。map時,文件中所有詞都產生主題選取msg(k維向量),reduce會將該msg聚集(疊加)到相應文件頂點和詞頂點上。迭代完成後,以聚集後的頂點向量作為圖的頂點資料,從而完成統計資料的更新。Spark LDA每輪迭代完成之後全域性計數器nkmntknmnk才做一次更新,更新結果只能影響下一輪迭代各詞的msg計算。因而,Spark LDA實現是一個粗粒度地控制主題分佈影響的演算法。另外,Spark LDA的輸入文件是詞頻矩陣,所以虛擬碼中第6步需要將歸一化的主題分佈乘以詞頻,相當於LDA實現演算法對文件中相同詞的多次處理。

優缺點分析

Spark LDA使用圖頂點儲存文件、詞的主題統計資訊,以圖邊來儲存文件和詞的關係,以遍歷圖邊的方式訓練模型,完美表達了LDA訓練邏輯。然而,GraphX的aggregateMasseges 方法處理邊三元組需要從兩端頂點拉取頂點資料,生成訊息後把需要訊息發往兩端頂點。這裡拉取資料和訊息匯聚都會引起大量的shuffle過程。Spark shuffle是指兩個分割槽間的資料移動。shuffle完整過程分為:(1)傳送方將待發送的資料寫入本地磁碟;(2)資料序列化後經網路傳輸;(3)接收方接收流流資料,寫在本地磁碟;(4)接收方反序列化資料。這裡涉及兩次序列化、兩次讀寫磁碟,序列化耗CPU,讀寫磁碟耗時間。大量的shuffle大大地拖累了Spark LDA的迭代速度。因而,GraphX的效能並不高,不及使用MPI進行訊息通訊的LightLDA和plda。可以說,GraphX為LDA在Spark上實現提供一個完美的結構,而效能卻不敢恭維!!但是,spark由於其強大的普適性,為了減少資料多平臺跨越的煩惱,在可接受範圍內,使用Spark訓練語料庫還是可行的。

作者介紹

唐黎哲,國防科學技術大學 並行與分散式計算國家重點實驗室(PDL)碩士,從事spark、圖計算、文字分析研究,歡迎交流,請多指教。
郵箱:[email protected]

相關推薦

Spark MLlib LDA 基於GraphX實現原理原始碼分析

LDA背景 LDA(隱含狄利克雷分佈)是一個主題聚類模型,是當前主題聚類領域最火、最有力的模型之一,它能通過多輪迭代把特徵向量集合按主題分類。目前,廣泛運用在文字主題聚類中。 LDA的開源實現有很多。目前廣泛使用、能夠分散式並行處理大規模語料庫的有微軟的Li

HashMap實現原理原始碼分析(轉載)

作者: dreamcatcher-cx 出處: <http://www.cnblogs.com/chengxiao/>        雜湊表(hash table)也叫散列表,是一種非常重要的資料結構,應用場景及其豐富,

併發程式設計(三)—— ReentrantLock實現原理原始碼分析

  ReentrantLock是Java併發包中提供的一個可重入的互斥鎖。ReentrantLock和synchronized在基本用法,行為語義上都是類似的,同樣都具有可重入性。只不過相比原生的Synchronized,ReentrantLock增加了一些高階的擴充套件功能,比如它可以實現公平鎖,同時也可以

HashMap、ConcurrentHashMap實現原理原始碼分析

HashMap:https://www.cnblogs.com/chengxiao/p/6059914.html ConcurrentHashMap:https://blog.csdn.net/dingjianmin/article/details/79776646   遺留問

【java基礎】ConcurrentHashMap實現原理原始碼分析

  ConcurrentHashMap是Java併發包中提供的一個執行緒安全且高效的HashMap實現(若對HashMap的實現原理還不甚瞭解,可參考我的另一篇文章),ConcurrentHashMap在併發程式設計的場景中使用頻率非常之高,本文就來分析下Concurre

HashMap實現原理原始碼分析

雜湊表(hash table)也叫散列表,是一種非常重要的資料結構,應用場景及其豐富,許多快取技術(比如memcached)的核心其實就是在記憶體中維護一張大的雜湊表,而HashMap的實現原理也常常出現在各類的面試題中,重要性可見一斑。本文會對java集合框架中的對

ConcurrentHashMap實現原理原始碼分析

ConcurrentHashMap是Java併發包中提供的一個執行緒安全且高效的HashMap實現(若對HashMap的實現原理還不甚瞭解,可參考我的另一篇文章),ConcurrentHashMap在併發程式設計的場景中使用頻率非常之高,本文就來分析下Concurrent

[轉]HashMap實現原理原始碼分析

目錄: 一、什麼是雜湊表 二、HashMap實現原理 三、為何HashMap的陣列長度一定是2的次冪? 四、為什麼重寫equals方法需同時重寫hashCode方法 一、什麼是雜湊表 雜湊表(hash table)也叫散列表,是一

(轉)HashMap實現原理原始碼分析

雜湊表(hash table)也叫散列表,是一種非常重要的資料結構,應用場景及其豐富,許多快取技術(比如memcached)的核心其實就是在記憶體中維護一張大的雜湊表,而HashMap的實現原理也常常出現在各類的面試題中,重要性可見一斑。本文會對java集合框架中的對應實

JDK8中的HashMap實現原理原始碼分析

本篇所述原始碼基於JDK1.8.0_121 在寫上一篇線性表的文章的時候,筆者看的是Android原始碼中support24中的Java程式碼,當時發現這個ArrayList和LinkedList的原始碼和Java官方的沒有什麼區別,然而在閱讀HashMap原

ReentrantLock實現原理原始碼分析

       ReentrantLock實現原理及原始碼分析        ReentrantLock是Java併發包中提供的一個可重入的互斥鎖。ReentrantLock和synchronized在基本用法,行為語

google PLDA + 實現原理原始碼分析

LDA背景 LDA(隱含狄利克雷分佈)是一個主題聚類模型,是當前主題聚類領域最火、最有力的模型之一,它能通過多輪迭代把特徵向量集合按主題分類。目前,廣泛運用在文字主題聚類中。 LDA的開源實現有很多。目前廣泛使用、能夠分散式並行處理大規模語料庫的有微軟的Li

JDK1.8中ArrayList的實現原理原始碼分析

一、概述              ArrayList是Java開發中使用比較頻繁的一個類,通過對原始碼的解讀,可以瞭解ArrayList的內部結構以及實現方法,清楚它的優缺點,以便我們在程式設計時靈活運用。 二、原始碼分析 2.1 類結構  JDK1.8原始碼中的A

Go語言GC實現原理原始碼分析

> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com/archives/475 > > 本文使用的 Go 的原始碼1.15.7 ## 介紹 ### 三色標記法 三色標記法將物件的顏色分為了黑、灰、白,三種顏色。 - 黑色:該物件已經被標

【原創】大資料基礎之Spark(5)Shuffle實現原理程式碼解析

一 簡介 Shuffle,簡而言之,就是對資料進行重新分割槽,其中會涉及大量的網路io和磁碟io,為什麼需要shuffle,以詞頻統計reduceByKey過程為例, serverA:partition1: (hello, 1), (word, 1)serverB:partition2: (hell

CocurrentHashMap實現原理原始碼解析

##1、CocurrentHashMap概念      CocurrentHashMap是jdk中的容器,是hashmap的一個提升,結構圖: 這裡對比在對比hashmap的結構: 可以看出CocurrentHashMap對比HashMa

C# winform框架 音樂播放器開發 聯網下載音樂功能的實現原理原始碼(純原創--)

首先 ,我做下載音樂功能;主要是為了探究它是怎麼實現的;所以介面很醜,不要在意哈---- 接下來進入正題: 1.首先: 介面中下載音樂的部分主要是由3個segment組成:: 一個textbox,用於輸入比如你喜歡的歌曲名/歌手;; 第二個是button1 這是主

Go中定時器實現原理原始碼解析

> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 本文使用的go的原始碼15.7,需要注意的是由於timer是1.14版本進行改版,但是1.14和1.15版本的timer並無很大區別 我在春節期間寫了一篇文章有關時間輪的:https

ConcurrentHashMap JDK1.8中結構原理原始碼分析

注:本文根據網路和部分書籍整理基於JDK1.7書寫,如有雷同敬請諒解  歡迎指正文中的錯誤之處。 資料結構       ConcurrentHashMap 1.8 拋棄了Segment分段鎖機制,採用Node + CAS + Synchronized來保證併發安全進行實現

HashMap, ConcurrentHashMap 最詳細的原理原始碼分析

網上關於 HashMap 和 ConcurrentHashMap 的文章確實不少,不過缺斤少兩的文章比較多,所以才想自己也寫一篇,把細節說清楚說透,尤其像 Java8 中的 ConcurrentHashMap,大部分文章都說不清楚。 終歸是希望能降低大家學習的成本,不希望大家到處找各種不是很