1. 程式人生 > >tensorflow之佇列,執行緒,輸入

tensorflow之佇列,執行緒,輸入

本部落格純屬轉載

一、佇列和執行緒 

1、佇列:

  1)、tf.FIFOQueue(capacity, dtypes, name='fifo_queue') 建立一個以先進先出的順序對元素進行排隊的佇列

    引數:

      capacity:整數。可能儲存在此佇列中的元素數量的上限

      dtypes:DType物件列表。長度dtypes必須等於每個佇列元 素中的張量數,dtype的型別形狀,決定了後面進佇列元素形狀

    方法:

      q.dequeue()獲取佇列的資料

      q.enqueue(值)將一個數據新增進佇列

      q.enqueue_many(列表或者元組)將多個數據新增進佇列

      q.size() 返回佇列的大小

  2)、tf.RandomShuffleQueue() 隨機出的佇列

2、佇列管理器

  tf.train.QueueRunner(queue, enqueue_ops=None)

  引數:

    queue:A Queue

    enqueue_ops:新增執行緒的佇列操作列表,[]*2,指定兩個執行緒

    create_threads(sess, coord=None,start=False) 建立執行緒來執行給定會話的入隊操作

    start:布林值,如果True啟動執行緒;如果為False呼叫者 必須呼叫start()啟動執行緒

    coord:執行緒協調器  用於執行緒的管理

3、執行緒協調器

  tf.train.Coordinator() 執行緒協調員,實現一個簡單的機制來協調一 組執行緒的終止

  方法:    返回的是執行緒協調例項

    request_stop()  請求停止

    join(threads=None, stop_grace_period_secs=120) 等待執行緒終止

結合佇列、佇列管理器和執行緒協調器實現非同步的小例:

複製程式碼

import tensorflow as tf

# 1.建立佇列
Q = tf.FIFOQueue(2000, tf.float32)

# 2.新增資料進佇列  
# 2.1建立一個數據(變數)
var = tf.Variable(0.0, tf.float32)
# 2.2資料自增
plus = tf.assign_add(var, 1)
# 2.3將資料新增進佇列
en_q = Q.enqueue(plus)

# 3.建立佇列管理器
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 2)

# 4.變數初始化
init = tf.global_variables_initializer()

# 5.建立會話
with tf.Session() as sess:
    # 6.執行初始化
    sess.run(init)

    # 7.建立執行緒協調器
    coord = tf.train.Coordinator()

    # 8.開啟子執行緒
    threads = qr.create_threads(sess, coord=coord, start=True)

    # 9.主執行緒 從佇列中取資料
    for i in range(200):
        print(sess.run(Q.dequeue()))
        
    # 10.執行緒回收
    coord.request_stop()
    coord.join(threads)

複製程式碼

二、檔案讀取

1、檔案讀取流程

2、檔案讀取API

  1)檔案佇列

  tf.train.string_input_producer(string_tensor, ,shuffle=True) 將輸出字串(例如檔名)輸入到管道佇列

  引數:   

    string_tensor 含有檔名的1階張量

    num_epochs:過幾遍資料,預設無限過資料

    返回:具有輸出字串的佇列

  2)檔案閱讀器(根據檔案格式,選擇對應的檔案閱讀器)

    csv檔案:  class tf.TextLineReader       預設按行讀取      返回:讀取器例項

    二進位制檔案:  tf.FixedLengthRecordReader(record_bytes)           record_bytes:整型,指定每次讀取的位元組數      返回:讀取器例項

    TfRecords檔案:  tf.TFRecordReader     返回:讀取器例項

    以上3個閱讀器有一個相同的方法:

    read(file_queue):從佇列中指定數量內容 返回一個Tensors元組(key, value)  其中key是檔名字,value是預設的內容(行,位元組)

  3)檔案內容解碼器(由於從檔案中讀取的是字串,需要函式去解析這些字串到張量)

    ①tf.decode_csv(records,record_defaults=None,field_delim = None,name = None)   將CSV轉換為張量,與tf.TextLineReader搭配使用

      引數:

        records:tensor型字串,每個字串是csv中的記錄行

        field_delim:預設分割符”,”

        record_defaults:引數決定了所得張量的型別,並設定一個值在輸入字串中缺少使用預設值

    ②tf.decode_raw(bytes,out_type,little_endian = None,name = None)   將位元組轉換為一個數字向量表示,位元組為一字串型別的張量,與函式tf.FixedLengthRecordReader搭配使用,二進位制讀取為uint8格式

  4)開啟執行緒操作

    tf.train.start_queue_runners(sess=None,coord=None) 收集所有圖中的佇列執行緒,並啟動執行緒 sess:所在的會話中 coord:執行緒協調器 return:返回所有執行緒佇列

  5)管道讀端批處理

    ①tf.train.batch(tensors,batch_size,num_threads = 1,capacity = 32,name=None) 讀取指定大小(個數)的張量

     引數:

      tensors:可以是包含張量的列表

      batch_size:從佇列中讀取的批處理大小

      num_threads:進入佇列的執行緒數

      capacity:整數,佇列中元素的最大數量

      返回:tensors

    ②tf.train.shuffle_batch(tensors,batch_size,capacity,min_after_dequeue,    num_threads=1,)  亂序讀取指定大小(個數)的張量

      引數:

        min_after_dequeue:留下佇列裡的張量個數,能夠保持隨機打亂

3、檔案讀取案例

複製程式碼

import tensorflow as tf
import os


def csv_read(filelist):
    # 構建檔案佇列
    Q = tf.train.string_input_producer(filelist)
    # 構建讀取器
    reader = tf.TextLineReader()
    # 讀取佇列
    key, value = reader.read(Q)
    # 構建解碼器
    x1, y = tf.decode_csv(value, record_defaults=[["None"], ["None"]])
    # 進行管道批處理
    x1_batch, y_batch = tf.train.batch([x1, y], batch_size=12, num_threads=1, capacity=12)
    # 開啟會話
    with tf.Session() as sess:
        # 建立執行緒協調器
        coord = tf.train.Coordinator()
        # 開啟執行緒
        threads = tf.train.start_queue_runners(sess, coord=coord)
        # 執行任務
        print(sess.run([x1_batch, y_batch]))
        # 執行緒回收
        coord.request_stop()
        coord.join(threads)


if __name__ == "__main__":
    filename = os.listdir("./data/")   #  檔案目錄自己指定
    filelist = [os.path.join("./data/", file) for file in filename]
    csv_read(filelist)

複製程式碼

三、圖片讀取與儲存

    1   影象數字化三要素:長度,寬度,通道數(一通道 : 灰度值    三通道 : RGB)

    2   縮小圖片大小:

      tf.image.resize_images(images, size) 縮小圖片

      目的:

         1、增加圖片資料的統一性

         2、所有圖片轉換成指定大小

         3、縮小圖片資料量,防止增加開銷

    3  影象讀取API

      1)影象讀取器

        tf.WholeFileReader 將檔案的全部內容作為值輸出的讀取器

          return:讀取器例項 read(file_queue):輸出將是一個檔名(key)和該檔案的內容 (值)

      2)影象解碼器

        tf.image.decode_jpeg(contents) 將JPEG編碼的影象解碼為uint8張量

          return:uint8張量,3-D形狀[height, width, channels]

        tf.image.decode_png(contents) 將PNG編碼的影象解碼為uint8或uint16張量

          return:張量型別,3-D形狀[height, width, channels]

圖片讀取案的簡單demo:

複製程式碼

import tensorflow as tf
import os

flags = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string("data_home", "./data/dog/", "狗的圖片目錄")   # 檔案路徑自己指定

def picread(filelist):
    # 構建檔名佇列
    file_q = tf.train.string_input_producer(filelist)
    # 構建讀取器
    reader = tf.WholeFileReader()
    # 讀取內容
    key, value = reader.read(file_q)
    print(value)
    # 構建解碼器
    image = tf.image.decode_jpeg(value)
    print(image)
    # 統一圖片大小   設定長寬
    resize_image = tf.image.resize_images(image, [256,256])
    print(resize_image)
    # 指定通道大小
    resize_image.set_shape([256,256,3])
    # 構建批量處理管道
    image_batch = tf.train.batch([resize_image], batch_size=100,num_threads=1, capacity=100)

    return image_batch

if __name__ == "__main__":
    filename = os.listdir(flags.data_home)
    filelist = [os.path.join(flags.data_home, file) for file in filename]
    image_batch = picread(filelist)

    with tf.Session() as sess:
        # 構建執行緒協調器
        coord = tf.train.Coordinator()
        # 開啟執行緒
        threads = tf.train.start_queue_runners(sess,coord=coord)
        # 訓練資料
        print(sess.run(image_batch))
        # 回收執行緒
        coord.request_stop()
        coord.join(threads)

複製程式碼

四、TFRecords分析、存取

  1 概念

   TFRecords是Tensorflow設計的一種內建檔案格式,是一種二進位制檔案, 它能更好的利用記憶體,更方便複製和移動 (將二進位制資料和標籤(訓練的類別標籤)資料儲存在同一個檔案中)

  2 TFRecords檔案分析

    1)檔案格式:*.tfrecords

    2)寫入檔案內容:Example協議塊

  3 TFRecords儲存

    1)建立TFRecord儲存器

       tf.python_io.TFRecordWriter(path) 寫入tfrecords檔案

      引數: 

        path: TFRecords檔案的路徑

      return:無,  執行寫檔案操作

      方法:

        write(record):向檔案中寫入一個字串記錄     # 一個序列化的Example,Example.SerializeToString()

        close():關閉檔案寫入器

    2)構造每個樣本的Example協議塊

    tf.train.Example(features=None) 寫入tfrecords檔案

      引數:

        features:tf.train.Features型別的特徵例項

        return:example格式協議塊

    tf.train.Features(feature=None) 構建每個樣本的資訊鍵值對

      引數:

         feature:字典資料,key為要儲存的名字

        value為tf.train.Feature例項

        return:Features型別

    tf.train.Feature(**options)

      引數:

        **options:例如 bytes_list=tf.train. BytesList(value=[Bytes])

               int64_list=tf.train. Int64List(value=[Value])

               float_list = tf.train. FloatList(value=[value])

  4  TFRecords讀取方法

     1)構建檔案佇列

        tf.train.string_input_producer(string_tensor, ,shuffle=True)  

     2)構建檔案讀取器,讀取佇列的資料

        tf.TFRecordReader     返回:讀取器例項

        read(file_queue)

     3)解析TFRecords的example協議記憶體塊

        ①tf.parse_single_example(serialized,features=None,name=None) 解析一個單一的Example原型

          引數:

            serialized:標量字串Tensor,一個序列化的Example

            features:dict字典資料,鍵為讀取的名字,值為FixedLenFeature

            return:一個鍵值對組成的字典,鍵為讀取的名字

        ②tf.FixedLenFeature(shape,dtype)

          引數:

            shape:輸入資料的形狀,一般不指定,為空列表

             dtype:輸入資料型別,與儲存進檔案的型別要一致 型別只能是float32,int64,string

     4)解碼

        tf.decode_raw(bytes,out_type,little_endian = None,name = None)   將位元組轉換為一個數字向量表示,位元組為一字串型別的張量,與函式tf.FixedLengthRecordReader搭配使用,二進位制讀取為uint8格式

以下是從二進位制檔案中讀取資料,寫入tfrecords檔案,再從tfrecords檔案讀取的小案例:

複製程式碼

import tensorflow as tf
import os

flags = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string("data_home", "./data/cifar10/cifar-10-batches-bin/", "二進位制檔案目錄")
tf.app.flags.DEFINE_string("data_tfrecords", "./data/temp/tfrecords", "tfrecords檔案路徑")


class cifarread(object):
    def __init__(self, filelist):
        self.filelist = filelist
        # 構建圖的一些資料
        self.height = 32
        self.width = 32
        self.channel = 3
        self.label_bytes = 1
        self.image_bytes = self.height * self.width*self.channel
        self.bytes = self.label_bytes + self.image_bytes

    def read_decode(self):
        """
        讀取二進位制檔案
        :return: image_batch, label_batch
        """
        # 構建檔名佇列
        file_q = tf.train.string_input_producer(self.filelist)

        # 構建閱讀器
        reader = tf.FixedLengthRecordReader(record_bytes=self.bytes)

        # 讀取資料
        key, value = reader.read(file_q)

        # 解碼
        label_image = tf.decode_raw(value, tf.uint8)

        # 分割資料集
        label = tf.cast(tf.slice(label_image, [0], [self.label_bytes]), tf.int32)
        image = tf.slice(label_image, [self.label_bytes], [self.image_bytes])

        # 改變形狀
        image_tensor = tf.reshape(image, [self.height, self.width, self.channel])

        # 批量處理
        image_batch, label_batch = tf.train.batch([image_tensor, label], batch_size=10, num_threads=1, capacity=10)

        return image_batch, label_batch

    def write2tfrecords(self, image_batch, label_batch):
        """
        將從二進位制檔案中讀取的內容寫入tfrecords檔案
        :param image_batch:
        :param label_batch:
        :return:
        """
        # 構建一個tfrecords檔案儲存器
        writer = tf.python_io.TFRecordWriter(flags.data_tfrecords)
        # 對於每一個樣本,都要構造example寫入
        for i in range(10):
            # 取出特徵值,轉換成字串
            image_string = image_batch[i].eval().tostring()

            # 取出目標值
            label_int = int(label_batch[i].eval()[0])

            example = tf.train.Example(features=tf.train.Features(feature={
                "image":tf.train.Feature(bytes_list = tf.train.BytesList(value=[image_string])),
                "label":tf.train.Feature(int64_list = tf.train.Int64List(value=[label_int]))
            }))
            # 寫入檔案中,要先把協議序列化值之後才能儲存
            writer.write(example.SerializeToString())

        writer.close()
        return None

    def read_tfrecords(self):
        """
        從tfrecords檔案讀取內容
        :return: image_batch, label_batch
        """
        # 構造檔案佇列
        file_q = tf.train.string_input_producer([flags.data_tfrecords])
        # 構造閱讀器,讀取資料
        reader = tf.TFRecordReader()
        # 一次只讀取一個樣本
        key, value = reader.read(file_q)
        # 解析內容 解析example協議
        feature = tf.parse_single_example(value, features={
            "image":tf.FixedLenFeature([], tf.string),
            "label":tf.FixedLenFeature([], tf.int64)
        })

        # 解碼     字串需要解碼, 整形不用
        image = tf.decode_raw(feature["image"], tf.uint8)

        # 設定圖片的形狀,以便批處理
        image_reshape = tf.reshape(image, [self.height, self.width])
        label = tf.cast(feature["label"], tf.int32)

        # 批處理
        image_batch, label_batch = tf.train.batch([image_reshape, label],batch_size=10 ,num_threads=1, capacity=10)

        return image_batch, label_batch

if __name__ == "__main__":
    filename = os.listdir(flags.data_home)
    filelist = [os.path.join(flags.data_home, file) for file in filename if file[-3:] == "bin"]
    cif = cifarread(filelist)
    # 讀取二進位制檔案
    image_batch, label_batch = cif.read_decode()
    # 讀取tfrecords檔案
    # cif.read_tfrecords()

    with tf.Session() as sess:
        # 構建執行緒協調器
        coord = tf.train.Coordinator()
        # 開啟執行緒
        threads = tf.train.start_queue_runners(sess, coord=coord)
        # 執行任務
        print(sess.run([image_batch, label_batch]))
        # 儲存tfrecords檔案
        # cif.write2tfrecords(image_batch, label_batch)
        # 回收執行緒
        coord.request_stop()
        coord.join(threads)

複製程式碼