深度學習小白——Tensorflow(三) 讀取資料
Tensorflow 程式讀取資料一共有3種方法:
- 供給資料(feeding):在程式執行的每一步,讓Python程式碼來供給資料
- 從檔案讀取資料: 讓一個輸入管線從檔案中讀取資料
- 預載入資料:在tensorflow圖中定義常量或變數來儲存所有資料(適用於資料量小的時候)
一個典型的檔案讀取管線會包含下面這些步驟:
- 檔名列表
- 可配置的 檔名亂序(shuffling)
- 可配置的 最大訓練迭代數(epoch limit)
- 檔名佇列
- 針對輸入檔案格式的閱讀器
- 紀錄解析器
- 可配置的前處理器
- 樣本佇列
filenames=[os.path.join(data_dir,'data_batch_%d.bin'%i) for i in range(1,6)] #得到一個檔名列表 for f in filenames: if not tf.gfile.Exists(f): raise ValueError('Failed to find file: '+ f)
此處用list表示檔名列表,然後依次檢驗檔案是否存在,以丟擲異常
2.將檔名列表交給tf.train.string_input_producer函式,得到一個先入先出的佇列(Queue),檔案閱讀器會需要它來讀取資料
其中可配置引數中有shuffle,是bool值,判斷要不要用亂序操作
filename_queue=tf.train.string_input_producer(filenames)#生成一個先入先出佇列,需要用檔案閱讀器來讀取其資料
3.得到檔名佇列後,針對輸入檔案格式,建立閱讀器進行讀取
例如:若從CSV檔案中讀取資料,需要使用TextLineReader和decode_csv來進行讀取和解碼
若是CIFAR-10 dataset檔案,因為每條記錄的長度固定,一個位元組的標籤+3072畫素資料
所以此處採用FixedLengthRecordReader()和decode_raw來進行讀取和解碼
每次read的執行都會從檔案中讀取一行內容, decode_csv 操作會解析這一行內容並將其轉為張量列表。如果輸入的引數有缺失,record_default引數可以根據張量的型別來設定預設值。
在呼叫run或者eval去執行read之前, 你必須呼叫tf.train.start_queue_runners來將檔名填充到佇列。否則read操作會被阻塞到檔名佇列中有值為止。
本例中將讀取資料單獨寫到一個函式中
def read_cifar10(filename_queue):
"""Reads and parses(解析) examples from CIFAR10 data files
Args:
filename_queue:A queue of strings with the filenames to read from
Returns:
An object representing a single example, with the following fields:
height:行數32
width:列數32
depth:顏色通道數3
key:a scalar string Tensor describing the filename & record number for this example
label: an int32 Tensor with the label in the range 0~9
uint8 image: a [height, width, depth] uint8 Tensor with the image data
"""
class CIFAR10Record:
pass
result=CIFAR10Record()
#CIFAR10資料庫中圖片的維度
label_bytes=1 #2 for CIFAR-100
result.height=32
result.width=32
result.depth=3
image_bytes=result.height*result.width*result.depth
#每個記錄都由一個位元組的標籤和3072位元組的影象資料組成,長度固定
record_bytes=label_bytes+image_bytes
#read a record, getting filenames from the filename_queue
reader=tf.FixedLengthRecordReader(record_bytes=record_bytes)
result.key,value=reader.read(filename_queue)#注意這裡read每次只讀取一行!
#Convert from a string to a vector of uint8 that is record_bytes long
record_bytes=tf.decode_raw(value,tf.uint8)#decode_raw可以將一個字串轉換為一個uint8的張量
#The first bytes represent the label, which we convert from uint8->int32
result.label=tf.cast(tf.strided_slice(record_bytes,[0],[label_bytes]),tf.int32)
#將剩下的影象資料部分reshape為【depth,height,width】的形式
depth_major=tf.reshape(tf.strided_slice(record_bytes,[label_bytes],[label_bytes+image_bytes]),[result.depth,result.height,result.width])
#from【depth,height,width】to【height,width,depth】
result.uint8image=tf.transpose(depth_major,[1,2,0])
return result #返回的是一個類的物件!
read_cifar10返回了一個訓練樣本,包括result.label和reaule.uint8image兩個資料成員4.預處理
針對輸入進來的一個樣本,進行的預處理可以使加噪,新增失真,翻轉等
read_input=read_cifar10(filename_queue)
reshaped_image=tf.cast(read_input.uint8image,tf.float32)
height=IMAGE_SIZE
width=IMAGE_SIZE
#Image processing for training the network. Note the many random
#distrotions applied to the image 預處理圖片,新增噪聲,失真等。
#Randomly crop(裁剪) a [height,width]section of the image
distorted_image=tf.random_crop(reshaped_image,[height,width,3])
#隨機水平翻轉圖片
distorted_image=tf.image.random_flip_left_right(distorted_image)
distorted_image=tf.image.random_brightness(distorted_image,max_delta=63)
distorted_image=tf.image.random_contrast(distorted_image,lower=0.2,upper=1.8)
#減去均值畫素,併除以畫素方差 (圖片標準化)
float_image=tf.image.per_image_standardization(distorted_image)
#set the shapes of tensors
float_image.set_shape([height,width,3])
read_input.label.set_shape([1])
#確保隨機亂序有好的混合效果
min_fraction_of_examples_in_queue=0.4
min_queue_examples= int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN* min_fraction_of_examples_in_queue)
print('Filling queue with %d CIFAR images before starting to train.'% min_queue_examples+'This will take a few minutes.')
#Generate a batch of images and labels by building up a queue of examples
return _generate_image_and_label_batch(float_image,read_input.label,min_queue_examples,batch_size,shuffle=True)
5.得到樣本佇列
在資料輸入管線的末端,我們需要有另一個佇列來執行輸入樣本的training, evaluation, inference,要對樣本進行批處理
所以使用tf.train.shuffle_batch函式用16個不同的reader讀到的樣本組成batch返回
def _generate_image_and_label_batch(image,label,min_queue_examples,batch_size,shuffle):
"""
Construct a queued batch of images and labels.
:param image: 3-D Tensor of[Height, width,3] of type.float32
:param label:1-D Tensor of type.int32
:param min_queue_examples: int32,minimum number of samples to retain in the queue that provides of batches of examples
:param batch_size: Number of images per batch
:param shuffle: boolean indicating whether to use shuffling queue (亂序)
:return:
images: Images. 4D tensor of [batch_size,height,width,3]size
labels: Labels.1D tensor of [batch_size]size
"""
#Create a queue that shuffles the examples, and then
#read 'batch_size' images +labels from the example queue\
num_preprocess_threads=16
#讀取圖片加上預處理要花費不少時間,所以我們在16個獨自執行緒上執行它們,which fill a TensorFlow queue
#這種方案可以保證同一時刻只在一個檔案中進行讀取操作(但讀取速度依然優於單執行緒),而不是同時讀取多個檔案
#優點是:
#避免了兩個不同的執行緒從同一個檔案中讀取同一個樣本
#避免了過多的磁碟搜尋操作
if shuffle:
#建立bathces of ‘batch_size’個圖片和'batch_size'個labels
images,label_batch=tf.train.shuffle_batch(
[image,label],
batch_size=batch_size,
num_threads=num_preprocess_threads,
capacity=min_queue_examples+3*batch_size,#capacity必須比min_after_dequeue大
min_after_dequeue=min_queue_examples) #min_after_dequeue 定義了我們會從多大的buffer中隨機取樣
#大的值意味著更好的亂序但更慢的開始,和更多記憶體佔用
else: #不亂序
images,label_batch=tf.train.batch(
[image,label],
batch_size=batch_size,
num_threads=num_preprocess_threads,
capacity=min_queue_examples+3*batch_size)
#display the training images in the visualizer
tf.summary.image('images',images)
return images,tf.reshape(label_batch,[batch_size])
執行緒和佇列
佇列就是tensorFlow圖中的節點,這是一種有狀態的節點,就像變數一樣,其他節點可以修改它的內容。
具體來說,其他節點可以把新元素插入到佇列後端(rear),也可以把前端元素刪除
佇列的使用:
佇列型別有先進先出(FIFO Queue),或者是隨機的(RandomShuffleQueue)
FIFO Que
建立一個先進先出佇列,以及一個“出隊,+1,入隊”操作:
import tensorflow as tf
#建立的圖:一個先入先出佇列,以及初始化,出隊,+1,入隊操作
q = tf.FIFOQueue(3, "float")
init = q.enqueue_many(([0.1, 0.2, 0.3],))
x = q.dequeue()
y = x + 1
q_inc = q.enqueue([y])
#開啟一個session,session是會話,會話的潛在含義是狀態保持,各種tensor的狀態保持
with tf.Session() as sess:
sess.run(init)
for i in range(2):
sess.run(q_inc)
quelen = sess.run(q.size())
for i in range(quelen):
print (sess.run(q.dequeue()))
輸出結果:
0.3
1.1
1.2
注意先入先出的規則!
一個典型的輸入結構:是使用一個RandomShuffleQueue來作為模型訓練的輸入,多個執行緒準備訓練樣本,並且把這些樣本推入佇列,一個訓練執行緒執行一個訓練操作,此操作會從佇列中移出最小批次的樣本(mini-batches)
之前的例子中,入隊操作都在主執行緒中進行,Session中可以多個執行緒一起執行。 在資料輸入的應用場景中,入隊操作從硬碟上讀取輸入,放到記憶體當中,速度較慢。 使用QueueRunner可以建立一系列新的執行緒進行入隊操作,讓主執行緒繼續使用資料。如果在訓練神經網路的場景中,就是訓練網路和讀取資料是非同步的,主執行緒在訓練網路,另一個執行緒在將資料從硬碟讀入記憶體。
再舉一個例子:
import tensorflow as tf
import sys
q=tf.FIFOQueue(1000,"float")
#計數器
counter=tf.Variable(0.0)
#操作:給計數器加一
increment_op=tf.assign_add(counter,tf.constant(1.0))
#操作:將計數器加入佇列
enqueue_op=q.enqueue(counter)
#建立一個佇列管理器QueueRunner,用這兩個操作向q中新增元素,目前我們只使用一個執行緒:
qr=tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1)
#主執行緒:
sess=tf.Session()
sess.run(tf.global_variables_initializer())
enqueue_threads=qr.create_threads(sess,start=True) #啟動入隊執行緒
#主執行緒:
for i in range(0,5):
print(sess.run(q.dequeue()))
結果是:2.0
172.0
225.0
272.0
367.0
並不是如普通計數器一樣的1,2,3,4,5,原因就是+1操作和入隊操作不同步!可能+1操作執行了很多次之後,才會進行一次入隊操作,並且出隊結束後,本應程式要結束,但是因為入隊執行緒沒有顯示結束,所以,整個程式就跟掛起一樣,也結束不了。
Tensorflow 的session物件是支援多執行緒的,因此多個執行緒可以很方便地使用同一個會話(session),並且並行地執行操作。
然而,在Python程式實現並行運算並不容易,所有執行緒都必須被同步終止,異常必須能被正常地捕獲並報告,會話終止的時候,佇列必須能被正確地關閉。
所幸TensorFlow提供了兩個類來幫助多執行緒的實現:tf.Coordinator和 tf.QueueRunner。從設計上這兩個類必須被一起使用。Coordinator類可以用來同時停止多個工作執行緒並且向那個在等待所有工作執行緒終止的程式報告異常。QueueRunner類用來協調多個工作執行緒同時將多個張量推入同一個佇列中。
使用tf.train.Coordinator來終止其他執行緒,Coordinator類主要有如下幾個方法:
- should_stop():如果執行緒應該停止則返回True
- request_stop(<exception>):請求該執行緒停止
- join(<list of threads>):等待被指定的執行緒終止
import tensorflow as tf
import sys
q=tf.FIFOQueue(1000,"float")
#計數器
counter=tf.Variable(0.0)
#操作:給計數器加一
increment_op=tf.assign_add(counter,tf.constant(1.0))
#操作:將計數器加入佇列
enqueue_op=q.enqueue(counter)
#建立一個佇列管理器QueueRunner,用這兩個操作向q中新增元素,目前我們只使用一個執行緒:
qr=tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1)
#主執行緒:
sess=tf.Session()
sess.run(tf.global_variables_initializer())
coord=tf.train.Coordinator()
enqueue_threads=qr.create_threads(sess,coord=coord,start=True) #啟動入隊執行緒,Coordinator是執行緒的引數
#主執行緒:
for i in range(0,5):
print(sess.run(q.dequeue()))
coord.request_stop() #通知其他執行緒關閉
coord.join(enqueue_threads)#其他所有執行緒關之後,這一函式才能返回
返回結果為:3.0
28.0
48.0
73.0
94.0
雖然執行緒都結束了,但計數器仍沒有正常工作,有如下方法改進:
參考:
http://wiki.jikexueyuan.com/project/tensorflow-zh/how_tos/threading_and_queues.html
http://blog.csdn.net/shenxiaolu1984/article/details/53024513
http://blog.csdn.net/lujiandong1/article/details/53369961