1. 程式人生 > >python分散式程式設計(轉)

python分散式程式設計(轉)

本文程式碼轉載廖雪峰老師的python3教程

分散式程式設計的難點在於:

1.伺服器之間的通訊,主節點如何瞭解從節點的執行進度,並在從節點之間進行負載均衡和任務排程;

2.如何讓多個伺服器上的程序訪問同一資源的不同部分進行執行

第一部分涉及到網路程式設計的底層細節

第二個問題讓我聯想到hdfs的一些功能。

首先分散式程序還是解決的是單機單程序無法處理的大資料量大計算量的問題,希望能加通過一份程式碼(最多主+從兩份)來並行執行一個大任務。

這就面臨兩個問題,首先將程式分佈到多臺伺服器,其次將輸入資料分配給多臺伺服器。

第一個問題相對比較簡單,畢竟程式一般不會太長,即便是超級jar包的spark程式,也不過百兆。

但資料裡不同,如今企業級別的資料動輒GB、TB,如果在分散式程式執行之前首先要進行大容量資料的轉移,顯然是不可取的。

這時候我們就需要一箇中央共享資料來源,所有伺服器都可以對這個資料來源進行並行存取(塊block),這就已經非常接近hdfs的功能。

因為在hdfs中,叢集中的多臺伺服器共享同一個hdfs,每臺機器訪問hdfs就像訪問本地資料一樣(還是稍微慢一點);

計算任務執行完之後,每臺伺服器還可以將自己的計算結果寫回hdfs,每臺伺服器的結果被儲存成了結果目錄中的小檔案。

# task_master.py

import random, time, queue
from multiprocessing.managers import
BaseManager # 傳送任務的佇列: task_queue = queue.Queue() # 接收結果的佇列: result_queue = queue.Queue() # 從BaseManager繼承的QueueManager: class QueueManager(BaseManager): pass # 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('
get_result_queue', callable=lambda: result_queue) # 繫結埠5000, 設定驗證碼'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 啟動Queue: manager.start() # 獲得通過網路訪問的Queue物件: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 從result佇列讀取結果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 關閉: manager.shutdown() print('master exit.')

 

# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 建立類似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由於這個QueueManager只從網路上獲取Queue,所以註冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 連線到伺服器,也就是執行task_master.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 埠和驗證碼注意保持與task_master.py設定的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網路連線:
m.connect()
# 獲取Queue的物件:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task佇列取任務,並把結果寫入result佇列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 處理結束:
print('worker exit.')