1. 程式人生 > >TensorFlow學習系列(五):如何使用佇列和多執行緒優化輸入管道

TensorFlow學習系列(五):如何使用佇列和多執行緒優化輸入管道

這篇教程是翻譯Morgan寫的TensorFlow教程,作者已經授權翻譯,這是原文

目錄

TensorFlow 1.0版本已經出來了,隨著這次更新,一些不錯的指導建議出現在官網上面。其中一個我比較關心的是 feed_dict 系統,當你在呼叫 sess.run() 時:

導致效能低下的一個常見原因是未充分利用GPU,或者沒有設定一個有效的資料通道。除非情況特殊或者只是一個示例程式碼,否則不要將 Python 變數傳送到 session 中…

當然,到目前為止,我一直專門使用 feed_dict 系統來訓練我的模型…所以,讓我們一起來改變這個習慣吧。

已經有一個關於 TF 佇列的官方文件和 TF 網站上的一些非常好的視覺化過程(我非常建議你去看一下它們)。為了避免冗餘,我們將重點介紹具有完整程式碼的基本案例。

我們將探索佇列,QueueRunner和協調器,以提高我們的訓練速度。在一個非常基本的例子中,由於多執行緒和優化的記憶體處理,我們能得到33%的訓練速度。而且,我們還將密切關注我們在單GPU(nvidia GTX Titan X)上面的效能。

讓我們從一個最簡單的神經網路開始,使用 feed_dict 系統來訓練一個樸素的任務。然後我們將修改我們的程式碼,以便能體現利用佇列的好處,並刪除這個依賴。

那麼,我們就從下面的程式碼開始分析:

import time
import tensorflow as tf

# We simulate some raw input data 
# (think about it as fetching some data from the file system)
# let's say: batches of 128 samples, each containing 1024 data points
x_inputs_data = tf.random_normal([128, 1024], mean=0, stddev=1)
# We will try to predict this law:
# predict 1 if the sum of the elements is positive and 0 otherwise y_inputs_data = tf.cast(tf.reduce_sum(x_inputs_data, axis=1, keep_dims=True) > 0, tf.int32) # We build our small model: a basic two layers neural net with ReLU with tf.variable_scope("placeholder"): input = tf.placeholder(tf.float32, shape=[None, 1024]) y_true = tf.placeholder(tf.int32, shape=[None, 1]) with tf.variable_scope('FullyConnected'): w = tf.get_variable('w', shape=[1024, 1024], initializer=tf.random_normal_initializer(stddev=1e-1)) b = tf.get_variable('b', shape=[1024], initializer=tf.constant_initializer(0.1)) z = tf.matmul(input, w) + b y = tf.nn.relu(z) w2 = tf.get_variable('w2', shape=[1024, 1], initializer=tf.random_normal_initializer(stddev=1e-1)) b2 = tf.get_variable('b2', shape=[1], initializer=tf.constant_initializer(0.1)) z = tf.matmul(y, w2) + b2 with tf.variable_scope('Loss'): losses = tf.nn.sigmoid_cross_entropy_with_logits(None, tf.cast(y_true, tf.float32), z) loss_op = tf.reduce_mean(losses) with tf.variable_scope('Accuracy'): y_pred = tf.cast(z > 0, tf.int32) accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y_true), tf.float32)) accuracy = tf.Print(accuracy, data=[accuracy], message="accuracy:") # We add the training operation, ... adam = tf.train.AdamOptimizer(1e-2) train_op = adam.minimize(loss_op, name="train_op") startTime = time.time() with tf.Session() as sess: # ... init our variables, ... sess.run(tf.global_variables_initializer()) # ... check the accuracy before training, ... x_input, y_input = sess.run([x_inputs_data, y_inputs_data]) sess.run(accuracy, feed_dict={ input: x_input, y_true: y_input }) # ... train ... for i in range(5000): # ... by sampling some input data (fetching) ... x_input, y_input = sess.run([x_inputs_data, y_inputs_data]) # ... and feeding it to our model _, loss = sess.run([train_op, loss_op], feed_dict={ input: x_input, y_true: y_input }) # We regularly check the loss if i % 500 == 0: print('iter:%d - loss:%f' % (i, loss)) # Finally, we check our final accuracy x_input, y_input = sess.run([x_inputs_data, y_inputs_data]) sess.run(accuracy, feed_dict={ input: x_input, y_true: y_input }) print("Time taken: %f" % (time.time() - startTime))

這個程式在我的GPU上面執行,並且能得到下面的分析結果:

一些備註:

  • “檔案系統模擬”是不可信的,但我們還是會在所有測試中進行這一行為,所以我們可以忽略它的影響。
  • 我們使用 feed_dict 系統將資料提供給我們的模型,這會使得 TF 建立一個 Python 資料的副本到會話中。
  • 在一整個訓練中,我們只使用了大約 31% 的GPU。
  • 訓練這個神經網路大約需要 18 秒。

人們可能認為這就是我們能完成的簡單任務,但是也有人不這麼想,比如:

  • 在這個指令碼中,一切都是同步和單執行緒的(你必須等待一個Python指令碼呼叫完成,然後才會進行下一個Python指令碼)。
  • 我們在Python和底層C++之間來回移動。

那麼如何避免這些陷阱呢?

解決方案是使用 TF 的佇列系統,你可以把它想象成設計你的資料輸入管道,然後直接進入圖,並且停止使用Python輸入!事實上,我們將嘗試從輸入管道中刪除任何 Python 依賴。

由於去除了 feed_dict 系統,這將會更好的使用多執行緒,非同步性和記憶體優化(這是非常酷的事,因為如果你計劃在分散式系統上訓練你的模型,那麼TF會給你意想不到的驚喜)。

但首先,讓我們通過簡單的例子來探索 TF 中的佇列。再次,請跟著我的註釋來閱讀:

import tensorflow as tf

# We simulate some raw input data
# let's start with only 3 samples of 1 data point
x_input_data = tf.random_normal([3], mean=-1, stddev=4)

# We build a FIFOQueue inside the graph 
# You can see it as a waiting line that holds waiting data
# In this case, a line with only 3 positions
q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)

# We need an operation that will actually fill the queue with our data
# "enqueue_many" slices "x_input_data" along the 0th dimension to make multiple queue elements
enqueue_op = q.enqueue_many(x_input_data) # <- x1 - x2 -x3 |

# We need a dequeue op to get the next elements in the queue following the FIFO policy.
input = q.dequeue() 
# The input tensor is the equivalent of a placeholder now 
# but directly connected to the data sources in the graph

# Each time we use the input tensor, we print the number of elements left
# in the queue
input = tf.Print(input, data=[q.size()], message="Nb elements left:")

# fake graph: START
y = input + 1
# fake graph: END 

# We start the session as usual
with tf.Session() as sess:
    # We first run the enqueue_op to load our data into the queue
    sess.run(enqueue_op)
    # Now, our queue holds 3 elements, it's full. 
    # We can start to consume our data
    sess.run(y)
    sess.run(y) 
    sess.run(y) 
    # Now our queue is empty, if we call it again, our program will hang right here
    # waiting for the queue to be filled by at least one more datum
    sess.run(y) 

這裡發生了什麼呢?為什麼程式不再往下執行了呢?

那麼,這就是 TF 的具體實現,如果佇列是空的,那麼出對操作會導致真個圖去等待更多的資料。但是,這種行為只有在你手動使用佇列時才會發生,但這顯然是非常麻煩的,甚至是完全沒用的,因為我們仍然只有一個執行緒在呼叫入隊和出隊操作。

注意:如果要進行非同步操作,那麼它們必須都在自己的執行緒中,而不是主執行緒。正如我的發過奶奶曾經說過的那樣:如果許多廚師去使用同一個刀具來做飯,那麼他們不會比只有一個廚師要快。

為了解決這個問題,讓我來介紹一下 QueueRunner 和協調器,它們的唯一目的是在自己的執行緒中處理佇列,並確保同步(啟動,排隊,出隊,停止等等)。

QueueRunner 需要做 2 件事情:

  • 一個佇列

  • 一些入隊操作(你可以對一個佇列,進行多個入隊操作)

協調器不需要做任何事:它是一個方便的高階API,專門用來處理 “tf.train” 名稱空間下的佇列。如果你像我一樣建立自定義佇列,並天機器一個 QueueRunner 來處理它。只要你不要忘記將 QueueRunner 新增到 TF 的 QUEUE_RUNNERS 集合中,則可以安全地使用高階 API 。

讓我們先來看看以前的例子,在原來的例子中我們讓每一個執行緒去處理自己的佇列:

import tensorflow as tf

# This time, let's start with 6 samples of 1 data point
x_input_data = tf.random_normal([6], mean=-1, stddev=4)

# Note that the FIFO queue has still a capacity of 3
q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)

# To check what is happening in this case:
# we will print a message each time "x_input_data" is actually computed
# to be used in the "enqueue_many" operation
x_input_data = tf.Print(x_input_data, data=[x_input_data], message="Raw inputs data generated:", summarize=6)
enqueue_op = q.enqueue_many(x_input_data)

# To leverage multi-threading we create a "QueueRunner"
# that will handle the "enqueue_op" outside of the main thread
# We don't need much parallelism here, so we will use only 1 thread
numberOfThreads = 1 
qr = tf.train.QueueRunner(q, [enqueue_op] * numberOfThreads)
# Don't forget to add your "QueueRunner" to the QUEUE_RUNNERS collection
tf.train.add_queue_runner(qr) 

input = q.dequeue() 
input = tf.Print(input, data=[q.size(), input], message="Nb elements left, input:")

# fake graph: START
y = input + 1
# fake graph: END 

# We start the session as usual ...
with tf.Session() as sess:
    # But now we build our coordinator to coordinate our child threads with
    # the main thread
    coord = tf.train.Coordinator()
    # Beware, if you don't start all your queues before runnig anything
    # The main threads will wait for them to start and you will hang again
    # This helper start all queues in tf.GraphKeys.QUEUE_RUNNERS
    threads = tf.train.start_queue_runners(coord=coord)

    # The QueueRunner will automatically call the enqueue operation
    # asynchronously in its own thread ensuring that the queue is always full
    # No more hanging for the main process, no more waiting for the GPU
    sess.run(y)
    sess.run(y) 
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)

    # We request our child threads to stop ...
    coord.request_stop()
    # ... and we wait for them to do so before releasing the main thread
    coord.join(threads)

小練習:
在檢視日誌之前,思考一下 tf.random_normal 被呼叫了多少次?

具體日誌結果如下:

Logs of the queue exercise

正如你所看到的,x_input_data 被呼叫了 3 次。並且每次我們嘗試推送更多的元素而不是佇列容量,額外的元素不會像預期的那樣被丟棄,他們會等待別的執行緒(或佇列)來呼叫。

所以我們只需要在第四和第十個呼叫中填滿佇列中的空位,在佇列中只剩下 2 個元素。(因為我們現在是非同步的,所以列印語句的順序可能會有點混亂。)

注意:我不會深入的探討佇列和TF的生態系統,因為它太讓人心動了。你一定要更加熟悉它,你可以多閱讀以下最後面的連結。

感謝所有這些新知識,我們終於可以利用這個佇列系統來更新我們的第一個指令碼,看看是否有任何改進!

import time
import tensorflow as tf

# We simulate some raw input data 
# (think about it as fetching some data from the file system)
# let's say: batches of 128 samples, each containing 1024 data points
x_input_data = tf.random_normal([128, 1024], mean=0, stddev=1)

# We build our small model: a basic two layers neural net with ReLU
with tf.variable_scope("queue"):
    q = tf.FIFOQueue(capacity=5, dtypes=tf.float32) # enqueue 5 batches
    # We use the "enqueue" operation so 1 element of the queue is the full batch
    enqueue_op = q.enqueue(x_input_data)
    numberOfThreads = 1
    qr = tf.train.QueueRunner(q, [enqueue_op] * numberOfThreads)
    tf.train.add_queue_runner(qr)
    input = q.dequeue() # It replaces our input placeholder
    # We can also compute y_true right into the graph now
    y_true = tf.cast(tf.reduce_sum(input, axis=1, keep_dims=True) > 0, tf.int32)

with tf.variable_scope('FullyConnected'):
    w = tf.get_variable('w', shape=[1024, 1024], initializer=tf.random_normal_initializer(stddev=1e-1))
    b = tf.get_variable('b', shape=[1024], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(input, w) + b
    y = tf.nn.relu(z)

    w2 = tf.get_variable('w2', shape=[1024, 1], initializer=tf.random_normal_initializer(stddev=1e-1))
    b2 = tf.get_variable('b2', shape=[1], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(y, w2) + b2

with tf.variable_scope('Loss'):
    losses = tf.nn.sigmoid_cross_entropy_with_logits(None, tf.cast(y_true, tf.float32), z)
    loss_op = tf.reduce_mean(losses)

with tf.variable_scope('Accuracy'):
    y_pred = tf.cast(z > 0, tf.int32)
    accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y_true), tf.float32))
    accuracy = tf.Print(accuracy, data=[accuracy], message="accuracy:")

# We add the training op ...
adam = tf.train.AdamOptimizer(1e-2)
train_op = adam.minimize(loss_op, name="train_op")

startTime = time.time()
with tf.Session() as sess:
    # ... init our variables, ...
    sess.run(tf.global_variables_initializer())

    # ... add the coordinator, ...
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    # ... check the accuracy before training (without feed_dict!), ...
    sess.run(accuracy)

    # ... train ...
    for i in range(5000):
        #  ... without sampling from Python and without a feed_dict !
        _, loss = sess.run([train_op, loss_op])

        # We regularly check the loss
        if i % 500 == 0:
            print('iter:%d - loss:%f' % (i, loss))

    # Finally, we check our final accuracy
    sess.run(accuracy)

    coord.request_stop()
    coord.join(threads)

print("Time taken: %f" % (time.time() - startTime))

Training phase monitoring of the second example with logs and nvidia-smi

最後總結:

  • 在佇列系統之外,我們使用了與以前完全相同的程式碼。

  • y_true 是在圖內部進行計算的,您可以比較當人們必須將其輸入資料分割為輸入資料和標籤時的情況。

  • 不需要任何 feed_dict,不再浪費記憶體。

  • 我們現在對GPU的使用率 ~43%,優於 31% 。這意味著我們的程式至少多使用了12%的GPU資源。在這種情況下,這意味著你可以增加 batch_size 的值,但是你要小心,batch_size 將會影響你的收斂速度。

  • 訓練時間需要 ~11.5秒,這大約是以前的33%的訓練時間,太棒了!

Reference:

如果覺得內容有用,幫助多多分享哦 :)

長按或者掃描如下二維碼,關注 “CoderPai” 微訊號(coderpai)。新增底部的 coderpai 小助手,新增小助手時,請備註 “演算法” 二字,小助手會拉你進演算法群。如果你想進入 AI 實戰群,那麼請備註 “AI”,小助手會拉你進AI實戰群。

相關推薦

TensorFlow學習系列如何使用佇列執行優化輸入管道

這篇教程是翻譯Morgan寫的TensorFlow教程,作者已經授權翻譯,這是原文。 目錄 TensorFlow 1.0版本已經出來了,隨著這次更新,一些不錯的指導建議出現在官網上面。其中一個我比較關心的是 f

TensorFlow學習系列形狀動態維度

這篇教程是翻譯Morgan寫的TensorFlow教程,作者已經授權翻譯,這是原文。 目錄 這篇教程主要去理解一些關於Tensorflow的維度形狀概念,希望在後續的教程中能幫你節約除錯程式的時間 :) 。

深度學習系列一個簡單深度學習工具箱

本節主要介紹一個深度學習的matlab版工具箱, 該工具箱中的程式碼很簡單,感覺比較適合用來學習演算法。裡面有常見的網路結構,包括深度網路(NN),稀疏自編碼網路(SAE),CAE,深度信念網路(DBN)(基於玻爾茲曼RBM實現),卷積神經網路(CNN

tensorflow學習筆記TensorFlow變數共享資料讀取

  這一節我們提及了三個內容:變數共享、執行緒和佇列和資料讀取,這些都是TensorFlow官方指導中的內容。會在程式中經常遇到所以放在一起進行敘述。前面都是再利用已有的資料進行tensorflow的學習,這一節我們要學習怎麼從檔案中讀取我們需要的各類資料。

TensorFlow學習系列變數更新控制依賴

這篇教程是翻譯Morgan寫的TensorFlow教程,作者已經授權翻譯,這是原文。 目錄 在本文中,我們將圍繞變數更新和控制依賴討論更深層次的 TensorFlow 能力。 變數更新 到目前為止,我們已經

強化學習系列蒙特卡羅方法Monte Carlo)

一、前言 在強化學習系列(四):動態規劃中,我們介紹了採用DP (動態規劃)方法求解environment model 已知的MDP(馬爾科夫決策過程),那麼當environment model資訊不全的時候,我們會採用什麼樣的方法求解呢?蒙特卡洛方法(Mon

TensorFlow學習系列儲存/恢復混合個模型

這篇教程是翻譯Morgan寫的TensorFlow教程,作者已經授權翻譯,這是原文。 目錄 在學習這篇部落格之前,我希望你已經掌握了Tensorflow基本的操作。如果沒有,你可以閱讀這篇入門文章。 為什麼要

pytorch學習筆記儲存載入模型

# 儲存和載入整個模型 torch.save(model_object, 'model.pkl') model = torch.load('model.pkl') # 僅儲存和載入模型引數(推薦使

Servlet學習筆記ServletConfigServletContext詳解

2)獲取:SevletConfig 物件中維護了 ServletContext 物件的引用,通過 SevletConfig.getServletContext()方法獲取ServletContext 物件。或者直接呼叫封裝好的getServletContext ()方法即可獲取。  3)作用:由於一個Web

hive學習教程hiveHbase整合

一、Hive整合HBase原理 Hive與HBase整合的實現是利用兩者本身對外的API介面互相進行通訊,相互通訊主要是依靠hive-hbase-handler-0.9.0.jar工具類,如下圖 Hive與HBase通訊示意圖 二、具體步驟

北京大學MOOC C++學習筆記虛擬函式

虛擬函式: 在類的定義中,前面有 virtual 關鍵字的成員函式就是虛擬函式。 class base { virtual int get() ; }; int base::get() { } virtual 關鍵字只用在類定義裡的函式宣告中,寫函式體時不用。 多型的表現

c++11執行程式設計joiningdetaching 執行

Joining執行緒 執行緒一旦啟動,另一個執行緒可以通過呼叫std::thread物件上呼叫join()函式等待這個執行緒執行完畢std::thread th(funcPtr); th.join(); 看一個例子主執行緒啟動10個工作執行緒,啟動完畢後,main函式等待

各種音視訊編解碼學習詳解之 編解碼學習筆記Mpeg系列——AAC音訊

     最近在研究音視訊編解碼這一塊兒,看到@bitbit大神寫的【各種音視訊編解碼學習詳解】這篇文章,非常感謝,佩服的五體投地。奈何大神這邊文章太長,在這裡我把它分解成很多小的篇幅,方便閱讀。大神部落格傳送門:https://www.cnblogs.com/skyo

機器學習實戰系列SVM支援向量機

課程的所有資料和程式碼在我的Github:Machine learning in Action,目前剛開始做,有不對的歡迎指正,也歡迎大家star。除了 版本差異,程式碼裡的部分函式以及程式碼正規化也和原書不一樣(因為作者的程式碼實在讓人看的彆扭,我改過後看起來舒服多了)

周志華《機器學習》課後習題解答系列Ch4

本章概要 本章講述決策樹(decision tree),相關內容包括: 決策樹生成(construction) 子決策(sub-decision)、遞迴生成演算法(basic algorithm)、最優劃分屬性、純度(purity)、

【開源】OSharp框架學習系列1總體設計及系列導航

正是 html 組織 內聚性 權限 是什麽 enc 3-0 分發 OSharp是什麽?   OSharp是個快速開發框架,但不是一個大而全的包羅萬象的框架,嚴格的說,OSharp中什麽都沒有實現。與其他大而全的框架最大的不同點,就是OSharp只做抽象封裝,不做實現。依賴註

javascript學習筆記異常捕獲事件處理

log 類型 按鈕 輸入 button lan yellow logs 代碼 異常捕獲 Try{   發生異常的代碼塊 }catch(err){   異常信息處理 } 1 <!DOCTYPE html> 2 <html> 3 <head

Unity3D之Mecanim動畫系統學習筆記Animator Controller

浮點 key 發現 菜單 融合 stat mon 好的 project 簡介 Animator Controller在Unity中是作為一種單獨的配置文件存在的文件類型,其後綴為controller,Animator Controller包含了以下幾種功能: 可以對

Scala入門系列面向對象之類

important ica back ember const 就會 out 不用 spa // 定義類,包含field以及method class HelloWorld { private var name = "Leo" def sayHello() { prin

Docker學習系列windows下安裝docker

阻止 statistic pro nta 雙擊 copyright ner notebook 現在 本文目錄如下: windows按照docker的基本要求 具體安裝步驟 開始使用 安裝遠程連接工具連接docker 安裝中遇到的問題 Docker的更新 Dock