1. 程式人生 > >Tensorflow高階讀寫教程

Tensorflow高階讀寫教程

前言

tensorflow提供了多種讀寫方式,我們最常見的就是使用tf.placeholder()這種方法,使用這個方法需要我們提前處理好資料格式,不過這種處理方法也有缺陷:不便於儲存和不利於分散式處理,因此,TensorFlow提供了一個標準的讀寫格式和儲存協議,不僅如此,TensorFlow也提供了基於多執行緒佇列的讀取方式,高效而簡潔,讀取速度也更快,據一個博主說速度能提高10倍,相當的誘人.【下面的實驗均是在tensorflow1.0的環境下進行】

tensorflow的example解析

example協議

在TensorFlow官方github文件裡面,有個example.proto

的檔案,這個檔案詳細說明了TensorFlow裡面的example協議,下面我將簡要敘述一下。

tensorflow的example包含的是基於key-value對的儲存方法,其中key是一個字串,其對映到的是feature資訊,feature包含三種類型:

  1. BytesList:字串列表
  2. FloatList:浮點數列表
  3. Int64List:64位整數列表

以上三種類型都是列表型別,意味著都能夠進行拓展,但是也是因為這種彈性格式,所以在解析的時候,需要制定解析引數,這個稍後會講。

在TensorFlow中,example是按照行讀的,這個需要時刻記住,比如儲存M×N矩陣,使用ByteList儲存的話,需要M

×N大小的列表,按照每一行的讀取方式存放。

tf.tain.example

官方給了一個example的例子:

An Example for a movie recommendation application:
   features {
     feature {
       key: "age"
       value { float_list {
         value: 29.0
       }}
     }
     feature {
       key: "movie"
       value { bytes_list {
         value
: "The Shawshank Redemption" value: "Fight Club" }} } feature { key: "movie_ratings" value { float_list { value: 9.0 value: 9.7 }} } feature { key: "suggestion" value { bytes_list { value: "Inception" }} }

上面的例子中包含一個features,features裡面包含一些feature,和之前說的一樣,每個feature都是由鍵值對組成的,其key是一個字串,其value是上面提到的三種類型之一。

Example中有幾個一致性規則需要注意:

  1. 如果一個example的feature K 的資料型別是 T,那麼所有其他的所有feature K都應該是這個資料型別
  2. feature K 的value list的item個數可能在不同的example中是不一樣多的,這個取決於你的需求
  3. 如果在一個example中沒有feature k,那麼如果在解析的時候指定一個預設值的話,那麼將會返回一個預設值
  4. 如果一個feature k 不包含任何的value值,那麼將會返回一個空的tensor而不是預設值

tf.train.SequenceExample

sequence_example表示的是一個或者多個sequences,同時還包括上下文context,其中,context表示的是feature_lists的總體特徵,如資料集的長度等,feature_list包含一個key,一個value,value表示的是features集合(feature_lists),同樣,官方原始碼也給出了sequence_example的例子:

//ontext: {
   feature: {
     key  : "locale"
     value: {
       bytes_list: {
         value: [ "pt_BR" ]
       }
     }
   }
   feature: {
     key  : "age"
     value: {
       float_list: {
         value: [ 19.0 ]
       }
     }
   }
   feature: {
     key  : "favorites"
     value: {
       bytes_list: {
         value: [ "Majesty Rose", "Savannah Outen", "One Direction" ]
       }
     }
   }
 }
 feature_lists: {
   feature_list: {
     key  : "movie_ratings"
     value: {
       feature: {
         float_list: {
           value: [ 4.5 ]
         }
       }
       feature: {
         float_list: {
           value: [ 5.0 ]
         }
       }
     }
   }
   feature_list: {
     key  : "movie_names"
     value: {
       feature: {
         bytes_list: {
           value: [ "The Shawshank Redemption" ]
         }
       }
       feature: {
         bytes_list: {
           value: [ "Fight Club" ]
         }
       }
     }
   }
   feature_list: {
     key  : "actors"
     value: {
       feature: {
         bytes_list: {
           value: [ "Tim Robbins", "Morgan Freeman" ]
         }
       }
       feature: {
         bytes_list: {
           value: [ "Brad Pitt", "Edward Norton", "Helena Bonham Carter" ]
         }
       }
     }
   }
 }

一致性的sequence_example遵循以下規則:

  1. context中,所有feature k要保持資料型別一致性
  2. 一些example中的某些feature_lists L可能會丟失,如果在解析的時候允許為空的話,那麼在解析的時候回返回一個空的list
  3. feature_lists可能是空的
  4. 如果一個feature_list是非空的,那麼其裡面的所有feature都必須是一個數據型別
  5. 如果一個feature_list是非空的,那麼對於裡面的feature的長度是不是需要一樣的,這個取決於解析時候的引數

tensorflow 的parse example解析

在官方程式碼*[parsing_ops.py](https://github.com/tensorflow/tensorflow/blob/
master/tensorflow/python/ops/parsing_ops.py)*中有關於parse example的詳細介紹,我在這裡再敘述一下。

tf.parse_example

來看tf.parse_example的方法定義:

def parse_example(serialized, features, name=None, example_names=None)

parse_example是把example解析為詞典型的tensor
引數含義:
serialized:一個batch的序列化的example
features:解析example的規則
name:當前操作的名字
example_name:當前解析example的proto名稱

這裡重點要說的是第二個引數,也就是features,features是把serialized的example中按照鍵值對映到三種tensor: 1,VarlenFeature 2, SparseFeature 3,FixedLenFeature
下面對這三種對映方式做一個簡要的敘述:

VarlenFeature

是按照鍵值把example的value對映到SpareTensor物件,假設我們有如下的serialized資料:

 serialized = [
    features
      { feature { key: "ft" value { float_list { value: [1.0, 2.0] } } } },
    features
      { feature []},
    features
      { feature { key: "ft" value { float_list { value: [3.0] } } }
  ]

使用VarLenFeatures方法:

features={
    "ft":tf.VarLenFeature(tf.float32)
}

那麼我們將得到的是:

{"ft": SparseTensor(indices=[[0, 0], [0, 1], [2, 0]],
                      values=[1.0, 2.0, 3.0],
                      dense_shape=(3, 2)) }

可見,顯示的indices是ft值的索引,values是值,dense_shape是indices的shape

FixedLenFeature

而FixedLenFeature是按照鍵值對將features對映到大小為[serilized.size(),df.shape]的矩陣,這裡的FixLenFeature指的是每個鍵值對應的feature的size是一樣的。對於上面的例子,如果使用:

features: {
      "ft": FixedLenFeature([2], dtype=tf.float32, default_value=-1),
  }

那麼我們將得到:

{"ft": [[1.0, 2.0], [3.0, -1.0]]}

可見返回的值是一個[2,2]的矩陣,如果返回的長度不足給定的長度,那麼將會使用預設值去填充。
【注意:】

事實上,在TensorFlow1.0環境下,根據官方文件上的內容,我們是能夠得到VarLenFeature的值,但是得不到FixLenFeature的值,因此建議如果使用定長的FixedLenFeature,一定要保證對應的資料是等長的。

做個試驗來說明:

#coding=utf-8

import tensorflow as tf
import os
keys=[[1.0],[],[2.0,3.0]]
sess=tf.InteractiveSession()
sess.run(tf.global_variables_initializer())

def make_example(key):
    example = tf.train.Example(features=tf.train.Features(
        feature={
            'ft':tf.train.Feature(float_list=tf.train.FloatList(value=key))
        }
    ))
    return example

filename="tmp.tfrecords"
if os.path.exists(filename):
    os.remove(filename)
writer = tf.python_io.TFRecordWriter(filename)
for key in keys:
    ex = make_example(key)
    writer.write(ex.SerializeToString())
writer.close()

reader = tf.TFRecordReader()
filename_queue = tf.train.string_input_producer(["tmp.tfrecords"],num_epochs=1)
_,serialized_example =reader.read(filename_queue)

# coord = tf.train.Coordinator()
# threads = tf.train.start_queue_runners(sess=sess,coord=coord)

batch = tf.train.batch(tensors=[serialized_example],batch_size=3)

features={
    "ft":tf.VarLenFeature(tf.float32)
}
#key_parsed = tf.parse_single_example(make_example([1,2,3]).SerializeToString(),features)
key_parsed = tf.parse_example(batch,features)
#start the queue
print tf.contrib.learn.run_n(key_parsed)

#[]means scalar

features={
    "ft":tf.FixedLenFeature(shape=[2],dtype=tf.float32)
}

key_parsed = tf.parse_example(batch,features)

print tf.contrib.learn.run_n(key_parsed)

結果返回如下:

[{'ft': SparseTensorValue(indices=array([[0, 0],
       [2, 0],
       [2, 1]]), values=array([ 1.,  2.,  3.], dtype=float32), dense_shape=array([3, 2]))}]

InvalidArgumentError (see above for traceback): Name: <unknown>, Key: ft, Index: 0.  Number of float values != expected.  Values size: 1 but output shape: [2]

可見,對於VarLenFeature,是能返回正常結果的,但是對於FixedLenFeature則返回size不對,可見如果對於邊長的資料還是不要使用FixedLenFeature為好。

如果把資料設定為[[1.0,2.0],[2.0,3.0]],那麼FixedLenFeature返回的是:

[{'ft': array([[ 1.,  2.],
       [ 2.,  3.]], dtype=float32)}]

這是正確的結果。

SparseFeature可以從下面的例子來說明:

`serialized`:
  ```
  [
    features {
      feature { key: "val" value { float_list { value: [ 0.5, -1.0 ] } } }
      feature { key: "ix" value { int64_list { value: [ 3, 20 ] } } }
    },
    features {
      feature { key: "val" value { float_list { value: [ 0.0 ] } } }
      feature { key: "ix" value { int64_list { value: [ 42 ] } } }
    }
  ]
  ```
  And arguments
  ```
  example_names: ["input0", "input1"],
  features: {
      "sparse": SparseFeature(
          index_key="ix", value_key="val", dtype=tf.float32, size=100),
  }
  ```
  Then the output is a dictionary:
  ```python
  {
    "sparse": SparseTensor(
        indices=[[0, 3], [0, 20], [1, 42]],
        values=[0.5, -1.0, 0.0]
        dense_shape=[2, 100]),
  }
  ```

現在明白了Example的協議和tf.parse_example的方法之後,我們再看看看幾個簡單的parse_example

tf.parse_single_example

區別於tf.parse_example,tf.parse_single_example只是少了一個batch而已,其餘的都是一樣的,我們看程式碼:

#coding=utf-8

import tensorflow as tf
import os

sess=tf.InteractiveSession()
sess.run(tf.global_variables_initializer())

def make_example(key):
    example = tf.train.Example(features=tf.train.Features(
        feature={
            'ft':tf.train.Feature(float_list=tf.train.FloatList(value=key))
        }
    ))
    return example

features={
    "ft":tf.FixedLenFeature(shape=[3],dtype=tf.float32)
}

key_parsed = tf.parse_single_example(make_example([1.0,2.0,3.0]).SerializeToString(),features)

print tf.contrib.learn.run_n(key_parsed)

結果返回為:

[{'ft': array([ 1.,  2.,  3.], dtype=float32)}]

tf.parse_single_sequence_example

tf.parse_single_sequence_example對應的是tf.train,SequenceExample,我們以下面程式碼說明,single_sequence_example的用法:

#coding=utf-8

import tensorflow as tf
import os
keys=[[1.0,2.0],[2.0,3.0]]
sess=tf.InteractiveSession()
sess.run(tf.global_variables_initializer())

def make_example(locale,age,score,times):

    example = tf.train.SequenceExample(
        context=tf.train.Features(
            feature={
            "locale":tf.train.Feature(bytes_list=tf.train.BytesList(value=[locale])),
            "age":tf.train.Feature(int64_list=tf.train.Int64List(value=[age]))
        }),
        feature_lists=tf.train.FeatureLists(
            feature_list={
            "movie_rating":tf.train.FeatureList(feature=[tf.train.Feature(float_list=tf.train.FloatList(value=score)) for i in range(times)])
            }
        )
    )
    return example.SerializeToString()

context_features = {
    "locale": tf.FixedLenFeature([],dtype=tf.string),
    "age": tf.FixedLenFeature([],dtype=tf.int64)
}
sequence_features = {
    "movie_rating": tf.FixedLenSequenceFeature([3], dtype=tf.float32,allow_missing=True)
}

context_parsed, sequence_parsed  = tf.parse_single_sequence_example(make_example("china",24,[1.0,3.5,4.0],2),context_features=context_features,sequence_features=sequence_features)

print tf.contrib.learn.run_n(context_parsed)
print tf.contrib.learn.run_n(sequence_parsed)

結果列印為:

[{'locale': 'china', 'age': 24}]

[{'movie_rating': array([[ 1. ,  3.5,  4. ],
       [ 1. ,  3.5,  4. ]], dtype=float32)}]

tf.parse_single_sequence_example的自動補齊

在常用的文字處理方面,由於文字經常是非定長的,因此需要經常補齊操作,例如使用CNN進行文字分類的時候就需要進行padding操作,通常我們把padding的索引設定為0,而且在文字預處理的時候也需要額外的程式碼進行處理,而TensorFlow提供了一個比較好的自動補齊工具,就是在tf.train.batch裡面把引數dynamic_pad設定成True,樣例如下:

#coding=utf-8

import tensorflow as tf
import os
keys=[[1,2],[2]]
sess=tf.InteractiveSession()
sess.run(tf.global_variables_initializer())



def make_example(key):

    example = tf.train.SequenceExample(
        context=tf.train.Features(
            feature={
            "length":tf.train.Feature(int64_list=tf.train.Int64List(value=[len(key)]))
        }),
        feature_lists=tf.train.FeatureLists(
            feature_list={
            "index":tf.train.FeatureList(feature=[tf.train.Feature(int64_list=tf.train.Int64List(value=[key[i]])) for i in range(len(key))])
            }
        )
    )
    return example.SerializeToString()


filename="tmp.tfrecords"
if os.path.exists(filename):
    os.remove(filename)
writer = tf.python_io.TFRecordWriter(filename)
for key in keys:
    ex = make_example(key)
    writer.write(ex)
writer.close()

reader = tf.TFRecordReader()
filename_queue = tf.train.string_input_producer(["tmp.tfrecords"],num_epochs=1)
_,serialized_example =reader.read(filename_queue)

# coord = tf.train.Coordinator()
# threads = tf.train.start_queue_runners(sess=sess,coord=coord)

context_features={
    "length":tf.FixedLenFeature([],dtype=tf.int64)
}
sequence_features={
    "index":tf.FixedLenSequenceFeature([],dtype=tf.int64)
}

context_parsed, sequence_parsed = tf.parse_single_sequence_example(
    serialized=serialized_example,
    context_features=context_features,
    sequence_features=sequence_features
)

batch_data = tf.train.batch(tensors=[sequence_parsed['index']],batch_size=2,dynamic_pad=True)
result = tf.contrib.learn.run_n({"index":batch_data})

print result

列印結果如下:

[{'index': array([[1, 2],
       [2, 0]])}]

可見還是比較好用的功能

tensorflow的TFRecords讀取

在上面的部分,我們展示了關於tensorflow的example的用法和解析過程,那麼我們該如何使用它們呢?其實在上面的幾段程式碼裡面也有體現,就是TFRecords進行讀寫,TFRecords讀寫其實很簡單,tensorflow提供了兩個方法:

  1. tf.TFRecordReader
  2. tf.TFRecordWriter

首先我們看下第二個,也就是tf.TFRecordWritre,之所以先看第二個的原因是第一個Reader將和batch一起在下一節講述。
關於TFRecordWriter,可以用下面程式碼說明,假設serilized_object是一個已經序列化好的example,那麼其寫的過程如下:

writer = tf.python_io.TFRecordWriter(filename)
writer.write(serilized_object)
writer.close()

tensorflow的多執行緒batch讀取

這一節主要關注的是基於TFRecords的讀取的方法和batch操作,我們可以回看一下之前的部落格的batch操作:

Batching

def read_my_file_format(filename_queue):
  reader = tf.SomeReader()
  key, record_string = reader.read(filename_queue)
  example, label = tf.some_decoder(record_string)
  processed_example = some_processing(example)
  return processed_example, label

def input_pipeline(filenames, batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer(
      filenames, num_epochs=num_epochs, shuffle=True)
  example, label = read_my_file_format(filename_queue)
  # min_after_dequeue defines how big a buffer we will randomly sample
  #   from -- bigger means better shuffling but slower start up and more
  #   memory used.
  # capacity must be larger than min_after_dequeue and the amount larger
  #   determines the maximum we will prefetch.  Recommendation:
  #   min_after_dequeue + (num_threads + a small safety margin) * batch_size
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

這裡我們把tf.SomeReader()換成tf.TFRecordReader()即可,然後再把tf.some_decoder換成我們自定義的decoder,當然在decoder裡面我們可以自己指定parser(也就是上文提到的內容),然後我們使用tf.train.batch或者tf.train.shuffle_batch等操作獲取到我們需要送入網路訓練的batch引數即可。

多執行緒讀取batch例項

我使用了softmax迴歸做一個簡單的示例,下面是一個多執行緒讀取batch的例項主要程式碼:

#coding=utf-8
"""
author:luchi
date:24/4/2017
desc:training logistic regression
"""
import tensorflow as tf
from model import Logistic

def read_my_file_format(filename_queue):
    reader = tf.TFRecordReader()
    _,serilized_example = reader.read(filename_queue)

    #parsing example
    features = tf.parse_single_example(serilized_example,
        features={
            "data":tf.FixedLenFeature([2],tf.float32),
            "label":tf.FixedLenFeature([],tf.int64)
        }

    )

    #decode from raw data,there indeed do not to change ,but to show common step , i write a case here

    # data = tf.cast(features['data'],tf.float32)
    # label = tf.cast(features['label'],tf.int64)

    return features['data'],features['label']


def input_pipeline(filenames, batch_size, num_epochs=100):


    filename_queue = tf.train.string_input_producer([filenames],num_epochs=num_epochs)
    data,label=read_my_file_format(filename_queue)

    datas,labels = tf.train.shuffle_batch([data,label],batch_size=batch_size,num_threads=5,
                                          capacity=1000+3*batch_size,min_after_dequeue=1000)
    return datas,labels

class config():
    data_dim=2
    label_num=2
    learining_rate=0.1
    init_scale=0.01

def run_training():

    with tf.Graph().as_default(), tf.Session() as sess:

        datas,labels = input_pipeline("reg.tfrecords",32)

        c = config()
        initializer = tf.random_uniform_initializer(-1*c.init_scale,1*c.init_scale)

        with tf.variable_scope("model",initializer=initializer):
            model = Logistic(config=c,data=datas,label=labels)

        fetches = [model.train_op,model.accuracy,model.loss]
        feed_dict={}

        #init
        init_op = tf.group(tf.global_variables_initializer(),
                       tf.local_variables_initializer())
        sess.run(init_op)

        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess,coord=coord)
        try:
            while not coord.should_stop():

                # fetches = [model.train_op,model.accuracy,model.loss]
                # feed_dict={}
                # feed_dict[model.data]=sess.run(datas)
                # feed_dict[model.label]=sess.run(labels)
                # _,accuracy,loss= sess.run(fetches,feed_dict)
                _,accuracy,loss= sess.run(fetches,feed_dict)
                print("the loss is %f and the accuracy is %f"%(loss,accuracy))
        except tf.errors.OutOfRangeError:
            print("done training")
        finally:
            coord.request_stop()
        coord.join(threads)
        sess.close()

def main():
    run_training()

if __name__=='__main__':
    main()

這裡有幾個坑需要說明一下:

  1. 使用了string_input_producer指定num_epochs之後,在初始化的時候需要使用:
init_op = tf.group(tf.global_variables_initializer(),
                       tf.local_variables_initializer())
sess.run(init_op)

要不然會報錯
2. 使用了從檔案讀取batch之後,就不需要設定tf.placeholder了【非常重要】,我在這個坑裡呆了好久,如果使用了tf.placeholder一是會報錯為tensor物件能送入到tf.placeholder中,另外一個是就算使用sess.run(batch_data),也會存在模型不能收斂的問題,所以切記切記

結果顯示如下:

the loss is 0.156685 and the accuracy is 0.937500
the loss is 0.185438 and the accuracy is 0.968750
the loss is 0.092628 and the accuracy is 0.968750
the loss is 0.059271 and the accuracy is 1.000000
the loss is 0.088685 and the accuracy is 0.968750
the loss is 0.271341 and the accuracy is 0.968750
the loss is 0.244190 and the accuracy is 0.968750
the loss is 0.136841 and the accuracy is 0.968750
the loss is 0.115607 and the accuracy is 0.937500
the loss is 0.080254 and the accuracy is 1.000000

後記

關於tfrecords的batch操作,我一共折騰了好幾天,中間也是磕磕絆絆,走過各種坑,現在也終於算是明白了一些,也算是沒白費功夫,不過使用這種特性之後,一是提高了速度,而是減少了寫程式碼的量,當然因為資料成了序列化的,也容易檢查資料是否有誤,最後,希望明天面試順利吧,2017年4月24,於北京