1. 程式人生 > >【Multiprocessing】Python多程序記憶體共享資料佇列SMQueue

【Multiprocessing】Python多程序記憶體共享資料佇列SMQueue

0x00 前言

自從先前研究了下Python的多程序計算(原文連結)之後,
深深地感受到多程序處理的美好,並決定運用到模型訓練時,
作為 feed data 的資料處理模組使用,實現工具類 SharedMemoryQueue

但是眾所周知,python的程序間通訊比較複雜,更別提大量程序的時候……想想就頭疼……
但是python本身就是萬金油,為啥我非要用python的機制呢,用python呼叫C不就好啦!
簡要記錄一下解決方案(我覺得聰明的小夥伴看到這個import,就已經懂了一半了):

from ctypes import c_int, c_float, cast, POINTER
from multiprocessing.
sharedctypes import Array, Value

0x01 基本思路

在C語言中,宣告變數時是可以開一塊記憶體區域的,
並且,在知道這塊記憶體指標的情況下可以通過指標呼叫這一塊資料。
然後對於當前需求中這種 “生產者—消費者” 的模式:
1)生產者為多個,即多個程序分別生產資料,誰生產好了誰就寫一塊;
2)消費者為一個,即一個程序每次需求資料,從已生產資料中拿一塊;
那麼就很明瞭了,佇列這種資料結構就可以解決這個問題。

舉個例子說明一下具體流程(即生產者—消費者模式):
對於【A(即預設每塊資料佔用記憶體量) x 10 (即佇列最大長度)】的記憶體,
開多個程序生成資料(生產者),對於每個程序,生成之後把不足預設佔用記憶體的部分padding(補零),
然後寫入佇列(這裡需要寫入鎖,體現為 with write_lock

),將寫入指標後移一個 A 的偏移地址,
當偏移地址超過預設最大長度(此處為 10)的時候,回到前面第一塊記憶體的位置(取模 write_idx % queue_size操作),
特別的,當 write_idx - read_idx 等於預設佇列最大長度即佇列已滿的時候進入等待狀態,
對於讀取資料的程序(消費者),直接向佇列讀取當前讀取(此處為了防止多個消費者也加了讀取鎖),
如果資料的使用不需要padding,讀取完畢後記得把先前padding的部分去掉,並將讀取指標後移一個 A 的偏移地址。

0x02 Source Code

Code 思路來源於 @lihongwei / @lhw446
個人進行了Feature的增加與修改,包括但不限於: 冷啟動

超時重置的處理,
主要修改在基於該佇列的HVDprocessor,就放在下篇裡再寫吧~

# coding=utf8
# ========================================================
#   Copyright (C) 2017-2018 All rights reserved.
# 
#   filename : shared_memory_queue.py
#   author   : lihongwei / [email protected]
#   update   : chendian / [email protected]
#   date     : 2017-11-28
#   desc     : a shared memory queue for data processor
# ========================================================

import time
import numpy as np
import multiprocessing
from ctypes import c_int, c_float, cast, POINTER
from multiprocessing.sharedctypes import Array, Value


class SMQueue(object):
    ''' a shared memory queue for data processor '''

    # pylint: disable=protected-access
    def __init__(self, queue_size, f_data_size, i_data_size):
        queue_size += 1  # plus 1 is for the one consumer space
        f_cdatasets = Array('f', np.zeros((queue_size * f_data_size), dtype=np.float32))
        i_cdatasets = Array('i', np.zeros((queue_size * i_data_size), dtype=np.int32))
        self.f_cbuffer = f_cdatasets._obj._wrapper
        self.i_cbuffer = i_cdatasets._obj._wrapper
        self.read_idx = Value('i', 0)
        self.write_idx = Value('i', 0)
        self.queue_size = queue_size
        self.f_data_size = f_data_size
        self.i_data_size = i_data_size
        self.read_lock = multiprocessing.Lock()
        self.write_lock = multiprocessing.Lock()

    def get(self, time_gap=0.1, time_out=1234, cold_boot=False):
        ''' get f_data, i_data from queue '''
        with self.read_lock:
            time_cnt = 0
            while self.read_idx.value == self.write_idx.value:
                time_cnt += 1
                if time_cnt >= time_out and not cold_boot:
                    return None, None
                time.sleep(time_gap)
            index = self.read_idx.value % self.queue_size
            f_buffer_ptr = cast(self.f_cbuffer.get_address() + index * self.f_data_size * 4,
                                POINTER(c_float))
            i_buffer_ptr = cast(self.i_cbuffer.get_address() + index * self.i_data_size * 4,
                                POINTER(c_int))
            f_data = np.ctypeslib.as_array(f_buffer_ptr, shape=(self.f_data_size, ))
            i_data = np.ctypeslib.as_array(i_buffer_ptr, shape=(self.i_data_size, ))
            self.read_idx.value += 1
            return f_data, i_data

    def put(self, f_data, i_data, time_gap=0.1):
        ''' put f_data and i_data to queue '''
        with self.write_lock:
            # only use queue_size-1 space
            while self.write_idx.value - self.read_idx.value == self.queue_size - 1:
                time.sleep(time_gap)
            index = self.write_idx.value % self.queue_size
            f_buffer_ptr = cast(self.f_cbuffer.get_address() + index * self.f_data_size * 4,
                                POINTER(c_float))
            i_buffer_ptr = cast(self.i_cbuffer.get_address() + index * self.i_data_size * 4,
                                POINTER(c_int))
            o_f_data = np.ctypeslib.as_array(f_buffer_ptr, shape=(self.f_data_size,))
            o_i_data = np.ctypeslib.as_array(i_buffer_ptr, shape=(self.i_data_size,))
            o_f_data[:] = f_data
            o_i_data[:] = i_data
            self.write_idx.value += 1

    def push(self, f_data, i_data, time_gap=0.1):
        self.put(f_data, i_data, time_gap)

    def reset(self):
        self.read_idx = Value('i', 0)
        self.write_idx = Value('i', 0)

    def queue_info(self):
        return "{}/{}".format(self.write_idx.value - self.read_idx.value, self.queue_size)