PaddlePaddle之資料預處理
根據官網的資料,總結出PaddlePaddle支援多種不同的資料格式,包括四種資料型別和三種序列格式:
四種資料型別:
- dense_vector:稠密的浮點數向量。
- sparse_binary_vector:稀疏的二值向量,即大部分值為0,但有值的地方必須為1。
- sparse_float_vector:稀疏的向量,即大部分值為0,但有值的部分可以是任何浮點數。
- integer:整型格式
api如下:
-
paddle.v2.data_type.dense_vector(dim, seq_type=0)
- dim(int) 向量維度
- seq_type(int)輸入的序列格式
- 說明:稠密向量,輸入特徵是一個稠密的浮點向量。舉個例子,手寫數字識別裡的輸入圖片是28*28的畫素,Paddle的神經網路的輸入應該是一個784維的稠密向量。
- 引數:
- 返回型別:InputType
-
paddle.v2.data_type.sparse_binary_vector(dim, seq_type=0)
- 說明:稀疏的二值向量。輸入特徵是一個稀疏向量,這個向量的每個元素要麼是0,要麼是1
- 引數:同上
- 返回型別:同上
-
paddle.v2.data_type.sparse_vector(dim, seq_type=0)
- 說明:稀疏向量,向量裡大多數元素是0,其他的值可以是任意的浮點值
- 引數:同上
- 返回型別:同上
-
paddle.v2.data_type.integer_value(value_range, seq_type=0)
- seq_type(int):輸入的序列格式
- value_range(int):每個元素的範圍
- 說明:整型格式
- 引數:
- 返回型別:InputType
不同的資料型別和序列模式返回的格式不同,如下表:

其中f表示浮點數,i表示整數
注意:對sparse_binary_vector和sparse_float_vector,PaddlePaddle存的是有值位置的索引。例如,
-
對一個5維非序列的稀疏01向量 [0, 1, 1, 0, 0] ,型別是sparse_binary_vector,返回的是 [1, 2] 。(因為只有第1位和第2位有值)
-
對一個5維非序列的稀疏浮點向量 [0, 0.5, 0.7, 0, 0] ,型別是sparse_float_vector,返回的是 [(1, 0.5), (2, 0.7)] 。(因為只有第一位和第二位有值,分別是0.5和0.7)
PaddlePaddle的資料讀取方式
我們瞭解了上文的四種基本資料格式和三種序列模式後,在處理自己的資料時可以根據需求選擇,但是處理完資料後如何把資料放到模型裡去訓練呢?我們知道,基本的方法一般有兩種:
-
一次性載入到記憶體:模型訓練時直接從記憶體中取資料,不需要大量的IO消耗,速度快,適合少量資料。
-
載入到磁碟/HDFS/共享儲存等:這樣不用佔用記憶體空間,在處理大量資料時一般採取這種方式,但是缺點是每次資料載入進來也是一次IO的開銷,非常影響速度。
在PaddlePaddle中我們可以有三種模式來讀取資料:分別是reader、reader creator和reader decorator,這三者有什麼區別呢?
-
reader:從本地、網路、分散式檔案系統HDFS等讀取資料,也可隨機生成資料,並返回一個或多個數據項。
-
reader creator:一個返回reader的函式。
-
reader decorator:裝飾器,可組合一個或多個reader。
reader
我們先以reader為例,為房價資料(斯坦福吳恩達的公開課第一課舉例的資料)建立一個reader:
- 建立一個reader,實質上是一個迭代器,每次返回一條資料(此處以房價資料為例)
reader = paddle.dataset.uci_housing.train()
- 建立一個shuffle_reader,把上一步的reader放進去,配置buf_size就可以讀取buf_size大小的資料自動做shuffle,讓資料打亂,隨機化
shuffle_reader = paddle.reader.shuffle(reader,buf_size= 100)
- 建立一個batch_reader,把上一步混洗好的shuffle_reader放進去,給定batch_size,即可建立。
batch_reader = paddle.batch(shuffle_reader,batch_size = 2)
這三種方式也可以組合起來放一塊:
reader = paddle.batch( paddle.reader.shuffle( uci_housing.train(), buf_size = 100), batch_size=2) 複製程式碼
可以以一個直觀的圖來表示:

reader creator
如果想要生成一個簡單的隨機資料,以reader creator為例:
def reader_creator():def reader():while True:yield numpy.random.uniform(-1,1,size=784)return reader 原始碼見creator.py, 支援四種格式:np_array,text_file,RecordIO和cloud_reader __all__ = ['np_array', 'text_file', "cloud_reader"] def np_array(x): """ Creates a reader that yields elements of x, if it is a numpy vector. Or rows of x, if it is a numpy matrix. Or any sub-hyperplane indexed by the highest dimension. :param x: the numpy array to create reader from. :returns: data reader created from x. """ def reader(): if x.ndim < 1: yield x for e in x: yield e return reader def text_file(path): """ Creates a data reader that outputs text line by line from given text file. Trailing new line ('\\\\n') of each line will be removed. :path: path of the text file. :returns: data reader of text file """ def reader(): f = open(path, "r") for l in f: yield l.rstrip('\n') f.close() return reader def recordio(paths, buf_size=100): """ Creates a data reader from given RecordIO file paths separated by ",", glob pattern is supported. :path: path of recordio files, can be a string or a string list. :returns: data reader of recordio files. """ import recordio as rec import paddle.v2.reader.decorator as dec import cPickle as pickle def reader(): if isinstance(paths, basestring): path = paths else: path = ",".join(paths) f = rec.reader(path) while True: r = f.read() if r is None: break yield pickle.loads(r) f.close() return dec.buffered(reader, buf_size) pass_num = 0 def cloud_reader(paths, etcd_endpoints, timeout_sec=5, buf_size=64): """ Create a data reader that yield a record one by one from the paths: :paths: path of recordio files, can be a string or a string list. :etcd_endpoints: the endpoints for etcd cluster :returns: data reader of recordio files. ..code-block:: python from paddle.v2.reader.creator import cloud_reader etcd_endpoints = "http://127.0.0.1:2379" trainer.train.( reader=cloud_reader(["/work/dataset/uci_housing/uci_housing*"], etcd_endpoints), ) """ import os import cPickle as pickle import paddle.v2.master as master c = master.client(etcd_endpoints, timeout_sec, buf_size) if isinstance(paths, basestring): path = [paths] else: path = paths c.set_dataset(path) def reader(): global pass_num c.paddle_start_get_records(pass_num) pass_num += 1 while True: r, e = c.next_record() if not r: if e != -2: print "get record error: ", e break yield pickle.loads(r) return reader 複製程式碼
reader decorator
如果想要讀取同時讀取兩部分的資料,那麼可以定義兩個reader,合併後對其進行shuffle。如我想讀取所有使用者對比車系的資料和瀏覽車系的資料,可以定義兩個reader,分別為contrast()和view(),然後通過預定義的reader decorator快取並組合這些資料,在對合並後的資料進行亂序操作。原始碼見decorator.py
data = paddle.reader.shuffle( paddle.reader.compose( paddle.reader(contradt(contrast_path),buf_size = 100), paddle.reader(view(view_path),buf_size = 200),500) 複製程式碼
這樣有一個很大的好處,就是組合特徵來訓練變得更容易了!傳統的跑模型的方法是,確定label和feature,儘可能多的找合適的feature扔到模型裡去訓練,這樣我們就需要做一張大表,訓練完後我們可以分析某些特徵的重要性然後重新增加或減少一些feature來進行訓練,這樣我們有需要對原來的label-feature表進行修改,如果資料量小沒啥影響,就是麻煩點,但是資料量大的話需要每一次增加feature,和主鍵、label來join的操作都會很耗時,如果採取這種方式的話,我們可以對某些同一類的特徵做成一張表,資料存放的地址存為一個變數名,每次跑模型的時候想選取幾類特徵,就建立幾個reader,用reader decorator 組合起來,最後再shuffle灌倒模型裡去訓練。這!樣!是!不!是!很!方!便!
如果沒理解,我舉一個例項,假設我們要預測使用者是否會買車,label是買車 or 不買車,feature有瀏覽車系、對比車系、關注車系的功能偏好等等20個,傳統的思維是做成這樣一張表:

如果想要減少feature_2,看看feature_2對模型的準確率影響是否很大,那麼我們需要在這張表裡去掉這一列,想要增加一個feature的話,也需要在feature裡增加一列,如果用reador decorator的話,我們可以這樣做資料集:

把相同型別的feature放在一起,不用頻繁的join減少時間,一共做四個表,建立4個reador:
data = paddle.reader.shuffle( paddle.reader.compose( paddle.reader(table1(table1_path),buf_size = 100), paddle.reader(table2(table2_path),buf_size = 100), paddle.reader(table3(table3_path),buf_size = 100), paddle.reader(table4(table4_path),buf_size = 100), 500) 複製程式碼
如果新發現了一個特徵,想嘗試這個特徵對模型提高準確率有沒有用,可以再單獨把這個特徵資料提取出來,再增加一個reader,用reader decorator組合起來,shuffle後放入模型裡跑就行了。
PaddlePaddle的資料預處理例項
還是以手寫數字為例,對資料進行處理後並劃分train和test,只需要4步即可:
1.指定資料地址
import paddle.v2.dataset.common import subprocess import numpy import platform __all__ = ['train', 'test', 'convert'] URL_PREFIX = 'http://yann.lecun.com/exdb/mnist/' TEST_IMAGE_URL = URL_PREFIX + 't10k-images-idx3-ubyte.gz' TEST_IMAGE_MD5 = '9fb629c4189551a2d022fa330f9573f3' TEST_LABEL_URL = URL_PREFIX + 't10k-labels-idx1-ubyte.gz' TEST_LABEL_MD5 = 'ec29112dd5afa0611ce80d1b7f02629c' TRAIN_IMAGE_URL = URL_PREFIX + 'train-images-idx3-ubyte.gz' TRAIN_IMAGE_MD5 = 'f68b3c2dcbeaaa9fbdd348bbdeb94873' TRAIN_LABEL_URL = URL_PREFIX + 'train-labels-idx1-ubyte.gz' TRAIN_LABEL_MD5 = 'd53e105ee54ea40749a09fcbcd1e9432' 複製程式碼
2.建立reader creator
def reader_creator(image_filename, label_filename, buffer_size): # 建立一個reader def reader(): if platform.system() == 'Darwin': zcat_cmd = 'gzcat' elif platform.system() == 'Linux': zcat_cmd = 'zcat' else: raise NotImplementedError() m = subprocess.Popen([zcat_cmd, image_filename], stdout=subprocess.PIPE) m.stdout.read(16) l = subprocess.Popen([zcat_cmd, label_filename], stdout=subprocess.PIPE) l.stdout.read(8) try:# reader could be break. while True: labels = numpy.fromfile( l.stdout, 'ubyte', count=buffer_size).astype("int") if labels.size != buffer_size: break# numpy.fromfile returns empty slice after EOF. images = numpy.fromfile( m.stdout, 'ubyte', count=buffer_size * 28 * 28).reshape( (buffer_size, 28 * 28)).astype('float32') images = images / 255.0 * 2.0 - 1.0 for i in xrange(buffer_size): yield images[i, :], int(labels[i]) finally: m.terminate() l.terminate() return reader 複製程式碼
3.建立訓練集和測試集
def train(): """ 建立mnsit的訓練集 reader creator 返回一個reador creator,每個reader裡的樣本都是圖片的畫素值,在區間[0,1]內,label為0~9 返回:training reader creator """ return reader_creator( paddle.v2.dataset.common.download(TRAIN_IMAGE_URL, 'mnist', TRAIN_IMAGE_MD5), paddle.v2.dataset.common.download(TRAIN_LABEL_URL, 'mnist', TRAIN_LABEL_MD5), 100) def test(): """ 建立mnsit的測試集 reader creator 返回一個reador creator,每個reader裡的樣本都是圖片的畫素值,在區間[0,1]內,label為0~9 返回:testreader creator """ return reader_creator( paddle.v2.dataset.common.download(TEST_IMAGE_URL, 'mnist', TEST_IMAGE_MD5), paddle.v2.dataset.common.download(TEST_LABEL_URL, 'mnist', TEST_LABEL_MD5), 100) 複製程式碼
4.下載資料並轉換成相應格式
def fetch(): paddle.v2.dataset.common.download(TRAIN_IMAGE_URL, 'mnist', TRAIN_IMAGE_MD5) paddle.v2.dataset.common.download(TRAIN_LABEL_URL, 'mnist', TRAIN_LABEL_MD5) paddle.v2.dataset.common.download(TEST_IMAGE_URL, 'mnist', TEST_IMAGE_MD5) paddle.v2.dataset.common.download(TEST_LABEL_URL, 'mnist', TRAIN_LABEL_MD5) def convert(path): """ 將資料格式轉換為 recordio format """ paddle.v2.dataset.common.convert(path, train(), 1000, "minist_train") paddle.v2.dataset.common.convert(path, test(), 1000, "minist_test") 複製程式碼
如果想換成自己的訓練資料,只需要按照步驟改成自己的資料地址,建立相應的reader creator(或者reader decorator)即可。
這是影象的例子,如果我們想訓練一個文字模型,做一個情感分析,這個時候如何處理資料呢?步驟也很簡單。假設我們有一堆資料,每一行為一條樣本,以 \t 分隔,第一列是類別標籤,第二列是輸入文字的內容,文字內容中的詞語以空格分隔。以下是兩條示例資料:
- positive 今天終於試了自己理想的車 外觀太騷氣了 而且中控也很棒
- negative 這臺車好貴 而且還費油 價效比太低了
現在開始做資料預處理
1.建立reader
def train_reader(data_dir, word_dict, label_dict): def reader(): UNK_ID = word_dict["<UNK>"] word_col = 0 lbl_col = 1 for file_name in os.listdir(data_dir): with open(os.path.join(data_dir, file_name), "r") as f: for line in f: line_split = line.strip().split("\t") word_ids = [ word_dict.get(w, UNK_ID) for w in line_split[word_col].split() ] yield word_ids, label_dict[line_split[lbl_col]] return reader 複製程式碼
返回型別為: paddle.data_type.integer_value_sequence(詞語在字典的序號)和 paddle.data_type.integer_value(類別標籤)
2.組合讀取方式
train_reader = paddle.batch( paddle.reader.shuffle( reader.train_reader(train_data_dir, word_dict, lbl_dict), buf_size=1000), batch_size=batch_size) 複製程式碼
完整的程式碼如下(加上了劃分train和test部分):
train_reader = paddle.batch( paddle.reader.shuffle( reader.train_reader(train_data_dir, word_dict, lbl_dict), buf_size=1000), batch_size=batch_size) 完整的程式碼如下(加上了劃分train和test部分): import os def train_reader(data_dir, word_dict, label_dict): """ 建立訓練資料reader :param data_dir: 資料地址. :type data_dir: str :param word_dict: 詞典地址, 詞典裡必須有 "UNK" . :type word_dict:python dict :param label_dict: label 字典的地址 :type label_dict: Python dict """ def reader(): UNK_ID = word_dict["<UNK>"] word_col = 1 lbl_col = 0 for file_name in os.listdir(data_dir): with open(os.path.join(data_dir, file_name), "r") as f: for line in f: line_split = line.strip().split("\t") word_ids = [ word_dict.get(w, UNK_ID) for w in line_split[word_col].split() ] yield word_ids, label_dict[line_split[lbl_col]] return reader def test_reader(data_dir, word_dict): """ 建立測試資料reader :param data_dir: 資料地址. :type data_dir: str :param word_dict: 詞典地址, 詞典裡必須有 "UNK" . :type word_dict:python dict """ def reader(): UNK_ID = word_dict["<UNK>"] word_col = 1 for file_name in os.listdir(data_dir): with open(os.path.join(data_dir, file_name), "r") as f: for line in f: line_split = line.strip().split("\t") if len(line_split) < word_col: continue word_ids = [ word_dict.get(w, UNK_ID) for w in line_split[word_col].split() ] yield word_ids, line_split[word_col] return reader 複製程式碼
總結
這篇文章主要講了在paddlepaddle裡如何載入自己的資料集,轉換成相應的格式,並劃分train和test。我們在使用一個框架的時候通常會先去跑幾個簡單的demo,但是如果不用常見的demo的資料,自己做一個實際的專案,完整的跑通一個模型,這才代表我們掌握了這個框架的基本應用知識。跑一個模型第一步就是資料預處理,在paddlepaddle裡,提供的方式非常簡單,但是有很多優點:
- shuffle資料非常方便
- 可以將資料組合成batch訓練
- 可以利用reader decorator來組合多個reader,提高組合特徵執行模型的效率
- 可以多執行緒讀取資料
而我之前使用過mxnet來訓練車牌識別的模型,50w的圖片資料想要一次訓練是非常慢的,這樣的話就有兩個解決方法:一是批量訓練,這一點大多數的框架都會有, 二是轉換成mxnet特有的rec格式,提高讀取效率,可以通過im2rec.py將圖片轉換,比較麻煩,如果是tesnorflow,也有相對應的特定格式tfrecord,這幾種方式各有優劣,從易用性上,paddlepaddle是比較簡單的。
轉載:寬客線上