1. 程式人生 > >tensorflow 程式掛起的原因,即整個程序不報錯又不執行的原因

tensorflow 程式掛起的原因,即整個程序不報錯又不執行的原因

一、說明:在使用tensorflow的過程中,出現過程式不報錯又不接下去執行的錯誤,後來分析了原因是tf的資料執行緒沒有啟動,導致資料流圖沒辦法計算,整個程式就卡在哪裡。

更深層次的原因是tensorflow的計算和資料讀入是非同步的,合理的方式是主執行緒進行模型的訓練,然後開一個數據讀入執行緒非同步讀入資料.tensorflow會在記憶體中維護一個佇列,然後資料執行緒非同步從磁碟中將樣本推入隊列當中。並且,因為tensorflow的訓練和讀資料是非同步的,故即使當前沒有資料進來,tensorflow也沒辦法報錯,因為可能接下來會有資料進佇列,所以,tensorflow就一直處於等待的狀態

說明:我是在修改Tensorflow的原始碼ptb_word_lm.py的時候遇到上述的問題的。下面就該原始碼來解釋說明這個問題:

tensorflow的reader.py檔案:

"""Utilities for parsing PTB text files."""
#-*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import os

import tensorflow as tf

#將檔案中所有的word收集起來
def _read_words(filename):
  with tf.gfile.GFile(filename, "r") as f:
    return f.read().decode("utf-8").replace("\n", "<eos>").split()

#將收集到的word對映到id
def _build_vocab(filename):
  data = _read_words(filename)

  counter = collections.Counter(data)
  count_pairs = sorted(counter.items(), key=lambda x: (-x[1], x[0]))

  words, _ = list(zip(*count_pairs))
  word_to_id = dict(zip(words, range(len(words))))

  return word_to_id

#使用訓練集的word建立word的對映表
#
def _file_to_word_ids(filename, word_to_id):
  data = _read_words(filename)
  return [word_to_id[word] for word in data if word in word_to_id]


def ptb_raw_data(data_path=None):
  """Load PTB raw data from data directory "data_path".

  Reads PTB text files, converts strings to integer ids,
  and performs mini-batching of the inputs.

  The PTB dataset comes from Tomas Mikolov's webpage:

  http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz

  Args:
    data_path: string path to the directory where simple-examples.tgz has
      been extracted.

  Returns:
    tuple (train_data, valid_data, test_data, vocabulary)
    where each of the data objects can be passed to PTBIterator.
  """

  train_path = os.path.join(data_path, "ptb.train.txt")
  valid_path = os.path.join(data_path, "ptb.valid.txt")
  test_path = os.path.join(data_path, "ptb.test.txt")

  word_to_id = _build_vocab(train_path)
  train_data = _file_to_word_ids(train_path, word_to_id)
  valid_data = _file_to_word_ids(valid_path, word_to_id)
  test_data = _file_to_word_ids(test_path, word_to_id)
  vocabulary = len(word_to_id)
  return train_data, valid_data, test_data, vocabulary


def ptb_producer(raw_data, batch_size, num_steps, name=None):
  """Iterate on the raw PTB data.

  This chunks up raw_data into batches of examples and returns Tensors that
  are drawn from these batches.

  Args:
    raw_data: one of the raw data outputs from ptb_raw_data.
    batch_size: int, the batch size.
    num_steps: int, the number of unrolls.
    name: the name of this operation (optional).

  Returns:
    A pair of Tensors, each shaped [batch_size, num_steps]. The second element
    of the tuple is the same data time-shifted to the right by one.

  Raises:
    tf.errors.InvalidArgumentError: if batch_size or num_steps are too high.
  """
  with tf.name_scope(name, "PTBProducer", [raw_data, batch_size, num_steps]):
    raw_data = tf.convert_to_tensor(raw_data, name="raw_data", dtype=tf.int32)

    data_len = tf.size(raw_data)
    batch_len = data_len // batch_size
    data = tf.reshape(raw_data[0 : batch_size * batch_len],
                      [batch_size, batch_len])

    epoch_size = (batch_len - 1) // num_steps
    assertion = tf.assert_positive(
        epoch_size,
        message="epoch_size == 0, decrease batch_size or num_steps")
    with tf.control_dependencies([assertion]):
      epoch_size = tf.identity(epoch_size, name="epoch_size")

    i = tf.train.range_input_producer(epoch_size, shuffle=False).dequeue()
    x = tf.slice(data, [0, i * num_steps], [batch_size, num_steps])
    y = tf.slice(data, [0, i * num_steps + 1], [batch_size, num_steps])
    return x, y

說明:詳解這個reader.py檔案:

1、產生一個佇列,裡面的數是0到epoch_size-1.然後定義了一個出隊操作,說明佇列也是資料流圖中的一個結點.使用了range_input_producer之後,會自動產生一個QueueRunner. QueueRunner for the Queue is added to the current Graph'sQUEUE_RUNNER collection.

i = tf.train.range_input_producer(epoch_size, shuffle=False).dequeue()
2、定義了切片操作,返回訓練樣本的x和y
    x = tf.slice(data, [0, i * num_steps], [batch_size, num_steps])
    y = tf.slice(data, [0, i * num_steps + 1], [batch_size, num_steps])
3、具體使用說明:

    在使用的過程中,只要每次迭代的時候,我們取一下x,y。那麼,就會觸發跟x,y相關聯的操作,也即出隊操作和切片操作,為我們生成資料.但是,通過佇列的方式來讀入資料都是一種多執行緒讀入資料的方式,要在session當中將該執行緒開啟,不然就會掛起。

二、分析錯誤的情況&相應的修改辦法

1、錯誤的情況

#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf

from tensorflow.models.rnn.ptb import reader

class PTBInput(object):
  """The input data."""
  def __init__(self, config, data, name=None):
    self.batch_size = batch_size = config.batch_size
    self.num_steps = num_steps = config.num_steps
    #為何要進行-1操作
    self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
    self.input_data, self.targets = reader.ptb_producer(
        data, batch_size, num_steps, name=name)

class SmallConfig(object):
  """Small config."""
  init_scale = 0.1
  learning_rate = 1.0
  max_grad_norm = 5
  num_layers = 2
  num_steps = 20
  hidden_size = 200
  max_epoch = 4
  max_max_epoch = 13
  keep_prob = 1.0
  lr_decay = 0.5
  batch_size = 20
  vocab_size = 10000

if __name__ == '__main__':
	config = SmallConfig()
        data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'       
	raw_data = reader.ptb_raw_data(data_path)
	train_data, valid_data, test_data, _ = raw_data
	train_input = PTBInput(config=config, data=train_data, name="TrainInput")
        print "end--------------------------------"
        
	#wrong,使用session就會出現讀不出資料的錯誤,讀不出資料,整個資料流圖就無法計算,整個程式就處於掛起的狀態
	#使用session會出錯
	with tf.Session() as sess:
		for step in range(1):
			print sess.run(train_input.input_data)	
說明:在Session當中,沒有啟動資料讀入執行緒。所以,sess.run(train_input.input_data)就是無資料可取,程式就處於一種掛起的狀態。

2、解決方案

#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf

from tensorflow.models.rnn.ptb import reader

class PTBInput(object):
  """The input data."""
  def __init__(self, config, data, name=None):
    self.batch_size = batch_size = config.batch_size
    self.num_steps = num_steps = config.num_steps
    #為何要進行-1操作
    self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
    self.input_data, self.targets = reader.ptb_producer(
        data, batch_size, num_steps, name=name)

class SmallConfig(object):
  """Small config."""
  init_scale = 0.1
  learning_rate = 1.0
  max_grad_norm = 5
  num_layers = 2
  num_steps = 20
  hidden_size = 200
  max_epoch = 4
  max_max_epoch = 13
  keep_prob = 1.0
  lr_decay = 0.5
  batch_size = 20
  vocab_size = 10000

if __name__ == '__main__':
	config = SmallConfig()
        data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'       
	raw_data = reader.ptb_raw_data(data_path)
	train_data, valid_data, test_data, _ = raw_data
	train_input = PTBInput(config=config, data=train_data, name="TrainInput")
        print "end--------------------------------"
        

	#right,使用Supervisor()
	#sv = tf.train.Supervisor()
        #with sv.managed_session() as sess:
	#	for step in range(1):
	#		print sess.run(train_input.input_data)	
        
	#right
	# Create a session for running operations in the Graph.
	sess = tf.Session()
	# Start input enqueue threads.
	coord = tf.train.Coordinator()
	threads = tf.train.start_queue_runners(sess=sess, coord=coord)
	# Run training steps or whatever
	try:
		for step in range(2):
			print sess.run(train_input.input_data)
	except Exception,e:
		#Report exceptions to the coordinator
		coord.request_stop(e)
	coord.request_stop()
	# Terminate as usual.  It is innocuous to request stop twice.
	coord.join(threads)
	sess.close()

說明:使用tf.train.range_input_producer(epoch_size, shuffle=False),會預設將QueueRunner新增到全域性圖中,我們必須使用tf.train.start_queue_runners(sess=sess),去啟動該執行緒。然後使用coord = tf.train.Coordinator()去做一些執行緒的同步工作。

3、解決方案:

#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf

from tensorflow.models.rnn.ptb import reader

class PTBInput(object):
  """The input data."""
  def __init__(self, config, data, name=None):
    self.batch_size = batch_size = config.batch_size
    self.num_steps = num_steps = config.num_steps
    #為何要進行-1操作
    self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
    self.input_data, self.targets = reader.ptb_producer(
        data, batch_size, num_steps, name=name)

class SmallConfig(object):
  """Small config."""
  init_scale = 0.1
  learning_rate = 1.0
  max_grad_norm = 5
  num_layers = 2
  num_steps = 20
  hidden_size = 200
  max_epoch = 4
  max_max_epoch = 13
  keep_prob = 1.0
  lr_decay = 0.5
  batch_size = 20
  vocab_size = 10000

if __name__ == '__main__':
	config = SmallConfig()
        data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'       
	raw_data = reader.ptb_raw_data(data_path)
	train_data, valid_data, test_data, _ = raw_data
	train_input = PTBInput(config=config, data=train_data, name="TrainInput")
        print "end--------------------------------"
        

	#right,使用Supervisor()
	sv = tf.train.Supervisor()
        with sv.managed_session() as sess:
		for step in range(1):
			print sess.run(train_input.input_data)

說明:使用sv = tf.train.Supervisor()會比較方便,文件上說,The Supervisor is a small wrapper around a Coordinator, a Saver, and a SessionManager

也即使用了Supervisor(),那麼儲存模型,執行緒同步的事情都不用我們去幹涉了。