機器學習筆記(十九):TensorFlow實戰十一(多執行緒輸入資料)
1 - 引言
為了加速模型訓練的時間,TensorFlow提供了一套多執行緒處理輸入資料的框架。
下面我們來詳細的介紹如何使用多執行緒來加速我們的模型訓練速度
2 - 佇列與多執行緒
在TensorFlow中,佇列和變數類似,我們可以修改它們的狀態。下面給出一個示例來展示如何在TensorFlow中操作一個佇列
import tensorflow as tf #建立一個佇列,指定佇列長度為2,型別為Int32 q = tf.FIFOQueue(2,"int32") #使用enqueue_many函式初始化佇列中的元素。和變數初始化類似。 init = q.enqueue_many(([0,10],)) #使用Dequeue函式將佇列中的第一個元素出佇列 x = q.dequeue() y= x+1 #將Y重新加入佇列 q_inc = q.enqueue([y]) with tf.Session() as sess: #執行初始化佇列的操作 init.run() for _ in range(5): v,_ = sess.run([x,q_inc]) print(v)
0
10
1
11
2
TensorFlow提供了FIFOQueue和RandomShuffleQueue兩種佇列
- FIFOQueue
先進先出佇列 - RandomShuffleQueue
從所有元素中隨機一個出隊
因為在訓練神經網路時希望每次使用的訓練資料儘量隨機。
在TensorFlow中,多個執行緒可以同時向一個佇列中寫元素,或者同時讀取一個佇列中的元素,主要使用
- tf.Coordinator
- tf.QueueRunner
這兩個類來完成多執行緒協同的功能
下面給出一個示例來展示tf.Coordinator的用法:
import tensorflow as tf import numpy as np import threading import time #執行緒中執行的程式 def MyLoop(coord,worker_id): #使用tf.Coordinator類提供工具判斷是否需要停止 while not coord.should_stop(): #隨機停止所有的執行緒 if np.random.rand() < 0.1: print("Stoping from id:%d\n" % worker_id) #呼叫coord.request_stop()函式來通知其他執行緒停止 coord.request_stop() else: #列印當前執行緒的id print("working on id: %d\n" % worker_id) time.sleep(1) #宣告一個tf.train.Coordinator類來協同多個執行緒 coord = tf.train.Coordinator() #宣告建立5個執行緒 threads = [threading.Thread(target=MyLoop,args=(coord,i,)) for i in range(5)] #啟動所有的執行緒 for t in threads: t.start() #等待所有執行緒退出 coord.join(threads)
working on id: 0
working on id: 1
working on id: 2
working on id: 3
working on id: 4
working on id: 0
Stoping from id:3
working on id: 4
tf.QueueRunner主要用於啟動多個執行緒來操作同一個佇列,啟動的這些執行緒可以通過上面介紹的tf.Coordinator類來同一管理。以下程式碼示例展示瞭如何管理多執行緒佇列操作
import tensorflow as tf import numpy as np import threading import time #宣告一個先進先出的佇列,最多100個元素,元素為實數 queue = tf.FIFOQueue(100,"float") #定義佇列的入隊操作 enqueue_op = queue.enqueue([tf.random_normal([1])]) #使用tf.train.QueueRunner來建立多個執行緒執行佇列的入隊操作 #tf.train.QueueRunner的第一個引數給出了被操作的佇列,[enqueue_op]*5 #表示了需要啟動5個執行緒,每個執行緒中執行的是enqueue_op操作。 qr = tf.train.QueueRunner(queue,[enqueue_op]*5) #將定義過的QueueRunner加入TensorFlow計算圖上指定的集合 tf.train.add_queue_runner(qr) #定義出隊操作 out_tensor = queue.dequeue() with tf.Session() as sess: #使用tf.train.Coordinator來協同啟動執行緒 coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess,coord=coord) #獲取佇列中的值 for _ in range(3):print(sess.run(out_tensor)[0]) coord.request_stop() coord.join(threads)
0.19481687
-0.41217336
-0.38111967
3 - 輸入檔案佇列
當輸入資料較大時,可以將資料分成多個TFRecord檔案來提高處理效率。檔案在加入佇列之前會被打亂順序,所以出隊的順序也是隨機的。隨機打亂檔案順序以及加入輸入佇列的過程會跑在一個單獨的執行緒上,這樣可以加快獲取檔案的速度
首先我們先建立2個TFRecord檔案
import tensorflow as tf
# 建立TFRecord檔案的幫助函式
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
# 模擬海量資料情況下將資料寫入不同的檔案。num_shards定義了總共寫入多少個檔案
# instances_per_shard定義了每個檔案有多少個數據
num_sards = 2
instances_per_shard = 2
for i in range(num_sards):
# 將資料分為多個檔案時,可以將不同檔案以類似0000n-of-0000m的字尾區分。其中m表示
# 資料總共被存在了多少個檔案,n表示當前檔案的編號。式樣的方式既方便了通過正則表示式
# 獲取檔案列表,又在檔名中加入更多的資訊。
filename = ('/path/to/data.tfrecords-%.5d-of-%.5d' % (i, num_sards))
writer = tf.python_io.TFRecordWriter(filename)
# 將資料封裝成Example結構並寫入TFRecord檔案
for j in range(instances_per_shard):
# Example結構僅包含當前樣例屬於第幾個檔案以及是當前檔案的第幾個樣本
example = tf.train.Example(features=tf.train.Features(feature={
'i':_int64_feature(i),
'j':_int64_feature(j)
}))
writer.write(example.SerializeToString())
writer.close()
如下圖所示建立了2個檔案
檔案佇列的生成主要使用兩個函式
- tf.train.match_filenames_once():獲取符合正則表示式的檔案列表
- tf.train.string_input_producer():用檔案列表建立一個輸入佇列
通過設定 shuffle 引數為 True,string_input_producer 會將檔案的入隊順序打亂,所以出隊順序是隨機的。隨機打亂檔案順序和入隊操作會跑在一個單獨的執行緒上,不會影響出隊的速度
當輸入佇列中的所有檔案都處理完後,它會將檔案列表中的檔案重新加入佇列。可以通過設定 num_epochs 引數來限制載入初始檔案列表的最大輪數
import tensorflow as tf
# 獲取檔案列表
files = tf.train.match_filenames_once('/path/to/data.tfrecords-*')
# 建立檔案輸入佇列
filename_queue = tf.train.string_input_producer(files, shuffle=False)
# 讀取並解析Example
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
serialized_example,
features={
'i': tf.FixedLenFeature([], tf.int64),
'j': tf.FixedLenFeature([], tf.int64)
})
with tf.Session() as sess:
# 使用match_filenames_once需要用local_variables_initializer初始化一些變數
sess.run(
[tf.global_variables_initializer(),
tf.local_variables_initializer()])
# 列印檔名
print(sess.run(files))
# 用Coordinator協同執行緒,並啟動執行緒
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
# 獲取資料
for i in range(6):
print(sess.run([features['i'], features['j']]))
coord.request_stop()
coord.join(threads)
[0, 0]
[0, 1]
[1, 0]
[1, 1]
[0, 0]
[0, 1]
在不打亂檔案列表的情況下,會依次讀出樣例資料中的每一個樣例。
4 - 組合訓練資料(batching)
將多個輸入樣例組成一個batch可以提高模型訓練的效率。所以在得到單個樣例的預處理結果之後,還需要將它們組成batch,然後再提供神經網路的輸入層TensorFlow提供了tf.train.batch和tf.train.shuffle_batch函式來將單個的樣例組織成batch的形式輸出
import tensorflow as tf
# 獲取檔案列表
files = tf.train.match_filenames_once('/path/to/data.tfrecords-*')
# 建立檔案輸入佇列
filename_queue = tf.train.string_input_producer(files, shuffle=False)
# 讀取並解析Example
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
serialized_example,
features={
'i': tf.FixedLenFeature([], tf.int64),
'j': tf.FixedLenFeature([], tf.int64)
})
# i代表特徵向量,j代表標籤
example, label = features['i'], features['j']
# 一個batch中的樣例數
batch_size = 3
# 檔案佇列中最多可以儲存的樣例個數
capacity = 1000 + 3 * batch_size
# 組合樣例
example_batch, label_batch = tf.train.batch(
[example, label], batch_size=batch_size, capacity=capacity)
with tf.Session() as sess:
# 使用match_filenames_once需要用local_variables_initializer初始化一些變數
sess.run(
[tf.global_variables_initializer(),
tf.local_variables_initializer()])
# 用Coordinator協同執行緒,並啟動執行緒
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
# 獲取並列印組合之後的樣例。真實問題中一般作為神經網路的輸入
for i in range(2):
cur_example_batch, cur_label_batch = sess.run(
[example_batch, label_batch])
print(cur_example_batch, cur_label_batch)
coord.request_stop()
coord.join(threads)
[0 0 1] [0 1 0]
[1 0 0] [1 0 1]
可以看到單個的資料被組織成 3 個一組的 batch
以下是使用 tf.train.shuffle_batch() 的方法,min_after_dequeue 引數限制了出隊時佇列中元素的最少個數,當佇列元素個數太少時,隨機的意義就不大了
# 和tf.train.batch的樣例程式碼一樣產生example和label
example, label = features['i'], features['j']
# 使用tf.train,shuffle_batch函式來組合樣例。tf.train.shuffle_batch函式
# 的引數大部分都和tf.train.batch函式相似,但是min_after_dequeue引數是
# tf.train.shuffle_batch函式特有的。min_after_dequeue引數限制了出隊時佇列中
# 元素的最少個數。當佇列中元素太少時,隨機打亂樣側順序的作用就不大。所以
# tf.train.shuffle_batch函式提供了限制出隊時最少元素的個數來保證隨機打亂順序的
# 作用。當出隊函式被呼叫但是出隊中元素不夠時,出隊操作將等待更多的元素入隊才會完成
# 如果min_after_dequeue引數被設定,capacity也應該相應調整來滿足效能需求
example_batch, label_batch = tf.train.shuffle_batch(
[example, label], batch_size=batch_size,
capacity=capacity, min_after_dequeue=30
)
5 - 完整的輸入訓練框架
以上,已經介紹完了利用佇列 多執行緒從多個 TFRecord 檔案中組織成訓練資料 batch 的方法。這裡給出一個完整的處理和訓練框架
import tensorflow as tf
files = tf.train.match_filenames_once("/path/to/file_pattern-*")
filename_queue = tf.train.string_input_producer(files, shuffle=False)
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
serialized_example,
features={'image': tf.FixedLenFeature([], tf.string),
'label': tf.FixedLenFeature([], tf.int64),
'height': tf.FixedLenFeature([], tf.int64),
'width': tf.FixedLenFeature([], ft.int64),
'channels': tf.FixedLenFeature([], tf.int64),
})
image, label = features['image'], features['label']
height, width = features['height'], features['width']
channels = features['channels']
# 從原始影象中解析出畫素矩陣,並根據圖片尺寸還原影象
decoded_image = tf.decode_raw(image, tf.uint8)
decoded_image.set_shape([height, width, channels])
# 定義神經網路輸入圖片大小
image_size = 299
# 假設 preprocess_for_train 為圖片隨機處理函式,具體參考第 11 節
''' 由於這裡採用了多執行緒輸入框架,所以影象處理過程不會造成訓練的瓶頸 '''
distorted_image = preprocess_for_train(decoded_image, image_size, image_size, None)
# 通過 tf.train.shuffle_batch 組織成 batch 訓練資料
min_after_dequeue = 10000
batch_size = 100
capacity = min_after_dequeue + 3 * batch_size
image_batch, label_batch = tf.train.shuffle_batch(
[distorted_image, label], batch_size=batch_size,
capacity=capacity, min_after_dequeue=min_after_dequeue)
# 定義神經網路傳輸和優化過程
logit = inference(image_batch)
loss = calc_loss(logit, label_batch)
# 採用隨機梯度下降演算法優化損失函式
train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss)
with tf.Session() as sess:
# 初始化變數
tf.initialize_all_variables().run()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# 訓練神經網路過程
for i in range(TRAINING_STEPS):
sess.run(train_step)
coord.request_stop()
coord.join(threads)
完整的輸入資料框架如下圖所示:
通過這種方式,可以有效地提高資料預處理的效率,避免資料預處理成為神經網路模型訓練過程中的效能瓶頸