1. 程式人生 > >用Tensorflow來預處理Imagenet資料

用Tensorflow來預處理Imagenet資料

最近想以Imagenet 2012影象分類大賽的資料來進行訓練和測試,看看如何能利用這麼大量的影象資料來完善卷積神經網路模型。之前做的基於Cifar10的資料量還是大小了,類別也不夠多。Imagenet的資料總共有146G,共包含了1000個類別的影象,總共120萬張圖片。Tensorlfow的官方模型庫中的很多模型也是以Imagenet來訓練的。

首先要去Imagenet的官網下載資料集,在下載前要用字尾為edu的郵箱註冊並審批通過之後才能獲取下載地址。不過也可以在網上搜索一下直接查下載地址即可,不用註冊。

資料下載之後,是一個大的Tar包,裡面包含了1000個影象類別的Tar包。把他們全部解壓之後,就可以編寫程式來處理了。

我的思路是,先把這1000個資料夾的所有檔案的檔名都讀取出來,並構建一個字典來表示資料夾名稱和影象類別的對應關係,把這些檔名和類別儲存在一個CSV檔案中。程式程式碼如下:

#Imagenet圖片都儲存在/data目錄下,裡面有1000個子目錄,獲取這些子目錄的名字
classes = os.listdir('data/')

#構建一個字典,Key是目錄名,value是類名0-999
labels_dict = {}
for i in range(len(classes)):
    labels_dict[classes[i]]=i

#構建一個列表,裡面的每個元素是圖片檔名+類名
images_labels_list = []
for i in range(len(classes)):
    path = 'data/'+classes[i]+'/'
    images_files = os.listdir(path)
    label = str(labels_dict[classes[i]])
    for image_file in images_files:
        images_labels_list.append(path+image_file+','+label+'\n')

#把列表進行隨機排序,然後取其中80%的資料作為訓練集,10%作為驗證集,10%作為測試集
random.shuffle(images_labels_list)
num = len(images_labels_list)
with open('imagenet_train.csv', 'w') as file:
    file.writelines(images_labels_list[:int(num*0.8)])
with open('imagenet_valid.csv', 'w') as file:
    file.writelines(images_labels_list[int(num*0.8):int(num*0.9)])
with open('imagenet_test.csv', 'w') as file:
    file.writelines(images_labels_list[int(num*0.9):])

程式執行後,我們有3個CSV檔案,分別對應訓練集,驗證集和測試集。CSV檔案有兩列,一列是圖片的檔名,另一列是類名。

之後我們就可以在Tensorflow裡面構建輸入資料了。有兩種方法,一種是直接讀取CSV的資料來構建Dataset,然後對每一條資料用Map函式來進行影象的解碼。這種方法最簡單,但是在實際訓練中,因為涉及到大量小檔案的讀取和解碼,效率並不太高。程式程式碼如下:

#定義對Dataset每條資料進行處理的map函式
def _parse_function(filename, label):
    image_string = tf.read_file(filename)
    image_decoded = tf.image.decode_jpeg(image_string, channels=3)
    image_height = tf.shape(image_decoded)[0]
    image_width = tf.shape(image_decoded)[1]
    #按照RESNET論文的訓練影象的處理方式,對圖片的短邊隨機縮放到256-481之間的數值,然後在隨機
    #剪下224×224大小的圖片。
    random_s = tf.random_uniform([1], minval=256, maxval=481, dtype=tf.int32)[0]
    resized_height, resized_width = tf.cond(image_height<image_width, 
                lambda: (random_s, tf.cast(tf.multiply(tf.cast(image_width, tf.float64),tf.divide(random_s,image_height)), tf.int32)), 
                lambda: (tf.cast(tf.multiply(tf.cast(image_height, tf.float64),tf.divide(random_s,image_width)), tf.int32), random_s))
    image_float = tf.image.convert_image_dtype(image_decoded, tf.float32)
    image_resized = tf.image.resize_images(image_float, [resized_height, resized_width])
    image_flipped = tf.image.random_flip_left_right(image_resized)
    image_cropped = tf.random_crop(image_flipped, [imageCropHeight, imageCropWidth, imageDepth])
    image_distorted = tf.image.random_brightness(image_cropped, max_delta=63)
    image_distorted = tf.image.random_contrast(image_distorted, lower=0.2, upper=1.8)
    image_distorted = tf.image.per_image_standardization(image_distorted)
    image_distorted = tf.transpose(image_distorted, perm=[2, 0, 1])
    return image_distorted, label

#構建Dataset
with tf.device('/cpu:0'):
    filename_train = ["imagenet_train.csv"]
    filename_valid = ["imagenet_valid.csv"]
    #filename_test = ["imagenet_test.csv"]
    record_defaults = [tf.string, tf.int32]
    dataset_train = tf.contrib.data.CsvDataset(filename_train, record_defaults)
    dataset_valid = tf.contrib.data.CsvDataset(filename_valid, record_defaults)
    #dataset_test = tf.contrib.data.CsvDataset(filename_test, record_defaults)
    dataset_train = dataset_train.map(_parse_function, num_parallel_calls=4)
    dataset_valid = dataset_valid.map(_parse_test_function, num_parallel_calls=2)
    #dataset_test = dataset_test.map(_parse_function, num_parallel_calls=2)

    dataset_train = dataset_train.repeat(10)
    
    dataset_train = dataset_train.batch(batch_size)
    dataset_train = dataset_train.prefetch(batch_size)
    dataset_valid = dataset_valid.batch(batch_size)
    #dataset_test = dataset_test.batch(batch_size)

    iterator = tf.data.Iterator.from_structure(dataset_train.output_types, dataset_train.output_shapes)
    next_images, next_labels = iterator.get_next()
    train_init_op = iterator.make_initializer(dataset_train)
    valid_init_op = iterator.make_initializer(dataset_valid)
    #test_init_op = iterator.make_initializer(dataset_test)

第二種方法是把這些小圖片轉換為TFRECORD格式,大概每1000張小圖片合併為一個TFRECORD檔案,這樣可以減少頻繁讀取小檔案帶來的開銷,提高之後訓練的速度。另外,為了提高效率,可以開啟多個程序來同時進行TFRECORD檔案的生成。我的CPU有4個核心,因此起了4個程序來同時處理。程式碼如下:

import tensorflow as tf
from multiprocessing import Process, Queue

#把影象資料和標籤轉換為TRRECORD的格式
def make_example(image, label):
    return tf.train.Example(features=tf.train.Features(feature={
        'image' : tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
        'label' : tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))
    }))

#這個函式用來生成TFRECORD檔案,第一個引數是列表,每個元素是圖片檔名加類名,第二個引數是佇列名稱,用於和父程序傳送訊息
def gen_tfrecord(trainrecords, queue):
    tfrecords_file_num = 1
    file_num = 0
    total_num = len(trainrecords)
    writer = tf.python_io.TFRecordWriter("tfdata/"+str(os.getpid())+"_"+str(tfrecords_file_num)+".tfrecord")
    pid = os.getpid()

    for record in trainrecords:
        file_num += 1
        fields = record.strip('\n').split(',')
        with open(fields[0], 'rb') as jpgfile:
            img = jpgfile.read()
        label = np.array(int(fields[1]))
        ex = make_example(img, label)
        writer.write(ex.SerializeToString())
        #每寫入100條記錄,向父程序傳送訊息,報告進度
        if file_num%100==0:
            queue.put((pid, file_num))
        if file_num%max_num==0 and file_num<total_num:
            writer.close()
            tfrecords_file_num += 1
            writer = tf.python_io.TFRecordWriter("tfdata/"+str(os.getpid())+"_"+str(tfrecords_file_num)+".tfrecord")
    writer.close()

max_num = 1000  #max record number in one file
tfrecords_file_num = 1 

#讀取之前生成的訓練集的圖片檔名和類名的CSV檔案
with open('imagenet_train.csv', 'r') as trainfile:
    trainrecords = trainfile.readlines()
total_files_num = len(trainrecords)

#CPU有4個核心,因此每個核心處理1/4的資料,把trainrecords列表拆分為4份
each_process_files_num = int(total_files_num/4.0)
list1 = trainrecords[:each_process_files_num]
list2 = trainrecords[each_process_files_num:2*each_process_files_num]
list3 = trainrecords[2*each_process_files_num:3*each_process_files_num]
list4 = trainrecords[3*each_process_files_num:]

#設定4個佇列,和4個子程序
q1 = Queue()
q2 = Queue()
q3 = Queue()
q4 = Queue()
p1=Process(target=gen_tfrecord, args=(list1,q1,))
p2=Process(target=gen_tfrecord, args=(list2,q2,))
p3=Process(target=gen_tfrecord, args=(list3,q3,))
p4=Process(target=gen_tfrecord, args=(list4,q4,))
p_list=[p1, p2, p3, p4]
_ = map(Process.start, p_list)

#父程序迴圈查詢佇列的訊息,並且每10秒更新一次
progress_str = 'PID:%i Processing:%i/%i | PID:%i Processing:%i/%i | PID:%i Processing:%i/%i | PID:%i Processing:%i/%i \r'
while(True):
    try:
        msg1 = q1.get()
        msg2 = q2.get()
        msg3 = q3.get()
        msg4 = q4.get()
        print progress_str % (msg1[0],msg1[1],len(list1),msg2[0],msg2[1],len(list2),msg3[0],msg3[1],len(list3),msg4[0],msg4[1],len(list4)),
        time.sleep(10)
    except:
        break

#構建Dataset
with tf.device('/cpu:0'):
    train_files_names = os.listdir('tfdata/')
    train_files = ['tfdata/'+item for item in train_files_names]
    #train_files = ['testrecords.tfrecord']
    dataset_train = tf.data.TFRecordDataset(train_files)
    dataset_train = dataset_train.map(_parse_function, num_parallel_calls=4)
    dataset_train = dataset_train.repeat(10)
    dataset_train = dataset_train.batch(batch_size)
    dataset_train = dataset_train.prefetch(batch_size)
    iterator = tf.data.Iterator.from_structure(dataset_train.output_types, dataset_train.output_shapes)
    next_images, next_labels = iterator.get_next()
    train_init_op = iterator.make_initializer(dataset_train)

2種方法的效能差異還是很大的。在我的Intel I3 CPU+1070Ti 8G+16G RAM的配置下,Batch Size是64, 採用方法一,每個Batch的訓練時間是80秒,方法二的訓練時間是40秒,因此方法二對效能的提高還是很大的。