Caffe框架原始碼剖析(3)—資料層DataLayer
阿新 • • 發佈:2018-10-31
Caffe網路正向傳導時,首先進行的是DataLayer資料層的傳導。該層從檔案讀取資料,載入至它的上一層卷積層。反向傳播時,因為資料層不需要反傳,所以它的Backward_cpu()和Backward_gpu()都是空函式。下面看一下DataLayer類圖關係。
首先從父類BaseDataLayer開始看原始碼,base_data_layer.hpp標頭檔案:
template <typename Dtype> class BaseDataLayer : public Layer<Dtype> { public: // 建構函式 explicit BaseDataLayer(const LayerParameter& param); // 實現一般資料層構建,並呼叫DataLayerSetup函式 virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top); // 資料層可在並行時共享 virtual inline bool ShareInParallel() const { return true; } // 空的構建函式(該函式為虛擬函式,待子類過載) virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {} // 資料層沒有bottom層,因此Reshape函式為空函式 virtual void Reshape(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {} // 反向傳播,空函式 virtual void Backward_cpu(const vector<Blob<Dtype>*>& top, const vector<bool>& propagate_down, const vector<Blob<Dtype>*>& bottom) {} virtual void Backward_gpu(const vector<Blob<Dtype>*>& top, const vector<bool>& propagate_down, const vector<Blob<Dtype>*>& bottom) {} protected: TransformationParameter transform_param_; shared_ptr<DataTransformer<Dtype> > data_transformer_; // 是否包含有輸出標籤 bool output_labels_; };
base_data_layer.cpp實現檔案
// 建構函式 template <typename Dtype> BaseDataLayer<Dtype>::BaseDataLayer(const LayerParameter& param) : Layer<Dtype>(param), transform_param_(param.transform_param()) { } template <typename Dtype> void BaseDataLayer<Dtype>::LayerSetUp(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) { // 如果top層size大於1,則包含有標籤 if (top.size() == 1) { output_labels_ = false; } else { output_labels_ = true; } data_transformer_.reset( new DataTransformer<Dtype>(transform_param_, this->phase_)); // 初始化隨機數生成器 data_transformer_->InitRand(); // 呼叫構建虛擬函式 DataLayerSetUp(bottom, top); }
接下來看一下子類BasePrefetchingDataLayer類,該類不僅繼承了BaseDataLayer類,還繼承自InternalThread類。因此該類過載了InternalThread類的虛擬函式InternalThreadEntry()。
template <typename Dtype> class BasePrefetchingDataLayer : public BaseDataLayer<Dtype>, public InternalThread { public: explicit BasePrefetchingDataLayer(const LayerParameter& param); // 構建函式 void LayerSetUp(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top); // CPU正向傳導函式 virtual void Forward_cpu(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top); // GPU正向傳導函式 virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top); // 預取資料塊大小 static const int PREFETCH_COUNT = 3; protected: // 執行緒函式,虛擬函式過載 virtual void InternalThreadEntry(); // 載入batch,純虛擬函式,由子類DataLayer實現 virtual void load_batch(Batch<Dtype>* batch) = 0; Batch<Dtype> prefetch_[PREFETCH_COUNT]; BlockingQueue<Batch<Dtype>*> prefetch_free_; BlockingQueue<Batch<Dtype>*> prefetch_full_; Blob<Dtype> transformed_data_; };
base_data_layer.cpp實現檔案
template <typename Dtype>
BasePrefetchingDataLayer<Dtype>::BasePrefetchingDataLayer(
const LayerParameter& param)
: BaseDataLayer<Dtype>(param),
prefetch_free_(), prefetch_full_() {
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_free_.push(&prefetch_[i]);
}
}
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
// 先呼叫父類LayerSetUp
BaseDataLayer<Dtype>::LayerSetUp(bottom, top);
// 執行緒開啟前先分配記憶體&視訊記憶體,防止在某些GPU上報錯
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_[i].data_.mutable_cpu_data();
if (this->output_labels_) {
prefetch_[i].label_.mutable_cpu_data();
}
}
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_[i].data_.mutable_gpu_data();
if (this->output_labels_) {
prefetch_[i].label_.mutable_gpu_data();
}
}
}
#endif
DLOG(INFO) << "Initializing prefetch";
// 初始化隨機數生成器
this->data_transformer_->InitRand();
// 開啟執行緒
StartInternalThread();
DLOG(INFO) << "Prefetch initialized.";
}
// 執行緒函式,由StartInternalThread開啟
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
#ifndef CPU_ONLY
// 在GPU上啟用stream非同步載入
cudaStream_t stream;
if (Caffe::mode() == Caffe::GPU) {
CUDA_CHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
}
#endif
try {
while (!must_stop()) {
Batch<Dtype>* batch = prefetch_free_.pop();
// 載入batch,該函式由子類DataLayer實現
load_batch(batch);
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
batch->data_.data().get()->async_gpu_push(stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
}
#endif
prefetch_full_.push(batch);
}
} catch (boost::thread_interrupted&) {
// Interrupted exception is expected on shutdown
}
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
CUDA_CHECK(cudaStreamDestroy(stream));
}
#endif
}
// CPU正向傳導
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty");
// Reshape成與batch資料同一維度
top[0]->ReshapeLike(batch->data_);
// 將batch資料拷貝至top層blob[0]
caffe_copy(batch->data_.count(), batch->data_.cpu_data(),
top[0]->mutable_cpu_data());
DLOG(INFO) << "Prefetch copied";
// 如果包含輸出標籤
if (this->output_labels_) {
// Reshape成batch標籤同一維度
top[1]->ReshapeLike(batch->label_);
// 將batch標籤拷貝至top層blob[1]
caffe_copy(batch->label_.count(), batch->label_.cpu_data(),
top[1]->mutable_cpu_data());
}
prefetch_free_.push(batch);
}
// 如果CPU_ONLY模式則禁止Forward_gpu和Backward_gpu函式
#ifdef CPU_ONLY
STUB_GPU_FORWARD(BasePrefetchingDataLayer, Forward);
#endif
最後分析下最終的子類DataLayer,由於很多方法由它的父類實現了,該類功能很簡單了,只過載了兩個虛擬函式DataLayerSetUp()和load_batch()。
template <typename Dtype>
class DataLayer : public BasePrefetchingDataLayer<Dtype> {
public:
explicit DataLayer(const LayerParameter& param);
virtual ~DataLayer();
// 構建函式,過載虛擬函式
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
// DataLayer uses DataReader instead for sharing for parallelism
virtual inline bool ShareInParallel() const { return false; }
virtual inline const char* type() const { return "Data"; }
virtual inline int ExactNumBottomBlobs() const { return 0; }
virtual inline int MinTopBlobs() const { return 1; }
virtual inline int MaxTopBlobs() const { return 2; }
protected:
// 載入batch,過載虛擬函式
virtual void load_batch(Batch<Dtype>* batch);
// DataReader物件
DataReader reader_;
};
cpp檔案如下,
// 建構函式
template <typename Dtype>
DataLayer<Dtype>::DataLayer(const LayerParameter& param)
: BasePrefetchingDataLayer<Dtype>(param),
reader_(param) {
}
// 解構函式
template <typename Dtype>
DataLayer<Dtype>::~DataLayer() {
// 終止執行緒
this->StopInternalThread();
}
template <typename Dtype>
void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
const int batch_size = this->layer_param_.data_param().batch_size();
// 讀取一個dataum,用來初始化top blob維度
Datum& datum = *(reader_.full().peek());
// 從datum獲取單個數據維度
vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
this->transformed_data_.Reshape(top_shape);
// 加上batch尺寸
top_shape[0] = batch_size;
// Reshape
top[0]->Reshape(top_shape);
for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
// Reshape,並分配data記憶體
this->prefetch_[i].data_.Reshape(top_shape);
}
// 輸出尺寸資訊
LOG(INFO) << "output data size: " << top[0]->num() << ","
<< top[0]->channels() << "," << top[0]->height() << ","
<< top[0]->width();
// label
if (this->output_labels_) {
vector<int> label_shape(1, batch_size);
top[1]->Reshape(label_shape);
for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
// Reshape,並分配label記憶體
this->prefetch_[i].label_.Reshape(label_shape);
}
}
}
// 該函式被InternalThreadEntry執行緒函式呼叫
template<typename Dtype>
void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
CPUTimer batch_timer;
batch_timer.Start();
double read_time = 0;
double trans_time = 0;
CPUTimer timer;
CHECK(batch->data_.count());
CHECK(this->transformed_data_.count());
// 讀取一個dataum,用來初始化top blob維度,同上
const int batch_size = this->layer_param_.data_param().batch_size();
Datum& datum = *(reader_.full().peek());
vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
this->transformed_data_.Reshape(top_shape);
top_shape[0] = batch_size;
batch->data_.Reshape(top_shape);
Dtype* top_data = batch->data_.mutable_cpu_data();
Dtype* top_label = NULL; // suppress warnings about uninitialized variables
if (this->output_labels_) {
top_label = batch->label_.mutable_cpu_data();
}
// 迴圈載入batch
for (int item_id = 0; item_id < batch_size; ++item_id) {
timer.Start();
// 讀取資料datum
Datum& datum = *(reader_.full().pop("Waiting for data"));
// 統計讀取時間
read_time += timer.MicroSeconds();
timer.Start();
// 計算指標offset
int offset = batch->data_.offset(item_id);
this->transformed_data_.set_cpu_data(top_data + offset);
// 將datum資料拷貝到batch中
this->data_transformer_->Transform(datum, &(this->transformed_data_));
// 拷貝標籤
if (this->output_labels_) {
top_label[item_id] = datum.label();
}
// 統計拷貝時間
trans_time += timer.MicroSeconds();
reader_.free().push(const_cast<Datum*>(&datum));
}
timer.Stop();
// 統計載入batch總時間
batch_timer.Stop();
// 輸出時間開銷
DLOG(INFO) << "Prefetch batch: " << batch_timer.MilliSeconds() << " ms.";
DLOG(INFO) << " Read time: " << read_time / 1000 << " ms.";
DLOG(INFO) << "Transform time: " << trans_time / 1000 << " ms.";
}