1. 程式人生 > >高效能分散式執行框架——Ray

高效能分散式執行框架——Ray

原文地址:http://www.cnblogs.com/fanzhidongyzby/p/7901139.html

RayUC Berkeley RISELab新推出的高效能分散式執行框架,它使用了和傳統分散式計算系統不一樣的架構和對分散式計算的抽象方式,具有比Spark更優異的計算效能。

Ray目前還處於實驗室階段,最新版本為0.2.2版本。雖然Ray自稱是面向AI應用的分散式計算框架,但是它的架構具有通用的分散式計算抽象。本文對Ray進行簡單的介紹,幫助大家更快地瞭解Ray是什麼,如有描述不當的地方,歡迎不吝指正。

一、簡單開始

首先來看一下最簡單的Ray程式是如何編寫的。

# 匯入ray,並初始化執行環境
import ray
ray.init()

# 定義ray remote函式
@ray.remote
def hello():
    return "Hello world !"

# 非同步執行remote函式,返回結果id
object_id = hello.remote()

# 同步獲取計算結果
hello = ray.get(object_id)

# 輸出計算結果
print hello

在Ray裡,通過Python註解@ray.remote定義remote函式。使用此註解宣告的函式都會自帶一個預設的方法remote,通過此方法發起的函式呼叫都是以提交分散式任務的方式非同步執行的,函式的返回值是一個物件id,使用ray.get內建操作可以同步獲取該id對應的物件。熟悉Java裡的Future機制的話對此應該並不陌生,或許會有人疑惑這和普通的非同步函式呼叫沒什麼大的區別,但是這裡最大的差異是,函式hello是分散式非同步執行的。

remote函式是Ray分散式計算抽象中的核心概念,通過它開發者擁有了動態定製計算依賴(任務DAG)的能力。比如:

@ray.remote
def A():
    return "A"

@ray.remote
def B():
    return "B"

@ray.remote
def C(a, b):
    return "C"

a_id = A.remote()
b_id = B.remote()
c_id = C.remote(a_id, b_id)
print ray.get(c_id)

例子程式碼中,對函式A、B的呼叫是完全並行執行的,但是對函式C的呼叫依賴於A、B函式的返回結果。Ray可以保證函式C需要等待A、B函式的結果真正計算出來後才會執行。如果將函式A、B、C類比為DAG的節點的話,那麼DAG的邊就是函式C引數對函式A、B計算結果的依賴,自由的函式呼叫方式允許Ray可以自由地定製DAG的結構和計算依賴關係。另外,提及一點的是Python的函式可以定義函式具有多個返回值,這也使得Python的函式更天然具備了DAG節點多入和多出的特點。


二、系統架構

Ray是使用什麼樣的架構對分散式計算做出如上抽象的呢,一下給出了Ray的系統架構(來自Ray論文,參考文獻1)。


作為分散式計算系統,Ray仍舊遵循了典型的Master-Slave的設計:Master負責全域性協調和狀態維護,Slave執行分散式計算任務。不過和傳統的分散式計算系統不同的是,Ray使用了

混合任務排程的思路。在叢集部署模式下,Ray啟動了以下關鍵元件:

  1. GlobalScheduler:Master上啟動了一個全域性排程器,用於接收本地排程器提交的任務,並將任務分發給合適的本地任務排程器執行。
  2. RedisServer:Master上啟動了一到多個RedisServer用於儲存分散式任務的狀態資訊(ControlState),包括物件機器的對映、任務描述、任務debug資訊等。
  3. LocalScheduler:每個Slave上啟動了一個本地排程器,用於提交任務到全域性排程器,以及分配任務給當前機器的Worker程序。
  4. Worker:每個Slave上可以啟動多個Worker程序執行分散式任務,並將計算結果儲存到ObjectStore。
  5. ObjectStore:每個Slave上啟動了一個ObjectStore儲存只讀資料物件,Worker可以通過共享記憶體的方式訪問這些物件資料,這樣可以有效地減少記憶體拷貝和物件序列化成本。ObjectStore底層由Apache Arrow實現。
  6. Plasma(現在改名為arrow):每個Slave上的ObjectStore都由一個名為Plasma的物件管理器進行管理,它可以在Worker訪問本地ObjectStore上不存在的遠端資料物件時,主動拉取其它Slave上的物件資料到當前機器。

需要說明的是,Ray的論文中提及,全域性排程器可以啟動一到多個,而目前Ray的實現文件裡討論的內容都是基於一個全域性排程器的情況。我猜測可能是Ray尚在建設中,一些機制還未完善,後續讀者可以留意此處的細節變化。

Ray的任務也是通過類似Spark中Driver的概念的方式進行提交的,有所不同的是:

  1. Spark的Driver提交的是任務DAG,一旦提交則不可更改。
  2. 而Ray提交的是更細粒度的remote function,任務DAG依賴關係由函式依賴關係自由定製。

論文給出的架構圖裡並未畫出Driver的概念,因此我在其基礎上做了一些修改和擴充。


Ray的Driver節點和和Slave節點啟動的元件幾乎相同,不過卻有以下區別:

  1. Driver上的工作程序DriverProcess一般只有一個,即使用者啟動的PythonShell。Slave可以根據需要建立多個WorkerProcess。
  2. Driver只能提交任務,卻不能接收來自全域性排程器分配的任務。Slave可以提交任務,也可以接收全域性排程器分配的任務。
  3. Driver可以主動繞過全域性排程器給Slave傳送Actor呼叫任務(此處設計是否合理尚不討論)。Slave只能接收全域性排程器分配的計算任務。

三、核心操作

基於以上架構,我們簡單討論一下Ray中關鍵的操作和流程。

1. ray.init()

在PythonShell中,使用ray.init()可以在本地啟動ray,包括Driver、HeadNode(Master)和若干Slave。
import ray
ray.init()
如果是直連已有的Ray叢集,只需要指定RedisServer的地址即可。
ray.init(redis_address="<redis-address>")

本地啟動Ray得到的輸出如下:

>>> ray.init()
Waiting for redis server at 127.0.0.1:58807 to respond...
Waiting for redis server at 127.0.0.1:23148 to respond...
Allowing the Plasma store to use up to 13.7439GB of memory.
Starting object store with directory /tmp and huge page support disabled
Starting local scheduler with 8 CPUs, 0 GPUs

======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5
======================================================================

{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}
>>> 
本地啟動Ray時,可以看到Ray的WebUI的訪問地址。

2. ray.put()

使用ray.put()可以將Python物件存入本地ObjectStore,並且非同步返回一個唯一的ObjectID。通過該ID,Ray可以訪問叢集中任一個節點上的物件(遠端物件通過查閱Master的物件表獲得)。

物件一旦存入ObjectStore便不可更改,Ray的remote函式可以將直接將該物件的ID作為引數傳入。使用ObjectID作為remote函式引數,可以有效地減少函式引數的寫ObjectStore的次數。

@ray.remote
def f(x):
    pass

x = "hello"

# 物件x往ObjectStore拷貝里10次
[f.remote(x) for _ in range(10)]

# 物件x僅往ObjectStore拷貝1次
x_id = ray.put(x)
[f.remote(x_id) for _ in range(10)]

3. ray.get()

使用ray.get()可以通過ObjectID獲取ObjectStore內的物件並將之轉換為Python物件。對於陣列型別的物件,Ray使用共享記憶體機制減少資料的拷貝成本。而對於其它物件則需要將資料從ObjectStore拷貝到程序的堆記憶體中。

如果呼叫ray.get()操作時,物件尚未建立好,則get操作會阻塞,直到物件建立完成後返回。get操作的關鍵流程如下:

  1. Driver或者Worker程序首先到ObjectStore內請求ObjectID對應的物件資料。
  2. 如果本地ObjectStore沒有對應的物件資料,本地物件管理器Plasma會檢查Master上的物件表檢視物件是否儲存其它節點的ObjectStore。
  3. 如果物件資料在其它節點的ObjectStore內,Plasma會發送網路請求將物件資料拉到本地ObjectStore。
  4. 如果物件資料還沒有建立好,Master會在物件建立完成後通知請求的Plasma讀取。
  5. 如果物件資料已經被所有的ObjectStore移除(被LRU策略刪除),本地排程器會根據任務血緣關係執行物件的重新建立工作。
  6. 一旦物件資料在本地ObjectStore可用,Driver或者Worker程序會通過共享記憶體的方式直接將物件記憶體區域對映到自己的程序地址空間中,並反序列化為Python物件。

另外,ray.get()可以一次性讀取多個物件的資料:

result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

4. @ray.remote

Ray中使用註解@ray.remote可以宣告一個remote function。remote函式時Ray的基本任務排程單元,remote函式定義後會立即被序列化儲存到RedisServer中,並且分配了一個唯一的ID,這樣就保證了叢集的所有節點都可以看到這個函式的定義。

不過,這樣對remote函式定義有了一個潛在的要求,即remote函式內如果呼叫了其它的使用者函式,則必須提前定義,否則remote函式無法找到對應的函式定義內容。

remote函式內也可以呼叫其它的remote函式,Driver和Slave每次呼叫remote函式時,其實都是向叢集提交了一個計算任務,從這裡也可以看到Ray的分散式計算的自由性。

Ray中呼叫remote函式的關鍵流程如下:

  1. 呼叫remote函式時,首先會建立一個任務物件,它包含了函式的ID、引數的ID或者值(Python的基本物件直接傳值,複雜物件會先通過ray.put()操作存入ObjectStore然後返回ObjectID)、函式返回值物件的ID。
  2. 任務物件被髮送到本地排程器。
  3. 本地排程器決定任務物件是在本地排程還是傳送給全域性排程器。如果任務物件的依賴(引數)在本地的ObejctStore已經存在且本地的CPU和GPU計算資源充足,那麼本地排程器將任務分配給本地的WorkerProcess執行。否則,任務物件被髮送給全域性排程器並存儲到任務表(TaskTable)中,全域性排程器根據當前的任務狀態資訊決定將任務發給叢集中的某一個本地排程器。
  4. 本地排程器收到任務物件後(來自本地的任務或者全域性排程分配的任務),會將其放入一個任務佇列中,等待計算資源和本地依賴滿足後分配給WorkerProcess執行。
  5. Worker收到任務物件後執行該任務,並將函式返回值存入ObjectStore,並更新Master的物件表(ObjectTable)資訊。

@ray.remote註解有一個引數num_return_vals用於宣告remote函式的返回值個數,基於此實現remote函式的多返回值機制。

@ray.remote(num_return_vals=2)
def f():
    return 1, 2

x_id, y_id = f.remote()
ray.get(x_id)  # 1
ray.get(y_id)  # 2
@ray.remote註解的另一個引數num_gpus可以為任務指定GPU的資源。使用內建函式ray.get_gpu_ids()可以獲取當前任務可以使用的GPU資訊。
@ray.remote(num_gpus=1)
def gpu_method():
    return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())

5. ray.wait()

ray.wait()操作支援批量的任務等待,基於此可以實現一次性獲取多個ObjectID對應的資料。
# 啟動5個remote函式呼叫任務
results = [f.remote(i) for i in range(5)]
# 阻塞等待4個任務完成,超時時間為2.5s
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)

上述例子中,results包含了5個ObjectID,使用ray.wait操作可以一直等待有4個任務完成後返回,並將完成的資料物件放在第一個list型別返回值內,未完成的ObjectID放在第二個list返回值內。如果設定了超時時間,那麼在超時時間結束後仍未等到預期的返回值個數,則已超時完成時的返回值為準。

6. ray.error_info()

使用ray.error_info()可以獲取任務執行時產生的錯誤資訊。
>>> import time
>>> @ray.remote
>>> def f():
>>>     time.sleep(5)
>>>     raise Exception("This task failed!!")
>>> f.remote()
Remote function __main__.f failed with:

Traceback (most recent call last):
  File "<stdin>", line 4, in f
Exception: This task failed!!


  You can inspect errors by running

      ray.error_info()

  If this driver is hanging, start a new one with

      ray.init(redis_address="127.0.0.1:65452")
>>> ray.error_info()
[{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n  File "<stdin>", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]

7. Actor

Ray的remote函式只能處理無狀態的計算需求,有狀態的計算需求需要使用Ray的Actor實現。在Python的class定義前使用@ray.remote可以宣告Actor。
@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value
使用如下方式建立Actor物件。
a1 = Counter.remote()
a2 = Counter.remote()
Ray建立Actor的流程為:
  1. Master選取一個Slave,並將Actor建立任務分發給它的本地排程器。
  2. 建立Actor物件,並執行它的建構函式。

從流程可以看出,Actor物件的建立時並行的。

通過呼叫Actor物件的方法使用Actor。

a1.increment.remote()  # ray.get returns 1
a2.increment.remote()  # ray.get returns 1
呼叫Actor物件的方法的流程為:
  1. 首先建立一個任務。
  2. 該任務被Driver直接分配到建立該Actor對應的本地執行器執行,這個操作繞開了全域性排程器(Worker是否也可以使用Actor直接分配任務尚存疑問)。
  3. 返回Actor方法呼叫結果的ObjectID。

為了保證Actor狀態的一致性,對同一個Actor的方法呼叫是序列執行的。

四、安裝Ray

如果只是使用Ray,可以使用如下命令直接安裝。

pip intall ray
如果需要編譯Ray的最新原始碼進行安裝,按照如下步驟進行(MaxOS):
# 更新編譯依賴包
brew update
brew install cmake pkg-config automake autoconf libtool boost wget
pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six
# 下載原始碼編譯安裝
git clone https://github.com/ray-project/ray.git
cd ray/python
python setup.py install
# 測試
python test/runtest.py

# 安裝WebUI需要的庫[可選]
pip install jupyter ipywidgets bokeh

# 編譯Ray文件[可選]
cd ray/doc
pip install -r requirements-doc.txt
make html
open _build/html/index.html
我在MacOS上安裝jupyter時,遇到了Python的setuptools庫無法升級的情況,原因是MacOS的安全性設定問題,可以使用如下方式解決:
  1. 重啟電腦,啟動時按Command+R進入Mac保護模式。
  2. 開啟命令列,輸入命令csrutils disable關閉系統安全策略。
  3. 重啟電腦,繼續安裝jupyter。
  4. 安裝完成後,重複如上的方式執行csrutils enable,再次重啟即可。

進入PythonShell,輸入程式碼本地啟動Ray:

import ray
ray.init()
瀏覽器內開啟WebUI介面如下:


參考資料