1. 程式人生 > >人民幣對美元匯率的大資料分析與預測【完整程式碼】

人民幣對美元匯率的大資料分析與預測【完整程式碼】

## 匯入所需的包

import pandas as pd

import numpy as np

import matplotlib.pyplot as plt

import tensorflow as tf

 

tf.reset_default_graph()

plt.rcParams['font.sans-serif'] = 'SimHei' ##設定字型為SimHei顯示中文

plt.rcParams['axes.unicode_minus'] = False ##設定正常顯示符號

 

## 匯入所需資料

df = pd.read_csv('美元-人民幣.csv',encoding='gbk',engine='python')

df['時間'] = pd.to_datetime(df['時間'],format='%Y/%m/%d')

df = df.sort_values(by='時間')

df.head()

 

## 用折線圖展示資料

plt.figure(figsize=(12,8))

plt.title('1999年1月1日到2018年8月21日最高價資料曲線')

plt.plot(df['時間'],df['高'])

plt.show()

 

### 提取測試資料

data = df.loc[:,['時間','高']]

 

## 標準化資料

data['高'] = (data['高']-np.mean(data['高']))/np.std(data['高'])

data['高(預)'] = data['高'].shift(-1)

data = data.iloc[:data.shape[0]-1]

data.columns = ['time','x','y']

data.head()

 

#獲取最高價序列

data=np.array(df['高'])

 

normalize_data=(data-np.mean(data))/np.std(data)  #標準化

normalize_data=normalize_data[:,np.newaxis]  #增加維度

 

#———————————————形成訓練集——————————————————

#設定常量

time_step=20       #時間步

rnn_unit=10        #hidden layer units

batch_size=60      #每一批次訓練多少個樣例

input_size=1       #輸入層維度

output_size=1       #輸出層維度

lr=0.0006           #學習率

train_x,train_y=[],[]   #訓練集

for i in range(len(normalize_data)-time_step-1):

    x=normalize_data[i:i+time_step]

    y=normalize_data[i+1:i+time_step+1]

    train_x.append(x.tolist())

    train_y.append(y.tolist())

 

test_x = train_x[len(train_x)-31:len(train_x)-1]

test_y = train_y[len(train_y)-31:len(train_y)-1]

 

X=tf.placeholder(tf.float32, [None,time_step,input_size])    #每批次輸入網路的tensor

Y=tf.placeholder(tf.float32, [None,time_step,output_size])   #每批次tensor對應的標籤

 

#輸入層、輸出層權重、偏置

weights={

         'in':tf.Variable(tf.random_normal([input_size,rnn_unit])),

         'out':tf.Variable(tf.random_normal([rnn_unit,1]))

         }

biases={

        'in':tf.Variable(tf.constant(0.1,shape=[rnn_unit,])),

        'out':tf.Variable(tf.constant(0.1,shape=[1,]))

        }

 

def lstm(batch):  #引數:輸入網路批次數目

    w_in=weights['in']

    b_in=biases['in']

    input=tf.reshape(X,[-1,input_size])  #需要將tensor轉成2維進行計算,計算後的結果作為隱藏層的輸入

    input_rnn=tf.matmul(input,w_in)+b_in

    input_rnn=tf.reshape(input_rnn,[-1,time_step,rnn_unit])  #將tensor轉成3維,作為lstm cell的輸入

    cell=tf.nn.rnn_cell.BasicLSTMCell(rnn_unit)

    init_state=cell.zero_state(batch,dtype=tf.float32)

 

 

output_rnn,final_states=tf.nn.dynamic_rnn(cell,input_rnn,initial_state=init_state, dtype=tf.float32)  #output_rnn是記錄lstm每個輸出節點的結果,final_states是最後一個cell的結果

output=tf.reshape(output_rnn,[-1,rnn_unit]) #作為輸出層的輸入

    w_out=weights['out']

    b_out=biases['out']

    pred=tf.matmul(output,w_out)+b_out

   

    return pred,final_states

 

def train_lstm():

    global batch_size

pred,_=lstm(batch_size)

 

#損失函式

    loss=tf.reduce_mean(tf.square(tf.reshape(pred,[-1])-tf.reshape(Y, [-1])))

    train_op=tf.train.AdamOptimizer(lr).minimize(loss)

    saver=tf.train.Saver(tf.global_variables())

    with tf.Session() as sess:

        sess.run(tf.global_variables_initializer())

        #重複訓練100次

        for i in range(100):

            step=0

            start=0

            end=start+batch_size

              while(end<len(train_x)):                _,loss_=sess.run([train_op,loss],feed_dict={X:train_x[start:end],Y:train_y[start:end]})

                start+=batch_size

                end=start+batch_size

                #每10步儲存一次引數

                if step%10==0:

                    print(i,step,loss_)

                    print("儲存模型:",saver.save(sess,'.\stock.model'))

                step+=1

 

def prediction():

    pred,_=lstm(1)    #預測時只輸入[1,time_step,input_size]的測試資料

    saver=tf.train.Saver(tf.global_variables())

    with tf.Session() as sess:

        #引數恢復

        module_file = tf.train.latest_checkpoint('./')

        saver.restore(sess, module_file)

        #取訓練集最後一行為測試樣本。shape=[1,time_step,input_size]

        prev_seq=train_x[-31]

        predict=[]

        #得到之後100個預測結果

        for i in range(100):

            next_seq=sess.run(pred,feed_dict={X:[prev_seq]})

            predict.append(next_seq[-1])

      #每次得到最後一個時間步的預測結果,與之前的資料加在一起,形成新的測試樣本

            prev_seq=np.vstack((prev_seq[1:],next_seq[-1]))

        #以折線圖表示結果

        plt.figure()

        plt.plot(list(range(len(normalize_data))), normalize_data, color='b')

        plt.plot(list(range(len(normalize_data), len(normalize_data) + len(predict))), predict, color='r')

        plt.show()

 

with tf.variable_scope('train'):

    train_lstm()

with tf.variable_scope('train',reuse=True):

    prediction()