1. 程式人生 > >tensorflow學習筆記——多執行緒輸入資料處理框架

tensorflow學習筆記——多執行緒輸入資料處理框架

  之前我們學習使用TensorFlow對影象資料進行預處理的方法。雖然使用這些影象資料預處理的方法可以減少無關因素對影象識別模型效果的影響,但這些複雜的預處理過程也會減慢整個訓練過程。為了避免影象預處理成為神經網路模型訓練效率的瓶頸,TensorFlow提供了一套多執行緒處理輸入資料的框架。

  下面總結了一個經典的輸入資料處理的流程:

   下面我們首先學習TensorFlow中佇列的概念。在TensorFlow中,佇列不僅是一種資料結構,它更提供了多執行緒機制。佇列也是TensorFlow多執行緒輸入資料處理框架的基礎。然後再學習上面的流程。最後這個流程將處理好的單個訓練資料整理成訓練資料 batch,這些batch就可以作為神經網路的輸入。

準備知識:多執行緒的簡單介紹

  在傳統作業系統中,每個程序有一個地址空間,而且預設就有一個控制執行緒。執行緒顧名思義,就是一條流水線工作的過程(流水線的工作需要電源,電源就相當於CPU),而一條流水線必須屬於一個車間,一個車間就是一個程序,車間負責把資源整合到一起,是一個資源單位,而一個車間內至少有一條流水線。所以,程序只是用來把資源集中到一起(程序只是一個資源單位,或者說資源集合),而執行緒才是CPU上的執行單位。

  多執行緒(即多個控制執行緒)的概念就是:在一個程序中存在多個執行緒,多個執行緒共享該程序的地址空間,相當於一個車間內有多條流水線,都共用一個車間的資源。比如成都地鐵和西安地鐵是不同的程序,而成都地鐵3號線是一個執行緒,成都地鐵所有的執行緒共享成都所有的資源,比如成都所有的乘客可以被所有線拉。

  開啟多執行緒的方式:

import time
import random
from threading import Thread


def study(name):
    print("%s is learning" % name)
    time.sleep(random.randint(1, 3))
    print("%s is playing " % name)


if __name__ == '__main__':
    t = Thread(target=study, args=('james', ))
    t.start()
    print("主執行緒開始執行")

'''
結果展示:
james is learning
主執行緒開始執行
james is playing 
'''

    t.start() 將開啟程序的訊號發給作業系統後,作業系統要申請記憶體空間,讓好拷貝父程序地址空間到子程序,開銷遠大於執行緒。

1,佇列與多執行緒

  在TensorFlow中,佇列和變數類似,都是計算圖上有狀態的節點。其他的計算節點可以修改他們的狀態。對於變數,可以通過賦值操作修改變數的取值。對於佇列,修改佇列狀態的操作主要有Enqueue,EnqueueMany和Dequeue。下面程式展示瞭如何使用這些函式來操作一個佇列。

#_*_coding:utf-8_*_
import tensorflow as tf

# 建立一個先進先出的佇列,指定佇列中最多可以儲存兩個元素,並指定型別為整數
q = tf.FIFOQueue(2, 'int32')
# 使用enqueue_many 函式來初始化佇列中的元素。
# 和變數初始化類似,在使用佇列之前需要明確的呼叫這個初始化過程
init = q.enqueue_many(([0, 10], ))
# 使用Dequeue 函式將佇列中的第一個元素出佇列。這個元素的值將被存在變數x中
x = q.dequeue()
# 將得到的值加1
y = x + 1
# 將加 1 後的值在重新加入佇列
q_inc = q.enqueue([y])

with tf.Session() as sess:
    # 執行初始化佇列的操作
    init.run()
    for _ in range(6):
        #執行q_inc 將執行資料出佇列,出隊的元素 +1 ,重新加入佇列的整個過程
        v, _ = sess.run([x, q_inc])
        # 打印出隊元素的取值
        print('%s'%v)

'''
佇列開始有[0, 10] 兩個元素,第一個出隊的為0, 加1之後為[10, 1]
第二次出隊的為10, 加1之後入隊的為11, 得到的佇列為[1, 11]
以此類推,最後得到的輸出為:
0
10
1
11
2
'''

  TensorFlow中提供了FIFOQueue 和 RandomShuffleQueue 兩種佇列。在上面的程式中,已經展示瞭如何使用FIFOQueue,它的實現的一個先進先出佇列。 RandomShuffleQueue 會將佇列中的元素打亂,每次出隊操作得到的是從當前佇列所有元素中隨機選擇的一個。在訓練審計網路時希望每次使用的訓練資料儘量隨機。 RandomShuffleQueue 就提供了這樣的功能。

  在TensorFlow中,佇列不僅僅是一種資料結構,還是非同步計算張量取值的一個重要機制。比如多個執行緒可以同時向一個佇列中寫元素,或者同時讀取一個佇列中的元素。在後面我們會學習TensorFlow是如何利用佇列來實現多執行緒輸入資料處理的。

  TensorFlow提供了 tf.Coordinator 和 tf.QueueRunner 兩個類來完成多執行緒協同的功能。tf.Coordinator 主要用於協同多個執行緒一起停止,並提供了 should_stop, request_stop 和 join 三個函式。在啟動執行緒之前,需要先宣告一個 tf.Coordinator 類,並將這個類傳入每一個建立的執行緒中。啟動的執行緒需要一直查詢 tf.Coordinator 類中提供的 should_stop 函式,當這個函式的返回值為 True時,則當前執行緒也需要退出。每一個啟動的執行緒都可以通過呼叫 request_stop 函式來通知其他執行緒退出。當某一個執行緒呼叫  request_stop 函式之後, should_stop 函式的返回值將被設定為 TRUE,這樣其他的執行緒就可以同時終止了。下面程式展示瞭如何使用 tf.Coordinator。

#_*_coding:utf-8_*_
import tensorflow as tf
import numpy as np
import threading
import time

# 執行緒中執行的程式,這個程式每隔1秒判斷是否停止並列印自己的ID
def MyLoop(coord, worker_id):
    # 使用 tf.Coordinator 類提供的協同工具判斷當前是否需要停止
    while not coord.should_stop():
        # 隨機停止所有的執行緒
        if np.random.rand() < 0.1:
            print("Stopping from id: %d\n" % worker_id)
            # 呼叫 coord.request_stop() 函式來通知其他執行緒停止
            coord.request_stop()
        else:
            # 列印當前執行緒的 ID
            print("Working on id: %d\n" % worker_id)
        # 暫停1 s
        time.sleep(1)

# 宣告一個  tf.train.Coordinator 類來協同多個執行緒
coord = tf.train.Coordinator()

# 宣告建立 5 個執行緒
threads = [
    threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)
]

# 啟動所有的執行緒
for t in threads:
    t.start()

# 等待所有執行緒退出
coord.join(threads)
'''
Working on id: 0
Working on id: 1
Working on id: 2
Working on id: 3
Working on id: 4

Working on id: 0
Working on id: 1
Working on id: 3
Working on id: 2
Working on id: 4

Working on id: 0
Working on id: 2
Working on id: 1
Working on id: 3
Working on id: 4

Working on id: 2
Working on id: 1
Working on id: 0
Working on id: 3
Working on id: 4

Working on id: 3
Working on id: 0
Working on id: 1
Working on id: 2
Working on id: 4
Working on id: 1
Stopping from id: 0
'''

  當所有執行緒啟動之後,每個執行緒會列印各自的ID,於是前面4行打印出了他們的ID。然後在暫停1秒之後,所有的執行緒又開始第二遍列印ID。在這個時候有一個執行緒推出的條件達到,於是呼叫了coord.request_stop 函式來停止所有其他的執行緒。然而在列印Stoping_from_id:4之後,可以看到有執行緒仍然在輸出。這是因為這些執行緒已經執行完 coord.should_stop 的判斷,於是仍然會繼續輸出自己的ID。但在下一輪判斷是否需要停止時將推出執行緒。於是在列印一次ID之後就不會再有輸出了。

  tf.QueueRunner 主要用於啟動多個執行緒來操作同一個佇列,啟動的這些執行緒可以通過上面介紹的 tf.Coordinator 類來統一管理,下面程式碼展示瞭如何使用 tf.QueueRunner 和 tf.Coordinator 來管理多執行緒佇列操作。

#_*_coding:utf-8_*_
import tensorflow as tf

# 宣告一個先進先出的佇列,佇列中最多100個元素,型別為實數
queue = tf.FIFOQueue(100, 'float')
# 定義佇列的入隊操作
enqueue_op = queue.enqueue([tf.random_normal([1])])

# 使用 tf.train.QueueRunner 來建立多個執行緒執行佇列的入隊操作
# tf.train.QueueRunner 的第一個引數給出了被操作的佇列
# [enqueue_op] * 5 表示了需要啟動5個執行緒,每個執行緒執行的是equeue_op操作
qr = tf.train.QueueRunner(queue, [enqueue_op]*5)

# 將定義過的 QueueRunner 加入 TensorFlow計算圖上指定的集合
# tf.train.add_queue_runner 函式沒有指定集合
# 則加入預設集合 tf.GraphKeys.QUEUE_RUNNERS
# 下面的函式就是講剛剛定義的qr加入預設的tf.GraphKeys.QUEUE_RUNNERS集合
tf.train.add_queue_runner(qr)
# 定義出隊操作
out_tensor = queue.dequeue()

with tf.Session() as sess:
    # 使用 tf.train.coordinator 來協同啟動的執行緒
    coord = tf.train.Coordinator()
    # 使用tf.train.QueueRunner時,需要明確呼叫 tf.train.start_queue_runnsers來啟動所有執行緒
    # 否則因為沒有執行緒執行入隊操作,當調用出隊操作時,程式會一直等待入隊操作被執行。
    # tf.train.start_queue_runners 函式會預設啟動 tf.GraphKeys.QUEUE_RUNNERS集合
    # 所說的 tf.train.add_queue_runner 函式和 tf.train.start_queue_runners 函式會指定同一個集合
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 獲取佇列中的取值
    for _ in range(3):
        print(sess.run(out_tensor)[0])

    # s使用 tf.train.Coordinator 來停止所有的執行緒
    coord.request_stop()
    coord.join(threads)

'''
-0.88587755
-0.6659831
-2.9722364
'''

  

輸入檔案佇列

  下面將學習如何使用TensorFlow中的佇列管理輸入檔案列表。這裡假設所有的輸入資料都已經整理成了TFRecord 格式。雖然一個 TFRecord 檔案中可以儲存多個訓練樣例,但是當訓練資料量較大時,可以將資料分成多個 TFRecord 檔案來提高處理效率。 TensorFlow 提供了 tf.train.match_filenames_once 函式來獲取符合一個正則表示式的所有檔案,得到的檔案列表可以通過 tf.train.string_input_producer 函式進行有效的管理。

  tf.train.string_input_producer 函式會使用初始化時提供的檔案列表建立一個輸入佇列,輸入對壘中原始的元素為檔案列表中的所有檔案。如上面的程式碼所示,建立好的輸入佇列可以作為檔案讀取函式的引數。每次呼叫檔案讀取函式時,該函式會先判斷當前是否已有開啟的檔案可讀,如果沒有或者開啟的檔案以及讀完,這個函式會從輸入佇列中出隊一個檔案並從這個檔案中讀取資料。

  通過設定 shuffle 引數,tf.train.string_input_producer 函式支援隨機打亂檔案列表中檔案出隊的順序。當 shuffle 引數為 TRUE時,檔案在加入佇列之前會被打亂順序,所以出隊的順序也是隨機的。隨機打亂檔案順序以及加入輸入佇列的過程會泡在一個單獨的執行緒上,這樣不會影響獲取檔案的速度。tf.train.string_input_producer 函式生成的輸入佇列可以同時被多個檔案讀取執行緒操作,而且輸入佇列會將佇列中的檔案均勻的分給不同的執行緒,不出現有些檔案被處理過多次而有些檔案還沒有被處理過的情況。

  當一個輸入佇列中的所有檔案都被處理完後,它會將初始化時提供的檔案列表中的檔案全部重新加入佇列。tf.train.string_input_producer 函式可以設定 num_epochs 引數來限制載入初始檔案列表的最大輪數。當所有檔案都已經被使用了設定的輪數後,如果繼續嘗試讀取新的檔案,輸入佇列會報 OutOfRange 的錯誤。在測試神經網路模型時,因為所有測試資料只需要使用一次,所以可以將 num_epochs 引數設定為1,這樣在計算完一輪之後程式將自動停止。在展示  tf.train.match_filenames_once 和 tf.train.string_input_producer 函式的使用方法之前,我們可以先給出一個簡單的程式來生成資料。

#_*_coding:utf-8_*_
import tensorflow as tf

# 建立TFReocrd檔案的幫助函式
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# 模擬海量資料情況下降資料寫入不同的檔案,num_shards 定義了總共寫入多少檔案
# instances_per_shard 定義了每個檔案中有多少個數據
num_shards = 2
instances_per_shard = 2
for i in range(num_shards):
    # 將資料分為多個檔案時,可以將不同檔案以類似0000n-of-0000m 的字尾區分
    # 其中m表示了資料總共被存在了多少個檔案中,n表示當前檔案的編號
    # 式樣的方式既方便了通過正則表示式獲取檔案列表,又在檔名中加入了更多的資訊
    filename = ('data.tfrecords-%.5d-of-%.5d' % (i, num_shards))
    writer = tf.python_io.TFRecordWriter(filename)
    # 將資料封裝成Example結構並寫入 TFRecord 檔案
    for j in range(instances_per_shard):
        # Example 結構僅包含當前樣例屬於第幾個檔案以及時當前檔案的第幾個樣本
        example = tf.train.Example(features=tf.train.Features(
            feature={
                'i': _int64_feature(i),
                'j': _int64_feature(j)
            }
        ))
        writer.write(example.SerializeToString())
    writer.close()

  程式執行之後,在指定的目錄下生產兩個檔案,每一個檔案中儲存了兩個樣例,在生成了樣例資料之後,下面程式碼展示了 tf.train.match_filenames_once 函式 和 tf.train.string_input_producer 函式的使用方法:

#_*_coding:utf-8_*_
import tensorflow as tf

# 使用tf.train.match_filenames_once 函式獲取檔案列表
files = tf.train.match_filenames_once('path/data.tfrecords-*')
# print(files)

# 輸入佇列中的檔案列表為 tf.train.match_filenames_once 函式獲取的檔案列表
# 這裡將 shuffle引數設定為FALSE來避免隨機打亂讀檔案的順序
# 但是一般在解決真實問題,會將shuffle引數設定為TRUE
filename_queue = tf.train.string_input_producer(files, shuffle=False)
# print(filename_queue)
# 讀取並解析一個樣本
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
    serialized_example,
    features={
        'i': tf.FixedLenFeature([], tf.int64),
        'j': tf.FixedLenFeature([], tf.int64),
    }
)

with tf.Session() as sess:
    # 雖然在本段程式中沒有宣告任何變數
    # 但在使用 tf.train.match_filenames_once 函式時需要初始化一些變數
    # init = tf.global_variables_initializer()
    # init = tf.initialize_all_variables()
    init = tf.local_variables_initializer()
    sess.run(init)
    # sess.run(files)
    # sess.run([tf.global_variables_initializer(), tf.local_variables_initializer()])
    print(sess.run(files))

    # 宣告 tf.train.Coordinator 類來協同不同執行緒,並啟動執行緒
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)

    # 多次執行獲取資料的操作
    for i in range(6):
        print(sess.run([features['i'], features['j']]))
    coord.request_stop()
    coord.join(threads)

  列印結果如下:

[b'path\\data.tfrecords-00000-of-00002'
 b'path\\data.tfrecords-00001-of-00002']
[0, 0]
[0, 1]
[1, 0]
[1, 1]
[0, 0]
[0, 1]

  在不打亂檔案列表的情況下,會依次獨處樣例資料中的每一個樣例。而且當所有樣例都被讀完之後,程式會自動從頭開始。如果限制 num_epochs=1,那麼程式會報錯。

組合訓練資料(batching)

  在上面,我們已經學習瞭如何從檔案列表中讀取單個樣例,將這些單個樣例通過預處理方法進行處理,就可以得到提高給神經網路輸入層的訓練資料了。在之前學習過,將多個輸入樣例組織成一個batch可以提高模型訓練的效率。所以在得到單個樣例的預處理結果之後,還需要將他們組織成batch,然後再提供給審計網路的輸入層。TensorFlow提供了 tf.train.batch 和 tf.train.shuffle_batch 函式來將單個的樣例組織成 batch 的形式輸出。這兩個函式都會生成一個佇列,佇列的入隊操作時生成單個樣例的方法,而每次出隊得到的時一個batch的樣例。他們唯一的區別自安於是否會將資料順序打亂。下面程式碼展示了這兩個函式的使用方法。

   下面程式碼展示了 tf.train.batch函式的用法:

#_*_coding:utf-8_*_
import tensorflow as tf

# 讀取解析得到樣例,這裡假設Example結構中 i表示一個樣例的特徵向量
# 比如一張影象的畫素矩陣,而j表示該樣例對應的標籤


# 使用tf.train.match_filenames_once 函式獲取檔案列表
files = tf.train.match_filenames_once('path/data.tfrecords-*')

# 輸入佇列中的檔案列表為 tf.train.match_filenames_once 函式獲取的檔案列表
# 這裡將 shuffle引數設定為FALSE來避免隨機打亂讀檔案的順序
# 但是一般在解決真實問題,會將shuffle引數設定為TRUE
filename_queue = tf.train.string_input_producer(files, shuffle=False)
# print(filename_queue)
# 讀取並解析一個樣本
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
    serialized_example,
    features={
        'i': tf.FixedLenFeature([], tf.int64),
        'j': tf.FixedLenFeature([], tf.int64),
    }
)

example, label = features['i'], features['j']

# 一個 batch 中樣例的個數
batch_size = 2
# 組合樣例的佇列中最多可以儲存的樣例個數。這個佇列如果太大,
# 那麼需要佔用很多記憶體資源,如果太小,那麼出隊操作可能會因為
# 沒有資料而被阻礙(block),從而導致訓練效率降低,一般來說
# 這個佇列的大小會和每一個batch的大小相關,下面程式碼給出了設定
# 佇列大小的一種方式。
capacity = 1000 + 3 * batch_size

# 使用 tf.train.batch 函式來組合樣例。[example, label] 引數給
# 出了需要組合的元素,一般 example 和 label分別代表訓練樣本和這個樣本
# 對應的正確標籤。batch_size 引數給出了每個batch中樣例的個數。
# capacity 給出了佇列的最大容量。當佇列長度等於容量時,TensorFlow將暫停
# 入隊操作,而只是等待元素出隊。當元素個數小於容量時,TensorFlow將自動重新啟動入隊操作
example_batch, label_batch = tf.train.batch(
    [example, label], batch_size=batch_size, capacity=capacity
)

with tf.Session() as sess:
    tf.global_variables_initializer().run()
    tf.local_variables_initializer().run()
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)

    # 獲取並列印組合之後的樣例,在真實問題中,這個輸出一般會作為神經網路的輸入
    for i in range(3):
        cur_example_batch, cur_label_batch = sess.run(
            [example_batch, label_batch]
        )
        print(cur_example_batch, cur_label_batch)
    coord.request_stop()
    coord.join(threads)

'''
執行上面的程式會得到下面的輸出:
[0 0] [0 1]
[1 1] [0 1]
[0 0] [0 1]
從這個輸出可以看到 tf.train.batch函式可以將單個的資料組織成3個一組的batch
在 example, lable 中讀取的資料依次為:
example:0  label:0
example:0  label:1
example:1  label:1
example:0  label:1
example:0  label:0
example:0  label:1
    這是因為 tf.train.batch 函式不會隨機打亂順序,所以在組合之後得到的資料
    組成了上面給出的輸出。
'''

  下面程式碼展示了 tf.train.shuffle_batch 函式的使用方法:

import tensorflow as tf

#_*_coding:utf-8_*_
import tensorflow as tf

# 讀取解析得到樣例,這裡假設Example結構中 i表示一個樣例的特徵向量
# 比如一張影象的畫素矩陣,而j表示該樣例對應的標籤


# 使用tf.train.match_filenames_once 函式獲取檔案列表
files = tf.train.match_filenames_once('path/data.tfrecords-*')

# 輸入佇列中的檔案列表為 tf.train.match_filenames_once 函式獲取的檔案列表
# 這裡將 shuffle引數設定為FALSE來避免隨機打亂讀檔案的順序
# 但是一般在解決真實問題,會將shuffle引數設定為TRUE
filename_queue = tf.train.string_input_producer(files, shuffle=False)
# print(filename_queue)
# 讀取並解析一個樣本
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
    serialized_example,
    features={
        'i': tf.FixedLenFeature([], tf.int64),
        'j': tf.FixedLenFeature([], tf.int64),
    }
)

example, label = features['i'], features['j']

# 一個 batch 中樣例的個數
batch_size = 2
# 組合樣例的佇列中最多可以儲存的樣例個數。這個佇列如果太大,
# 那麼需要佔用很多記憶體資源,如果太小,那麼出隊操作可能會因為
# 沒有資料而被阻礙(block),從而導致訓練效率降低,一般來說
# 這個佇列的大小會和每一個batch的大小相關,下面程式碼給出了設定
# 佇列大小的一種方式。
capacity = 1000 + 3 * batch_size

# 使用 tf.train.shuffle_batch 函式來組合樣例。[example, label] 引數給
# 出了需要組合的元素,一般 example 和 label分別代表訓練樣本和這個樣本
# 對應的正確標籤。batch_size 引數給出了每個batch中樣例的個數。
# capacity 給出了佇列的最大容量。min_after_dequeue引數是
# tf.train.shuffle_batch 特有的。min_after_dequeue引數限制了出隊時
# 佇列中元素的最小個數,當佇列中元素太小時,隨機打亂樣例的順序作用就不大了
# 所以 tf.train.shuffle_batch 函式提供了限制出隊時的最小元素的個數來保證
# 隨機打亂順序的作用。當出隊函式被呼叫但是佇列中元素不夠時,出隊操作將等待更多
# 的元素入隊才會完成。如果min_after_dequeue引數被設定,capacity也應該來調整
example_batch, label_batch = tf.train.shuffle_batch(
    [example, label], batch_size=batch_size, capacity=capacity,
    min_after_dequeue=30
)

with tf.Session() as sess:
    tf.global_variables_initializer().run()
    tf.local_variables_initializer().run()
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)

    # 獲取並列印組合之後的樣例,在真實問題中,這個輸出一般會作為神經網路的輸入
    for i in range(3):
        cur_example_batch, cur_label_batch = sess.run(
            [example_batch, label_batch]
        )
        print(cur_example_batch, cur_label_batch)
    coord.request_stop()
    coord.join(threads)

'''
執行上面的程式會得到下面的輸出:
[0 1] [0 0]
[1 0] [0 0]
[1 0] [0 1]
從這個輸出可以看到 tf.train.shuffle_batch函式已經將樣例順序打亂了
'''

  tf.train.batch 函式 和 tf.train.shuffle_batch 函式除了將單個訓練資料整理成輸入 batch,也提供了並行化處理輸入資料的方法。tf.train.batch 函式 和 tf.train.shuffle_batch 函式並行化的方式一樣,所以我們執行應用更多的 tf.train.shuffle_batch 函式為例。通過設定tf.train.shuffle_batch 函式中的 num_threads引數,可以指定多個執行緒同時執行入隊操作。tf.train.shuffle_batch 函式的入隊操作就是資料讀取以及預處理的過程。當 num_threads 引數大於1時,多個執行緒會同時讀取一個檔案中的不同樣例並進行預處理。如果需要多個執行緒處理不同檔案中的樣例時,可以使用tf.train.shuffle_batch_join 函式。此函式會從輸入檔案佇列中獲取不同的檔案分配給不同的執行緒。一般來說,輸入檔案佇列時通過 tf.train.string_input_producer 函式生成的。這個函式會分均分配檔案以保證不同檔案中的資料會被儘量平均地使用。

  tf.train.shuffle_batch 函式 和 tf.train.shuffle_batch_join 函式都可以完成多執行緒並行的方式來進行資料預處理,但是他們各有優劣。對於tf.train.shuffle_batch 函式,不同執行緒會讀取同一個檔案。如果一個檔案中的樣例比較相似(比如都屬於同一個類別),那麼神經網路的訓練效果有可能會受到影響。所以在使用 tf.train.shuffle_batch 函式時,需要儘量將同一個TFRecord 檔案中的樣例隨機打亂。而是用 tf.train.shuffle_batch_join 函式時,不同執行緒會讀取不同檔案。如果讀取資料的執行緒數比總檔案數還大,那麼多個執行緒可能會讀取同一個檔案中相近部分的資料。而卻多個執行緒讀取多個檔案可能導致過多的硬碟定址,從而使得讀取的效率降低。不同的並行化方式各有所長。具體採用哪一種方法需要根據具體情況來確定。

輸入資料處理框架

  前面已經學習了開始給出的流程圖中的所有步驟,下面將這些步驟串成一個完成的TensorFlow來處理輸入資料,下面程式碼給出了這個步驟:

#_*_coding:utf-8_*_
import tensorflow as tf

# 建立檔案佇列,並通過檔案列表建立輸入檔案佇列
# 需要統一所有原始資料的格式並將他們儲存到TFRecord檔案中
# 下面給出的檔案列表應該包含所有提供訓練資料的TFRecord檔案
files = tf.train.match_filenames_once('path/output.tfrecords')
filename_queue = tf.train.string_input_producer([files])

# 解析TFRecord檔案中的資料,這裡假設image中儲存的時影象的原始資料
# label為該樣例所對應的標籤。height,width 和 channels 給出了圖片的維度
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
# 用FixedLenFeature 將讀入的Example解析成 tensor
features = tf.parse_single_example(
    serialized_example,
    features={
        'image_raw': tf.FixedLenFeature([], tf.string),
        'pixels': tf.FixedLenFeature([], tf.int64),
        'label': tf.FixedLenFeature([], tf.int64)
    }
)
# 從原始影象資料解析出畫素矩陣,並根據影象尺寸還原影象
decoded_images = tf.decode_raw(features['image_raw'], tf.uint8)
labels = tf.cast(features['label'], tf.int32)
pixels = tf.cast(features['pixels'], tf.int32)

retyped_images = tf.cast(decoded_images, tf.float32)
images = tf.reshape(retyped_images, [784])


# 將處理後的影象和標籤資料通過 tf.train.shuffle_batch 整理成
# 神經網路訓練訓練時需要的batch
# 將檔案以100個為一組打包
min_after_dequeue = 10000
batch_size = 100
capacity = min_after_dequeue + 3 * batch_size

image_batch, label_batch = tf.train.shuffle_batch([images, labels],
                                                  batch_size=batch_size,
                                                  capacity=capacity,
                                                  min_after_dequeue=min_after_dequeue)

# 訓練模型 計算審計網路的前向傳播結果
def inference(input_tensor, weights1, biases1, weights2, biases2):
    # 引入啟用函式讓每一層去線性化 tf.nn.relu()
    layer1 = tf.nn.relu(tf.matmul(input_tensor, weights1) + biases1)
    return tf.matmul(layer1, weights2) + biases2

# 模型相關的引數
INPUT_NODE = 784
OUTPUT_NODE = 10
LAYER1_NODE = 500
REGULARAZTION_RATE = 0.0001
TREINING_STEPS = 5000

# 生成隱藏層的引數
weights1 = tf.Variable(tf.truncated_normal([INPUT_NODE, LAYER1_NODE], stddev=0.1))
biases1 = tf.Variable(tf.constant(0.1, shape=[LAYER1_NODE]))

# 生成輸出層的引數
weights2 = tf.Variable(tf.truncated_normal([LAYER1_NODE, OUTPUT_NODE], stddev=0.1))
biases2 = tf.Variable(tf.constant(0.1, shape=[OUTPUT_NODE]))

y = inference(image_batch, weights1, biases1, weights2, biases2)

# 計算交叉熵及其平均值(對於分類問題,通常將交叉熵與softmax迴歸一起使用
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y,
                                                               labels=label_batch)
cross_entropy_mean = tf.reduce_mean(cross_entropy)

# 損失函式的計算
regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
# 計算模型的正則化損失,一般只計算神經網路邊上的權重的正則化損失,而不是用偏置項
regularization = regularizer(weights1) + regularizer(weights2)
# 總損失等於交叉熵損失和正則化損失的和
loss = cross_entropy_mean + regularization

# 優化損失函式
# 一般優化器的目的是優化權重W和偏差 biases,最小化損失函式的結果
train_step = tf.train.GradientDescentOptimizer(0.01).minimize(loss)

# 初始化會話,並開始訓練過程
with tf.Session() as sess:
    # 由於使用了Coordinator,必須對local 和 global 變數進行初始化
    sess.run(tf.local_variables_initializer())
    sess.run(tf.global_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 迴圈的訓練神經網路
    for i in range(TREINING_STEPS):
        if i %1000 == 0:
            print("After %d training step(s), loss is %g " % (i, sess.run(loss)))

        sess.run(train_step)
    coord.request_stop()
    coord.join(threads)

  下面程式碼是生成TFRecord檔案的(資料是MNIST資料)程式碼:

#_*_coding:utf-8_*_
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np

# 生成整數型的屬性
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# 生成字串型的屬性
def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

mnist = input_data.read_data_sets(
    'data', dtype=tf.uint8, one_hot=True
)
images = mnist.train.images
# 訓練資料所對應的正確答案,可以作為一個屬性儲存在TFRecord中
labels = mnist.train.labels
# 訓練資料的影象解析度,這可以作為Example中的一個屬性
pixels = images.shape[1]
num_examples = mnist.train.num_examples

# 輸出TFRecord 檔案的地址
filename = 'path/output.tfrecords'
# 建立一個writer來寫TFRecord 檔案
writer = tf.python_io.TFRecordWriter(filename)
for index in range(num_examples):
    # 將影象矩陣轉化為一個字串
    image_raw = images[index].tostring()
    # 將一個樣例轉化為 Example Protocol Buffer,並將所有的資訊寫入這個資料結構
    example = tf.train.Example(
        features=tf.train.Features(
            feature={
                'pixels': _int64_feature(pixels),
                'labels': _int64_feature(np.argmax(labels[index])),
                'image_raw': _bytes_feature(image_raw)
            }
        ))
    # 將一個Example寫入 TFRecord檔案
    writer.write(example.SerializeToString())
writer.close()

  上面程式碼給出了從輸入資料處理的整個流程。(但是程式可能會報錯,我們這裡主要學習思路)。從下圖中可以看出,輸入資料處理的第一步是為獲取儲存訓練資料的檔案列表。下圖的檔案列表為{A, B, C}.通過 tf.train.string_input_producer 函式可以選擇性地將檔案列表中檔案的順序打亂,並加入輸入佇列。因為是否打亂檔案的順序是可選的,所以在圖中是虛線的。tf.train.string_input_producer 函式會生成並維護一個輸入檔案佇列,不同執行緒中的檔案讀取函式可以共享這個輸入檔案佇列。在讀取樣例資料之後,需要將影象進行預處理。影象預處理的過程也會通過tf.train.shuffle_batch 提供的機制並行地跑在多個執行緒中。輸入資料處理流程的最後通過 tf.train.shuffle_batch 函式將處理好的單個樣例整理成 batch 提供給神經網路的輸入層。通過這種方式,可以有效地提高資料預處理的效率,避免資料預處理成為神經網路模型效能過程中的效能瓶頸。

 TensorFlow 資料讀取機制主要是兩種方法:

  • (1)使用檔案佇列方法,如使用 slice_input_producer 和 string_input_producer;這種方法既可以將資料轉存為 TFRecord資料格式,也可以直接讀取檔案圖片資料,當然轉存為 TFRecord 資料格式進行讀取會更高效點。而這兩者之間的區別就是前者是輸入 tensor_list ,因此可以將多個list組合成一個 tensorlist 作為輸入;而後者只能是一個 string_tensor了。
  • (2)使用TensorFlow 1.4版本後出現的 tf.data.DataSet 的資料讀取機制(pipeline機制),這是TensorFlow強烈推薦的方式,是一種更高效的讀取方式。使用 tf.data.Dataset 模組的pipeline機制,可以實現 CPU 多執行緒處理輸入的資料,如讀取圖片和圖片的一些預處理,這樣 GPU就可以專注於訓練過程,而CPU去準備資料。

  舉例如下:

image_dir ='path/to/image_dir/*.jpg'
image_list = glob.glob(image_dir)
label_list=...
image_list = tf.convert_to_tensor(image_list, dtype=tf.string)

# 可以將image_list,label_list多個list組合成一個tensor_list
image_que, label_que = tf.train.slice_input_producer([image_list,label_list], num_epochs=1)

# 只能時string_tensor,所以不能組合多個list
image = tf.train.string_input_producer(image_list, num_epochs=1)

  

tf.train.slice_input_produce() 函式的用法

  這個函式的作用就是從輸入的 tensor_list 按要求抽取一個 tensor 放入檔名佇列,下面學習各個引數:

tf.slice_input_producer(tensor_list, num_epochs=None, shuffle=True,
                         seed=None,capacity=32, shared_name=None, name=None)

  說明:

  • tensor_list 這個就是輸入,格式為tensor的列表;一般為[data, label],即由特徵和標籤組成的資料集
  • num_epochs 這個是你抽取batch的次數,如果沒有給定值,那麼將會抽取無數次batch(這會導致你訓練過程停不下來),如果給定值,那麼在到達次數之後就會報OutOfRange的錯誤
  • shuffle 是否隨機打亂,如果為False,batch是按順序抽取;如果為True,batch是隨機抽取
  • seed 隨機種子
  • capcity 佇列容量的大小,為整數
  • name 名稱

  舉個例子:我們的資料data的 shape是(4000,10),label的shape是(4000, 2),執行下面這行程式碼:

input_queue = tf.train.slice_input_producer([data, label], 
                                   num_epochs=1, shuffle=True, capacity=32 )

  結果肯定是返回值包含兩組資料的 list,每個list的shape和輸入的data和label的shape對應。

 

batch_size 的設定與影響

1,batch_size 的含義

  batch_size 可以理解為批處理引數,它的極限值為訓練集樣本總數,當資料量比較小時,可以將batch_size 值設定為全資料集(Full batch cearning)。實際上,在深度學習中所涉及到的資料都是比較多的,一般都採用小批量資料處理原則。

2,關於小批量訓練網路的優缺點

小批量訓練網路的優點:

  • 相對海量的的資料集和記憶體容量,小批量處理需要更少的記憶體就可以訓練網路。
  • 通常小批量訓練網路速度更快,例如我們將一個大樣本分成11小樣本(每個樣本100個數據),採用小批量訓練網路時,每次傳播後更新權重,就傳播了11批,在每批次後我們均更新了網路的(權重)引數;如果在傳播過程中使用了一個大樣本,我們只會對訓練網路的權重引數進行1次更新。
  • 全資料集確定的方向能夠更好地代表樣本總體,從而能夠更準確地朝著極值所在的方向;但是不同權值的梯度值差別較大,因此選取一個全域性的學習率很困難。

小批量訓練網路的缺點:

  • 批次越小,梯度的估值就越不準確,在下圖中,我們可以看到,與完整批次漸變(藍色)方向相比,小批量漸變(綠色)的方向波動更大。
  • 極端特例batch_size = 1,也成為線上學習(online learning);線性神經元在均方誤差代價函式的錯誤面是一個拋物面,橫截面是橢圓,對於多層神經元、非線性網路,在區域性依然近似是拋物面,使用online learning,每次修正方向以各自樣本的梯度方向修正,這就造成了波動較大,難以達到收斂效果。

3,為什麼需要 batch_size 的引數

  Batch 的選擇,首先決定的時下降的方向。如果資料集比較小,完全可以採用全資料集(Full  Batch Learning)的形式,這樣做有如下好處:

  • 全資料集確定的方向能夠更好的代表樣本總體,從而更準確地朝著極值所在的方向
  • 由於不同權值的梯度差別較大,因此選取一個全域性的學習率很困難

  Full  Batch Learning 可以使用 Rprop 只基於梯度符號並且針對性單獨更新各權值。但是對於非常大的資料集,上述兩個好處變成了兩個壞處:

  • 隨著資料集的海量增加和記憶體限制,一次載入所有資料不現實
  • 以Rprop的方式迭代,會由於各個 batch之間的取樣差異性,各次梯度修正值相互抵消,無法修正。這才有了後來的RMSprop的妥協方案。

4,選擇適中的 batch_size

  可不可以選擇一個適中的Batch_size 值呢?當然可以,就是批梯度下降法(Mini-batches Learning)。因為如果資料集足夠充分,那麼用一半(甚至少得多)的資料訓練算出來的梯度與用全部資料訓練出來的梯度是幾乎一樣的。

在合理的範圍內,增大Batch_size 有什麼好處?

  1. 記憶體利用率提高了,大矩陣乘法的並行化效率提高
  2. 跑完一次epoch(全資料集)所需要的迭代次數減少,對於相同資料量的處理速度進一步加快。
  3. 在一定範圍內,一般來說Batch_Size 越大,其確定的下降方向越準,引起訓練震盪越小。

盲目增大Batch_size 有什麼壞處?

記憶體利用率提高了,但是記憶體容量可能撐不住了

跑完一次epoch(全資料集)所需要的迭代次數減少,要想達到相同的精度,其所花費的時間大大的增加了,從而對引數的修正也就顯得更加緩慢。

Batch_size 增大到一定程度,其確定的下降方向已經基本不再變化。

5,調節Batch_Size 對訓練效果影響到底如何?

  這裡有一個LeNet 在MNIST 資料集上的效果。MNIST 是一個手寫體標準庫。

  執行結果如上圖所示,其中絕對時間做了標準化處理。執行結果與上文分析相印證:

  1. Batch_Size 太小,演算法在200 epochs 內不收斂。
  2. 隨著Batch_Size 增大,處理相同資料量的速度越快。
  3. 隨著Batch_Size 增大,達到相同精度所需要的epoch 數量越來越多
  4. 由於上述兩種因素的矛盾,Batch_Size 增大到某個時候,達到時間上的最優
  5. 由於最終收斂精度會陷入不同的區域性極值,因此Batch_Size 增大到某些時候,達到最終收斂精度上的最優

 

   此文是自己的學習筆記總結,學習於《TensorFlow深度學習框架》,俗話說,好記性不如爛筆頭,寫寫總是好的,所以若侵權,請聯絡我,謝