1. 程式人生 > >CAFFE原始碼學習筆記之十-data_layer

CAFFE原始碼學習筆記之十-data_layer

一、前言

CAFFE在搭建CNN網路的時候,第一層就是資料層,所以本節梳理一下同樣很龐大的DataLayer層。
先給一個網路結構:
這裡寫圖片描述

Layer類: 層的基類;

BaseDataLayer類:資料層的基類;

BasePrefetchingDataLayer類:預取層,主要是預先讀取若干批次的資料,平衡CPU與GPU頻寬和GPU計算速度。從繼承關係可以看出,該層是多執行緒系統主要發揮作用的地方。
其實多執行緒系統在caffe中主要就是為GPU服務,準備資料的。

BasePrefetchingDataLayer類要做的就是資料的加工了。這一部分主要完成兩件事:

1、確定資料層最終的輸出(可以不輸出label的)
2、完成資料層預處理(通常要做一些白化資料的簡單工作,比如減均值,乘係數)

DataLayer類:資料層,網路結構的第一層。Caffe的DataLayer的主要目標是讀入兩種DB的訓練資料作為輸入,而兩種DB記憶體儲的格式預設是一種叫Datum的資料結構。該層就是將Datum讀取到blob中。

message Datum {
  optional int32 channels = 1;
  optional int32 height = 2;
  optional int32 width = 3;
  // the actual image data, in bytes
  optional bytes data = 4;
  optional int32 label = 5
; // Optionally, the datum could also hold float data. repeated float float_data = 6; // If true data contains an encoded image that need to be decoded optional bool encoded = 7 [default = false]; }

其餘層都是資料的儲存層,主要儲存的格式有:
1、HDF5格式;包括將資料從硬碟讀出,將資料寫入硬碟;
2、ImageDataLayer:影象檔案直接讀取
3、MemoryDatalayer:從記憶體中讀取,直觀感覺是速度快;
4、WindowDataLayer:從影象資料的視窗,一般是opencv相關的吧。
5、DummyDataLayer:通過Filler產生的資料。

二、base_data_layer檔案

在base_data_layer.hpp和base_data_layer.cpp檔案中,分別定義了三個類:BaseDataLayer,Batch,BasePrefetchingDataLayer。
1、Batch類
Batch實際就是資料和標籤,其資料型別就是Blob。

template <typename Dtype>
class Batch {
 public:
  Blob<Dtype> data_, label_;
};

2、BaseDataLayer類
該類是datalayer的基類,其中由該類自己實現的成員函式只有兩個:
a、建構函式
由於其繼承了Layer類,所以首先構造基類Layer;
然後用transform_param()初始化其成員變數,為轉換資料的維度或者預處理做準備。

template <typename Dtype>
BaseDataLayer<Dtype>::BaseDataLayer(const LayerParameter& param)
    : Layer<Dtype>(param),
      transform_param_(param.transform_param()) {
}

b、LayerSetUp函式
資料層的初始化,初始化時根據top的大小來確定,如果大小為1,表明只需要輸出資料即可,不輸出類標誌。

template <typename Dtype>
void BaseDataLayer<Dtype>::LayerSetUp(const vector<Blob<Dtype>*>& bottom,const vector<Blob<Dtype>*>& top) {
  if (top.size() == 1) {
    output_labels_ = false;
  } else {
    output_labels_ = true;
  }
  data_transformer_.reset(
      new DataTransformer<Dtype>(transform_param_, this->phase_));//初始化DataTransformer例項,進行資料的預處理
  data_transformer_->InitRand();
  // The subclasses should setup the size of bottom and top
  DataLayerSetUp(bottom, top);//實際的Layer初始化是呼叫DataLayerSetUp函式,對特殊的層進行初始化的。該函式是純虛擬函式,繼承類必須自己實現。
}

3、BasePrefetchingDataLayer 類
該類繼承自InternalThread和BaseDataLayer類,所以預取操作是採用多執行緒的系統。

a、功能
因為GPU計算速度和頻寬跟CPU都有較大的差距,所以需要在GPU在計算的時候預先取出若干批次的資料。而該類就是實現這個功能的。
b、成員變數
可以看出,在batch級別的預取操作中,使用了雙阻塞佇列。


vector<shared_ptr<Batch<Dtype> > > prefetch_:預先讀取的若干批次資料的容器;

BlockingQueue<Batch<Dtype>*> prefetch_free_;生產者佇列

BlockingQueue<Batch<Dtype>*> prefetch_full_;消費者佇列

Batch<Dtype>* prefetch_current_;指向當前批次的資料的指標

Blob<Dtype> transformed_data_;需要注意的是之前的成員變數都是batch級別的,而該變數則是Blob型資料。
template <typename Dtype>
class BasePrefetchingDataLayer :
    public BaseDataLayer<Dtype>, public InternalThread {
 public:
  explicit BasePrefetchingDataLayer(const LayerParameter& param);
  // LayerSetUp: implements common data layer setup functionality, and calls
  // DataLayerSetUp to do special data layer setup for individual layer types.
  // This method may not be overridden.
  void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top);

  virtual void Forward_cpu(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top);
  virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top);

 protected:
  virtual void InternalThreadEntry();//開啟預取執行緒
  virtual void load_batch(Batch<Dtype>* batch) = 0;//純虛擬函式,主要是需要data_layer自己實現

  vector<shared_ptr<Batch<Dtype> > > prefetch_;//預取的若干batch
  BlockingQueue<Batch<Dtype>*> prefetch_free_;
  BlockingQueue<Batch<Dtype>*> prefetch_full_;
  Batch<Dtype>* prefetch_current_;//指向當前批次的指標

  Blob<Dtype> transformed_data_;//被修正過的資料
};

c、建構函式

template <typename Dtype>
BasePrefetchingDataLayer<Dtype>::BasePrefetchingDataLayer(
    const LayerParameter& param)
    : BaseDataLayer<Dtype>(param),
      prefetch_(param.data_param().prefetch()),
      prefetch_free_(), prefetch_full_(), prefetch_current_() {//預設初始化阻塞佇列
  for (int i = 0; i < prefetch_.size(); ++i) {
    prefetch_[i].reset(new Batch<Dtype>());//根據預取的size初始化prefetch_
    prefetch_free_.push(prefetch_[i].get());//根據prefetch_初始化生產者的阻塞佇列
  }
}

d、LayerSetUp函式
初始化相關資料結構之後,開啟預取執行緒。

template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
    const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
  BaseDataLayer<Dtype>::LayerSetUp(bottom, top);//呼叫父類的setup函式


  for (int i = 0; i < prefetch_.size(); ++i) {
    prefetch_[i]->data_.mutable_cpu_data();
    if (this->output_labels_) {
      prefetch_[i]->label_.mutable_cpu_data();
    }//作者解釋:在開啟預取執行緒之前,必須主動呼叫mutable_cpu_data或者mutable_gpu_data,防止執行緒同時呼叫兩個函式。這是因為在某些GPU上不這麼做會發生錯誤。
  }
#ifndef CPU_ONLY
  if (Caffe::mode() == Caffe::GPU) {
    for (int i = 0; i < prefetch_.size(); ++i) {
      prefetch_[i]->data_.mutable_gpu_data();
      if (this->output_labels_) {
        prefetch_[i]->label_.mutable_gpu_data();
      }
    }
  }
#endif
  DLOG(INFO) << "Initializing prefetch";
  this->data_transformer_->InitRand();//初始化隨機數種子
  StartInternalThread();//開啟預取執行緒,執行緒啟動的工作是搬運全域性資源,初始化boost::thread等。
  DLOG(INFO) << "Prefetch initialized.";
}

e、InternalThreadEntry()
在之前的internel thread模組提到,InternalThreadEntry()函式沒有實現,是在繼承類中由繼承者實現的。

template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
#ifndef CPU_ONLY
  cudaStream_t stream;//建立流
  if (Caffe::mode() == Caffe::GPU) {
    CUDA_CHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
  }
#endif

  try {
    while (!must_stop()) {
      Batch<Dtype>* batch = prefetch_free_.pop();//從生產者佇列中pop出一個batch的資料
      load_batch(batch);//load_batch是純虛擬函式,任何繼承該類的繼承類都必須自己實現。
#ifndef CPU_ONLY
      if (Caffe::mode() == Caffe::GPU) {
        batch->data_.data().get()->async_gpu_push(stream);//如果是GPU模式,則是使用非同步流同步向GPU推送資料,該函式在syncmem中就已經總結了。
        if (this->output_labels_) {
          batch->label_.data().get()->async_gpu_push(stream);
        }
        CUDA_CHECK(cudaStreamSynchronize(stream));
      }
#endif
      prefetch_full_.push(batch);//將batch裝載進消費者佇列中
    }
  } catch (boost::thread_interrupted&) {
    // Interrupted exception is expected on shutdown
  }
#ifndef CPU_ONLY
  if (Caffe::mode() == Caffe::GPU) {
    CUDA_CHECK(cudaStreamDestroy(stream));//銷燬流
  }
#endif
}

f、datalayer中的foward_cpu()

template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
    const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
  if (prefetch_current_) {
    prefetch_free_.push(prefetch_current_);
  }
  prefetch_current_ = prefetch_full_.pop("Waiting for data");//從消費者佇列中彈出一個batch,這中間會有條件變數進行多執行緒下的資源同步
  // 根據batch的形狀修改top的形狀
  top[0]->ReshapeLike(prefetch_current_->data_);
  top[0]->set_cpu_data(prefetch_current_->data_.mutable_cpu_data());//初始化top
  if (this->output_labels_) {
    // Reshape to loaded labels.
    top[1]->ReshapeLike(prefetch_current_->label_);
    top[1]->set_cpu_data(prefetch_current_->label_.mutable_cpu_data());
  }
}

三、data_layer
該層繼承自預取層,
標頭檔案如下:

template <typename Dtype>
class DataLayer : public BasePrefetchingDataLayer<Dtype> {
 public:
  explicit DataLayer(const LayerParameter& param);
  virtual ~DataLayer();
  virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top);
  // DataLayer uses DataReader instead for sharing for parallelism?
  //我看的這個版本中沒有DataReader
  virtual inline bool ShareInParallel() const { return false; }//是否並行訓練時共享資料
  virtual inline const char* type() const { return "Data"; }
  virtual inline int ExactNumBottomBlobs() const { return 0; }
  virtual inline int MinTopBlobs() const { return 1; }
  virtual inline int MaxTopBlobs() const { return 2; }

 protected:
  void Next();//遊標移動
  bool Skip();//跳過某些資料
  virtual void load_batch(Batch<Dtype>* batch);//將影象資料從資料庫中讀取到batch中

//下面三個變數在之前的版本是用DataReader類表示的。現在看樣子是沒有了。
  shared_ptr<db::DB> db_;//資料庫格式資料
  shared_ptr<db::Cursor> cursor_;//遊標,配合資料庫取數
  uint64_t offset_;//偏移量,在blob中offset可以算出當前影象在batch中的位置
};

具體實現:

template <typename Dtype>
DataLayer<Dtype>::DataLayer(const LayerParameter& param)
  : BasePrefetchingDataLayer<Dtype>(param),
    offset_() {//建構函式中的基類開啟執行緒
  db_.reset(db::GetDB(param.data_param().backend()));//protobuf引數初始化資料庫型別
  db_->Open(param.data_param().source(), db::READ);//開啟資料庫檔案
  cursor_.reset(db_->NewCursor());//初始化遊標
}

template <typename Dtype>
DataLayer<Dtype>::~DataLayer() {
  this->StopInternalThread();//解構函式是結束執行緒
}

template <typename Dtype>
void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top) {
  const int batch_size = this->layer_param_.data_param().batch_size();//每批大小

  Datum datum;//表示一個影象資料
  datum.ParseFromString(cursor_->value());//從資料庫中的map中根據遊標讀取影象檔案


  vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);//根據datum的形狀推測top的形狀
  this->transformed_data_.Reshape(top_shape);根據推測出的形狀重塑
  // Reshape top[0] and prefetch_data according to the batch_size.  
  // 既然獲取了資料的形狀(channel,height,width),那麼這裡再設定一下batch_size  
  // top_shape[0]=batch_size  
  // top_shape[1]=channel  
  // top_shape[2]=height  
  // top_shape[3]=width 
  top_shape[0] = batch_size;
  top[0]->Reshape(top_shape);
  for (int i = 0; i < this->prefetch_.size(); ++i) {
    this->prefetch_[i]->data_.Reshape(top_shape);
  }//設定預取資料的形狀
  LOG_IF(INFO, Caffe::root_solver())
      << "output data size: " << top[0]->num() << ","
      << top[0]->channels() << "," << top[0]->height() << ","
      << top[0]->width();
  // label
  if (this->output_labels_) {
    vector<int> label_shape(1, batch_size);
    top[1]->Reshape(label_shape);
    for (int i = 0; i < this->prefetch_.size(); ++i) {
      this->prefetch_[i]->label_.Reshape(label_shape);
    }
  }
}

template <typename Dtype>
bool DataLayer<Dtype>::Skip() {
  int size = Caffe::solver_count();//並行訓練的個數
  int rank = Caffe::solver_rank();//並行訓練的序號
  bool keep = (offset_ % size) == rank ||
              // In test mode, only rank 0 runs, so avoid skipping
              this->layer_param_.phase() == TEST;
  return !keep;//跳過了哪些資料?
}

template<typename Dtype>
void DataLayer<Dtype>::Next() {
  cursor_->Next();
  if (!cursor_->valid()) {
    LOG_IF(INFO, Caffe::root_solver())
        << "Restarting data prefetching from start.";
    cursor_->SeekToFirst();//說明遊標到了末尾
  }
  offset_++;//遊標偏移量的移動
}

// This function is called on prefetch thread
template<typename Dtype>
void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {//將資料庫中的資料載入到batch中。
  CPUTimer batch_timer;
  batch_timer.Start();
  double read_time = 0;
  double trans_time = 0;
  CPUTimer timer;
  CHECK(batch->data_.count());
  CHECK(this->transformed_data_.count());
  const int batch_size = this->layer_param_.data_param().batch_size();

  Datum datum;//單個影象資料
  for (int item_id = 0; item_id < batch_size; ++item_id) {
    timer.Start();
    while (Skip()) {
      Next();
    }
    datum.ParseFromString(cursor_->value());//從資料庫中獲取的影象資料
    read_time += timer.MicroSeconds();

    if (item_id == 0) {
      //根據每個batch的第一個資料來推測形狀
      //一個Blob的shape,[batch_size,channels,height,width],後三個shape都可以由Datum推斷出來。
      vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
      this->transformed_data_.Reshape(top_shape);
      top_shape[0] = batch_size;
      batch->data_.Reshape(top_shape);
    }//Transformer提供了一個由Datum堆砌成Blob的途徑

    timer.Start();
    int offset = batch->data_.offset(item_id);//根據該批次內的編號設定偏移量
    //每個Datum在Blob的偏移位置必須計算出來,只要偏移offset=Blob.offset(i)即可,i 為一個Batch內的樣本資料下標

//Blob具體的shape必須提前計算出來,而且必須啟動SyncedMemory自動機,分配實際記憶體
    Dtype* top_data = batch->data_.mutable_cpu_data();
    this->transformed_data_.set_cpu_data(top_data + offset);
    this->data_transformer_->Transform(datum, &(this->transformed_data_));

    if (this->output_labels_) {
      Dtype* top_label = batch->label_.mutable_cpu_data();
      top_label[item_id] = datum.label();
    }
    trans_time += timer.MicroSeconds();
    Next();
  }
  timer.Stop();
  batch_timer.Stop();
  DLOG(INFO) << "Prefetch batch: " << batch_timer.MilliSeconds() << " ms.";
  DLOG(INFO) << "     Read time: " << read_time / 1000 << " ms.";
  DLOG(INFO) << "Transform time: " << trans_time / 1000 << " ms.";
}

別忘了例項化該類,以及註冊層

INSTANTIATE_CLASS(DataLayer);  
REGISTER_LAYER_CLASS(Data);  

四、總結
在大多數解釋該模組的文章中都有datareader這個模組,整個資料層就可以描述成一個兩級緩衝的系統。
第一級為從資料庫中讀取當個的影象檔案,按照batch_size
儲存在一個batch中。
第二級則是以batch為單位,使用雙阻塞佇列將若干batch存入prefetch_容器中。
如圖可以說明問題:
這裡寫圖片描述
但是我發現現在的版本中沒有了DataReader類,而是直接從資料庫中讀取檔案了。不過大致的流程沒有改變。

第一級從資料庫中將Datum檔案按照Blob的格式存放到batch中,根據Blob中總結的偏移量計算得到座標就可以對號入座了。