CNTK API文件翻譯(10)——使用LSTM預測時間序列資料
本篇教程展示如何用CNTK構建LSTM來進行時間序列資料的數值預測。
目標
我們使用一個連續函式的模擬資料集(本例使用正弦曲線)。對於函式y=sin(t),我們使用符合這個函式的N個值來預測之後的M個值。
在本教程中我們將使用基於LSTM的模型。LSTM比較擅長從以往的資料中學習,因此比較適合我們的教程。
本教程分為三個部分:
- 生成模擬資料
- 構建LSTM網路模型
- 模型訓練和評估
LSTM資料已經在很多真實的資料上得到過實踐,不過在本教程我們使用簡單的模擬資料,下一期我們會使用從一些物聯網裝置上採集的資料預測太陽能電池板每天的輸出電量。
使用CNTK我們能很容易的實現模型。
import math
from matplotlib import pyplot as plt
import numpy as np
import os
import pandas as pd
import time
import cntk as C
在下面的程式碼中,我們通過檢查在CNTK內部定義的環境變數來選擇正確的裝置(GPU或者CPU)來執行程式碼,如果不檢查的話,會使用CNTK的預設策略來使用最好的裝置(如果GPU可用的話就使用GPU,否則使用CPU)
# Select the right target device when this notebook is being tested:
if 'TEST_DEVICE' in os.environ:
if os.environ['TEST_DEVICE'] == 'cpu':
C.device.try_set_default_device(C.device.cpu())
else:
C.device.try_set_default_device(C.device.gpu(0))
我們設定了兩種執行模式:
- 快速模式:isFast變數設定成True。這是我們的預設模式,在這個模式下我們會訓練更少的次數,也會使用更少的資料,這個模式保證功能的正確性,但訓練的結果還遠遠達不到可用的要求。
- 慢速模式:我們建議學習者在學習的時候試試將isFast變數設定成False,這會讓學習者更加了解本教程的內容。
資料生成
我們需要幾個輔助函式來生成模擬正弦波資料。也就是讓上文說到的N和M分別是正弦波資料的有序集合。
generate_data()
在本教程中,我們會在正弦波上取N個連續的樣本,輸入模型,試圖預測與最後一次觀測值M步之後的資料。在我們的訓練中,我們會生成多個這樣的資料,並且預測與之相對應的值。假設一個取樣包的大小是k,那麼generate_data函式生成的資料如資料是:
X=[{y[1][1],y[1][2]…y[2][N]},{y[2][1],y[2][2]…y[2][N]}…{y[k][1],y[k][2]…y[k][N]}]
那麼他的預測值我們設為X
L=[{y[1][N+M]},{y[2][N+M]}…{y[k][N+M]}]split_data()
如函式名所訴,split_data會把資料分成訓練資料集、驗證資料集和測試資料集。
def split_data(data, val_size=0.1, test_size=0.1):
"""
splits np.array into training, validation and test
"""
pos_test = int(len(data) * (1 - test_size))
pos_val = int(len(data[:pos_test]) * (1 - val_size))
train, val, test = data[:pos_val], data[pos_val:pos_test], data[pos_test:]
return {"train": train, "val": val, "test": test}
def generate_data(fct, x, time_steps, time_shift):
"""
generate sequences to feed to rnn for fct(x)
"""
data = fct(x)
if not isinstance(data, pd.DataFrame):
data = pd.DataFrame(dict(a = data[0:len(data) - time_shift],
b = data[time_shift:]))
rnn_x = []
for i in range(len(data) - time_steps):
rnn_x.append(data['a'].iloc[i: i + time_steps].as_matrix())
rnn_x = np.array(rnn_x)
# Reshape or rearrange the data from row to columns
# to be compatible with the input needed by the LSTM model
# which expects 1 float per time point in a given batch
rnn_x = rnn_x.reshape(rnn_x.shape + (1,))
rnn_y = data['b'].values
# Reshape or rearrange the data from row to columns
# to match the input shape
rnn_y = rnn_y.reshape(rnn_y.shape + (1,))
return split_data(rnn_x), split_data(rnn_y)
N = 5 # input: N subsequent values
M = 5 # output: predict 1 value M steps ahead
X, Y = generate_data(np.sin, np.linspace(0, 100, 10000, dtype=np.float32), N, M)
f, a = plt.subplots(3, 1, figsize=(12, 8))
for j, ds in enumerate(["train", "val", "test"]):
a[j].plot(Y[ds], label=ds + ' raw');
[i.legend() for i in a];
生成的資料展示出來如下:
網路模型
我們對每一個輸入使用一個LSTM單元。我們有N個輸入資料,每個輸入資料都是正弦函式上的一個值。從LSTM單元中數出來的值是全連線網路層的輸入值,從而生成一個輸出值。在LSTM和全連線網路層之間我們加入一個捨棄層(DropoutLayer,關於Dropout的詳情可看我的Python與人工神經網路的第七期),捨棄層會隨機丟掉從LSTM中出來的百分之二十的資料,從而避免過度擬合。需要注意的是過度擬合實在訓練時出現,因此捨棄層只有在訓練時才有,預測的時候就不用了。
def create_model(x):
"""Create the model for time series prediction"""
with C.layers.default_options(initial_state = 0.1):
m = C.layers.Recurrence(C.layers.LSTM(N))(x)
m = C.sequence.last(m)
m = C.layers.Dropout(0.2, seed=1)(m)
m = C.layers.Dense(1)(m)
return m
訓練網路
我們定義了一個叫next_batch的迭代器,用來生成資料包,提供給訓練函式。注意,因為CNTK支援邊長序列,我們的資料包必須是一個序列列表。這樣也比較方便的生成小一點的資料包,也就是我在前面的文章裡說過的取樣包。
def next_batch(x, y, ds):
"""get the next batch to process"""
def as_batch(data, start, count):
part = []
for i in range(start, start + count):
part.append(data[i])
return np.array(part)
for i in range(0, len(x[ds])-BATCH_SIZE, BATCH_SIZE):
yield as_batch(x[ds], i, BATCH_SIZE), as_batch(y[ds], i, BATCH_SIZE)
下面的程式碼設定了一些訓練需要的引數
# Training parameters
TRAINING_STEPS = 10000
BATCH_SIZE = 100
EPOCHS = 20 if isFast else 100
關鍵知識
在訓練之前我們需要說一些在LSTM神經網路中使用序列的關鍵知識,簡單來說:
NTK的輸入資料、輸出資料和引數全部都用張量表示。每個張量有一個階數,一個標量是一個0階張量,一個向量是一個1階張量,一個矩陣是一個2階張量等等。我們經常用座標軸來表示張量的不同維度。
每個CNTK張量有一些固定維度和一些動態的維度。張量在神經網路執行的整個生命週期在固定維度上的長度保持不變,動態維度在定義時與固定維度類似,但有一些不同:
- 其長度可能根據實力的不同而變化
- 在訓練的取樣確定之前,其長度通常是不確定的
- 他們可能是按順序列好的
在CNTK裡面,如果要執行一個重複迭代的工作,那麼他的資料維度是變化的,因此在定義神經網路時我們也是不知道的。所以輸入變數就只定義固定的那個維度。打個比方,如果我們的輸入資料是一維的,那我們如下定義:
C.sequence.input_variable(1)
在本例中,首先我們有N個已經觀測到了的資料,還有持續生成的資料,因此我們會先定義一個預設值:
x_axes = [C.Axis.default_batch_axis(), C.Axis.default_dynamic_axis()]
C.input_variable(1, dynamic_axes=x_axes)
讀者應該意識到預設引數的意義,特別是迴圈贏球在動態維度上按語氣的順序處理時間序列資料。
# input sequences
x = C.sequence.input_variable(1)
# create the model
z = create_model(x)
# expected output (label), also the dynamic axes of the model output
# is specified as the model of the label input
l = C.input_variable(1, dynamic_axes=z.dynamic_axes, name="y")
# the learning rate
learning_rate = 0.001
lr_schedule = C.learning_rate_schedule(learning_rate, C.UnitType.minibatch)
# loss function
loss = C.squared_error(z, l)
# use squared error to determine error for now
error = C.squared_error(z, l)
# use adam optimizer
momentum_time_constant = C.momentum_as_time_constant_schedule(BATCH_SIZE / -math.log(0.9))
learner = C.fsadagrad(z.parameters,
lr = lr_schedule,
momentum = momentum_time_constant,
unit_gain = True)
trainer = C.Trainer(z, (loss, error), [learner])
訓練一百個週期應該能得到可以接受的結果
# train
loss_summary = []
start = time.time()
for epoch in range(0, EPOCHS):
for x1, y1 in next_batch(X, Y, "train"):
trainer.train_minibatch({x: x1, l: y1})
if epoch % (EPOCHS / 10) == 0:
training_loss = trainer.previous_minibatch_loss_average
loss_summary.append(training_loss)
print("epoch: {}, loss: {:.5f}".format(epoch, training_loss))
print("training took {0:.1f} sec".format(time.time() - start))
輸出:
epoch: 0, loss: 0.22282
epoch: 2, loss: 0.19430
epoch: 4, loss: 0.16113
epoch: 6, loss: 0.15103
epoch: 8, loss: 0.11810
epoch: 10, loss: 0.07352
epoch: 12, loss: 0.06572
epoch: 14, loss: 0.07864
epoch: 16, loss: 0.09846
epoch: 18, loss: 0.06972
training took 12.9 sec
通常來說我們需要使用我們之前分離出來的驗證資料就來做驗證,不過因為輸入資料的量比較小,我們可以使用所有資料來做驗證。
# validate
def get_mse(X,Y,labeltxt):
result = 0.0
for x1, y1 in next_batch(X, Y, labeltxt):
eval_error = trainer.test_minibatch({x : x1, l : y1})
result += eval_error
return result/len(X[labeltxt])
# Print the train and validation errors
for labeltxt in ["train", "val"]:
print("mse for {}: {:.6f}".format(labeltxt, get_mse(X, Y, labeltxt)))
# Print validate and test error
labeltxt = "test"
print("mse for {}: {:.6f}".format(labeltxt, get_mse(X, Y, labeltxt)))
因為我們使用的是非常簡單的正弦資料,因此理論上來說對於訓練資料集、驗證資料集和測試資料集他們的差值應該是差不多的,但是在真實資料中這種情況一般不會出現。當然我們也把預測的資料展示出來看看和正弦波相差多遠。
# predict
f, a = plt.subplots(3, 1, figsize = (12, 8))
for j, ds in enumerate(["train", "val", "test"]):
results = []
for x1, y1 in next_batch(X, Y, ds):
pred = z.eval({x: x1})
results.extend(pred[:, 0])
a[j].plot(Y[ds], label = ds + ' raw');
a[j].plot(results, label = ds + ' predicted');
[i.legend() for i in a];
看起來不是完美擬合,不過也差不多了
歡迎掃碼關注我的微信公眾號獲取最新文章