1. 程式人生 > >【深度學習&分散式】Parameter Server 詳解

【深度學習&分散式】Parameter Server 詳解

Parameter Server 詳解

MXNet 是李沐和陳天奇等各路英雄豪傑打造的開源深度學習框架(最近不能更火了),其中最吸引我的是它的分散式訓練的特性;而提供支援其分散式訓練特性的正是當年李少帥和 Alex Smola 等人開發的 parameter server.

本部落格以“Scaling Distributed Machine Learning with the Parameter Server” 為主,從易用性、通訊高效性、可擴充套件性等角度介紹 parameter server .

Background

在機器學習和深度學習領域,分散式的優化已經成了一種先決條件。因為單機已經解決不了目前快速增長的資料和引數了。
現實中,訓練資料的數量可能達到1TB到1PB之間,而訓練過程中的引數可能會達到10

91012。而往往這些模型的引數需要被所有的worker節點頻繁的訪問,這就會帶來很多問題和挑戰:

  1. 訪問這些巨量的引數,需要大量的網路頻寬支援;
  2. 很多機器學習演算法都是連續型的,只有上一次迭代完成(各個worker都完成)之後,才能進行下一次迭代,這就導致瞭如果機器之間效能差距大(木桶理論),就會造成效能的極大損失;
  3. 在分散式中,容錯能力是非常重要的。很多情況下,演算法都是部署到雲環境中的(這種環境下,機器是不可靠的,並且job也是有可能被搶佔的);

Related Work

對於機器學習分散式優化,有很多大公司在做了,包括:Amazon,Baidu,Facebook,Google,Microsoft 和 Yahoo。也有一些開源的專案,比如:YahooLDA 和 Petuum 和Graphlab。 總結一下:

此處輸入圖片的描述

李少帥的這個ParameterServer 屬於第三代的parameter server。

第一代 parameter server:缺少靈活性和效能 —— 僅使用memcached(key, value) 鍵值對儲存作為同步機制。 YahooLDA 通過改進這個機制,增加了一個專門的伺服器,提供使用者能夠自定義的更新操作(set, get, update)。
第二代 parameter server:用bounded delay模型來改進YahooLDA,但是卻進一步限制了worker執行緒模型。
第三代 parameter server 能夠解決這些侷限性。

首先來比較一下parameter server 跟通用的分散式系統之間的差別吧。

通用的分散式系統通常都是:每次迭代都強制同步,通常在幾十個節點上,它們的效能可以表現的很好,但是在大規模叢集中,這樣的每次迭代強制同步的機制會因為木桶效應變得很慢。
Mahout 基於 HadoopMLI 基於 Spark,它們(Spark與MLI)採用的都是 Iterative MapReduce 的架構。它們能夠保持迭代之間的狀態,並且執行策略也更加優化了。但是,由於這兩種方法都採用同步迭代的通訊方式,使得它們很容易因為個別機器的低效能導致全域性效能的降低。

直觀感受一下,當其中一個節點執行時間過長會發生什麼:

此處輸入圖片的描述

為了解決這個問題,Graphlab 採用圖形抽象的方式進行非同步排程通訊。但是它缺少了以 MapReduce 為基礎架構的彈性擴充套件性,並且它使用粗粒度的snapshots來進行恢復,這兩點都會阻礙到可擴充套件性。parameter server 正是吸取Graphlab非同步機制的優勢,並且解決了其在可擴充套件性方面的劣勢。

看看非同步迭代是如何提高效能的:

此處輸入圖片的描述

Parameter Server 優勢

說完了其他的分散式系統的缺點,該回到本部落格的主題了(誇ps),parameter server 有哪些features?

1. Efficient communication:
由於是非同步的通訊,因此,不需要停下來等一些機器執行完一個iteration(除非有必要),這大大減少了延時。為機器學習任務做了一些優化(後續會細講),能夠大大減少網路流量和開銷;

2. Flexible consistency models:
寬鬆的一致性要求進一步減少了同步的成本和延時。parameter server 允許演算法設計者根據自身的情況來做演算法收斂速度和系統性能之間的trade-off。

3. Elastic Scalability:
使用了一個分散式hash表使得新的server節點可以隨時動態的插入到集合中;因此,新增一個節點不需要重新執行系統。

4. Fault Tolerance and Durability:
我們都知道,節點故障是不可避免的,特別是在大規模商用伺服器叢集中。從非災難性機器故障中恢復,只需要1秒,而且不需要中斷計算。Vector clocks 保證了經歷故障之後還是能執行良好;

5. Ease of Use
全域性共享的引數可以被表示成各種形式:vector,matrices 或者相應的sparse型別,這大大方便了機器學習演算法的開發。並且提供的線性代數的資料型別都具有高效能的多執行緒庫。

Parameter Server 系統架構

在parameter server中,每個 server 實際上都只負責分到的部分引數(servers共同維持一個全域性的共享引數),而每個 work 也只分到部分資料和處理任務;

此處輸入圖片的描述

上圖中,每個子節點都只維護自己分配到的引數(圖中的黑色),自己部分更新之後,將計算結果(例如:梯度)傳回到主節點,進行全域性的更新(比如平均操作之類的),主節點再向子節點傳送新的引數;

servers 與 workers 之間的通訊如下:

此處輸入圖片的描述

此處輸入圖片的描述

server 節點可以跟其他 server 節點通訊,每個server負責自己分到的引數,server group 共同維持所有引數的更新。
server manager node 負責維護一些元資料的一致性,比如各個節點的狀態,引數的分配情況等;
worker 節點之間沒有通訊,只跟自己對應的server進行通訊。每個worker group有一個task scheduler,負責向worker分配任務,並且監控worker的執行情況。當有新的worker加入或者退出,task scheduler 負責重新分配任務。

(key, value),Range Push and Pull

parameter server 中,引數都是可以被表示成(key, value)的集合,比如一個最小化損失函式的問題,key就是feature ID,而value就是它的權值。對於稀疏引數,不存在的key,就可以認為是0.

此處輸入圖片的描述

把引數表示成k-v, 形式更自然, 易於理,更易於程式設計解;

workers 跟 servers 之間通過 pushpull 來通訊。worker 通過 push 將計算好的梯度傳送到server,然後通過 pull 從server更新引數。為了提高計算效能和頻寬效率,parameter server 允許使用者使用 Range PushRange Pull 操作;

假設 R 是需要push或pull的 key 的range,那麼可以進行如下操作:

w.push(R, dest)
w.pull(R, dest)

意思應該很明顯吧,就是傳送和接送特定Range中的w.

Asynchronous Tasks and Dependency

體會一下Asynchronous Task 跟 Synchronous Task 的區別:

此處輸入圖片的描述

如果 iter1 需要在 iter0 computation,push 跟 pull 都完成後才能開始,那麼就是Synchronous,反之就是Asynchronous.

Asynchronous Task:能夠提高系統的效率(因為節省了很多等待的過程),但是,它的缺點就是容易降低演算法的收斂速率;

所以,系統性能跟演算法收斂速率之間是存在一個trade-off的,你需要同時考慮:

  1. 演算法對於引數非一致性的敏感度;
  2. 訓練資料特徵之間的關聯度;
  3. 硬碟的儲存容量;

考慮到使用者使用的時候會有不同的情況,parameter server 為使用者提供了多種任務依賴方式:

此處輸入圖片的描述

Sequential: 這裡其實是 synchronous task,任務之間是有順序的,只有上一個任務完成,才能開始下一個任務;
Eventual: 跟 sequential 相反,所有任務之間沒有順序,各自獨立完成自己的任務,
Bounded Delay: 這是sequential 跟 eventual 之間的trade-off,可以設定一個 τ 作為最大的延時時間。也就是說,只有 >τ 之前的任務都被完成了,才能開始一個新的任務;極端的情況:
τ=0,情況就是 Sequential;
τ=,情況就是 Eventual;

看一個bounded delay 的 PGD (proximal gradient descent)演算法的系統執行流程:

此處輸入圖片的描述

注意: τ 不是越大越好的,具體要取多大,得看具體情況,貼一張李少帥做的實驗(Ad click prediction):

此處輸入圖片的描述

Implementation

servers 使用 一致性hash 來儲存引數k-v鍵值對。用鏈式複製方式來提高系統容錯性。使用 range based communication 和 range based vector clocks 來進一步優化系統;

Vector Clock

parameter server 使用 vector clock 來記錄每個節點中引數的時間戳,能夠用來跟蹤狀態或避免資料的重複傳送。但是,假設有n個節點,m個引數,那麼vector clock的空間複雜度就是O(nm). 顯然,當有幾千個節點和幾十億的引數時,對於記憶體和頻寬來說都是不可實現的。

好在,parameter server 在push跟pull的時候,都是rang-based,這就帶來了一個好處:這個range裡面的引數共享的是同一個時間戳,這顯然可以大大降低了空間複雜度。

此處輸入圖片的描述

每次從一個range裡再提取一個range,最多會生成3個新的 vector clocks(一分為三) . 假設k是演算法中產生的所有的range,那麼空間複雜度就變成了O(km),因為k 遠小於引數個數,所以空間複雜度大大降低了;

Messages

一條 message 包括:時間戳,len(range)對k-v.

[vc(R),(k1,v1),...,(kp,vp)]kjRandj{1,...p}

這是parameter server 中最基本的通訊格式,不僅僅是共享的引數才有,task 的message也是這樣的格式,只要把這裡的(key, value) 改成 (task ID, 引數/返回值)。

由於機器學習問題通常都需要很高的網路頻寬,因此資訊的壓縮是必須的。

key的壓縮: 因為訓練資料通常在分配之後都不會發生改變,因此worker沒有必要每次都發送相同的key,只需要接收方在第一次接收的時候快取起來就行了。第二次,worker不再需要同時傳送key和value,只需要傳送value 和 key list的hash就行。這樣瞬間減少了一般的通訊量。
value的壓縮: 假設引數時稀疏的,那麼就會有大量的0存在。因此,為了進一步壓縮,我們只需要傳送非0值。parameter server使用 Snappy 快速壓縮庫來壓縮資料、高效去除0值。

key 的壓縮和 value 的壓縮可以同時進行。

使用者自定義過濾:
對於機器學習優化問題比如梯度下降來說,並不是每次計算的梯度對於最終優化都是有價值的,使用者可以通過自定義的規則過濾一些不必要的傳送,再進一步壓縮頻寬cost:
1. 傳送很小的梯度值是低效的:
因此可以自定義設定,只在梯度值較大的時候傳送;
2. 更新接近最優情況的值是低效的:
因此,只在非最優的情況下發送,可通過KKT來判斷;

Replication and Consistency

parameter server 在資料一致性上,使用的是傳統的一致性雜湊演算法,引數key與server node id被插入到一個hash ring中。

具體的一致性hash演算法不是本文的重點,這裡不過多介紹了,不清楚的同學建議看看其他文獻熟悉一下。
只要知道它的作用是在分散式系統中,動態增加和移除節點的同時還能保證系統儲存與key分配的效能效率;

此處輸入圖片的描述

從上圖可以看出,每個節點都複製了它逆時鐘方向的k個節點中的key。圖中,k=2S1 賦值了 S2S3 內的key。

兩種方式保證slave跟master之間的資料一致性:

1.預設的複製方式: Chain replication (強一致性, 可靠):

此處輸入圖片的描述
a. 更新:只能發生在資料頭節點,然後更新逐步後移,直到更新到達尾節點,並由尾節點向客戶確認更新成功;
b. 查詢:為保證強一致性,客戶查詢只能在尾節點進行;

2.Replication after Aggregation

此處輸入圖片的描述

兩個worker 節點分別向server傳送x和y。server 首先通過一定方式(如:f(x+y) )進行aggregate,然後再進行復制操作;

當有n個worker的時候,複製只需要k/n的頻寬。通常來說,k(複製次數)是一個很小的常數,而n的值大概是幾百到幾千;

Server Management

要想實現系統的容錯以及動態的擴充套件系統規模,必須要求系統能夠支援動態新增和移除節點。

當有一個 server節點新增 進來的時候會發生什麼呢?
1. server manager 會對新的節點分配一些range 的key,這會造成其他server節點的key的變化;
2. 新節點會獲取資料做為訓練用,另外會複製k份到slave。
3. server manager 將節點的變化情況廣播出去。接收方可能會移除不再屬於自己的資料,並且將未完成的任務提交給新來的節點;

當有一個 worker節點(W)新增 進來的時候會發生什麼呢? 跟server差不多,相對更簡單一些:
1. task scheduler 為W分配資料;
2. 這個 worker 節點通過網路或者檔案系統得到分配到的訓練資料。接著,W會從伺服器pull引數;
3. task scheduler 會廣播節點的變化情況,可能會使一些節點釋放一部分訓練資料;

參考文獻

【1】Mu Li. Scaling Distributed Machine Learning with the Parameter Server.
【2】CMU. http://parameterserver.org/
【3】Joseph E.Gonzalez. Emerging Systems For Large-scale Machine Learning.