1. 程式人生 > >Django非同步任務執行緒池

Django非同步任務執行緒池

當資料庫資料量很大時(百萬級),許多批量資料修改請求的響應會非常慢,一些不需要即時響應的任務可以放到後臺的非同步執行緒中完成,發起非同步任務的請求就可以立即響應

選擇用執行緒池的原因是:執行緒比程序更為可控。不像子程序,子執行緒會在所屬程序結束時立即結束。執行緒可共享記憶體。

請求任務非同步處理的原理

使用python manage.py runserver模式啟動的Django應用只有一個程序,對於每個請求,主執行緒會開啟一個子執行緒來處理請求。請求子執行緒向主執行緒申請一個新執行緒,然後把耗時的任務交給新執行緒,自身立即響應,這就是請求任務非同步處理的原理。

視覺化執行緒池

如果想要管理這批非同步執行緒,知道他們是否在執行中,可以使用執行緒池(ThreadPoolExecutor)。

執行緒池會先啟動若干數量的執行緒,並讓這些執行緒都處於睡眠狀態,當向執行緒池submit一個任務後,會喚醒執行緒池中的某一個睡眠執行緒,讓它來處理這個任務,當處理完這個任務,執行緒又處於睡眠狀態。

submit任務後會返回一個期程(future),這個物件可以檢視執行緒池中執行此任務的執行緒是否仍在處理中

因此可以構建一個全域性視覺化執行緒池:

from concurrent.futures.thread import ThreadPoolExecutor


class ThreadPool(object):
    def __init__(self):
        # 執行緒池
        self.executor = ThreadPoolExecutor(20)
        # 用於儲存每個專案批量任務的期程
        self.future_dict = {}

    # 檢查某個專案是否有正在執行的批量任務
    def is_project_thread_running(self, project_id):
        future = self.future_dict.get(project_id, None)
        if future and future.running():
            # 存在正在執行的批量任務
            return True
        return False

    # 展示所有的非同步任務
    def check_future(self):
        data = {}
        for project_id, future in self.future_dict.items():
            data[project_id] = future.running()
        return data

    def __del__(self):
        self.executor.shutdown()

# 主執行緒中的全域性執行緒池
# global_thread_pool的生命週期是Django主執行緒執行的生命週期
global_thread_pool = ThreadPool()

使用:

# 檢查非同步任務
if global_thread_pool.is_project_thread_running(project_id):
    raise exceptions.ValidationError(detail='存在正在處理的批量任務,請稍後重試')

# 提交一個非同步任務
future = global_thread_pool.executor.submit(self.batch_thread, project_id)
global_thread_pool.future_dict[project_id] = future

# 檢視所有非同步任務
@login_required
def check_future(request):
    data = global_thread_pool.check_future()
    return HttpResponse(status=status.HTTP_200_OK, content=json.dumps(data))

序列執行

使用執行緒鎖

在全域性執行緒池中初始化執行緒鎖

class ThreadPool(object):
    def __init__(self):
        self.executor = ThreadPoolExecutor(20)
        self.future_dict = {}
        self.lock = threading.Lock()

然後執行執行緒前需要獲取鎖並再執行結束後釋放鎖

def batch_thread(self):
    global_thread_pool.lock.acquire()
    try:
        ...
        global_thread_pool.lock.release()
    except Exception:
        trace_log = traceback.format_exc()
        logger.error('非同步任務執行失敗:\n %s' % trace_log)
        global_thread_pool.lock.release()

需要捕捉異常預防子執行緒出錯而無法釋放鎖的情況

非同步執行緒任務執行前先檢查資料庫連線是否可用,然後關掉不可用連線

由於django的資料庫連線是儲存到執行緒本地變數中的,通過ThreadPoolExecutor建立的執行緒會儲存各自的資料庫連線。

當連線被儲存的時間超過mysql連線的最大超時時間,連線失效,但不會被執行緒釋放。

之後再調起執行緒執行涉及到資料庫操作的非同步任務時,會用到失效的資料庫連線,導致報錯“MySQL server has gone away”。

解決方案是線上程池的所有非同步任務執行前先檢查資料庫連線是否可用,然後關掉不可用連線

def batch_thread(self):
    for conn in connections.all():
        conn.close_if_unusable_or_obsolete()
    ...