1. 程式人生 > >TensorFlow 佇列與多執行緒的應用

TensorFlow 佇列與多執行緒的應用

深度學習的模型訓練過程往往需要大量的資料,而將這些資料一次性的讀入和預處理需要大量的時間開銷,所以通常採用佇列與多執行緒的思想解決這個問題,而且TensorFlow為我們提供了完善的函式。

實現佇列

在Python中是沒有提供直接實現佇列的函式的,所以通常會使用列表模擬佇列。
而TensorFlow提供了整套實現佇列的函式和方法,在TensorFlow中,佇列和變數類似,都是計算圖上有狀態的節點。操作佇列的函式主要有:

FIFOQueue():建立一個先入先出(FIFO)的佇列
RandomShuffleQueue():建立一個隨機出隊的佇列
enqueue_many():初始化佇列中的元素
dequeue():出隊
enqueue():入隊

下面是一個例子:

import tensorflow as tf

q = tf.FIFOQueue(3,"int32")
init = q.enqueue_many(([0,1,2],))

x = q.dequeue()
y = x + 1
q_inc = q.enqueue([y]) 

with tf.Session() as sess:
     init.run()
     for a in range(5):
          v,a = sess.run([x,q_inc])
          print(v)

列印結果:
0
1
2
1
2
原理如下圖:
這裡寫圖片描述

多執行緒協同

TensorFlow為我們提供了多執行緒協同操作的類—tf.Coordinator,其函式主要有:
should_stop():確定當前執行緒是否退出
request_stop():通知其他執行緒退出
join():等待所有執行緒終止
假設有五個執行緒同時在工作,每個執行緒自身會先判斷should_stop()的值,當其返回值為True時,則退出當前執行緒;如果為Flase,也繼續該執行緒。此時如果執行緒3發出了request_stop()通知,則其它4個執行緒的should_stop()將全部變為True,然後執行緒4自身的should_stop()也將變為True,則退出了所有執行緒。
下面是一段程式碼:

import tensorflow as tf
import numpy as np
import time
import threading

def MyLoop(coord,worker_id):
     while not coord.should_stop():
          if np.random.rand()<0.09:
               print('stoping from id:',worker_id)
               coord.request_stop()
          else:
               print('working from id:',worker_id)
          time.sleep(1)

coord = tf.train.Coordinator()
#宣告5個執行緒
threads=[threading.Thread(target=MyLoop,args=(coord,i,)) for i in range(5)]
#遍歷五個執行緒
for t in threads:  
     t.start()
coord.join(threads)       

列印結果:
working from id: 0
working from id: 1
working from id: 2
working from id: 3
working from id: 4
stoping from id: 0

在第一輪遍歷過程中,所有程序的should_stop()都為Flase,且隨機數都大於等於0.09,所以依次列印了working from id: 0-5,再重新回到程序0時,出現了小於0.09的隨機數,即程序0發出了request_stop()請求,程序1-4的should_stop()返回值全部為True(程序退出),也就無法進入while,程序0的should_stop()返回值也將為True(退出),五個程序全部退出。

多執行緒操作佇列

前面說到了佇列的操作,多執行緒協同的操作,在多執行緒協同的程式碼中讓每一個執行緒列印自己的id編號,下面我們說下如何用多執行緒操作一個佇列。
TensorFlow提供了佇列tf.QueueRunner類處理多個執行緒操作同一佇列,啟動的執行緒由上面提到的tf.Coordinator類統一管理,常用的操作有:
QueueRunner():啟動執行緒,第一個引數為執行緒需要操作的佇列,第二個引數為對佇列的操作,如enqueue_op,此時的enqueue_op = queue.enqueue()
add_queue_runner():在圖中的一個集合中加‘QueueRunner’,如果沒有指定的合集的話,會被新增到tf.GraphKeys.QUEUE_RUNNERS合集
start_queue_runners():啟動所有被新增到圖中的執行緒

import tensorflow as tf

#建立佇列
queue = tf.FIFOQueue(100,'float')
#入隊
enqueue_op = queue.enqueue(tf.random_normal([1]))
#啟動5個執行緒,執行enqueue_op
qr = tf.train.QueueRunner( queue,[enqueue_op] * 5)
#新增執行緒到圖
tf.train.add_queue_runner(qr)
#出隊
out_tensor = queue.dequeue()

with tf.Session() as sess:
     coord = tf.train.Coordinator()
     threads=tf.train.start_queue_runners(sess=sess,coord=coord)
     for i in range(6):
          print(sess.run(out_tensor)[0])
     coord.request_stop()
     coord.join(threads)

列印結果:
-0.543751
-0.712543
1.32066
0.2471
0.313005
-2.16349