1. 程式人生 > >乾貨|python利用LSTM進行時間序列分析預測

乾貨|python利用LSTM進行時間序列分析預測

時間序列(或稱動態數列)是指將同一統計指標的數值按其發生的時間先後順序排列而成的數列。時間序列分析的主要目的是根據已有的歷史資料對未來進行預測。

時間序列構成要素:長期趨勢,季節變動,迴圈變動,不規則變動

  • 長期趨勢( T )現象在較長時期內受某種根本性因素作用而形成的總的變動趨勢
  • 季節變動( S )現象在一年內隨著季節的變化而發生的有規律的週期性變動
  • 迴圈變動( C )現象以若干年為週期所呈現出的波浪起伏形態的有規律的變動
  • 不規則變動(I )是一種無規律可循的變動,包括嚴格的隨機變動和不規則的突發性影響很大的變動兩種型別

(1)原始時間序列資料(只列出了18行)

1455.219971
1399.420044
1402.109985
1403.449951
1441.469971
1457.599976
1438.560059
1432.25
1449.680054
1465.150024
1455.140015
1455.900024
1445.569946
1441.359985
1401.530029
1410.030029
1404.089966
1398.560059

2)處理資料使之符合LSTM的要求

為了更加直觀的瞭解資料格式,程式碼中加入了一些列印(print),並且後面加了註釋,就是輸出值

def load_data(filename, seq_len):
    f = open(filename, 'rb').read()
    data = f.split('\n')

    print('data len:',len(data))       #4172
    print('sequence len:',seq_len)     #50

    sequence_length = seq_len + 1
    result = []
    for
index in range(len(data) - sequence_length): result.append(data[index: index + sequence_length]) #得到長度為seq_len+1的向量,最後一個作為label print('result len:',len(result)) #4121 print('result shape:',np.array(result).shape) #(4121,51) result = np.array(result) #劃分train、test row = round(0.9
* result.shape[0]) train = result[:row, :] np.random.shuffle(train) x_train = train[:, :-1] y_train = train[:, -1] x_test = result[row:, :-1] y_test = result[row:, -1] x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1)) x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1)) print('X_train shape:',X_train.shape) #(3709L, 50L, 1L) print('y_train shape:',y_train.shape) #(3709L,) print('X_test shape:',X_test.shape) #(412L, 50L, 1L) print('y_test shape:',y_test.shape) #(412L,) return [x_train, y_train, x_test, y_test]

(3)LSTM模型

本文使用的是keras深度學習框架,讀者可能用的是其他的,諸如theano、tensorflow等,大同小異。

LSTM的結構可以自己定製,Stack LSTM or Bidirectional LSTM

def build_model(layers):  #layers [1,50,100,1]
    model = Sequential()

    #Stack LSTM
    model.add(LSTM(input_dim=layers[0],output_dim=layers[1],return_sequences=True))
    model.add(Dropout(0.2))

    model.add(LSTM(layers[2],return_sequences=False))
    model.add(Dropout(0.2))

    model.add(Dense(output_dim=layers[3]))
    model.add(Activation("linear"))

    start = time.time()
    model.compile(loss="mse", optimizer="rmsprop")
    print("Compilation Time : ", time.time() - start)
    return model

(4)LSTM訓練預測

1.直接預測

def predict_point_by_point(model, data):
    predicted = model.predict(data)
    print('predicted shape:',np.array(predicted).shape)  #(412L,1L)
    predicted = np.reshape(predicted, (predicted.size,))
    return predicted

2.滾動預測

def predict_sequence_full(model, data, window_size):  #data X_test
    curr_frame = data[0]  #(50L,1L)
    predicted = []
    for i in xrange(len(data)):
        #x = np.array([[[1],[2],[3]], [[4],[5],[6]]])  x.shape (2, 3, 1) x[0,0] = array([1])  x[:,np.newaxis,:,:].shape  (2, 1, 3, 1)
        predicted.append(model.predict(curr_frame[newaxis,:,:])[0,0])  #np.array(curr_frame[newaxis,:,:]).shape (1L,50L,1L)
        curr_frame = curr_frame[1:]
        curr_frame = np.insert(curr_frame, [window_size-1], predicted[-1], axis=0)   #numpy.insert(arr, obj, values, axis=None)
    return predicted

2.滑動視窗+滾動預測

def predict_sequences_multiple(model, data, window_size, prediction_len):  #window_size = seq_len
    prediction_seqs = []
    for i in xrange(len(data)/prediction_len):
        curr_frame = data[i*prediction_len]
        predicted = []
        for j in xrange(prediction_len):
            predicted.append(model.predict(curr_frame[newaxis,:,:])[0,0])
            curr_frame = curr_frame[1:]
            curr_frame = np.insert(curr_frame, [window_size-1], predicted[-1], axis=0)
        prediction_seqs.append(predicted)
    return prediction_seqs

(5)完整程式碼

# -*- coding: utf-8 -*-

from __future__ import print_function

import time
import warnings
import numpy as np
import time
import matplotlib.pyplot as plt
from numpy import newaxis
from keras.layers.core import Dense, Activation, Dropout
from keras.layers.recurrent import LSTM
from keras.models import Sequential

warnings.filterwarnings("ignore")

def load_data(filename, seq_len, normalise_window):
    f = open(filename, 'rb').read()
    data = f.split('\n')

    print('data len:',len(data))
    print('sequence len:',seq_len)

    sequence_length = seq_len + 1
    result = []
    for index in range(len(data) - sequence_length):
        result.append(data[index: index + sequence_length])  #得到長度為seq_len+1的向量,最後一個作為label

    print('result len:',len(result))
    print('result shape:',np.array(result).shape)
    print(result[:1])

    if normalise_window:
        result = normalise_windows(result)

    print(result[:1])
    print('normalise_windows result shape:',np.array(result).shape)

    result = np.array(result)

    #劃分train、test
    row = round(0.9 * result.shape[0])
    train = result[:row, :]
    np.random.shuffle(train)
    x_train = train[:, :-1]
    y_train = train[:, -1]
    x_test = result[row:, :-1]
    y_test = result[row:, -1]

    x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1))
    x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1))  

    return [x_train, y_train, x_test, y_test]

def normalise_windows(window_data):
    normalised_data = []
    for window in window_data:   #window shape (sequence_length L ,)  即(51L,)
        normalised_window = [((float(p) / float(window[0])) - 1) for p in window]
        normalised_data.append(normalised_window)
    return normalised_data

def build_model(layers):  #layers [1,50,100,1]
    model = Sequential()

    model.add(LSTM(input_dim=layers[0],output_dim=layers[1],return_sequences=True))
    model.add(Dropout(0.2))

    model.add(LSTM(layers[2],return_sequences=False))
    model.add(Dropout(0.2))

    model.add(Dense(output_dim=layers[3]))
    model.add(Activation("linear"))

    start = time.time()
    model.compile(loss="mse", optimizer="rmsprop")
    print("Compilation Time : ", time.time() - start)
    return model

#直接全部預測
def predict_point_by_point(model, data):
    predicted = model.predict(data)
    print('predicted shape:',np.array(predicted).shape)  #(412L,1L)
    predicted = np.reshape(predicted, (predicted.size,))
    return predicted

#滾動預測
def predict_sequence_full(model, data, window_size):  #data X_test
    curr_frame = data[0]  #(50L,1L)
    predicted = []
    for i in xrange(len(data)):
        #x = np.array([[[1],[2],[3]], [[4],[5],[6]]])  x.shape (2, 3, 1) x[0,0] = array([1])  x[:,np.newaxis,:,:].shape  (2, 1, 3, 1)
        predicted.append(model.predict(curr_frame[newaxis,:,:])[0,0])  #np.array(curr_frame[newaxis,:,:]).shape (1L,50L,1L)
        curr_frame = curr_frame[1:]
        curr_frame = np.insert(curr_frame, [window_size-1], predicted[-1], axis=0)   #numpy.insert(arr, obj, values, axis=None)
    return predicted

def predict_sequences_multiple(model, data, window_size, prediction_len):  #window_size = seq_len
    prediction_seqs = []
    for i in xrange(len(data)/prediction_len):
        curr_frame = data[i*prediction_len]
        predicted = []
        for j in xrange(prediction_len):
            predicted.append(model.predict(curr_frame[newaxis,:,:])[0,0])
            curr_frame = curr_frame[1:]
            curr_frame = np.insert(curr_frame, [window_size-1], predicted[-1], axis=0)
        prediction_seqs.append(predicted)
    return prediction_seqs

def plot_results(predicted_data, true_data, filename):
    fig = plt.figure(facecolor='white')
    ax = fig.add_subplot(111)
    ax.plot(true_data, label='True Data')
    plt.plot(predicted_data, label='Prediction')
    plt.legend()
    plt.show()
    plt.savefig(filename+'.png')

def plot_results_multiple(predicted_data, true_data, prediction_len):
    fig = plt.figure(facecolor='white')
    ax = fig.add_subplot(111)
    ax.plot(true_data, label='True Data')
    #Pad the list of predictions to shift it in the graph to it's correct start
    for i, data in enumerate(predicted_data):
        padding = [None for p in xrange(i * prediction_len)]
        plt.plot(padding + data, label='Prediction')
        plt.legend()
    plt.show()
    plt.savefig('plot_results_multiple.png')

if __name__=='__main__':
    global_start_time = time.time()
    epochs  = 1
    seq_len = 50

    print('> Loading data... ')

    X_train, y_train, X_test, y_test = lstm.load_data('sp500.csv', seq_len, True)

    print('X_train shape:',X_train.shape)  #(3709L, 50L, 1L)
    print('y_train shape:',y_train.shape)  #(3709L,)
    print('X_test shape:',X_test.shape)    #(412L, 50L, 1L)
    print('y_test shape:',y_test.shape)    #(412L,)

    print('> Data Loaded. Compiling...')

    model = lstm.build_model([1, 50, 100, 1])

    model.fit(X_train,y_train,batch_size=512,nb_epoch=epochs,validation_split=0.05)

    multiple_predictions = lstm.predict_sequences_multiple(model, X_test, seq_len, prediction_len=50)
    print('multiple_predictions shape:',np.array(multiple_predictions).shape)   #(8L,50L)

    full_predictions = lstm.predict_sequence_full(model, X_test, seq_len)
    print('full_predictions shape:',np.array(full_predictions).shape)    #(412L,)

    point_by_point_predictions = lstm.predict_point_by_point(model, X_test)
    print('point_by_point_predictions shape:',np.array(point_by_point_predictions).shape)  #(412L)

    print('Training duration (s) : ', time.time() - global_start_time)

    plot_results_multiple(multiple_predictions, y_test, 50)
    plot_results(full_predictions,y_test,'full_predictions')
    plot_results(point_by_point_predictions,y_test,'point_by_point_predictions')