這篇教程是翻譯Morgan寫的TensorFlow教程,作者已經授權翻譯,這是原文

目錄

TensorFlow 1.0版本已經出來了,隨著這次更新,一些不錯的指導建議出現在官網上面。其中一個我比較關心的是 feed_dict 系統,當你在呼叫 sess.run() 時:

導致效能低下的一個常見原因是未充分利用GPU,或者沒有設定一個有效的資料通道。除非情況特殊或者只是一個示例程式碼,否則不要將 Python 變數傳送到 session 中…

當然,到目前為止,我一直專門使用 feed_dict 系統來訓練我的模型…所以,讓我們一起來改變這個習慣吧。

已經有一個關於 TF 佇列的官方文件和 TF 網站上的一些非常好的視覺化過程(我非常建議你去看一下它們)。為了避免冗餘,我們將重點介紹具有完整程式碼的基本案例。

我們將探索佇列,QueueRunner和協調器,以提高我們的訓練速度。在一個非常基本的例子中,由於多執行緒和優化的記憶體處理,我們能得到33%的訓練速度。而且,我們還將密切關注我們在單GPU(nvidia GTX Titan X)上面的效能。

讓我們從一個最簡單的神經網路開始,使用 feed_dict 系統來訓練一個樸素的任務。然後我們將修改我們的程式碼,以便能體現利用佇列的好處,並刪除這個依賴。

那麼,我們就從下面的程式碼開始分析:

import time
import tensorflow as tf

# We simulate some raw input data 
# (think about it as fetching some data from the file system)
# let's say: batches of 128 samples, each containing 1024 data points
x_inputs_data = tf.random_normal([128, 1024], mean=0, stddev=1)
# We will try to predict this law:
# predict 1 if the sum of the elements is positive and 0 otherwise
y_inputs_data = tf.cast(tf.reduce_sum(x_inputs_data, axis=1, keep_dims=True) > 0, tf.int32)

# We build our small model: a basic two layers neural net with ReLU
with tf.variable_scope("placeholder"):
    input = tf.placeholder(tf.float32, shape=[None, 1024])
    y_true = tf.placeholder(tf.int32, shape=[None, 1])
with tf.variable_scope('FullyConnected'):
    w = tf.get_variable('w', shape=[1024, 1024], initializer=tf.random_normal_initializer(stddev=1e-1))
    b = tf.get_variable('b', shape=[1024], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(input, w) + b
    y = tf.nn.relu(z)

    w2 = tf.get_variable('w2', shape=[1024, 1], initializer=tf.random_normal_initializer(stddev=1e-1))
    b2 = tf.get_variable('b2', shape=[1], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(y, w2) + b2
with tf.variable_scope('Loss'):
    losses = tf.nn.sigmoid_cross_entropy_with_logits(None, tf.cast(y_true, tf.float32), z)
    loss_op = tf.reduce_mean(losses)
with tf.variable_scope('Accuracy'):
    y_pred = tf.cast(z > 0, tf.int32)
    accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y_true), tf.float32))
    accuracy = tf.Print(accuracy, data=[accuracy], message="accuracy:")

# We add the training operation, ...
adam = tf.train.AdamOptimizer(1e-2)
train_op = adam.minimize(loss_op, name="train_op")

startTime = time.time()
with tf.Session() as sess:
    # ... init our variables, ...
    sess.run(tf.global_variables_initializer())

    # ... check the accuracy before training, ...
    x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
    sess.run(accuracy, feed_dict={
        input: x_input,
        y_true: y_input
    })

    # ... train ...
    for i in range(5000):
        #  ... by sampling some input data (fetching) ...
        x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
        # ... and feeding it to our model
        _, loss = sess.run([train_op, loss_op], feed_dict={
            input: x_input,
            y_true: y_input
        })

        # We regularly check the loss
        if i % 500 == 0:
            print('iter:%d - loss:%f' % (i, loss))

    # Finally, we check our final accuracy
    x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
    sess.run(accuracy, feed_dict={
        input: x_input,
        y_true: y_input
    })

print("Time taken: %f" % (time.time() - startTime))

這個程式在我的GPU上面執行,並且能得到下面的分析結果:

一些備註:

  • “檔案系統模擬”是不可信的,但我們還是會在所有測試中進行這一行為,所以我們可以忽略它的影響。
  • 我們使用 feed_dict 系統將資料提供給我們的模型,這會使得 TF 建立一個 Python 資料的副本到會話中。
  • 在一整個訓練中,我們只使用了大約 31% 的GPU。
  • 訓練這個神經網路大約需要 18 秒。

人們可能認為這就是我們能完成的簡單任務,但是也有人不這麼想,比如:

  • 在這個指令碼中,一切都是同步和單執行緒的(你必須等待一個Python指令碼呼叫完成,然後才會進行下一個Python指令碼)。
  • 我們在Python和底層C++之間來回移動。

那麼如何避免這些陷阱呢?

解決方案是使用 TF 的佇列系統,你可以把它想象成設計你的資料輸入管道,然後直接進入圖,並且停止使用Python輸入!事實上,我們將嘗試從輸入管道中刪除任何 Python 依賴。

由於去除了 feed_dict 系統,這將會更好的使用多執行緒,非同步性和記憶體優化(這是非常酷的事,因為如果你計劃在分散式系統上訓練你的模型,那麼TF會給你意想不到的驚喜)。

但首先,讓我們通過簡單的例子來探索 TF 中的佇列。再次,請跟著我的註釋來閱讀:

import tensorflow as tf

# We simulate some raw input data
# let's start with only 3 samples of 1 data point
x_input_data = tf.random_normal([3], mean=-1, stddev=4)

# We build a FIFOQueue inside the graph 
# You can see it as a waiting line that holds waiting data
# In this case, a line with only 3 positions
q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)

# We need an operation that will actually fill the queue with our data
# "enqueue_many" slices "x_input_data" along the 0th dimension to make multiple queue elements
enqueue_op = q.enqueue_many(x_input_data) # <- x1 - x2 -x3 |

# We need a dequeue op to get the next elements in the queue following the FIFO policy.
input = q.dequeue() 
# The input tensor is the equivalent of a placeholder now 
# but directly connected to the data sources in the graph

# Each time we use the input tensor, we print the number of elements left
# in the queue
input = tf.Print(input, data=[q.size()], message="Nb elements left:")

# fake graph: START
y = input + 1
# fake graph: END 

# We start the session as usual
with tf.Session() as sess:
    # We first run the enqueue_op to load our data into the queue
    sess.run(enqueue_op)
    # Now, our queue holds 3 elements, it's full. 
    # We can start to consume our data
    sess.run(y)
    sess.run(y) 
    sess.run(y) 
    # Now our queue is empty, if we call it again, our program will hang right here
    # waiting for the queue to be filled by at least one more datum
    sess.run(y) 

這裡發生了什麼呢?為什麼程式不再往下執行了呢?

那麼,這就是 TF 的具體實現,如果佇列是空的,那麼出對操作會導致真個圖去等待更多的資料。但是,這種行為只有在你手動使用佇列時才會發生,但這顯然是非常麻煩的,甚至是完全沒用的,因為我們仍然只有一個執行緒在呼叫入隊和出隊操作。

注意:如果要進行非同步操作,那麼它們必須都在自己的執行緒中,而不是主執行緒。正如我的發過奶奶曾經說過的那樣:如果許多廚師去使用同一個刀具來做飯,那麼他們不會比只有一個廚師要快。

為了解決這個問題,讓我來介紹一下 QueueRunner 和協調器,它們的唯一目的是在自己的執行緒中處理佇列,並確保同步(啟動,排隊,出隊,停止等等)。

QueueRunner 需要做 2 件事情:

  • 一個佇列

  • 一些入隊操作(你可以對一個佇列,進行多個入隊操作)

協調器不需要做任何事:它是一個方便的高階API,專門用來處理 “tf.train” 名稱空間下的佇列。如果你像我一樣建立自定義佇列,並天機器一個 QueueRunner 來處理它。只要你不要忘記將 QueueRunner 新增到 TF 的 QUEUE_RUNNERS 集合中,則可以安全地使用高階 API 。

讓我們先來看看以前的例子,在原來的例子中我們讓每一個執行緒去處理自己的佇列:

import tensorflow as tf

# This time, let's start with 6 samples of 1 data point
x_input_data = tf.random_normal([6], mean=-1, stddev=4)

# Note that the FIFO queue has still a capacity of 3
q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)

# To check what is happening in this case:
# we will print a message each time "x_input_data" is actually computed
# to be used in the "enqueue_many" operation
x_input_data = tf.Print(x_input_data, data=[x_input_data], message="Raw inputs data generated:", summarize=6)
enqueue_op = q.enqueue_many(x_input_data)

# To leverage multi-threading we create a "QueueRunner"
# that will handle the "enqueue_op" outside of the main thread
# We don't need much parallelism here, so we will use only 1 thread
numberOfThreads = 1 
qr = tf.train.QueueRunner(q, [enqueue_op] * numberOfThreads)
# Don't forget to add your "QueueRunner" to the QUEUE_RUNNERS collection
tf.train.add_queue_runner(qr) 

input = q.dequeue() 
input = tf.Print(input, data=[q.size(), input], message="Nb elements left, input:")

# fake graph: START
y = input + 1
# fake graph: END 

# We start the session as usual ...
with tf.Session() as sess:
    # But now we build our coordinator to coordinate our child threads with
    # the main thread
    coord = tf.train.Coordinator()
    # Beware, if you don't start all your queues before runnig anything
    # The main threads will wait for them to start and you will hang again
    # This helper start all queues in tf.GraphKeys.QUEUE_RUNNERS
    threads = tf.train.start_queue_runners(coord=coord)

    # The QueueRunner will automatically call the enqueue operation
    # asynchronously in its own thread ensuring that the queue is always full
    # No more hanging for the main process, no more waiting for the GPU
    sess.run(y)
    sess.run(y) 
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)

    # We request our child threads to stop ...
    coord.request_stop()
    # ... and we wait for them to do so before releasing the main thread
    coord.join(threads)

小練習:
在檢視日誌之前,思考一下 tf.random_normal 被呼叫了多少次?

具體日誌結果如下:

Logs of the queue exercise

正如你所看到的,x_input_data 被呼叫了 3 次。並且每次我們嘗試推送更多的元素而不是佇列容量,額外的元素不會像預期的那樣被丟棄,他們會等待別的執行緒(或佇列)來呼叫。

所以我們只需要在第四和第十個呼叫中填滿佇列中的空位,在佇列中只剩下 2 個元素。(因為我們現在是非同步的,所以列印語句的順序可能會有點混亂。)

注意:我不會深入的探討佇列和TF的生態系統,因為它太讓人心動了。你一定要更加熟悉它,你可以多閱讀以下最後面的連結。

感謝所有這些新知識,我們終於可以利用這個佇列系統來更新我們的第一個指令碼,看看是否有任何改進!

import time
import tensorflow as tf

# We simulate some raw input data 
# (think about it as fetching some data from the file system)
# let's say: batches of 128 samples, each containing 1024 data points
x_input_data = tf.random_normal([128, 1024], mean=0, stddev=1)

# We build our small model: a basic two layers neural net with ReLU
with tf.variable_scope("queue"):
    q = tf.FIFOQueue(capacity=5, dtypes=tf.float32) # enqueue 5 batches
    # We use the "enqueue" operation so 1 element of the queue is the full batch
    enqueue_op = q.enqueue(x_input_data)
    numberOfThreads = 1
    qr = tf.train.QueueRunner(q, [enqueue_op] * numberOfThreads)
    tf.train.add_queue_runner(qr)
    input = q.dequeue() # It replaces our input placeholder
    # We can also compute y_true right into the graph now
    y_true = tf.cast(tf.reduce_sum(input, axis=1, keep_dims=True) > 0, tf.int32)

with tf.variable_scope('FullyConnected'):
    w = tf.get_variable('w', shape=[1024, 1024], initializer=tf.random_normal_initializer(stddev=1e-1))
    b = tf.get_variable('b', shape=[1024], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(input, w) + b
    y = tf.nn.relu(z)

    w2 = tf.get_variable('w2', shape=[1024, 1], initializer=tf.random_normal_initializer(stddev=1e-1))
    b2 = tf.get_variable('b2', shape=[1], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(y, w2) + b2

with tf.variable_scope('Loss'):
    losses = tf.nn.sigmoid_cross_entropy_with_logits(None, tf.cast(y_true, tf.float32), z)
    loss_op = tf.reduce_mean(losses)

with tf.variable_scope('Accuracy'):
    y_pred = tf.cast(z > 0, tf.int32)
    accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y_true), tf.float32))
    accuracy = tf.Print(accuracy, data=[accuracy], message="accuracy:")

# We add the training op ...
adam = tf.train.AdamOptimizer(1e-2)
train_op = adam.minimize(loss_op, name="train_op")

startTime = time.time()
with tf.Session() as sess:
    # ... init our variables, ...
    sess.run(tf.global_variables_initializer())

    # ... add the coordinator, ...
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    # ... check the accuracy before training (without feed_dict!), ...
    sess.run(accuracy)

    # ... train ...
    for i in range(5000):
        #  ... without sampling from Python and without a feed_dict !
        _, loss = sess.run([train_op, loss_op])

        # We regularly check the loss
        if i % 500 == 0:
            print('iter:%d - loss:%f' % (i, loss))

    # Finally, we check our final accuracy
    sess.run(accuracy)

    coord.request_stop()
    coord.join(threads)

print("Time taken: %f" % (time.time() - startTime))

Training phase monitoring of the second example with logs and nvidia-smi

最後總結:

  • 在佇列系統之外,我們使用了與以前完全相同的程式碼。

  • y_true 是在圖內部進行計算的,您可以比較當人們必須將其輸入資料分割為輸入資料和標籤時的情況。

  • 不需要任何 feed_dict,不再浪費記憶體。

  • 我們現在對GPU的使用率 ~43%,優於 31% 。這意味著我們的程式至少多使用了12%的GPU資源。在這種情況下,這意味著你可以增加 batch_size 的值,但是你要小心,batch_size 將會影響你的收斂速度。

  • 訓練時間需要 ~11.5秒,這大約是以前的33%的訓練時間,太棒了!

Reference:

如果覺得內容有用,幫助多多分享哦 :)

長按或者掃描如下二維碼,關注 “CoderPai” 微訊號(coderpai)。新增底部的 coderpai 小助手,新增小助手時,請備註 “演算法” 二字,小助手會拉你進演算法群。如果你想進入 AI 實戰群,那麼請備註 “AI”,小助手會拉你進AI實戰群。

.