1. 程式人生 > >Caffe2-Detectron原始碼解讀-資料載入

Caffe2-Detectron原始碼解讀-資料載入

Coordinator 類

由於 RoIDataLoader 類將 Coordinator 類物件作為成員變數, 因此我們先看一下這個類的作用和底層實現, 該類位於detectron/utils/coordinator.py檔案中, 定義如下:

#detectron/utils/coordinator.py

# 從名字可以看出, 該類的作用主要是協調各個資料載入管道之間的資訊同步
# 實現上, 該類主要封裝了threading多執行緒模組的一些功能
class Coordinator(object):

    def __init__(self):
        # import threading
self._event = threading.Event() def request_stop(self): log.debug("Coordinator stopping") self._event.set() def should_stop(self): # 當Event()物件使用set()方法後, is_set()方法返回鎮 return self._event.is_set() #... @contextlib.contextmanager 上下文環境管理器 def
stop_on_exception(self): try: yield except Exception: if not self.should_stop(): traceback.print_exc() self.request_stop() def coordinated_get(coordinator, queue): while not coordinator.should_stop(): try: # 從佇列中獲取資料
return queue.get(block=True, timeout=1.0) except Queue.Empty: continue raise Exception("Coordinator stopped during get()") def coordinated_put(coordinator, queue, element): while not coordinator.shuold_stop(): try: queue.put(element, block=True, timeout=1.0) return except Queue.Full: continue raise Exception("Coordinator stopped during put()")

RoIDataLoader 類

在之前分析的tools/train_net.py 檔案中, 關於資料載入的部分被封裝在了detectron/roi_data/loader.py檔案中的RoIDataLoader類中, 而資料載入對於任何模型和工程來說, 都是非常重要的一步, 下面, 我們就具體看看這個類的底層實現是怎麼樣的.

檔案開頭, 有一段非常詳細的註釋:

# detectron/roi_data/loader.py

"""Detectron data loader. The design is generic and abstracted away from any
details of the minibatch. A minibatch is a dictionary of blob name keys and
their associated numpy (float32 or int32) ndarray values.

Outline of the data loader design:

loader thread\
loader thread \                    / GPU 1 enqueue thread -> feed -> EnqueueOp
...           -> minibatch queue ->  ...
loader thread /                    \ GPU N enqueue thread -> feed -> EnqueueOp
loader thread/

<---------------------------- CPU -----------------------------|---- GPU ---->

A pool of loader threads construct minibatches that are put onto the shared
minibatch queue. Each GPU has an enqueue thread that pulls a minibatch off the
minibatch queue, feeds the minibatch blobs into the workspace, and then runs
an EnqueueBlobsOp to place the minibatch blobs into the GPU's blobs queue.
During each fprop the first thing the network does is run a DequeueBlobsOp
in order to populate the workspace with the blobs from a queued minibatch.
"""

從上面的註釋我們可以看出, 這個檔案定義了Detectron的資料載入器data loader, 這個類的設計是一種抽象的一般化的設計, 並且會與所有minibatch的細節隔離開來. 在這個類中, minibatch被記錄為一個字典結構, 它的key值為blob name, 其value值為對應的numpy ndarray.

每一個GPU都具有一個enqueue執行緒, 可以從minibatch queue中獲取資料, 然後會將minibatch blobs喂到workspace中去, 之後執行 EnqueueBlobsOp 來將minibatch blobs 放置到 GPU的blob queue中.

在每一次前向傳播過程中, 模型最先做的事情就是執行 DequeueBlobsOp 來構建工作空間.

下面, 看一下RoIDataLoader類的具體實現:

# detectron/roi_data/loader.py
class RoIDataLoader(object):
    def __init__(
        self,
        roidb,
        num_loaders = 4,
        minibatch_queue_size=64,
        blobs_queue_capacity=8
    ):
        self._roidb = roidb
        self._lock = threading.Lock()
        self._perm = deque(range(len(self._roidb)))
        self._cur = 0 # _perm cursor
        # minibatch佇列會在CPU記憶體當中持有準備好的訓練資料
        # 當訓練N>1個GPUs時, 在minibatch佇列中的每一個元素
        # 實際上是隻是一部分minibatch, 對整個minibatch貢獻了
        # 1/N的樣例
        # from six.moves import queue as Queue
        self._minibatch_queue = Queue.Queue(maxsize=minibatch_queue_size)
        # TODO, 其他引數的初始化


        # from detectron.utils.coordinator import Coordinator
        self.coordinator = Coordinator()

    # 載入mini-batches, 並且將它們放進mini-batch 佇列中.
    def minibatch_loader_thread(slef):
        # coordinator的上下文管理器, 當有異常出現時會呼叫coordinator.request_stop()方法
        with self.coordinator.stop_on_exception():
            while not self.coordinator.should_stop():
                # RoIDataLoader的成員函式, 返回用於下一個minibatch的blobs,
                # 函式內部呼叫了另一個成員函式_get_next_minibatch_inds()
                # 該函式返回下一個minibatch的roidb的下標
                # 還呼叫了detectron/roi_data/minibatch.py檔案中的get_minibatch方法
                # 該方法會在給定roidb的情況下, 從中構造一個minibatch
                blobs = self.get_next_minibatch()

                # from collections import OrderedDict
                # Blobs必須根據self.get_output_names()在佇列中進行排序
                ordered_blobs = OrderedDict()
                for key in self.get_output_names():
                    assert blobs[key].dtype in (np.int32, np.float32), \
                        "Blob {} of dtype {} must have dtype of" \
                        "np.int32 or np.float32".format(key, blobs[key].dtype)

                    ordered_blobs[key] = blobs[key]
                # from detectron.utils.coordinator import coordianted_put
                # 此處是將minibatch中資料blobs放入佇列的關鍵程式碼
                coordinated_put(self.coordinator, self._minibatch_queue, ordered_blobs)
        logger.info("Stopping mini-batch loading thread")

    # 將mini-batches從mini-batch佇列中轉移到BlobsQueue中.
    def enqueue_blobs_thread(self, gpu_id, blob_names):
        with self.coordinator.stop_on_exception():
            while not self.coordinator.should_stop():
                if self._minibatch_queue.qsize == 0:
                    logger.warning("Mini-batch queue is empty")
                blobs = coordinated_get(self.coordinate, self._minibatch_queue)