1. 程式人生 > >手擼RNN分類和迴歸,1024!

手擼RNN分類和迴歸,1024!

記錄下最近看莫凡大佬的python教程中,如何手擼RNN分類和迴歸兩種實現方式。

分類實現(具體需要主要的細節已經在程式碼中備註):

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

tf.set_random_seed(1)   #seed=1表示每次生成的隨機數都一樣

mnist = input_data.read_data_sets('MNIST_data',one_hot = True) #初始化資料

#超引數
lr = 0.001
training_iters = 100000
batch_size = 128

#網路引數
n_inputs = 28   #輸入資料畫素值
n_steps = 28   #步長
n_hidden_units = 128  #隱層神經元個數
n_classes = 10

x = tf.placeholder(tf.float32,[None,n_steps,n_inputs])
y = tf.placeholder(tf.float32,[None,n_classes])

weights = {'in':tf.Variable(tf.random_normal([n_inputs, n_hidden_units])), #輸入(28,128)
          'out':tf.Variable(tf.random_normal([n_hidden_units,n_classes]))}#輸出(128,10)
biases = {'in': tf.Variable(tf.constant(0.1,shape=[n_hidden_units,])),#輸入(128,)
         'out':tf.Variable(tf.constant(0.1,shape=[n_classes,]))}#輸出(10,)


def RNN(X,weights,biases):
    #輸入三維格式是[128batch,28steps,28inputs]
    X = tf.reshape(X,[-1,n_inputs])  #將原三維資料轉換成二維資料
    X_in = tf.matmul(X,weights['in']+biases['in']) 
    X_in = tf.reshape(X_in,[-1,n_steps,n_hidden_units])  #二維資料再次轉回三維
    
    if int((tf.__version__).split('.')[1])<12 and int((tf.__version__).split('.')[0])<1:
        #lstm cell的state被分為了兩個部分(c_state,m_state),state_is_tuple用來儲存這個元祖
        cell = tf.nn.rnn_cell.BasicLSTMCell(n_hidden_units,forget_bais=1.0,state_is_tuple=True)
    else:
         cell = tf.contrib.rnn.BasicLSTMCell(n_hidden_units)
    #初始化state
    init_state = cell.zero_state(batch_size,dtype=tf.float32)

    #rnn
    outputs,final_state = tf.nn.dynamic_rnn(cell,X_in,initial_state=init_state,time_major = False)#time_major取決於n_steps的位置


    #輸出結果
    if int((tf.__version__).split('.')[1])<12 and int((tf.__version__).split('.')[0])<1:
        #tf.transpose是將outputs維度按照[1,0,2]的順序裝換為[28steps,128batch,28outputs]
        #tf.unpack是將矩陣分解為[(batch,outputs)]*steps
        outputs = tf.unpack(tf.transpose(outputs,[1,0,2]))
    else:
        outputs = tf.unstack(tf.transpose(outputs, [1,0,2]))
    results = tf.matmul(outputs[-1],weights['out'])+biases['out']
    return results

pred = RNN(x,weights,biases)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=pred,labels = y))
train_op = tf.train.AdamOptimizer(lr).minimize(cost)

#tf.equal(A, B)是對比這兩個矩陣或者向量的相等的元素,如果是相等的那就返回True,
#反正返回False,返回的值的矩陣維度和A是一樣的
correct_pred = tf.equal(tf.argmax(pred,1),tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred,tf.float32))

with tf.Session() as sess:
    if int((tf.__version__).split('.')[1])<12 and int((tf.__version__).split('.')[0])<1:
        init = tf.initialize_all_variables()
    else:
        init = tf.global_variables_initializer()
    sess.run(init)
    step = 0
    
    while step*batch_size < training_iters:
        batch_xs,batch_ys = mnist.train.next_batch(batch_size)
        batch_xs = batch_xs.reshape([batch_size,n_steps,n_inputs])
        sess.run([train_op],feed_dict={x:batch_xs,y:batch_ys})
        if step % 20==0:
            print(sess.run(accuracy,feed_dict={x:batch_xs,y:batch_ys}))
        step += 1
            


輸出結果:

 

 

 

 

 

迴歸實現:

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

BATCH_START = 0
TIME_STEPS = 20  #反向傳播時間層數,同樣也是正向讀取資料時的層數,有時也用n_step表示
BATCH_SIZE = 50  #批次數
INPUT_SIZE = 1
OUTPUT_SIZE = 1
CELL_SIZE = 10  #一個cell中隱藏層的神經元個數,有時也叫state_size
LR = 0.006

def get_batch():
    global BATCH_START,TIME_STEP
    xs = np.arange(BATCH_START,BATCH_START+TIME_STEPS*BATCH_SIZE).reshape((BATCH_SIZE,TIME_STEPS))/(10*np.pi)
    seq = np.sin(xs)
    res = np.cos(xs)
    BATCH_START += TIME_STEPS
    return [seq[:,:,np.newaxis],res[:,:,np.newaxis],xs]  #給資料增加一個維度

class LSTMRNN(object):
    def __init__(self,n_steps,input_size,output_size,cell_size,batch_size):
        self.n_steps = n_steps
        self.input_size = input_size
        self.output_size = output_size
        self.cell_size = cell_size
        self.batch_size = batch_size
        
        #tf.name_scope可以讓變數有相同的命名,只是限於tf.Variable的變數
        with tf.name_scope('inputs'):
            self.xs = tf.placeholder(tf.float32,[None,n_steps,input_size],name='xs')
            self.ys = tf.placeholder(tf.float32,[None,n_steps,output_size],name='ys')
                                     
        #tf.variable_scope可以讓變數有相同的命名,包括tf.get_variable得到的變數,還有tf.Variable的變數
        with tf.variable_scope('in_hidden'):
            self.add_input_layer()
        with tf.variable_scope('LSTM_cell'):
            self.add_cell()
        with tf.variable_scope('out_hidden'):
            self.add_output_layer()
        with tf.variable_scope('cost'):
            self.compute_cost()
        with tf.name_scope('train'):
            self.train_op = tf.train.AdamOptimizer(LR).minimize(self.cost)

    def add_input_layer(self,):
        l_in_x = tf.reshape(self.xs,[-1,self.input_size],name='2_2D') #維度:(batch*n_step,input_size)
        Ws_in = self._weight_variable([self.input_size,self.cell_size]) #維度:(input_size,cell_size)
        bs_in = self._bias_variable([self.cell_size]) #維度:(cell_size,)
        with tf.name_scope('Wx_plus_b'):  #維度:(batch*n_step,cell_size)
            l_in_y = tf.matmul(l_in_x,Ws_in)+bs_in
        self.l_in_y = tf.reshape(l_in_y,[-1,self.n_steps,self.cell_size],name='2_3D')

    def add_cell(self,):
        lstm_cell = tf.contrib.rnn.BasicLSTMCell(self.cell_size,forget_bias=1.0,state_is_tuple=True)
        with tf.name_scope('initial_state'):                        
            self.cell_init_state = lstm_cell.zero_state(self.batch_size,dtype=tf.float32)
        #輸出一個結果和一個終狀態
        self.cell_outputs,self.cell_final_state = tf.nn.dynamic_rnn(
            lstm_cell, self.l_in_y, initial_state=self.cell_init_state, time_major=False)

    def add_output_layer(self,):
        l_out_x = tf.reshape(self.cell_outputs,[-1,self.cell_size],name='2_2D') #維度:(batch*n_step,cell_size)
        Ws_out = self._weight_variable([self.cell_size,self.output_size]) #維度:(cell_size,output_size)
        bs_out = self._bias_variable([self.output_size,]) #維度:(output_size,)
        with tf.name_scope('Wx_plus_b'):
            self.pred = tf.matmul(l_out_x,Ws_out)+bs_out #維度:(batch*n_step,output_size)


    def compute_cost(self,):
        #tf.nn.seq2seq.sequence_loss_by_example(logits, targets, weights)計算所有examples的加權交叉熵損失
        losses =tf.contrib.legacy_seq2seq.sequence_loss_by_example(
            [tf.reshape(self.pred,[-1],name='reshape_pred')],
            [tf.reshape(self.ys,[-1],name='reshape_target')],
            [tf.ones([self.batch_size*self.n_steps],dtype=tf.float32)],
            average_across_timesteps=True,
            softmax_loss_function=self.ms_error,
            name='losses')
        with tf.name_scope('average_cost'):
            self.cost = tf.div(
                tf.reduce_sum(losses,name='lossses_sum'),
                self.batch_size,
                name='average_cost')
            tf.summary.scalar('cost',self.cost)
            
     #返回函式的靜態方法,類可以不用例項化就可以呼叫該方法       
    @staticmethod
    def ms_error(labels,logits):
        #tf.subtract(x,y, name=None)
        #tf.subtract函式返回一個Tensor,與 x 具有相同的型別。
        return tf.square(tf.subtract(labels,logits))

    def _weight_variable(self,shape,name='weights'):
        #tf.random_normal_initializer(mean=0.0, stddev=1.0, seed=None, dtype=tf.float32)
        #返回一個生成具有正態分佈的張量的初始化器。mean:python標量或標量tensor,產生的隨機值的平均值。
        #stddev:一個python標量或一個標量tensor,標準偏差的隨機值生成。
        initializer = tf.random_normal_initializer(mean=0,stddev=1.0,)
        return tf.get_variable(shape=shape,initializer=initializer,name=name)

    def _bias_variable(self,shape,name='biases'):
        initializer = tf.constant_initializer(0.1)
        return tf.get_variable(name=name,shape=shape,initializer=initializer)

if __name__ =='__main__':
        model = LSTMRNN(TIME_STEPS,INPUT_SIZE,OUTPUT_SIZE,CELL_SIZE,BATCH_SIZE)
        sess =  tf.Session()
        merged = tf.summary.merge_all()
        #將摘要協議緩衝區寫入事件檔案
        writer = tf.summary.FileWriter('logs',sess.graph)
        if int((tf.__version__).split('.')[1])<12 and int((tf.__version__).split('.')[0]) <1:
            init = tf.initialize_all_variables()
        else:
            init = tf.global_variables_initializer()
        sess.run(init)

        plt.ion() #開啟互動模式
        plt.show()
        for i in range(1000):
            seq,res,xs = get_batch()
            if i == 0:
                feed_dict={
                    model.xs:seq,
                    model.ys:res,
                    }
            else:
                feed_dict = {
                    model.xs:seq,
                    model.ys:res,
                    model.cell_init_state:state,  #將前一個cell的final_state作為下一個cell的init_state輸入
                    }
            _,cost,state,pred = sess.run(
                [model.train_op,model.cost,model.cell_final_state,model.pred],
                feed_dict=feed_dict)

            #輸出圖形
            plt.plot(xs[0,:],res[0].flatten(),'r',xs[0,:],pred.flatten()[:TIME_STEPS],'b--')
            plt.ylim((-1.2,1.2))
            plt.draw()
            plt.pause(0.3)
            if i% 20==0:
                print('cost:',round(cost,4))  #round返回浮點數四捨五入值,即cost保留四位小數
                result = sess.run(merged, feed_dict)
                writer.add_summary(result,i)
                

輸出結果:

在訓練了1000輪後基本吻合了。