1. 程式人生 > >TensorFlow 資料讀取方法總結

TensorFlow 資料讀取方法總結

作者:黑暗星球
原文地址:https://blog.csdn.net/u014061630/article/details/80712635

====================下一篇:tf.data 官方教程====================

====================推薦:如何構建高效能的輸入 pipeline====================

TensorFlow輸入資料的方法有四種:

  • tf.data API:可以很容易的構建一個複雜的輸入通道(pipeline)(首選資料輸入方式)(Eager模式必須使用該API來構建輸入通道)
  • Feeding
    :使用Python程式碼提供資料,然後將資料feeding到計算圖中。
  • QueueRunner:基於佇列的輸入通道(在計算圖計算前從佇列中讀取資料)
  • Preloaded data:用一個constant常量將資料集載入到計算圖中(主要用於小資料集)

目錄

1. tf.data API

關於tf.data.Dataset的更詳盡解釋請看《programmer’s guide》。tf.data API能夠從不同的輸入或檔案格式中讀取、預處理資料,並且對資料應用一些變換(例如,batching、shuffling、mapping function over the dataset),tf.data API 是舊的 feeding、QueueRunner的升級。

tf.data的教程見下篇: TensorFlow匯入資料(tf.data)

2. Feeding

注意:Feeding是資料輸入效率最低的方式,應該只用於小資料集和除錯(debugging)

TensorFlow的Feeding機制允許我們將資料輸入計算圖中的任何一個Tensor。因此可以用Python來處理資料,然後直接將處理好的資料feed到計算圖中 。

run()eval()中用feed_dict來將資料輸入計算圖:

with tf.Session():
  input = tf.placeholder(tf.float32)
  classifier = ...
  print(classifier.eval(feed_dict={input: my_python_preprocessing_fn()}))
  
  • 1
  • 2
  • 3
  • 4

雖然你可以用feed data替換任何Tensor的值(包括variables和constants),但最好的使用方法是使用一個tf.placeholder節點(專門用於feed資料)。它不用初始化,也不包含資料。一個placeholder沒有被feed資料,則會報錯。

使用placeholder和feed_dict的一個例項(資料集使用的是MNIST)見tensorflow/examples/tutorials/mnist/fully_connected_feed.py

3. QueueRunner

注意:這一部分介紹了基於佇列(Queue)API構建輸入通道(pipelines),這一方法完全可以使用 tf.data API來替代。

一個基於queue的從檔案中讀取records的通道(pipline)一般有以下幾個步驟:

  1. 檔名列表(The list of filenames)
  2. 檔名打亂(可選)(Optional filename shuffling)
  3. epoch限制(可選)(Optional epoch limit)
  4. 檔名佇列(Filename queue)
  5. 與檔案格式匹配的Reader(A Reader for the file format)
  6. decoder(A decoder for a record read by the reader)
  7. 預處理(可選)(Optional preprocessing)
  8. Example佇列(Example queue)

3.1 Filenames, shuffling, and epoch limits

對於檔名列表,有很多方法:1. 使用一個constant string Tensor(比如:["file0", "file1"])或者[("file%d" %i) for i in range(2)];2. 使用 tf.train.match_filenames_once 函式;3. 使用 tf.gfile.Glob(path_pattern)

將檔名列表傳給 tf.train.string_input_producer 函式。string_input_producer 建立一個 FIFO 佇列來儲存(holding)檔名,以供Reader使用。

string_input_producer 可以對檔名進行shuffle(可選)、設定一個最大迭代 epochs 數。在每個epoch,一個queue runner將整個檔名列表新增到queue,如果shuffle=True,則新增時進行shuffle。This procedure provides a uniform sampling of files, so that examples are not under- or over- sampled relative to each other。

queue runner執行緒獨立於reader執行緒,所以enqueuing和shuffle不會阻礙reader。

3.2 File formats

要選擇與輸入檔案的格式匹配的reader,並且要將檔名佇列傳遞給reader的 read 方法。read 方法輸出一個 key identifying the file and record(在除錯過程中非常有用,如果你有一些奇怪的 record)

3.2.1 CSV file

為了讀取逗號分隔符分割的text檔案(csv),要使用一個 tf.TextLineReader 和一個 tf.decode_csv。例如:

filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])

reader = tf.TextLineReader()
key, value = reader.read(filename_queue)

# Default values, in case of empty columns. Also specifies the type of the
# decoded result.
record_defaults = [[1], [1], [1], [1], [1]]
col1, col2, col3, col4, col5 = tf.decode_csv(
    value, record_defaults=record_defaults)
features = tf.stack([col1, col2, col3, col4])

with tf.Session() as sess:
  # Start populating the filename queue.
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  for i in range(1200):
    # Retrieve a single instance:
    example, label = sess.run([features, col5])

  coord.request_stop()
  coord.join(threads)
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

read 方法每執行一次,會從檔案中讀取一行。然後 decode_csv 將讀取的內容解析成一個Tensor列表。引數 record_defaults 決定解析產生的Tensor的型別,另外,如果輸入中有缺失值,則用record_defaults 指定的預設值來填充。

在使用run或者eval 執行 read 方法前,你必須呼叫 tf.train.start_queue_runners 去填充 queue。否則,read 方法將會堵塞(等待 filenames queue 中 enqueue 檔名)。

3.2.2 Fixed length records

為了讀取二進位制檔案(二進位制檔案中,每一個record都佔固定bytes),需要使用一個 tf.FixedLengthRecordReadertf.decode_rawdecode_raw 將 reader 讀取的 string 解析成一個uint8 tensor。

例如,二進位制格式的CIFAR-10資料集中的每一個record都佔固定bytes:label佔1 bytes,然後後面的image資料佔3072 bytes。當你有一個unit8 tensor時,通過切片便可以得到各部分並reformat成需要的格式。對於CIFAR-10資料集的reading和decoding,可以參照:tensorflow_models/tutorials/image/cifar10/cifar10_input.py或這個教程

3.2.3 Standard TensorFlow format

另一個方法是將資料集轉換為一個支援的格式。這個方法使得資料集和網路的混合和匹配變得簡單(make it easier to mix and match data sets and network architectures)。TensorFlow中推薦的格式是 TFRecords檔案,TFRecords中包含 tf.train.Example protocol buffers (在這個協議下,特徵是一個欄位).
你寫一小段程式來獲取資料,然後將資料填入一個Example protocol buffer,並將這個 protocol buffer 序列化(serializes)為一個string,然後用 tf.python_io.TFRcordWriter 將這個string寫入到一個TFRecords檔案中。例如,tensorflow/examples/how_tos/reading_data/convert_to_records.py 將MNIST資料集轉化為TFRecord格式。

讀取TFRecord檔案的推薦方式是使用 tf.data.TFRecordDataset,像這個例子一樣:

dataset = tf.data.TFRecordDataset(filename)
dataset = dataset.repeat(num_epochs)

# map takes a python function and applies it to every sample
dataset = dataset.map(decode)
  
  • 1
  • 2
  • 3
  • 4
  • 5

為了完成相同的任務,基於queue的輸入通道需要下面的程式碼(使用的decode和上一段程式碼一樣):

filename_queue = tf.train.string_input_producer([filename], num_epochs=num_epochs)
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
image,label = decode(serialized_example)
  
  • 1
  • 2
  • 3
  • 4

3.3 Preprocessing

然後你可以對examples進行你想要的預處理(preprocessing)。預處理是獨立的(不依賴於模型引數)。常見的預處理有:資料的標準化(normalization of your data)、挑選一個隨機的切片,新增噪聲(noise)或者畸變(distortions)等。具體的例子見:tensorflow_models/tutorials/image/cifar10/cifar10_input.py

3.4 Batching

在pipeline的末端,我們通過呼叫 tf.train.shuffle_batch 來建立兩個queue,一個將example batch起來 for training、evaluation、inference;另一個來shuffle examples的順序。

例子:

def read_my_file_format(filename_queue):
  reader = tf.SomeReader()
  key, record_string = reader.read(filename_queue)
  example, label = tf.some_decoder(record_string)
  processed_example = some_processing(example)
  return processed_example, label

def input_pipeline(filenames, batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer(
      filenames, num_epochs=num_epochs, shuffle=True)
  example, label = read_my_file_format(filename_queue)
  # min_after_dequeue defines how big a buffer we will randomly sample
  #   from -- bigger means better shuffling but slower start up and more
  #   memory used.
  # capacity must be larger than min_after_dequeue and the amount larger
  #   determines the maximum we will prefetch.  Recommendation:
  #   min_after_dequeue + (num_threads + a small safety margin) * batch_size
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

如果你需要更多的並行或者打亂不同檔案中example,使用多個reader,然後使用 tf.train.shuffle_batch_join將多個reader讀取的內容整合到一起。(If you need more parallelism or shuffling of examples between files, use multiple reader instances using the tf.train.shuffle_batch_join

例子:

def read_my_file_format(filename_queue):
  reader = tf.SomeReader()
  key, record_string = reader.read(filename_queue)
  example, label = tf.some_decoder(record_string)
  processed_example = some_processing(example)
  return processed_example, label

def input_pipeline(filenames, batch_size, read_threads, num_epochs=None):
  filename_queue = tf.train.string_input_producer(
      filenames, num_epochs=num_epochs, shuffle=True)
  example_list = [read_my_file_format(filename_queue)
                  for _ in range(read_threads)]
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch_join(
      example_list, batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

所有的reader共享一個filename queue。這種方式保證了不同的reader在同一個epoch,讀取不同的檔案,直到所有的檔案的已經讀取完,然後在下一個epoch,重新從所有的檔案讀取(You still only use a single filename queue that is shared by all the readers. That way we ensure that the different readers use different files from the same epoch until all the files from the epoch have been started. (It is also usually sufficient to have a single thread filling the filename queue.))。

另一個可選的方法是去通過呼叫 tf.train.shuffle_batch 使用單個的reader,但是將引數 num_threads 引數設定為大於1的值。這將使得在同一時間只能從一個檔案讀取內容(但是比 1 執行緒快),而不是同時從N個檔案中讀取。這可能很重要:

  • 如果你的num_threads引數值比檔案的數量多,那麼很有可能:有兩個threads會一前一後從同一個檔案中讀取相同的example。這是不好的,應該避免。
  • 或者,如果並行地讀取N個檔案,可能或導致大量的磁碟搜尋(意思是,多個檔案存在於磁碟的不同位置,而磁頭只能有一個位置,所以會增加磁碟負擔)

那麼需要多少個執行緒呢?tf.train.shuffle_batch*函式會給計算圖新增一個summary來記錄 example queue 的使用情況。如果你有足夠的reading threads,這個summary將會總大於0。你可以用TensorBoard來檢視訓練過程中的summaries

3.5 Creating threads to prefetch using QueueRunner objects

使用QueueRunner物件來建立threads來prefetch資料

說明tf.train裡的很多函式會新增tf.train.QueueRunner物件到你的graph。這些物件需要你在訓練或者推理前,呼叫tf.train.start_queue_runners,否則資料無法讀取到圖中。呼叫tf.train.start_queue_runners會執行輸入pipeline需要的執行緒,這些執行緒將example enqueue到佇列中,然後dequeue操作才能成功。這最好和tf.train.Coordinator配合著用,當有錯誤時,它會完全關閉掉開啟的threads。如果你在建立pipline時設定了迭代epoch數限制,將會建立一個epoch counter的區域性變數(需要初始化)。下面是推薦的程式碼使用模板:

# Create the graph, etc.
init_op = tf.global_variables_initializer()

# Create a session for running operations in the Graph.
sess = tf.Session()

# Initialize the variables (like the epoch counter).
sess.run(init_op)

# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

try:
    while not coord.should_stop():
        # Run training steps or whatever
        sess.run(train_op)

except tf.errors.OutOfRangeError:
    print('Done training -- epoch limit reached')
finally:
    # When done, ask the threads to stop.
    coord.request_stop()

# Wait for threads to finish.
coord.join(threads)
sess.close()
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

這裡的程式碼是怎麼工作的?
首先,我們建立整個圖。它的input pipeline將有幾個階段,這些階段通過Queue連在一起。第一個階段將會產生要讀取的檔案的檔名,並將檔名enqueue到filename queue。第二個階段使用一個Reader來dequeue檔名並讀取,產生example,並將example enqueue到一個example queue。根據你的設定,你可能有很多第二階段(並行),所以你可以從並行地讀取多個檔案。最後一個階段是一個enqueue操作,將example enqueue成一個queue,然後等待下一步操作。我們想要開啟多個執行緒執行著些enqueue操作,所以我們的訓練loop能夠從example queue中dequeue examples。
這裡寫圖片描述

tf.train裡的輔助函式(建立了這些queue、enqueuing操作)會呼叫tf.train.add_queue——runner新增一個tf.train.QueueRunner到圖中。每一個QueueRunner負責一個階段。一旦圖構建好,tf.train.start_queue_runners函式會開始圖中每一個QueueRunner的入隊操作。

如果一切進行順利,你現在可以執行訓練step(後臺執行緒會填滿queue)。如果你設定了epoch限制,在達到固定的epoch時,在進行dequeuing會得到tf.errors.OutOfRangeError。這個錯誤等價於EOF(end of file),意味著已經達到了固定的epochs。

最後一部分是tf.train.Coordinator。它主要負責通知所有的執行緒是否應該停止。在大多數情況下,這通常是因為遇到了一個異常(exception)。例如,某一個執行緒在執行某些操作時出錯了(或者python的異常)。

關於threading、queues、QueueRunners、Coordinators的更多細節見這裡

3.6 Filtering records or producing multiple examples per record

一個example的shape是 [x,y,z],一個batch的example的shape為 [batch, x, y, z]。如果你想去過濾掉這個record,你可以把 batch size 設定為 0;如果你想讓每一個record產生多個example,你可以把batch size設定為大於1。然後,在呼叫呼叫batching函式(shuffle_batchshuffle_batch_join)時,設定enqueue_many=True

3.7 Sparse input data

queues在SparseTensors的情況下不能很好的工作。如果你使用SparseTensors,你必須在batching後用tf.sparse_example來decode string records(而不是在batching前使用tf.parse_single_example來decode)

4. Preloaded data

這僅僅適用於小資料集,小資料集可以被整體載入到記憶體。預載入資料集主要有兩種方法:

  • 將資料集儲存成一個constant
  • 將資料集儲存在一個variable中,一旦初始化或者assign to後,便不再改變。

使用一個constant更簡單,但是需要更多的記憶體(因為所有的常量都儲存在計算圖中,而計算圖可能需要進行多次複製)。

training_data = ...
training_labels = ...
with tf.Session():
  input_data = tf.constant(training_data)
  input_labels = tf.constant(training_labels)
  ...
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

為了使用一個varibale,在圖構建好後,你需要去初始化它。

training_data = ...
training_labels = ...
with tf.Session() as sess:
  data_initializer = tf.placeholder(dtype=training_data.dtype,
                                    shape=training_data.shape)
  label_initializer = tf.placeholder(dtype=training_labels.dtype,
                                     shape=training_labels.shape)
  input_data = tf.Variable(data_initializer, trainable=False, collections=[])
  input_labels = tf.Variable(label_initializer, trainable=False, collections=[])
  ...
  sess.run(input_data.initializer,
           feed_dict={data_initializer: training_data})
  sess.run(input_labels.initializer,
           feed_dict={label_initializer: training_labels})
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

設定trainable=False將使variable不加入GraphKeys.TRAINABLE_VARIABLES容器,所以我們不用在訓練過程中更新它。設定collections=[]將會使variable不加入GraphKeys.GLOBAL_VARIABLES容器(這個容器主要用於儲存和恢復checkpoints)。

無論哪種方式,tf.train.slice_input_producer都能夠用來產生一個slice。這在整個epoch上shuffle了example,所以batching時,進一步的shuffling不再需要。所以不再使用shuffle_batch函式,而使用tf.train.batch函式。為了使用多個預處理執行緒,設定num_threads引數大於1。

MNIST資料集上使用constant來preload資料的例項見tensorflow/examples/how_tos/reading_data/fully_connected_preloaded.py;使用variable來preload資料的例子見tensorflow/examples/how_tos/reading_data/fully_connected_preloaded_var.py,你可以通過 fully_connected_feed和 fully_connected_feed版本來對比兩種方式。

4. Multiple input pipelines

一般,你想要去在一個數據集上訓練,而在另一個數據集上評估模型。實現這個想法的一種方式是:以兩個程序,建兩個獨立的圖和session:

  • 訓練程序讀取訓練資料,並且週期性地將模型的所有訓練好的變數儲存到checkpoint檔案中。
  • 評估程序從checkpoint檔案中恢復得到一個inference模型,這個模型讀取評估資料。

estimators裡和CIFAR-10模型示例裡,採用就是上面的方法。該方法主要有兩個好處:

  • 你的評估是在一個訓練好的模型的快照上進行的。
  • 在訓練完成或中斷後,你也可以進行評估。

你可以在同一個程序中同一個圖中進行訓練和評估,並且訓練和評估共享訓練好的引數和層。關於共享變數,詳見這裡

為了支援單個圖方法(single-graph approach),tf.data也提供了高階的iterator型別,它將允許使用者去在不重新構建graph和session的情況下,改變輸入pipeline。

注意:儘管上面的實現很好,但很多op(比如tf.layers.batch_normalizationtf.layers.dropout)與模型模式有關(訓練和評估時,計算不一致),你必須很小心地去設定這些,如果你更改資料來源。

英文版:https://tensorflow.google.cn/api_guides/python/reading_data#_tf_data_API

				<script>
					(function(){
						function setArticleH(btnReadmore,posi){
							var winH = $(window).height();
							var articleBox = $("div.article_content");
							var artH = articleBox.height();
							if(artH > winH*posi){
								articleBox.css({
									'height':winH*posi+'px',
									'overflow':'hidden'
								})
								btnReadmore.click(function(){
									if(typeof window.localStorage === "object" && typeof window.csdn.anonymousUserLimit === "object"){
										if(!window.csdn.anonymousUserLimit.judgment()){
											window.csdn.anonymousUserLimit.Jumplogin();
											return false;
										}else if(!currentUserName){
											window.csdn.anonymousUserLimit.updata();
										}
									}
									
									articleBox.removeAttr("style");
									$(this).parent().remove();
								})
							}else{
								btnReadmore.parent().remove();
							}
						}
						var btnReadmore = $("#btn-readmore");
						if(btnReadmore.length>0){
							if(currentUserName){
								setArticleH(btnReadmore,3);
							}else{
								setArticleH(btnReadmore,1.2);
							}
						}
					})()
				</script>