用Tensorflow來預處理Imagenet資料
最近想以Imagenet 2012影象分類大賽的資料來進行訓練和測試,看看如何能利用這麼大量的影象資料來完善卷積神經網路模型。之前做的基於Cifar10的資料量還是大小了,類別也不夠多。Imagenet的資料總共有146G,共包含了1000個類別的影象,總共120萬張圖片。Tensorlfow的官方模型庫中的很多模型也是以Imagenet來訓練的。
首先要去Imagenet的官網下載資料集,在下載前要用字尾為edu的郵箱註冊並審批通過之後才能獲取下載地址。不過也可以在網上搜索一下直接查下載地址即可,不用註冊。
資料下載之後,是一個大的Tar包,裡面包含了1000個影象類別的Tar包。把他們全部解壓之後,就可以編寫程式來處理了。
我的思路是,先把這1000個資料夾的所有檔案的檔名都讀取出來,並構建一個字典來表示資料夾名稱和影象類別的對應關係,把這些檔名和類別儲存在一個CSV檔案中。程式程式碼如下:
#Imagenet圖片都儲存在/data目錄下,裡面有1000個子目錄,獲取這些子目錄的名字 classes = os.listdir('data/') #構建一個字典,Key是目錄名,value是類名0-999 labels_dict = {} for i in range(len(classes)): labels_dict[classes[i]]=i #構建一個列表,裡面的每個元素是圖片檔名+類名 images_labels_list = [] for i in range(len(classes)): path = 'data/'+classes[i]+'/' images_files = os.listdir(path) label = str(labels_dict[classes[i]]) for image_file in images_files: images_labels_list.append(path+image_file+','+label+'\n') #把列表進行隨機排序,然後取其中80%的資料作為訓練集,10%作為驗證集,10%作為測試集 random.shuffle(images_labels_list) num = len(images_labels_list) with open('imagenet_train.csv', 'w') as file: file.writelines(images_labels_list[:int(num*0.8)]) with open('imagenet_valid.csv', 'w') as file: file.writelines(images_labels_list[int(num*0.8):int(num*0.9)]) with open('imagenet_test.csv', 'w') as file: file.writelines(images_labels_list[int(num*0.9):])
程式執行後,我們有3個CSV檔案,分別對應訓練集,驗證集和測試集。CSV檔案有兩列,一列是圖片的檔名,另一列是類名。
之後我們就可以在Tensorflow裡面構建輸入資料了。有兩種方法,一種是直接讀取CSV的資料來構建Dataset,然後對每一條資料用Map函式來進行影象的解碼。這種方法最簡單,但是在實際訓練中,因為涉及到大量小檔案的讀取和解碼,效率並不太高。程式程式碼如下:
#定義對Dataset每條資料進行處理的map函式 def _parse_function(filename, label): image_string = tf.read_file(filename) image_decoded = tf.image.decode_jpeg(image_string, channels=3) image_height = tf.shape(image_decoded)[0] image_width = tf.shape(image_decoded)[1] #按照RESNET論文的訓練影象的處理方式,對圖片的短邊隨機縮放到256-481之間的數值,然後在隨機 #剪下224×224大小的圖片。 random_s = tf.random_uniform([1], minval=256, maxval=481, dtype=tf.int32)[0] resized_height, resized_width = tf.cond(image_height<image_width, lambda: (random_s, tf.cast(tf.multiply(tf.cast(image_width, tf.float64),tf.divide(random_s,image_height)), tf.int32)), lambda: (tf.cast(tf.multiply(tf.cast(image_height, tf.float64),tf.divide(random_s,image_width)), tf.int32), random_s)) image_float = tf.image.convert_image_dtype(image_decoded, tf.float32) image_resized = tf.image.resize_images(image_float, [resized_height, resized_width]) image_flipped = tf.image.random_flip_left_right(image_resized) image_cropped = tf.random_crop(image_flipped, [imageCropHeight, imageCropWidth, imageDepth]) image_distorted = tf.image.random_brightness(image_cropped, max_delta=63) image_distorted = tf.image.random_contrast(image_distorted, lower=0.2, upper=1.8) image_distorted = tf.image.per_image_standardization(image_distorted) image_distorted = tf.transpose(image_distorted, perm=[2, 0, 1]) return image_distorted, label #構建Dataset with tf.device('/cpu:0'): filename_train = ["imagenet_train.csv"] filename_valid = ["imagenet_valid.csv"] #filename_test = ["imagenet_test.csv"] record_defaults = [tf.string, tf.int32] dataset_train = tf.contrib.data.CsvDataset(filename_train, record_defaults) dataset_valid = tf.contrib.data.CsvDataset(filename_valid, record_defaults) #dataset_test = tf.contrib.data.CsvDataset(filename_test, record_defaults) dataset_train = dataset_train.map(_parse_function, num_parallel_calls=4) dataset_valid = dataset_valid.map(_parse_test_function, num_parallel_calls=2) #dataset_test = dataset_test.map(_parse_function, num_parallel_calls=2) dataset_train = dataset_train.repeat(10) dataset_train = dataset_train.batch(batch_size) dataset_train = dataset_train.prefetch(batch_size) dataset_valid = dataset_valid.batch(batch_size) #dataset_test = dataset_test.batch(batch_size) iterator = tf.data.Iterator.from_structure(dataset_train.output_types, dataset_train.output_shapes) next_images, next_labels = iterator.get_next() train_init_op = iterator.make_initializer(dataset_train) valid_init_op = iterator.make_initializer(dataset_valid) #test_init_op = iterator.make_initializer(dataset_test)
第二種方法是把這些小圖片轉換為TFRECORD格式,大概每1000張小圖片合併為一個TFRECORD檔案,這樣可以減少頻繁讀取小檔案帶來的開銷,提高之後訓練的速度。另外,為了提高效率,可以開啟多個程序來同時進行TFRECORD檔案的生成。我的CPU有4個核心,因此起了4個程序來同時處理。程式碼如下:
import tensorflow as tf
from multiprocessing import Process, Queue
#把影象資料和標籤轉換為TRRECORD的格式
def make_example(image, label):
return tf.train.Example(features=tf.train.Features(feature={
'image' : tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
'label' : tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))
}))
#這個函式用來生成TFRECORD檔案,第一個引數是列表,每個元素是圖片檔名加類名,第二個引數是佇列名稱,用於和父程序傳送訊息
def gen_tfrecord(trainrecords, queue):
tfrecords_file_num = 1
file_num = 0
total_num = len(trainrecords)
writer = tf.python_io.TFRecordWriter("tfdata/"+str(os.getpid())+"_"+str(tfrecords_file_num)+".tfrecord")
pid = os.getpid()
for record in trainrecords:
file_num += 1
fields = record.strip('\n').split(',')
with open(fields[0], 'rb') as jpgfile:
img = jpgfile.read()
label = np.array(int(fields[1]))
ex = make_example(img, label)
writer.write(ex.SerializeToString())
#每寫入100條記錄,向父程序傳送訊息,報告進度
if file_num%100==0:
queue.put((pid, file_num))
if file_num%max_num==0 and file_num<total_num:
writer.close()
tfrecords_file_num += 1
writer = tf.python_io.TFRecordWriter("tfdata/"+str(os.getpid())+"_"+str(tfrecords_file_num)+".tfrecord")
writer.close()
max_num = 1000 #max record number in one file
tfrecords_file_num = 1
#讀取之前生成的訓練集的圖片檔名和類名的CSV檔案
with open('imagenet_train.csv', 'r') as trainfile:
trainrecords = trainfile.readlines()
total_files_num = len(trainrecords)
#CPU有4個核心,因此每個核心處理1/4的資料,把trainrecords列表拆分為4份
each_process_files_num = int(total_files_num/4.0)
list1 = trainrecords[:each_process_files_num]
list2 = trainrecords[each_process_files_num:2*each_process_files_num]
list3 = trainrecords[2*each_process_files_num:3*each_process_files_num]
list4 = trainrecords[3*each_process_files_num:]
#設定4個佇列,和4個子程序
q1 = Queue()
q2 = Queue()
q3 = Queue()
q4 = Queue()
p1=Process(target=gen_tfrecord, args=(list1,q1,))
p2=Process(target=gen_tfrecord, args=(list2,q2,))
p3=Process(target=gen_tfrecord, args=(list3,q3,))
p4=Process(target=gen_tfrecord, args=(list4,q4,))
p_list=[p1, p2, p3, p4]
_ = map(Process.start, p_list)
#父程序迴圈查詢佇列的訊息,並且每10秒更新一次
progress_str = 'PID:%i Processing:%i/%i | PID:%i Processing:%i/%i | PID:%i Processing:%i/%i | PID:%i Processing:%i/%i \r'
while(True):
try:
msg1 = q1.get()
msg2 = q2.get()
msg3 = q3.get()
msg4 = q4.get()
print progress_str % (msg1[0],msg1[1],len(list1),msg2[0],msg2[1],len(list2),msg3[0],msg3[1],len(list3),msg4[0],msg4[1],len(list4)),
time.sleep(10)
except:
break
#構建Dataset
with tf.device('/cpu:0'):
train_files_names = os.listdir('tfdata/')
train_files = ['tfdata/'+item for item in train_files_names]
#train_files = ['testrecords.tfrecord']
dataset_train = tf.data.TFRecordDataset(train_files)
dataset_train = dataset_train.map(_parse_function, num_parallel_calls=4)
dataset_train = dataset_train.repeat(10)
dataset_train = dataset_train.batch(batch_size)
dataset_train = dataset_train.prefetch(batch_size)
iterator = tf.data.Iterator.from_structure(dataset_train.output_types, dataset_train.output_shapes)
next_images, next_labels = iterator.get_next()
train_init_op = iterator.make_initializer(dataset_train)
2種方法的效能差異還是很大的。在我的Intel I3 CPU+1070Ti 8G+16G RAM的配置下,Batch Size是64, 採用方法一,每個Batch的訓練時間是80秒,方法二的訓練時間是40秒,因此方法二對效能的提高還是很大的。