1. 程式人生 > >Tensorflow 大規模數據集訓練方法

Tensorflow 大規模數據集訓練方法

捕獲 cep ati 來看 排列 允許 同步 filename 函數

本文轉自:Tensorflow】超大規模數據集解決方案:通過線程來預取 原文地址:https://blog.csdn.net/mao_xiao_feng/article/details/73991787

現在讓我們用Tensorflow實現一個具體的Input pipeline,我們使用CoCo2014作為處理對象,網上應該可以下載到CoCo訓練集,train2014這個文件。下載鏈接:

http://msvocds.blob.core.windows.net/coco2014/train2014.zip

技術分享圖片

一共13.5G,解壓完以後大概會有8萬多張圖,這個數據集算得上超大規模級別了,那麽問題來了,這麽多圖片我們怎麽下手呢?難道和以前一樣讀到內存?如此笨重的數據集,如果仍然用內存暴力解決,那就太耗費時間空間資源了。能否在訓練的同時,讀數據,預處理數據呢?現在,讓我們用隊列+多線程去解決這個問題。

一.Beginning of an input pipeline

在輸入pipeline的開始,我們要構造一個隊列生成器,tensorflow中的tf.train.string_input_producer函數可以幫助我們解決這個問題:

string_input_producer(string_tensor,num_epochs=None,shuffle=True,seed=None,capacity=32

,shared_name=None,name=None,cancel_op=None)

該函數輸入字符串的Tensor或者List,返回一個字符串隊列,一共8個輸入參數,忽略最後三個不常用的參數,其中

string_tensor:一維的字符串Tensor,註意這個參數不傳入Tensor也是可以的,也可以傳入字符串的List

num_epochs:控制數量的一個參數,字符串List被放入隊列的重復次數

shuffle:很好理解,是否打亂字符串List

seed:shuffle需要的隨機數種子,一般可以不指定

capacity:隊列的大小

類似的函數還有:

tf.train.range_input_producer

tf.train.slice_input_producer

需要註意的是這裏返回的隊列是添加了QueueRunner的,也就是我們需要調用線程來操作隊列。還有很重要的一點,千萬不要以為創建完隊列以後,string_tensor的所有值就都入隊了,入隊也是流程化的,而入隊操作通常由分線程來做,任何時刻我們都不關註隊列的狀態,只關註入隊了什麽,出隊了什麽。

二.Batching at the end of an input pipeline

在Input pipeline的末尾則是抽取Batch的流程,這裏要用到的函數是tf.train.batch:

batch(tensors,batch_size,num_threads=1,capacity=32,enqueue_many=False

,shapes=None,dynamic_pad=False,allow_smaller_final_batch=False,shared_name=None,name=None)

該函數創建輸入的Tensor中的一些batches,同樣這個Tensor也可以是一個List,參數解析:

tensors:需要註意的是,為了構成pipeline,保持一致性,這個函數也是以隊列形式運行,所以Tensor的輸入可以和上面描述的類似。

batch_size:一個batch的數量

num_threads:執行操作的線程數量

capacity:該函數運行的隊列的長度

enqueue_many:控制是否可以一次入隊多個,一般為false

dynamic_pad:動態填充,填充維度為None的區域,不常用

allow_smaller_final_batch:控制是否允許小於batchsize的batch,如果不允許,則那幾個樣本會被丟棄

類似函數:

tf.train.batch_join

tf.train.maybe_batch

tf.train.shuffle_batch

同樣很重要的一點,batching操作也是在Input pipeline裏面,所以他也不是對全部數據來取batch,我們不需要關註當前隊列中要多少樣本,只需要關註取出了那些樣本。

三.示例程序

讓我們用一個程序來演示上面的過程。

為了演示需要,從train2014文件夾裏面取20張圖,然後連著文件夾拷貝到工程路徑:

技術分享圖片

技術分享圖片

from os import listdir
from os.path import isfile, join
import tensorflow as tf
 
dataset_path=train2014
 
with tf.Session() as sess:
    filenames = [join(dataset_path, f) for f in listdir(dataset_path) if isfile(join(dataset_path, f))]
    print number of images:, len(filenames)
    filename_queue = tf.train.string_input_producer(filenames, shuffle=False,num_epochs=1)
 
    reader = tf.WholeFileReader()
    name, img_bytes = reader.read(filename_queue)
    image = tf.image.decode_jpeg(img_bytes, channels=3)
    dataname = tf.train.batch([name], 2, dynamic_pad=True)
 
    sess.run([tf.global_variables_initializer(), tf.local_variables_initializer()])
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    try:
        while not coord.should_stop():
            print sess.run(dataname)
 
    except tf.errors.OutOfRangeError:
        print(Done training -- epoch limit reached)
    finally:
        coord.request_stop()
    coord.join(threads)

解決的思路:首先產生一個存儲路徑下所有文件名的List,然後用它作為輸入產生字符串隊列,這是pipeline的前端。得到了出隊的字符串序列,我們可以使用tensorflow中的filereader,將文件內容讀取出來,reader.read函數返回{文件名,文件內容}鍵值對,文件內容即是我們需要處理的對象(這裏為了直觀,我們使用了文件名作為輸出),這個階段是pipeline的中端。最後得到了文件名序列,再用batch函數提取一個batch,這個作為pipeline的末端。這樣一個程序就完成了。

運行以後結果如下:

number of images: 20
 
[train2014/COCO_train2014_000000000109.jpg
 train2014/COCO_train2014_000000000071.jpg]
[train2014/COCO_train2014_000000000092.jpg
 train2014/COCO_train2014_000000000094.jpg]
[train2014/COCO_train2014_000000000064.jpg
 train2014/COCO_train2014_000000000025.jpg]
[train2014/COCO_train2014_000000000072.jpg
 train2014/COCO_train2014_000000000110.jpg]
[train2014/COCO_train2014_000000000086.jpg
 train2014/COCO_train2014_000000000030.jpg]
[train2014/COCO_train2014_000000000113.jpg
 train2014/COCO_train2014_000000000009.jpg]
[train2014/COCO_train2014_000000000061.jpg
 train2014/COCO_train2014_000000000077.jpg]
[train2014/COCO_train2014_000000000081.jpg
 train2014/COCO_train2014_000000000034.jpg]
[train2014/COCO_train2014_000000000078.jpg
 train2014/COCO_train2014_000000000036.jpg]
[train2014/COCO_train2014_000000000089.jpg
 train2014/COCO_train2014_000000000049.jpg]
Done training -- epoch limit reached

這裏我沒有shuffle數據集,需要shuffle只要把string_input_producer中的shuffle參數改為True。

四.具體項目

這裏沒有為大家展示一個具體網絡怎麽調用以上過程來訓練。因為能輸出batch的數據其實已經達到我們的意圖了。如果有需要一個取數據+訓練完整程序的同學,請參考github上圖像風格遷移的repo,這個工程使用了以上方法,並有完整的訓練過程。源碼地址如下:

https://github.com/hzy46/fast-neural-style-tensorflow

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

下面是原理的講解部分

一.Tensorflow中的隊列機制

隊列和線程是Temsorflow中實現異步的重要工具。為什麽要異步?用一個形象的例子來解釋這個問題。

可以把數據導入的過程看作io操作,在數據規模極大的情況下,io請求需要大量時間執行。同步意味著我們一次處理完io請求,然後再執行程序後面的操作,所以之前在【Tensorflow】怎樣為你的網絡預加工和打包訓練數據?(二):小數據集的處理方案和【Tensorflow】怎樣為你的網絡預加工和打包訓練數據?(一)中的處理都可以看作同步的,因為都是一次處理完所有的數據,然後在feed給我們的網絡。而這裏,我們需要擴展一個異步的概念,所謂異步,也就是開辟一個線程來單獨處理這個io請求,處理完成,就通知給主程序告訴它“我已經完成了”,在此期間,主程序可以去做其他的事,也就是io請求並不耽誤程序的執行。所以異步方式可以顯著提高效率,那是不是說異步一定比同步好呢,當然不是,異步只適用於等待時間很長的情況,如果處理小數據集,就不如同步方式了。

Tensorflow是怎樣實現異步的?這裏需要用到隊列這個數據結構,先讓我們看看Tensorflow是如何實現隊列的。

我們來看一個簡單的例子。 創建一個“先進先出”隊列(FIFOQueue)並填充零。然後,創建一個將元素從隊列中取出的圖,將該元素加一,並將其放回隊列的末尾。緩慢地,隊列上的數字增加。用如下的圖來表示

技術分享圖片

和如上圖類似,tensorflow中使用RandomShuffleQueue作為輸入框架,來準備訓練數據。這種機制的運行流程大致如下:

1.使用多線程來準備訓練樣本,並把他們放入隊列

2.一個訓練線程執行優化,並吧mini-batch數據提取出隊列

TensorFlow Session對象是多線程的,所以多個線程可以輕松地使用相同的Session並並行運行op。但是,實現一個驅動線程的Python程序並不容易。所有線程必須能夠一起停止,異常必須被捕獲和報告,隊列必須在停止時正確關閉。TensorFlow提供了兩個類來幫助線程驅動:tf.train.Coordinator和tf.train.QueueRunner。這兩個類被設計為一起使用,協調器類幫助多個線程一起停止,並向等待其停止的程序報告異常。QueueRunner類用於創建多個線程,用於協調在同一隊列中放入張量。

二.線程協調器

先看一下一些關鍵方法:

tf.train.Coordinator.should_stop : returns True if the threads should stop

tf.train.Coordinator.request_stop :requests that threads should stop

tf.train.Coordinator.join :waits until the specified threads have stopped

首先創建一個Coordinator對象,然後創建一些使用協調器的線程。線程執行運行循環,當should_stop()返回True時停止。任何線程都可以決定計算是否停止。它只需要調用request_stop(),其他線程將停止,因為should_stop()將返回True。

官方也提供了協調器的代碼模板:

# Thread body: loop until the coordinator indicates a stop was requested.
# If some condition becomes true, ask the coordinator to stop.
def MyLoop(coord):
  while not coord.should_stop():
    ...do something...
    if ...some condition...:
      coord.request_stop()
 
# Main thread: create a coordinator.
coord = tf.train.Coordinator()
 
# Create 10 threads that run ‘MyLoop()‘
threads = [threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)]
 
# Start the threads and wait for all of them to stop.
for t in threads:
  t.start()
coord.join(threads)

三.隊列運行器

QueueRunner類創建了一些重復執行入隊操作的線程。 這些線程可以使用協調器一起停止, 此外,隊列運行器運行一個隊列關閉的線程,一旦向協調器報告異常,則會自動關閉隊列。

創建TensorFlow隊列(例如tf.RandomShuffleQueue)用作樣本輸入的過程如下:

example = ...ops to create one example...
# Create a queue, and an op that enqueues examples one at a time in the queue.
queue = tf.RandomShuffleQueue(...)
enqueue_op = queue.enqueue(example)
# Create a training graph that starts by dequeuing a batch of examples.
inputs = queue.dequeue_many(batch_size)
train_op = ...use inputs to build the training part of the graph...

然後我們要為這個隊列創建一個隊列運行器,使用多線程來執行入隊操作:

# Create a queue runner that will run 4 threads in parallel to enqueue
# examples.
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
 
# Launch the graph.
sess = tf.Session()
# Create a coordinator, launch the queue runner threads.
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
# Run the training loop, controlling termination with the coordinator.
for step in xrange(1000000):
    if coord.should_stop():
        break
    sess.run(train_op)
# When done, ask the threads to stop.
coord.request_stop()
# And wait for them to actually do it.
coord.join(enqueue_threads)

四.具體的使用方法

在訓練當中如何結合以上的工具呢?tf.train.QueueRunner對象要求您在運行任何訓練步驟之前調用tf.train.start_queue_runners,否則線程將永久掛起。所以在訓練之前要調用tf.train.start_queue_runners啟動輸入流水線的線程,填充樣本隊列,以使得樣本的出隊操作能成功執行。Tensorflow提供了一個參考代碼模板:

# Create the graph, etc.
init_op = tf.global_variables_initializer()
 
# Create a session for running operations in the Graph.
sess = tf.Session()
 
# Initialize the variables (like the epoch counter).
sess.run(init_op)
 
# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
 
try:
    while not coord.should_stop():
        # Run training steps or whatever
        sess.run(train_op)
 
except tf.errors.OutOfRangeError:
    print(Done training -- epoch limit reached)
finally:
    # When done, ask the threads to stop.
    coord.request_stop()
 
# Wait for threads to finish.
coord.join(threads)
sess.close()

ok,到這裏讓我們詳細解釋一下上面的代碼可以做什麽,首先我們創建計算圖,第一階段獲得文件名路徑並將其排入文件名隊列。第二階段使用文件名,生成樣本,並將它們排列在樣本隊列中。一旦啟動運行這些入隊操作的線程,我們的訓練循環就可以從樣本隊列中取出訓練樣本用以訓練。用一張圖來表示:

技術分享圖片

但是在我們實際使用中,要定義一個以上所描述的pipeline是非常復雜的,所以一般就只使用一個文件名隊列,而把訓練數據的預處理放在主程序當中,因為預處理一個batch的訓練數據通常不需要花費太多時間。我們可以使用tf.train.add_queue_runner來添加一個QueueRunner,創建完圖後,tf.train.start_queue_runners函數將在圖中查詢每個QueueRunner,以啟動其運行入隊操作的線程。此外,我們還要設定epoch的限制,保證在epoch到達時,程序會正確停止,並拋出一個tf.errors.OutOfRangeError異常。

五.一些小的示例程序

首先我們來看第一個先進先出隊列例子的實現代碼

import tensorflow as tf
 
 
q = tf.FIFOQueue(3, "float")
init = q.enqueue_many(vals=[[0., 0., 0.],])
 
x = q.dequeue()
y = x+1
q_inc = q.enqueue([y])
 
with tf.Session() as sess:
    init.run()
    q_inc.run()
    q_inc.run()
    q_inc.run()
    q_inc.run()
    print sess.run(x)
    print sess.run(x)
    print sess.run(x)

需要註意的是,enqueue_many方法在實現的時候還是有比較多的坑的,參數vals必須二維,而且要寫成[[0,0,0]]的形式而不是,[[0],[0],[0]],這個要重點記一下。

整個流程的動圖上面貼過了,這裏再貼一遍吧

技術分享圖片

然後就是RandomShuffleQueue的例子:

import tensorflow as tf
 
 
q = tf.RandomShuffleQueue(capacity=10,min_after_dequeue=0, dtypes="string")
 
sess = tf.Session()
for i in range(10):
    sess.run(q.enqueue(File:+str(i)))
 
for i in range(10):
    print(sess.run(q.dequeue()))

因為是RandomShuffleQueue,所以出隊順序是隨機的,一直到這裏,入隊都是用主線程來做的,以上簡單版程序沒有用多線程。

最後是加上多線程的終極版,讓我們來看看:

import tensorflow as tf
 
 
 
q = tf.RandomShuffleQueue(capacity=10,min_after_dequeue=0, dtypes="string")
 
enqueue_op = q.enqueue(File:)
 
qr = tf.train.QueueRunner(q, enqueue_ops=[enqueue_op] * 1)
 
sess = tf.Session()
 
enqueue_threads = qr.create_threads(sess, start=True)
 
for i in range(100):
 
  print(sess.run(q.dequeue()))

好了,運行這個程序,我們會發現輸出了100個File,說明在出隊的時候,是又有一個線程在做入隊操作不停的補充隊列的。但是發現一個問題,貌似這個程序自動終止不了?因為我們創建的線程還在運行。

所以要加上協調器,來使得程序得以終止:

import tensorflow as tf
 
 
 
q = tf.RandomShuffleQueue(capacity=10,min_after_dequeue=0, dtypes="string")
 
enqueue_op = q.enqueue(File:)
 
qr = tf.train.QueueRunner(q, enqueue_ops=[enqueue_op] * 1)
 
sess = tf.Session()
 
coord = tf.train.Coordinator()
 
enqueue_threads = qr.create_threads(sess, start=True,coord=coord)
 
for i in range(100):
 
  print(sess.run(q.dequeue()))
 
coord.request_stop()
 
coord.join(enqueue_threads)

這樣程序就得以終止了,然後,如果你想再設置一些epoch limit的異常:

import tensorflow as tf
 
q = tf.RandomShuffleQueue(capacity=10,min_after_dequeue=0, dtypes="string")
enqueue_op = q.enqueue(File:)
qr = tf.train.QueueRunner(q, enqueue_ops=[enqueue_op] * 1)
sess = tf.Session()
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, start=True,coord=coord)
try:
    for i in range(100):
        print(sess.run(q.dequeue()))
        if i>=50:
            coord.request_stop()
            coord.join(enqueue_threads)
except tf.errors.OutOfRangeError:
    print(Done training -- epoch limit reached)
finally:
    coord.request_stop()
    coord.join(enqueue_threads)

Tensorflow 大規模數據集訓練方法