1. 程式人生 > >基於tensorflow的棧式自編碼器實現

基於tensorflow的棧式自編碼器實現

這周完全沒有想法要看棧式編碼器的,誰知誤入桃花源,就暫且把棧式自編碼器看了吧。由於手上有很多數值型的資料,僅僅是資料,沒有標籤,所以,迫切需要通過聚類抽出特徵。無意間看到別人家公司的推薦系統裡面用到sdae,於是,找了個ae程式,建了個sdae,跑一下自己的資料。希望sdae在後面的推薦系統中能有用。

斯坦福的這篇文章原理講的很到位了。

一.基本原理

AE的原理是先通過一個encode層對輸入進行編碼,這個編碼就是特徵,然後利用encode乘第2層引數(也可以是encode層的引數的轉置乘特徵並加偏執),重構(解碼)輸入,然後用重構的輸入和實際輸入的損失訓練引數。

對於我的應用來說,我要做的首先是抽取特徵。AE的encode的過程很簡單,是這樣的:


SAE是這樣的:


訓練過程中,兩個SAE分別訓練,第一個SAE訓練完之後,其encode的輸出作為第二個SAE的輸入,接著訓練。訓練完後,用第二層的特徵,通過softmax分類器(由於分類器 還是得要帶標籤的資料來訓練,所以,實際應用總,提取特徵後不一定是拿來分類),將所有特徵分為n類,得到n類標籤。訓練網路圖


          step 1


              step 2


                      step 3    

 按照UFLDL的說明,在各自訓練到快要收斂的時候,要對整個網路通過反向傳播做微調,但是不能從一開始就整個網路調節。

兩個SAE訓練完後,每個encode的輸出作為兩個隱層的特徵,然後重新構建網路,新網路不做訓練,只做預測。網路如下:


二.程式

1.      首先建立自編碼器的網路

採用新增高斯噪聲的dae,這是tensorflow 的models-master裡面提供的一種自編碼器。

#coding: utf-8
import tensorflow as tf
import numpy as npimport Utilsclass AdditiveGaussianNoiseAutoencoder(object): def __init__(self, n_input, n_hidden, transfer_function = tf.nn.softplus, optimizer = tf.train.AdamOptimizer(), scale = 0.1): self.n_input = n_input self.n_hidden = n_hidden self.transfer = transfer_function self.scale = tf.placeholder(tf.float32) self.training_scale = scale network_weights = self._initialize_weights() self.weights = network_weights # model self.x = tf.placeholder(tf.float32, [None, self.n_input])
        #編碼
        self.hidden = self.transfer(tf.add(tf.matmul(self.x + scale * tf.random_normal((n_input,)),
                self.weights['w1']),
                self.weights['b1']))
        #解碼
        self.reconstruction = tf.add(tf.matmul(self.hidden, self.weights['w2']), self.weights['b2'])

        # cost
        self.cost = 0.5 * tf.reduce_sum(tf.pow(tf.sub(self.reconstruction, self.x), 2.0))
        self.optimizer = optimizer.minimize(self.cost)

        init = tf.initialize_all_variables()
        self.sess = tf.Session()
        self.sess.run(init)

    def _initialize_weights(self):
        all_weights = dict()
        all_weights['w1'] = tf.Variable(Utils.xavier_init(self.n_input, self.n_hidden))
        all_weights['b1'] = tf.Variable(tf.zeros([self.n_hidden], dtype = tf.float32))
        all_weights['w2'] = tf.Variable(tf.zeros([self.n_hidden, self.n_input], dtype = tf.float32))
        all_weights['b2'] = tf.Variable(tf.zeros([self.n_input], dtype = tf.float32))
        return all_weights
    #優化引數
    def partial_fit(self, X):
        cost, opt = self.sess.run((self.cost, self.optimizer), feed_dict = {self.x: X,
                                                                            self.scale: self.training_scale
                                                                            })
        return cost

    def calc_total_cost(self, X):
        return self.sess.run(self.cost, feed_dict = {self.x: X,
                                                     self.scale: self.training_scale
                                                     })

    def transform(self, X):
        return self.sess.run(self.hidden, feed_dict = {self.x: X,
                                                       self.scale: self.training_scale
                                                       })

    def generate(self, hidden = None):
        if hidden is None:
            hidden = np.random.normal(size = self.weights["b1"])
        return self.sess.run(self.reconstruction, feed_dict = {self.hidden: hidden})

    def reconstruct(self, X):
        return self.sess.run(self.reconstruction, feed_dict = {self.x: X,
                                                               self.scale: self.training_scale
                                                               })

    def getWeights(self):
        return self.sess.run(self.weights['w1'])

    def getBiases(self):
        return self.sess.run(self.weights['b1'])

2. sdae建立

#coding: utf-8
from __future__ import division, print_function, absolute_import

import numpy as np
import tensorflow as tf
from autoencoder_models.DenoisingAutoencoder import AdditiveGaussianNoiseAutoencoder
import prepare_data

#prepare_data.py是自己寫的資料整理檔案,通過這個檔案清理資料,並將其分成M-1份測試資料,1份測試資料
pd = prepare_data.TidyData(file1='data/dim_tv_mall_lng_lat.txt',
                           file2='data/ali-hdfs_2017-03-22_18-24-37.log',
                           M=8,
                           seed=12,
                           k=5)
pd.read_mall_location_np()
pd.calc_min_distance()
print ('data read finished!')

#定義訓練引數
training_epochs = 5
batch_size = 1000
display_step = 1
stack_size = 3  #棧中包含3個ae
hidden_size = [20, 20, 20]
input_n_size = [3, 200, 200]

def get_random_block_from_data(data, batch_size):
    start_index = np.random.randint(0, len(data) - batch_size)
    return data[start_index:(start_index + batch_size)]

#建立sdae圖
sdae = []
for i in xrange(stack_size):
    if i == 0:
        ae = AdditiveGaussianNoiseAutoencoder(n_input = 2,
                                               n_hidden = hidden_size[i],
                                               transfer_function = tf.nn.softplus,
                                               optimizer = tf.train.AdamOptimizer(learning_rate = 0.001),
                                               scale = 0.01)
        ae._initialize_weights()
        sdae.append(ae)
    else:
        ae = AdditiveGaussianNoiseAutoencoder(n_input=hidden_size[i-1],
                                              n_hidden=hidden_size[i],
                                              transfer_function=tf.nn.softplus,
                                              optimizer=tf.train.AdamOptimizer(learning_rate=0.001),
                                              scale=0.01)
        ae._initialize_weights()
        sdae.append(ae)

		
W = []
b = []
Hidden_feature = [] #儲存每個ae的特徵
X_train = np.array([0])
for j in xrange(stack_size):
    #輸入
    if j == 0:
        X_train = np.array(pd.train_set)
        X_test = np.array(pd.test_set)
    else:
        X_train_pre = X_train
        X_train = sdae[j-1].transform(X_train_pre)
        print (X_train.shape)
        Hidden_feature.append(X_train)
	
	#訓練
    for epoch in range(training_epochs):
        avg_cost = 0.
        total_batch = int(X_train.shape[1] / batch_size)
        # Loop over all batches
        for k in range(total_batch):
            batch_xs = get_random_block_from_data(X_train, batch_size)

            # Fit training using batch data
            cost = sdae[j].partial_fit(batch_xs)
            # Compute average loss
            avg_cost += cost / X_train.shape[1] * batch_size

        # Display logs per epoch step
        #if epoch % display_step == 0:
        print ("Epoch:", '%04d' % (epoch + 1), "cost=", "{:.9f}".format(avg_cost))
	
	#儲存每個ae的引數
    weight = sdae[j].getWeights()
    #print (weight)
    W.append(weight)
    b.append(sdae[j].getBiases())