1. 程式人生 > >【深度學習】python用RNN中LSTM進行正弦函式擬合

【深度學習】python用RNN中LSTM進行正弦函式擬合

深度學習框架:Tensorflow 0.8.0
Python:2.7.6

資料的兩種輸入模型:

①data和label是同一個變數,整個模型相當於自迴歸(本文先演示第一種)

②data和label是不同的變數,整個模型相當於尋找data和label的函式關係

生成資料類:

class BatchGenerator:
  def __init__(self,window_size,window_range):
    self.cursor=0.
    self.window=window_size
    self.range=window_range

  def
next(self):
x=np.zeros([self.window,1]) y=np.zeros([self.window,1]) d=np.arange(self.cursor,self.cursor+1.,1./self.window) for i in range(self.window): l=np.sin(d[i]) x[i,0]=d[i] y[i,0]=l self.cursor+=1./self.window return y[:self.window-1],y[1:],x[:self.window-1
]

一層的LSTM網路搭建:

with graph.as_default():
    #引數初始化
    #input gate
    ix=tf.Variable(tf.truncated_normal([window_size-1,num_nodes],-0.1,0.1),name="ix")
    ih=tf.Variable(tf.truncated_normal([num_nodes,num_nodes],-0.1,0.1),name="ih")
    ib=tf.Variable(tf.zeros([1,num_nodes]),name="ib")

    #output_gate
ox=tf.Variable(tf.truncated_normal([window_size-1,num_nodes],-0.1,0.1),name="ox") oh=tf.Variable(tf.truncated_normal([num_nodes,num_nodes],-0.1,0.1),name="oh") ob=tf.Variable(tf.zeros([1,num_nodes]),name="ob") #forget_gate fx=tf.Variable(tf.truncated_normal([window_size-1,num_nodes],-0.1,0.1),name="fx") fh=tf.Variable(tf.truncated_normal([num_nodes,num_nodes],-0.1,0.1),name="fh") fb=tf.Variable(tf.zeros([1,num_nodes]),name="fb") #cell gx=tf.Variable(tf.truncated_normal([window_size-1,num_nodes],-0.1,0.1),name="gx") gh=tf.Variable(tf.truncated_normal([num_nodes,num_nodes],-0.1,0.1),name="gh") gb=tf.Variable(tf.zeros([1,num_nodes]),name="gb") #variables saving state across unrollings saved_output=tf.Variable(tf.zeros([1,num_nodes]),trainable=False,name="saved_output") saved_state=tf.Variable(tf.zeros([1,num_nodes]),trainable=False,name="saved_state") #classifier's weights and biases w=tf.Variable(tf.truncated_normal([num_nodes,window_size-1],-0.1,0.1),name="w") b=tf.Variable(tf.zeros([window_size-1]),name="b") ##定義LSTM cell def lstm_cell(x,h,c): input_gate=tf.sigmoid(tf.matmul(x,ix)+tf.matmul(h,ih)+ib) output_gate=tf.sigmoid(tf.matmul(x,ox)+tf.matmul(h,oh)+ob) forget_gate=tf.sigmoid(tf.matmul(x,fx)+tf.matmul(h,fh)+fb) gt=tf.tanh(tf.matmul(x,gx)+tf.matmul(h,gh)+gb) ct=input_gate*gt+forget_gate*c return output_gate*tf.tanh(ct),ct #Input Data train_data=list() train_label=list() for _ in range(num_unrollings+1): train_data.append(\ tf.placeholder(tf.float32,shape=[1,window_size-1])) train_label.append(\ tf.placeholder(tf.float32,shape=[1,window_size-1])) train_inputs=train_data train_labels=train_label #將LSTM展開 outputs=list() output=saved_output state=saved_state for i in train_inputs: output,state=lstm_cell(i,output,state) outputs.append(output) #將最後一層LSTM的輸出和隱藏層儲存 with tf.control_dependencies([saved_output.assign(output),\ saved_state.assign(state)]): #一層LSTM的RNN搭建 logits=tf.nn.xw_plus_b(tf.concat(0,outputs),w,b) loss=tf.reduce_mean(\ tf.reduce_sum(tf.square(tf.concat(0,train_labels)-logits)) ) #optimizer global_step=tf.Variable(0) learning_rate=tf.train.exponential_decay(0.8,global_step,1000,0.5) optimizer=tf.train.GradientDescentOptimizer(learning_rate) gradients,v=zip(*optimizer.compute_gradients(loss)) gradients,_=tf.clip_by_global_norm(gradients,1.25) optimizer=optimizer.apply_gradients(\ zip(gradients,v),global_step=global_step) #Predictions train_prediction=logits #test eval:batch 1,no unrolling sample_input=tf.placeholder(tf.float32,shape=[1,window_size-1]) saved_sample_output=tf.Variable(tf.zeros([1,num_nodes])) saved_sample_state=tf.Variable(tf.zeros([1,num_nodes])) reset_sample_state=tf.group(\ saved_sample_output.assign(tf.zeros([1,num_nodes])),\ saved_sample_state.assign(tf.zeros([1,num_nodes]))) sample_output,sample_state=lstm_cell(\ sample_input,saved_sample_output,saved_sample_state) with tf.control_dependencies([saved_sample_output.assign(sample_output),\ saved_sample_state.assign(sample_state)]): sample_prediction=tf.nn.xw_plus_b(sample_output,w,b)

完整程式碼:

# -*- coding: utf-8 -*-
"""
Created on Fri Apr 14 08:52:47 2017

@author: ZMJ
"""
from __future__ import print_function
import numpy as np
import tensorflow as tf
from six.moves import range
import collections
import random
import csv
from matplotlib import pyplot as plt
class BatchGenerator:
  def __init__(self,window_size,window_range):
    self.cursor=0.
    self.window=window_size
    self.range=window_range

  def next(self):
    x=np.zeros([self.window,1])
    y=np.zeros([self.window,1])
    d=np.arange(self.cursor,self.cursor+1.,1./self.window)
    for i in range(self.window):

      l=np.sin(d[i])
      x[i,0]=d[i]
      y[i,0]=l
    self.cursor+=1./self.window
    return y[:self.window-1],y[1:],x[:self.window-1]

num_nodes=64
window_size=5
window_range=10
num_unrollings=100
graph=tf.Graph()
with graph.as_default():
    #引數初始化
    #input gate
    ix=tf.Variable(tf.truncated_normal([window_size-1,num_nodes],-0.1,0.1),name="ix")
    ih=tf.Variable(tf.truncated_normal([num_nodes,num_nodes],-0.1,0.1),name="ih")
    ib=tf.Variable(tf.zeros([1,num_nodes]),name="ib")

    #output_gate
    ox=tf.Variable(tf.truncated_normal([window_size-1,num_nodes],-0.1,0.1),name="ox")
    oh=tf.Variable(tf.truncated_normal([num_nodes,num_nodes],-0.1,0.1),name="oh")
    ob=tf.Variable(tf.zeros([1,num_nodes]),name="ob")

    #forget_gate
    fx=tf.Variable(tf.truncated_normal([window_size-1,num_nodes],-0.1,0.1),name="fx")
    fh=tf.Variable(tf.truncated_normal([num_nodes,num_nodes],-0.1,0.1),name="fh")
    fb=tf.Variable(tf.zeros([1,num_nodes]),name="fb")

    #cell
    gx=tf.Variable(tf.truncated_normal([window_size-1,num_nodes],-0.1,0.1),name="gx")
    gh=tf.Variable(tf.truncated_normal([num_nodes,num_nodes],-0.1,0.1),name="gh")
    gb=tf.Variable(tf.zeros([1,num_nodes]),name="gb")

    #variables saving state across unrollings
    saved_output=tf.Variable(tf.zeros([1,num_nodes]),trainable=False,name="saved_output")
    saved_state=tf.Variable(tf.zeros([1,num_nodes]),trainable=False,name="saved_state")

    #classifier's weights and biases
    w=tf.Variable(tf.truncated_normal([num_nodes,window_size-1],-0.1,0.1),name="w")
    b=tf.Variable(tf.zeros([window_size-1]),name="b")

    ##定義LSTM cell
    def lstm_cell(x,h,c):
        input_gate=tf.sigmoid(tf.matmul(x,ix)+tf.matmul(h,ih)+ib)
        output_gate=tf.sigmoid(tf.matmul(x,ox)+tf.matmul(h,oh)+ob)
        forget_gate=tf.sigmoid(tf.matmul(x,fx)+tf.matmul(h,fh)+fb)
        gt=tf.tanh(tf.matmul(x,gx)+tf.matmul(h,gh)+gb)
        ct=input_gate*gt+forget_gate*c
        return output_gate*tf.tanh(ct),ct

    #Input Data

    train_data=list()
    train_label=list()
    for _ in range(num_unrollings+1):
        train_data.append(\
                tf.placeholder(tf.float32,shape=[1,window_size-1]))
        train_label.append(\
                tf.placeholder(tf.float32,shape=[1,window_size-1]))
    train_inputs=train_data
    train_labels=train_label

    #將LSTM展開
    outputs=list()
    output=saved_output
    state=saved_state
    for i in train_inputs:
        output,state=lstm_cell(i,output,state)
        outputs.append(output)

    #將最後一層LSTM的輸出和隱藏層儲存
    with tf.control_dependencies([saved_output.assign(output),\
                                  saved_state.assign(state)]):
        #一層LSTM的RNN搭建
        logits=tf.nn.xw_plus_b(tf.concat(0,outputs),w,b)
        loss=tf.reduce_mean(\
                tf.reduce_sum(tf.square(tf.concat(0,train_labels)-logits))
            )
    #optimizer
    global_step=tf.Variable(0)
    learning_rate=tf.train.exponential_decay(0.8,global_step,1000,0.5)
    optimizer=tf.train.GradientDescentOptimizer(learning_rate)
    gradients,v=zip(*optimizer.compute_gradients(loss))
    gradients,_=tf.clip_by_global_norm(gradients,1.25)
    optimizer=optimizer.apply_gradients(\
               zip(gradients,v),global_step=global_step)
    #Predictions
    train_prediction=logits

    #test eval:batch 1,no unrolling
    sample_input=tf.placeholder(tf.float32,shape=[1,window_size-1])
    saved_sample_output=tf.Variable(tf.zeros([1,num_nodes]))
    saved_sample_state=tf.Variable(tf.zeros([1,num_nodes])) 
    reset_sample_state=tf.group(\
        saved_sample_output.assign(tf.zeros([1,num_nodes])),\
        saved_sample_state.assign(tf.zeros([1,num_nodes])))
    sample_output,sample_state=lstm_cell(\
                        sample_input,saved_sample_output,saved_sample_state)
    with tf.control_dependencies([saved_sample_output.assign(sample_output),\
                            saved_sample_state.assign(sample_state)]):
        sample_prediction=tf.nn.xw_plus_b(sample_output,w,b)   

num_steps=10001
summary_frequency=100
f=file("out.csv","a+")
writer=csv.writer(f)
with tf.Session(graph=graph) as session:
  tf.initialize_all_variables().run()
  print("Initialized!!")
  mean_loss=0
  feed_dict=dict()
  count=0
  test=BatchGenerator(window_size,window_range)

  for i in range(num_unrollings+1):
    temp1,temp2,_=test.next()
    feed_dict[train_data[i]]=temp1.reshape([1,window_size-1])
    feed_dict[train_label[i]]=temp2.reshape([1,window_size-1])
  for step in range(num_steps):
    _,l,predictions,lr=session.run([optimizer,loss,train_prediction,learning_rate],feed_dict=feed_dict)
    if step%100==0:
      print(l)
  X=list()
  Label=list()
  Prediction=list()
  for i in range(100):
    data,label,x=test.next()
    feed=data.reshape([1,window_size-1])
    prediction=sample_prediction.eval({sample_input:feed})
    X.append(x)
    Label.append(label)
    Prediction.append(prediction)
  Prediction=np.array(Prediction).reshape([1,100*(window_size-1)])
  Label=np.array(Label).reshape([1,100*(window_size-1)])
  X=np.array(X).reshape([1,100*(window_size-1)])
  print(Label,Prediction)
  plt.plot(X[0],Label[0],color="blue")
  plt.plot(X[0],Prediction[0],color="red")
  plt.show()
  plt.savefig("out.png")
  writer.writerows([data.T[0],label.T[0],prediction[0]])
f.close()

擬合的資料結果:
這裡寫圖片描述