1. 程式人生 > >獲取Keras模型中間層輸出

獲取Keras模型中間層輸出

使用Keras可以比較方便地搭建一些深度學習網路,獲取中間層輸出可以幫助理解它是如何執行的。這裡使用一個小型的Keras網路,對Caltech101資料集進行影象分類,並獲取中間層輸出結果,以及手工計算卷積層的輸出結果。

修正:表示當前是訓練模式還是測試模式的引數K.learning_phase()文中表述和使用有誤,在該函式說明中可以看到:

The learning phase flag is a bool tensor (0 = test, 1 = train),所以0是測試模式,1是訓練模式,部分網路結構下兩者有差別。

 

我使用的版本:

1.      WIN7系統

2.      Python 2.7.10.2

3.      Keras 2.0.8

4.      Theano 0.9.0

5.      numpy 1.13.1

6.      h5py 2.5.0

7.      OpenCV 2.4.13

在ubuntu中也可執行,需要修改檔案路徑,並且如果在ubuntu中使用的後端是tensorflow,後面的kernel不需要翻轉。我的Keras設定中總是:"image_data_format": "channels_last"。

參考資料:

2.      cs231n_2017_lecture5

從資料夾中提取影象資料的方式:

函式:

def eachFile(filepath):                 #將目錄內的檔名放入列表中
    pathDir =  os.listdir(filepath)
    out = []
    for allDir in pathDir:
        child = allDir.decode('gbk')    # .decode('gbk')是解決中文顯示亂碼問題
        out.append(child)
    return out
def get_data(data_name,train_left=0.0,train_right=0.7,train_all=0.7,resize=True,data_format=None,t=''):   #從資料夾中獲取影象資料
    file_name = os.path.join(pic_dir_out,data_name+t+'_'+str(train_left)+'_'+str(train_right)+'_'+str(Width)+"X"+str(Height)+".h5")   
    print file_name
    if os.path.exists(file_name):           #判斷之前是否有存到檔案中
        f = h5py.File(file_name,'r')
        if t=='train':
            X_train = f['X_train'][:]
            y_train = f['y_train'][:]
            f.close()
            return (X_train, y_train)
        elif t=='test':
            X_test = f['X_test'][:]
            y_test = f['y_test'][:]
            f.close()
            return (X_test, y_test)  
        else:
            return
    data_format = conv_utils.normalize_data_format(data_format)
    pic_dir_set = eachFile(pic_dir_data)
    X_train = []
    y_train = []
    X_test = []
    y_test = []
    label = 0
    for pic_dir in pic_dir_set:
        print pic_dir_data+pic_dir
        if not os.path.isdir(os.path.join(pic_dir_data,pic_dir)):
            continue    
        pic_set = eachFile(os.path.join(pic_dir_data,pic_dir))
        pic_index = 0
        train_count = int(len(pic_set)*train_all)
        train_l = int(len(pic_set)*train_left)
        train_r = int(len(pic_set)*train_right)
        for pic_name in pic_set:
            if not os.path.isfile(os.path.join(pic_dir_data,pic_dir,pic_name)):
                continue            
            img = cv2.imread(os.path.join(pic_dir_data,pic_dir,pic_name))
            if img is None:
                continue
            if (resize):
                img = cv2.resize(img,(Width,Height))
            if (data_format == 'channels_last'):
                img = img.reshape(-1,Width,Height,3)
            elif (data_format == 'channels_first'):
                img = img.reshape(-1,Width,Height,3)
                img = img.transpose(0, 3, 1, 2)
            if (pic_index < train_count):
                if t=='train':
                    if (pic_index >= train_l and pic_index < train_r):
                        X_train.append(img)
                        y_train.append(label)          
            else:
                if t=='test':
                    X_test.append(img)
                    y_test.append(label)
            pic_index += 1
        if len(pic_set) <> 0:        
            label += 1
     
    f = h5py.File(file_name,'w')
    if t=='train':
        X_train = np.concatenate(X_train,axis=0)     
        y_train = np.array(y_train)      
        f.create_dataset('X_train', data = X_train)
        f.create_dataset('y_train', data = y_train)
        f.close()
        return (X_train, y_train)
    elif t=='test':
        X_test = np.concatenate(X_test,axis=0) 
        y_test = np.array(y_test)
        f.create_dataset('X_test', data = X_test)
        f.create_dataset('y_test', data = y_test)
        f.close()
        return (X_test, y_test)   
    else:
        return

呼叫:

    global Width, Height, pic_dir_out, pic_dir_data
    Width = 32
    Height = 32
    num_classes = 102                   
    pic_dir_out = 'E:/pic_cnn/pic_out/'  
    pic_dir_data = 'E:/pic_cnn/pic_dataset/Caltech101/'  
    pic_dir_txt = 'E:/pic_cnn/txt/'  
    (X_train, y_train) = get_data("Caltech101_color_data_",0.0,0.7,data_format='channels_last',t='train')
    (X_test, y_test) = get_data("Caltech101_color_data_",0.0,0.7,data_format='channels_last',t='test')    
    
    X_train = X_train/255.           
    X_test = X_test/255.
    print X_train.shape
    print X_test.shape
    y_train = np_utils.to_categorical(y_train, num_classes)
    y_test = np_utils.to_categorical(y_test, num_classes)

設計的小型卷積神經網路:

    model = Sequential()                
    model.add(Convolution2D(
        input_shape=(Width, Height, 3),
        filters=8,
        kernel_size=3,
        strides=1,
        padding='same',     
        data_format='channels_last',
    ))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(
        pool_size=2,
        strides=2,
        data_format='channels_last',
    ))
    model.add(Flatten())
    model.add(Dense(num_classes, activation='softmax'))

    model.compile(optimizer=Adam(),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    print('\nTraining ------------')    #從檔案中提取引數,訓練後存在新的檔案中
    cm = 0                              #修改這個引數可以多次訓練
    cm_str = '' if cm==0 else str(cm)
    cm2_str = '' if (cm+1)==0 else str(cm+1)  
    if cm >= 1:
        model.load_weights(os.path.join(pic_dir_out,'cnn_model_Caltech101_32f8_'+cm_str+'.h5'))
    model.fit(X_train, y_train, epochs=10,batch_size=128,)   
    model.save_weights(os.path.join(pic_dir_out,'cnn_model_Caltech101_32f8_'+cm2_str+'.h5'))

提取卷積層輸出結果並儲存到檔案:

    get_1_layer_output = K.function([model.layers[0].input, K.learning_phase()],
                              [model.layers[0].output])
    pic_len = 1
    p_32 = get_1_layer_output([X_train[0:pic_len], 0])[0]           #獲取第一層的輸出
    image_array = deprocess_image(p_32)
    pic_mid = 'mid'
    if not os.path.isdir(os.path.join(pic_dir_out,pic_mid)):
        os.mkdir(os.path.join(pic_dir_out,pic_mid))
    for n_p in xrange(pic_len):
        if not os.path.isdir(os.path.join(pic_dir_out,pic_mid,str(n_p))):
            os.mkdir(os.path.join(pic_dir_out,pic_mid,str(n_p)))
        cv2.imwrite(os.path.join(pic_dir_out,pic_mid,str(n_p),'o_'+str(0)+'.jpg'),
                    cv2.resize(X_train[n_p]*255.,(512,512)))    
        for i in xrange(len(image_array)):  
            cv2.imwrite(os.path.join(pic_dir_out,pic_mid,str(n_p),'p_'+str(i)+'.jpg'),
                        cv2.resize(image_array[i,:,:,n_p],(512,512)))

其中的deprocess_image函式將0到1的浮點數轉換到0-255的影象值:

def deprocess_image(x):
    if K.image_data_format() == 'channels_first':
        x = x.transpose(1, 2, 3, 0)
    elif K.image_data_format() == 'channels_last':
        x = x.transpose(3, 1, 2, 0)
    x *= 255.
    x = np.clip(x, 0, 255).astype('uint8')
    return x

其中的“K.learning_phase()”表示當前處於訓練狀態還是測試狀態,0為訓練,1為測試。例如當網路有Dropout層時,訓練和測試要採取不同的策略,修改該值可以得到想要的結果。

手工計算卷積層結果:(運算效率暫時不在考慮的範圍內,若在tensorflow下不需要翻轉核)

    pic_len = 1
    pic_array = X_train[0:pic_len]                                  #擷取一部分訓練資料
    weights = model.layers[0].get_weights()                         #獲取第一層的引數
    filters = 8
    channels = 3
    conv_pic = np.zeros((pic_len,Width,Height,filters))             #產生的多幅卷積影象
    border_pic = np.zeros((pic_len,Width+2,Height+2,channels))      #邊緣加一圈0後的影象
    for n_p in xrange(pic_len):
        border_pic[n_p] = cv2.copyMakeBorder(pic_array[n_p], 1,1,1,1, cv2.BORDER_CONSTANT,(0,0,0))
    for n_filters in xrange(filters): 
        for n_channels in xrange(channels):
            kernel = weights[0][:,:,n_channels,n_filters]           #獲取卷積核
            kernel = np.flipud(np.fliplr(kernel))                   #左右翻轉後上下翻轉
            for n_p in xrange(pic_len):              
                for i in range(1,Width+1):
                    for j in range(1,Height+1):       
                        conv_pic[n_p,i-1,j-1,n_filters] += np.sum((border_pic[n_p,i-1:i+2,j-1:j+2,n_channels])*kernel)          
        conv_pic[:,:,:,n_filters] += weights[1][n_filters]          #加上bias
    
    image_array2 = deprocess_image(conv_pic)
    pic_mid = 'mid'
    for n_p in xrange(pic_len):
        for i in xrange(len(image_array2)):
            cv2.imwrite(os.path.join(pic_dir_out,pic_mid,str(n_p),'m_'+str(i)+'.jpg'),
                        cv2.resize(image_array2[i,:,:,n_p],(512,512)))    
    print ('difference: ',np.sum(image_array-image_array2))

便於理解的手工計算方式:

    filters = 8
    channels = 3
    one_pic = X_train[0]
    weights = model.layers[0].get_weights()
    one_pic = cv2.copyMakeBorder(one_pic, 1,1,1,1, cv2.BORDER_CONSTANT,(0,0,0))
    conv_pic = np.zeros((Width,Height,filters))
    for f in xrange(filters):
        for i in range(1,Width+1):
            for j in range(1,Height+1):
                for c in xrange(channels):
                    kernel = weights[0][:,:,c,f]  
                    kernel = np.flipud(np.fliplr(kernel)) 
                    conv_pic[i-1,j-1,f] += np.sum((one_pic[i-1:i+2,j-1:j+2,c])*kernel)
          
                conv_pic[i-1,j-1,f] += weights[1][f]
    conv_pic = conv_pic.reshape((-1,Width,Height,8))

其中獲取到的weights[0]是引數矩陣,weights[1]是bias。其中的“kernel = np.flipud(np.fliplr(kernel))”的意思是將kernel左右翻轉後再上下翻轉,這樣計算出來的結果才與呼叫介面產生的結果相同。計算方式的來源如下(來自cs231n_2017_lecture5.pdf):

 

在這附近有幾張幫助理解卷積神經網路的圖(來自cs231n_2017_lecture5.pdf):

 




完整程式碼:

# -*- coding: utf-8 -*-
import cv2
import numpy as np
import os
import h5py

from keras.utils import np_utils, conv_utils
from keras.models import Sequential
from keras.layers import Convolution2D, MaxPooling2D, Flatten, Dense, Activation
from keras.optimizers import Adam
from keras import backend as K

def get_name_list(filepath):                #獲取各個類別的名字
    pathDir =  os.listdir(filepath)
    out = []
    for allDir in pathDir:
        if os.path.isdir(os.path.join(filepath,allDir)):
            child = allDir.decode('gbk')    # .decode('gbk')是解決中文顯示亂碼問題
            out.append(child)
    return out
    
def eachFile(filepath):                 #將目錄內的檔名放入列表中
    pathDir =  os.listdir(filepath)
    out = []
    for allDir in pathDir:
        child = allDir.decode('gbk')    # .decode('gbk')是解決中文顯示亂碼問題
        out.append(child)
    return out
def get_data(data_name,train_left=0.0,train_right=0.7,train_all=0.7,resize=True,data_format=None,t=''):   #從資料夾中獲取影象資料
    file_name = os.path.join(pic_dir_out,data_name+t+'_'+str(train_left)+'_'+str(train_right)+'_'+str(Width)+"X"+str(Height)+".h5")   
    print file_name
    if os.path.exists(file_name):           #判斷之前是否有存到檔案中
        f = h5py.File(file_name,'r')
        if t=='train':
            X_train = f['X_train'][:]
            y_train = f['y_train'][:]
            f.close()
            return (X_train, y_train)
        elif t=='test':
            X_test = f['X_test'][:]
            y_test = f['y_test'][:]
            f.close()
            return (X_test, y_test)  
        else:
            return
    data_format = conv_utils.normalize_data_format(data_format)
    pic_dir_set = eachFile(pic_dir_data)
    X_train = []
    y_train = []
    X_test = []
    y_test = []
    label = 0
    for pic_dir in pic_dir_set:
        print pic_dir_data+pic_dir
        if not os.path.isdir(os.path.join(pic_dir_data,pic_dir)):
            continue    
        pic_set = eachFile(os.path.join(pic_dir_data,pic_dir))
        pic_index = 0
        train_count = int(len(pic_set)*train_all)
        train_l = int(len(pic_set)*train_left)
        train_r = int(len(pic_set)*train_right)
        for pic_name in pic_set:
            if not os.path.isfile(os.path.join(pic_dir_data,pic_dir,pic_name)):
                continue            
            img = cv2.imread(os.path.join(pic_dir_data,pic_dir,pic_name))
            if img is None:
                continue
            if (resize):
                img = cv2.resize(img,(Width,Height))
            if (data_format == 'channels_last'):
                img = img.reshape(-1,Width,Height,3)
            elif (data_format == 'channels_first'):
                img = img.reshape(-1,Width,Height,3)
                img = img.transpose(0, 3, 1, 2)
            if (pic_index < train_count):
                if t=='train':
                    if (pic_index >= train_l and pic_index < train_r):
                        X_train.append(img)
                        y_train.append(label)          
            else:
                if t=='test':
                    X_test.append(img)
                    y_test.append(label)
            pic_index += 1
        if len(pic_set) <> 0:        
            label += 1
     
    f = h5py.File(file_name,'w')
    if t=='train':
        X_train = np.concatenate(X_train,axis=0)     
        y_train = np.array(y_train)      
        f.create_dataset('X_train', data = X_train)
        f.create_dataset('y_train', data = y_train)
        f.close()
        return (X_train, y_train)
    elif t=='test':
        X_test = np.concatenate(X_test,axis=0) 
        y_test = np.array(y_test)
        f.create_dataset('X_test', data = X_test)
        f.create_dataset('y_test', data = y_test)
        f.close()
        return (X_test, y_test)   
    else:
        return

def deprocess_image(x):
    if K.image_data_format() == 'channels_first':
        x = x.transpose(1, 2, 3, 0)
    elif K.image_data_format() == 'channels_last':
        x = x.transpose(3, 1, 2, 0)
    x *= 255.
    x = np.clip(x, 0, 255).astype('uint8')
    return x
    
def main():
    global Width, Height, pic_dir_out, pic_dir_data
    Width = 32
    Height = 32
    num_classes = 102                   #Caltech101為102  cifar10為10
    pic_dir_out = 'E:/pic_cnn/pic_out/'  
    pic_dir_data = 'E:/pic_cnn/pic_dataset/Caltech101/'  
    pic_dir_txt = 'E:/pic_cnn/txt/'  
    (X_train, y_train) = get_data("Caltech101_color_data_",0.0,0.7,data_format='channels_last',t='train')
    (X_test, y_test) = get_data("Caltech101_color_data_",0.0,0.7,data_format='channels_last',t='test')    
    
    X_train = X_train/255.           
    X_test = X_test/255.
    print X_train.shape
    print X_test.shape
    y_train = np_utils.to_categorical(y_train, num_classes)
    y_test = np_utils.to_categorical(y_test, num_classes)
    
    model = Sequential()                
    model.add(Convolution2D(
        input_shape=(Width, Height, 3),
        filters=8,
        kernel_size=3,
        strides=1,
        padding='same',     
        data_format='channels_last',
    ))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(
        pool_size=2,
        strides=2,
        data_format='channels_last',
    ))
    model.add(Flatten())
    model.add(Dense(num_classes, activation='softmax'))

    model.compile(optimizer=Adam(),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    print('\nTraining ------------')    #從檔案中提取引數,訓練後存在新的檔案中
    cm = 0                              #修改這個引數可以多次訓練
    cm_str = '' if cm==0 else str(cm)
    cm2_str = '' if (cm+1)==0 else str(cm+1)  
    if cm >= 1:
        model.load_weights(os.path.join(pic_dir_out,'cnn_model_Caltech101_32f8_'+cm_str+'.h5'))
    model.fit(X_train, y_train, epochs=10,batch_size=128,)   
    model.save_weights(os.path.join(pic_dir_out,'cnn_model_Caltech101_32f8_'+cm2_str+'.h5'))

    get_1_layer_output = K.function([model.layers[0].input, K.learning_phase()],
                              [model.layers[0].output])
    pic_len = 1
    p_32 = get_1_layer_output([X_train[0:pic_len], 0])[0]           #獲取第一層的輸出
    image_array = deprocess_image(p_32)
    pic_mid = 'mid'
    if not os.path.isdir(os.path.join(pic_dir_out,pic_mid)):
        os.mkdir(os.path.join(pic_dir_out,pic_mid))
    for n_p in xrange(pic_len):
        if not os.path.isdir(os.path.join(pic_dir_out,pic_mid,str(n_p))):
            os.mkdir(os.path.join(pic_dir_out,pic_mid,str(n_p)))
        cv2.imwrite(os.path.join(pic_dir_out,pic_mid,str(n_p),'o_'+str(0)+'.jpg'),
                    cv2.resize(X_train[n_p]*255.,(512,512)))    
        for i in xrange(len(image_array)):  
            cv2.imwrite(os.path.join(pic_dir_out,pic_mid,str(n_p),'p_'+str(i)+'.jpg'),
                        cv2.resize(image_array[i,:,:,n_p],(512,512)))
    '''                    
    filters = 8
    channels = 3
    one_pic = X_train[0]
    weights = model.layers[0].get_weights()
    one_pic = cv2.copyMakeBorder(one_pic, 1,1,1,1, cv2.BORDER_CONSTANT,(0,0,0))
    conv_pic = np.zeros((Width,Height,filters))
    for f in xrange(filters):
        for i in range(1,Width+1):
            for j in range(1,Height+1):
                for c in xrange(channels):
                    kernel = weights[0][:,:,c,f]  
                    kernel = np.flipud(np.fliplr(kernel)) 
                    conv_pic[i-1,j-1,f] += np.sum((one_pic[i-1:i+2,j-1:j+2,c])*kernel)
          
                conv_pic[i-1,j-1,f] += weights[1][f]
    conv_pic = conv_pic.reshape((-1,Width,Height,8))
    '''  
    pic_len = 1
    pic_array = X_train[0:pic_len]                                  #擷取一部分訓練資料
    weights = model.layers[0].get_weights()                         #獲取第一層的引數
    filters = 8
    channels = 3
    conv_pic = np.zeros((pic_len,Width,Height,filters))             #產生的多幅卷積影象
    border_pic = np.zeros((pic_len,Width+2,Height+2,channels))      #邊緣加一圈0後的影象
    for n_p in xrange(pic_len):
        border_pic[n_p] = cv2.copyMakeBorder(pic_array[n_p], 1,1,1,1, cv2.BORDER_CONSTANT,(0,0,0))
    for n_filters in xrange(filters): 
        for n_channels in xrange(channels):
            kernel = weights[0][:,:,n_channels,n_filters]           #獲取卷積核
            kernel = np.flipud(np.fliplr(kernel))                   #左右翻轉後上下翻轉
            for n_p in xrange(pic_len):              
                for i in range(1,Width+1):
                    for j in range(1,Height+1):       
                        conv_pic[n_p,i-1,j-1,n_filters] += np.sum((border_pic[n_p,i-1:i+2,j-1:j+2,n_channels])*kernel)          
        conv_pic[:,:,:,n_filters] += weights[1][n_filters]          #加上bias
    
    image_array2 = deprocess_image(conv_pic)
    pic_mid = 'mid'
    for n_p in xrange(pic_len):
        for i in xrange(len(image_array2)):
            cv2.imwrite(os.path.join(pic_dir_out,pic_mid,str(n_p),'m_'+str(i)+'.jpg'),
                        cv2.resize(image_array2[i,:,:,n_p],(512,512)))    
    print ('difference: ',np.sum(image_array-image_array2))
    
if __name__ == '__main__':
    main()
輸出影象示例: