1. 程式人生 > >TensorFlow實現時間序列預測

TensorFlow實現時間序列預測

  常常會碰到各種各樣時間序列預測問題,如商場人流量的預測、商品價格的預測、股價的預測,等等。TensorFlow新引入了一個TensorFlow Time Series庫(以下簡稱為TFTS),它可以幫助在TensorFlow中快速搭建高效能的時間序列預測系統,並提供包括AR、LSTM在內的多個模型。

時間序列問題

  一般而言,時間序列資料抽象為兩部分:觀察的時間點和觀察的值(以商品價格為例,某年一月的價格為120元,二月的價格為130元,三月的價格為135元,四月的價格為132元。那麼觀察的時間點可以看作是1,2,3,4,而在各時間點上觀察到的資料的值為120,130,135,132)。觀察的時間點可以不連續,比如二月的資料有缺失,那麼實際的觀察時間點為1,3,4,對應的資料為120,135,132。所謂時間序列預測,是指預測某些未來的時間點上(如5,6)資料的值應該是多少。

  TFTS庫按照時間點+觀察值的方式對時間序列問題進行抽象包裝。觀察的時間點用“times”表示,對應的值用“values”表示。在訓練模型時,輸入資料需要同時具有times和values兩個欄位;在預測時,需要給定一些初始的數值,以及需要預測的時間點times。

讀入時間序列資料

  在訓練模型之前,需要將時間序列資料讀入成Tensor的形式。TFTS庫中提供了兩個方便的讀取器

  • NumpyReader:用於從Numpy陣列中讀入資料
  • CSVReader:用於從CSV檔案中讀入資料

從Numpy陣列中讀取時間序列

  匯入需要的包及函式

import numpy as np
import matplotlib

matplotlib.use('agg')
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.contrib.timeseries.python.timeseries import NumpyReader

  接著,利用np.sin生成一個實驗用的時間序列資料。該時間序列資料實際上是在正弦曲線上加入了上升的趨勢和一些隨機的噪聲:

x = np.array(range(1000))
noise = np.random.uniform(-0.2, 0.2, 1000)
y = np.sin(np.pi * x / 100) + x / 200. + noise
plt.plot(x, y)
plt.savefig('timeseries_y.jpg')

橫座標對應變數“x”,縱座標對應變數“y”,它們分別對應之前提到過的“觀察的時間點”和“觀察到的值”。TFTS讀入x和y的方式非常簡單,請看下面的程式碼:

data = {
    tf.contrib.timeseries.TrainEvalFeatures.TIMES: x,
    tf.contrib.timeseries.TrainEvalFeatures.VALUES: y,
}

reader = NumpyReader(data)

  首先把x和y變成Python中的字典(變數data)。上面的定義直接寫成“data={‘times':x, ‘values':y}”也是可以的。寫成比較複雜的形式是為了和原始碼中的寫法保持一致。

  得到的reader有一個read_full()方法,它的返回值是時間序列對應的Tensor,可以用下面的程式碼進行試驗:

with tf.Session() as sess:
    full_data = reader.read_full()
    # 呼叫read_full方法會生成讀取佇列
    # 要用tf.train.start_queue_runners啟動佇列才能正常進行讀取
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    print(sess.run(full_data))
    coord.request_stop()

  在訓練時,通常不會使用整個資料集進行訓練,而是採用batch的形式。從reader出發,建立batch資料的方法也很簡單:

train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
    reader, batch_size=2, window_size=10)

  tf.contrib.timeseries.RandomWindowInputFn會在reader的所有資料中,隨機選取視窗長度為window_size的序列,幷包裝成batch_size大小的batch資料。換句話說,一個batch內共有batch_size個序列,每個序列的長度為window_size。

  以batch_size=2, window_size=10為例,可以打印出一個batch的資料:

with tf.Session() as sess:
    batch_data = train_input_fn.create_batch()
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    one_batch = sess.run(batch_data[0])
    coord.request_stop()

print('one_batch_data:', one_batch)
# one_batch_data: {'times': array([[11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
#        [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]]), 'values': array([[[0.33901882],
#         [0.29966548],
#         [0.64006627],
#         [0.35204604],
#         [0.66049626],
#         [0.57470108],
#         [0.68309054],
#         [0.46613038],
#         [0.60309193],
#         [0.84166497]],
# 
#        [[0.77312242],
#         [0.82185951],
#         [0.71022706],
#         [0.63987861],
#         [0.7011966 ],
#         [0.84051192],
#         [1.05796465],
#         [0.92981324],
#         [1.0542786 ],
#         [0.89828743]]])}

  原先的資料長度為1000的時間序列(x=np.array(range(1000))),使用tf.contrib.timeseries.RandomWindowInputFn,並指定window_size=10, batch_size=2的功能是在這長度為1000的時間序列中,隨機選取長度為10的序列,並在每個batch裡包含兩個這樣的序列。這也可以從打印出的資料中看出來。

  使用tf.contrib.timeseries.RandomWindowInputFn返回的train_input_fn可以進行訓練了。這是在TFTS中讀入Numpy陣列時間序列的基本方式。下面介紹如何讀入CSV格式的資料。

從CSV檔案中讀取時間序列

  有時,時間序列資料是存在CSV檔案中的。當然可以將其先讀入為Numpy陣列,再使用之前的方法處理。更方便的做法是使用tf.contrib.timeseries.CSVReader讀入。資料檔案period_trend.csv

  假設CSV檔案的時間序列資料的形式為: 

1,-0.6656603714
2,-0.1164380359
3,0.7398626488
4,0.7368633029
5,0.2289480898
...

  CSV檔案的第一列為時間點,第二列為該時間點上觀察到的值。將其讀入的方法為:

import tensorflow as tf

csv_file_name = './period_trend.csv'
reader = tf.contrib.timeseries.CSVReader(csv_file_name)

  實際讀入的程式碼只有一行,直接使用函式tf.contrib.timeseries.CSVReader得到了reader。將reader中所有資料打印出來的方法和之前是一樣的:

with tf.Session() as sess:
    data = reader.read_full()
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    print(sess.run(data))
    coord.request_stop()

  從reader出發,建立batch資料的train_input_fn的方法也完全相同:

train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(reader, batch_size=4, window_size=16)

  最後,可以打印出兩個batch的資料進行測試:

with tf.Session() as sess:
    data = train_input_fn.create_batch()
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    batch1 = sess.run(data[0])
    batch2 = sess.run(data[0])
    coord.request_stop()

print('batch1:', batch1)
print('batch2:', batch2)

  以上是TFTS庫中資料的讀取方式。總的來說,從Numpy陣列或者CSV檔案出發構造一個reader,再利用reader生成batch資料。最後得到的Tensor為train_input_fn,這個train_input_fn會被當作訓練時的輸入。

使用AR模型預測時間序列

AR模型的訓練

  自迴歸模型(Autoregressive model,簡稱為AR模型)是統計學上處理時間序列模型的基本方法之一。TFTS中已經實現了一個自迴歸模型,我們只需要對其進行呼叫即可使用。

我們先定義出一個train_input_fn

x = np.array(range(1000))
noise = np.random.uniform(-0.2, 0.2, 1000)
y = np.sin(np.pi * x / 100) + x / 200. + noise
plt.plot(x, y)
plt.savefig('timeseries_y.jpg')

data = {
    tf.contrib.timeseries.TrainEvalFeatures.TIMES: x,
    tf.contrib.timeseries.TrainEvalFeatures.VALUES: y,
}

reader = NumpyReader(data)

train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
    reader, batch_size=16, window_size=40)

使用的時間序列資料如圖所示。

  定義AR模型:

ar = tf.contrib.timeseries.ARRegressor(
    periodicities=200, input_window_size=30, output_window_size=10,
    num_features=1,
    loss=tf.contrib.timeseries.ARModel.NORMAL_LIKELIHOOD_LOSS)

引數:

  • periodicities:序列的規律性週期。在定義資料時使用的語句是“y=np.sin(np.pi * x /100)+x /200.+noise”,因此週期為200
  • input_window_size:模型每次輸入的值
  • output_window_size:模型每次輸出的值
  • num_features:表示在一個時間點上觀察到的數的維度。這裡每一步都是一個單獨的值,所以num_features=1
  • loss:指定採取哪一種損失,NORMAL_LIKELIHOOD_LOSS 或 SQUARED_LOSS
  • model_dir:模型訓練好後儲存的地址,如果不指定的話,會隨機分配一個臨時地址

  input_window_size和output_window_size加起來必須等於train_input_fn中總的window_size。總的window_size為40, input_window_size為30,output_window_size為10;也是說,一個batch內每個序列的長度為40,其中前30個數被當作模型的輸入值,後面10個數為這些輸入對應的目標輸出值。

  使用變數ar的train方法可以直接進行訓練:

ar.train(input_fn=train_input_fn, steps=6000)

AR模型的驗證和預測

  TFTS中驗證(evaluation)的含義是:使用訓練好的模型在原先的訓練集上進行計算,由此可以觀察到模型的擬合效果,對應的程式段是:

evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader)
# keys of evaluation: ['covariance', 'loss', 'mean', 'observed', 'start_tuple', 'times', 'global_step']
evaluation = ar.evaluate(input_fn=evaluation_input_fn, steps=1)

  如果想要明白這裡的邏輯,首先要理解之前定義的AR模型:它每次都接收一個長度為30的輸入觀測序列,並輸出長度為10的預測序列。整個訓練集是一個長度為1000的序列,前30個數首先被當作“初始觀測序列”輸入到模型中,由此可以計算出下面10步的預測值。接著又會取30個數進行預測,這30個數中有10個數是前一步的預測值,新得到的預測值又會變成下一步的輸入,依此類推。

  最終得到970個預測值(970=1000-30,因為前30個數是沒辦法進行預測的)。970個預測值被記錄在evaluation[‘mean']中。evaluation還有其他幾個鍵值,如evaluation[‘times']表示evaluation[‘mean']對應的時間點,evaluation[‘loss']表示總的損失等等。

  evaluation[‘start_tuple']會被用於之後的預測中,它相當於最後30步的輸出值和對應的時間點。以此為起點,可以對1000步以後的值進行預測,對應的程式碼為:

(predictions,) = tuple(ar.predict(
    input_fn=tf.contrib.timeseries.predict_continuation_input_fn(
        evaluation, steps=250)))

  這裡的程式碼在1000步之後又向後預測了250個時間點。對應的值儲存在predictions[‘mean']中。可以把觀測到的值、模型擬合的值、預測值用下面的程式碼畫出來:

plt.figure(figsize=(15, 5))
plt.plot(data['times'].reshape(-1), data['values'].reshape(-1), label='origin')
plt.plot(evaluation['times'].reshape(-1), evaluation['mean'].reshape(-1), label='evaluation')
plt.plot(predictions['times'].reshape(-1), predictions['mean'].reshape(-1), label='prediction')
plt.xlabel('time_step')
plt.ylabel('values')
plt.legend(loc=4)
plt.savefig('predict_result.jpg')

  前1000步模型原始觀測值的曲線和模型擬合值非常接近,說明模型擬合得已經比較好了,1000步之後的預測也合情合理。

# coding: utf-8
from __future__ import print_function
import numpy as np
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.contrib.timeseries.python.timeseries import  NumpyReader


def main(_):
    x = np.array(range(1000))
    noise = np.random.uniform(-0.2, 0.2, 1000)
    y = np.sin(np.pi * x / 100) + x / 200. + noise
    plt.plot(x, y)
    plt.savefig('timeseries_y.jpg')

    data = {
        tf.contrib.timeseries.TrainEvalFeatures.TIMES: x,
        tf.contrib.timeseries.TrainEvalFeatures.VALUES: y,
    }

    reader = NumpyReader(data)

    train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
        reader, batch_size=16, window_size=40)

    ar = tf.contrib.timeseries.ARRegressor(
        periodicities=200, input_window_size=30, output_window_size=10,
        num_features=1,
        loss=tf.contrib.timeseries.ARModel.NORMAL_LIKELIHOOD_LOSS)

    ar.train(input_fn=train_input_fn, steps=6000)

    evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader)
    # keys of evaluation: ['covariance', 'loss', 'mean', 'observed', 'start_tuple', 'times', 'global_step']
    evaluation = ar.evaluate(input_fn=evaluation_input_fn, steps=1)

    (predictions,) = tuple(ar.predict(
        input_fn=tf.contrib.timeseries.predict_continuation_input_fn(
            evaluation, steps=250)))

    plt.figure(figsize=(15, 5))
    plt.plot(data['times'].reshape(-1), data['values'].reshape(-1), label='origin')
    plt.plot(evaluation['times'].reshape(-1), evaluation['mean'].reshape(-1), label='evaluation')
    plt.plot(predictions['times'].reshape(-1), predictions['mean'].reshape(-1), label='prediction')
    plt.xlabel('time_step')
    plt.ylabel('values')
    plt.legend(loc=4)
    plt.savefig('predict_result.jpg')


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    tf.app.run()
示例完整程式碼

使用LSTM模型預測時間序列

  為了使用LSTM模型,需要先使用TFTS庫對其進行定義。

單變數時間序列預測

  同樣,用函式加噪聲的方法模擬生成時間序列資料:

x = np.array(range(1000))
noise = np.random.uniform(-0.2, 0.2, 1000)
y = np.sin(np.pi * x / 50) + np.cos(np.pi * x / 50) + np.sin(np.pi * x / 25) + noise

data = {
    tf.contrib.timeseries.TrainEvalFeatures.TIMES: x,
    tf.contrib.timeseries.TrainEvalFeatures.VALUES: y,
}

reader = NumpyReader(data)

train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
    reader, batch_size=4, window_size=100)

  得到y和x後,使用NumpyReader讀入為Tensor形式,接著用tf.contrib.timeseries.RandomWindowInputFn將其變為batch訓練資料。一個batch中有4個隨機選取的序列,每個序列的長度為100。

  接下來定義一個LSTM模型:

estimator = ts_estimators.TimeSeriesRegressor(
        model=_LSTMModel(num_features=1, num_units=128),
        optimizer=tf.train.AdamOptimizer(0.001))

  num_features=1表示單變數時間序列,即每個時間點上觀察到的量只是一個單獨的數值,num_units=128表示使用隱層為128大小的LSTM模型。

  訓練、驗證和預測的方法都和之前類似。在訓練時,在已有的1000步的觀察量的基礎上向後預測200步:

estimator.train(input_fn=train_input_fn, steps=2000)    # 訓練模型
evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader)  # 測試資料
evaluation = estimator.evaluate(input_fn=evaluation_input_fn, steps=1)  # 得到評估後的資料
# 評估後預測200步資料
(predictions,) = tuple(estimator.predict(
    input_fn=tf.contrib.timeseries.predict_continuation_input_fn(
        evaluation, steps=200)))

將驗證、預測的結果取出並畫成示意圖,畫出的影象會儲存成“predict_result.jpg”檔案:

observed_times = evaluation["times"][0]
observed = evaluation["observed"][0, :, :]
evaluated_times = evaluation["times"][0]
evaluated = evaluation["mean"][0]
predicted_times = predictions['times']
predicted = predictions["mean"]

plt.figure(figsize=(15, 5))
plt.axvline(999, linestyle="dotted", linewidth=4, color='r')
observed_lines = plt.plot(observed_times, observed, label="observation", color="k")
evaluated_lines = plt.plot(evaluated_times, evaluated, label="evaluation", color="g")
predicted_lines = plt.plot(predicted_times, predicted, label="prediction", color="r")
plt.legend(handles=[observed_lines[0], evaluated_lines[0], predicted_lines[0]],
           loc="upper left")
plt.savefig('predict_result.jpg')

預測效果如圖15-4所示,橫座標為時間軸,前1000步是訓練資料,1000~1200步是模型預測的值。

import numpy as np
import tensorflow as tf

from tensorflow.contrib.timeseries.python.timeseries import estimators as ts_estimators
from tensorflow.contrib.timeseries.python.timeseries import model as ts_model
from tensorflow.contrib.timeseries.python.timeseries import NumpyReader

import matplotlib

matplotlib.use("agg")
import matplotlib.pyplot as plt


class _LSTMModel(ts_model.SequentialTimeSeriesModel):
    """A time series model-building example using an RNNCell."""

    def __init__(self, num_units, num_features, dtype=tf.float32):
        """Initialize/configure the model object.
        Note that we do not start graph building here. Rather, this object is a
        configurable factory for TensorFlow graphs which are run by an Estimator.
        Args:
          num_units: The number of units in the model's LSTMCell.
          num_features: The dimensionality of the time series (features per
            timestep).
          dtype: The floating point data type to use.
        """
        super(_LSTMModel, self).__init__(
            # Pre-register the metrics we'll be outputting (just a mean here).
            train_output_names=["mean"],
            predict_output_names=["mean"],
            num_features=num_features,
            dtype=dtype)
        self._num_units = num_units
        # Filled in by initialize_graph()
        self._lstm_cell = None
        self._lstm_cell_run = None
        self._predict_from_lstm_output = None

    def initialize_graph(self, input_statistics):
        """Save templates for components, which can then be used repeatedly.
        This method is called every time a new graph is created. It's safe to start
        adding ops to the current default graph here, but the graph should be
        constructed from scratch.
        Args:
          input_statistics: A math_utils.InputStatistics object.
        """
        super(_LSTMModel, self).initialize_graph(input_statistics=input_statistics)
        self._lstm_cell = tf.nn.rnn_cell.LSTMCell(num_units=self._num_units)
        # Create templates so we don't have to worry about variable reuse.
        self._lstm_cell_run = tf.make_template(
            name_="lstm_cell",
            func_=self._lstm_cell,
            create_scope_now_=True)
        # Transforms LSTM output into mean predictions.
        self._predict_from_lstm_output = tf.make_template(
            name_="predict_from_lstm_output",
            func_=lambda inputs: tf.layers.dense(inputs=inputs, units=self.num_features),
            create_scope_now_=True)

    def get_start_state(self):
        """Return initial state for the time series model."""
        return (
            # Keeps track of the time associated with this state for error checking.
            tf.zeros([], dtype=tf.int64),
            # The previous observation or prediction.
            tf.zeros([self.num_features], dtype=self.dtype),
            # The state of the RNNCell (batch dimension removed since this parent
            # class will broadcast).
            [tf.squeeze(state_element, axis=0)
             for state_element
             in self._lstm_cell.zero_state(batch_size=1, dtype=self.dtype)])

    def _transform(self, data):
        """Normalize data based on input statistics to encourage stable training."""
        mean, variance = self._input_statistics.overall_feature_moments
        return (data - mean) / variance

    def _de_transform(self, data):
        """Transform data back to the input scale."""
        mean, variance = self._input_statistics.overall_feature_moments
        return data * variance + mean

    def _filtering_step(self, current_times, current_values, state, predictions):
        """Update model state based on observations.
        Note that we don't do much here aside from computing a loss. In this case
        it's easier to update the RNN state in _prediction_step, since that covers
        running the RNN both on observations (from this method) and our own
        predictions. This distinction can be important for probabilistic models,
        where repeatedly predicting without filtering should lead to low-confidence
        predictions.
        Args:
          current_times: A [batch size] integer Tensor.
          current_values: A [batch size, self.num_features] floating point Tensor
            with new observations.
          state: The model's state tuple.
          predictions: The output of the previous `_prediction_step`.
        Returns:
          A tuple of new state and a predictions dictionary updated to include a
          loss (note that we could also return other measures of goodness of fit,
          although only "loss" will be optimized).
        """
        state_from_time, prediction, lstm_state = state
        with tf.control_dependencies(
                [tf.assert_equal(current_times, state_from_time)]):
            transformed_values = self._transform(current_values)
            # Use mean squared error across features for the loss.
            predictions["loss"] = tf.reduce_mean(
                (prediction - transformed_values) ** 2, axis=-1)
            # Keep track of the new observation in model state. It won't be run
            # through the LSTM until the next _imputation_step.
            new_state_tuple = (current_times, transformed_values, lstm_state)
        return (new_state_tuple, predictions)

    def _prediction_step(self, current_times, state):
        """Advance the RNN state using a previous observation or prediction."""
        _, previous_observation_or_prediction, lstm_state = state
        lstm_output, new_lstm_state = self._lstm_cell_run(
            inputs=previous_observation_or_prediction, state=lstm_state)
        next_prediction = self._predict_from_lstm_output(lstm_output)
        new_state_tuple = (current_times, next_prediction, new_lstm_state)
        return new_state_tuple, {"mean": self._de_transform(next_prediction)}

    def _imputation_step(self, current_times, state):
        """Advance model state across a gap."""
        # Does not do anything special if we're jumping across a gap. More advanced
        # models, especially probabilistic ones, would want a special case that
        # depends on the gap size.
        return state

    def _exogenous_input_step(
            self, current_times, current_exogenous_regressors, state):
        """Update model state based on exogenous regressors."""
        raise NotImplementedError(
            "Exogenous inputs are not implemented for this example.")


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    x = np.array(range(1000))
    noise = np.random.uniform(-0.2, 0.2, 1000)
    y = np.sin(np.pi * x / 50) + np.cos(np.pi * x / 50) + np.sin(np.pi * x / 25) + noise

    data = {
        tf.contrib.timeseries.TrainEvalFeatures.TIMES: x,
        tf.contrib.timeseries.TrainEvalFeatures.VALUES: y,
    }

    reader = NumpyReader(data)

    train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
        reader, batch_size=4, window_size=100)

    estimator = ts_estimators.TimeSeriesRegressor(
        model=_LSTMModel(num_features=1, num_units=128),
        optimizer=tf.train.AdamOptimizer(0.001))

    estimator.train(input_fn=train_input_fn, steps=2000)
    evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader)
    evaluation = estimator.evaluate(input_fn=evaluation_input_fn, steps=1)
    # Predict starting after the evaluation
    (predictions,) = tuple(estimator.predict(
        input_fn=tf.contrib.timeseries.predict_continuation_input_fn(
            evaluation, steps=200)))

    observed_times = evaluation["times"][0]
    observed = evaluation["observed"][0, :, :]
    evaluated_times = evaluation["times"][0]
    evaluated = evaluation["mean"][0]
    predicted_times = predictions['times']
    predicted = predictions["mean"]

    plt.figure(figsize=(15, 5))
    plt.axvline(999, linestyle="dotted", linewidth=4, color='r')
    observed_lines = plt.plot(observed_times, observed, label="observation", color="k")
    evaluated_lines = plt.plot(evaluated_times, evaluated, label="evaluation", color="g")
    predicted_lines = plt.plot(predicted_times, predicted, label="prediction", color="r")
    plt.legend(handles=[observed_lines[0], evaluated_lines[0], predicted_lines[0]],
               loc="upper left")
    plt.savefig('predict_result.jpg')
LSTM單變數完整程式碼

多變數時間序列預測

  所謂多變數時間序列,是指在每個時間點上的觀測量有多個值。在multivariate_periods.csv檔案中,儲存了一個多變數時間序列的資料:

0    0.926906299771    1.99107237682    2.56546245685    3.07914768197    4.04839057867
1    0.108010001864    1.41645361423    2.1686839775    2.94963962176    4.1263503303
2    -0.800567600028    1.0172132907    1.96434754116    2.99885333086    4.04300485864
3    0.0607042871898    0.719540073421    1.9765012584    2.89265588817    4.0951014426
...
99    0.987764008058    1.85581989607    2.84685706149    2.94760204892    6.0212151724

  這個CSV檔案的第一列是觀察時間點,除此之外,每一行還有5個數,表示在這個時間點上觀察到的資料。換句話說,時間序列上每一步都是一個5維的向量。

  使用TFTS讀入該CSV檔案的方法為:

csv_file_name = path.join("./data/multivariate_periods.csv")
reader = tf.contrib.timeseries.CSVReader(
    csv_file_name,
    column_names=((tf.contrib.timeseries.TrainEvalFeatures.TIMES,)
                  + (tf.contrib.timeseries.TrainEvalFeatures.VALUES,) * 5))
train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
    reader, batch_size=4, window_size=32)

  與之前的讀入相比,唯一的區別是column_names引數。它告訴TFTS在CSV檔案中,哪些列表示時間,哪些列表示觀測量。

  接下來定義LSTM模型:

estimator = ts_estimators.TimeSeriesRegressor(
    model=_LSTMModel(num_features=5, num_units=128),
    optimizer=tf.train.AdamOptimizer(0.001))

  區別在於使用num_features=5而不是1,原因在於每個時間點上的觀測量是一個5維向量。

  訓練、驗證、預測及畫圖的程式碼與之前比較類似,最後的執行結果圖所示

使用LSTM預測多變數時間序列

  前100步是訓練資料,一條線代表觀測量在一個維度上的取值。100步之後為預測值。

from os import path
import tensorflow as tf

from tensorflow.contrib.timeseries.python.timeseries import estimators as ts_estimators
from tensorflow.contrib.timeseries.python.timeseries import model as ts_model
import matplotlib

matplotlib.use("agg")
import matplotlib.pyplot as plt


class _LSTMModel(ts_model.SequentialTimeSeriesModel):
    """A time series model-building example using an RNNCell."""

    def __init__(self, num_units, num_features, dtype=tf.float32):
        """Initialize/configure the model object.
        Note that we do not start graph building here. Rather, this object is a
        configurable factory for TensorFlow graphs which are run by an Estimator.
        Args:
          num_units: The number of units in the model's LSTMCell.
          num_features: The dimensionality of the time series (features per
            timestep).
          dtype: The floating point data type to use.
        """
        super(_LSTMModel, self).__init__(
            # Pre-register the metrics we'll be outputting (just a mean here).
            train_output_names=["mean"],
            predict_output_names=["mean"],
            num_features=num_features,
            dtype=dtype)
        self._num_units = num_units
        # Filled in by initialize_graph()
        self._lstm_cell = None
        self._lstm_cell_run = None
        self._predict_from_lstm_output = None

    def initialize_graph(self, input_statistics):
        """Save templates for components, which can then be used repeatedly.
        This method is called every time a new graph is created. It's safe to start
        adding ops to the current default graph here, but the graph should be
        constructed from scratch.
        Args:
          input_statistics: A math_utils.InputStatistics object.
        """
        super(_LSTMModel, self).initialize_graph(input_statistics=input_statistics)
        self._lstm_cell = tf.nn.rnn_cell.LSTMCell(num_units=self._num_units)
        # Create templates so we don't have to worry about variable reuse.
        self._lstm_cell_run = tf.make_template(
            name_="lstm_cell",
            func_=self._lstm_cell,
            create_scope_now_=True)
        # Transforms LSTM output into mean predictions.
        self._predict_from_lstm_output = tf.make_template(
            name_="predict_from_lstm_output",
            func_=lambda inputs: tf.layers.dense(inputs=inputs, units=self.num_features),
            create_scope_now_=True)

    def get_start_state(self):
        """Return initial state for the time series model."""
        return (
            # Keeps track of the time associated with this state for error checking.
            tf.zeros([], dtype=tf.int64),
            # The previous observation or prediction.
            tf.zeros([self.num_features], dtype=self.dtype),
            # The state of the RNNCell (batch dimension removed since this parent
            # class will broadcast).
            [tf.squeeze(state_element, axis=0)
             for state_element
             in self._lstm_cell.zero_state(batch_size=1, dtype=self.dtype)])

    def _transform(self, data):
        """Normalize data based on input statistics to encourage stable training."""
        mean, variance = self._input_statistics.overall_feature_moments
        return (data - mean) / variance

    def _de_transform(self, data):
        """Transform data back to the input scale."""
        mean, variance = self._input_statistics.overall_feature_moments
        return data * variance + mean

    def _filtering_step(self, current_times, current_values, state, predictions):
        """Update model state based on observations.
        Note that we don't do much here aside from computing a loss. In this case
        it's easier to update the RNN state in _prediction_step, since that covers
        running the RNN both on observations (from this method) and our own
        predictions. This distinction can be important for probabilistic models,
        where repeatedly predicting without filtering should lead to low-confidence
        predictions.
        Args:
          current_times: A [batch size] integer Tensor.
          current_values: A [batch size, self.num_features] floating point Tensor
            with new observations.
          state: The model's state tuple.
          predictions: The output of the previous `_prediction_step`.
        Returns:
          A tuple of new state and a predictions dictionary updated to include a
          loss (note that we could also return other measures of goodness of fit,
          although only "loss" will be optimized).
        """
        state_from_time, prediction, lstm_state = state
        with tf.control_dependencies(
                [tf.assert_equal(current_times, state_from_time)]):
            transformed_values = self._transform(current_values)
            # Use mean squared error across features for the loss.
            predictions["loss"] = tf.reduce_mean(
                (prediction - transformed_values) ** 2, axis=-1)
            # Keep track of the new observation in model state. It won't be run
            # through the LSTM until the next _imputation_step.
            new_state_tuple = (current_times, transformed_values, lstm_state)
        return (new_state_tuple, predictions)

    def _prediction_step(self, current_times, state):
        """Advance the RNN state using a previous observation or prediction."""
        _, previous_observation_or_prediction, lstm_state = state
        lstm_output, new_lstm_state = self._lstm_cell_run(
            inputs=previous_observation_or_prediction, state=lstm_state)
        next_prediction = self._predict_from_lstm_output(lstm_output)
        new_state_tuple = (current_times, next_prediction, new_lstm_state)
        return new_state_tuple, {"mean": self._de_transform(next_prediction)}

    def _imputation_step(self, current_times, state):
        """Advance model state across a gap."""
        # Does not do anything special if we're jumping across a gap. More advanced
        # models, especially probabilistic ones, would want a special case that
        # depends on the gap size.
        return state

    def _exogenous_input_step(
            self, current_times, current_exogenous_regressors, state):
        """Update model state based on exogenous regressors."""
        raise NotImplementedError(
            "Exogenous inputs are not implemented for this example.")


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    csv_file_name = path.join("./data/multivariate_periods.csv")
    reader = tf.contrib.timeseries.CSVReader(
        csv_file_name,
        column_names=((tf.contrib.timeseries.TrainEvalFeatures.TIMES,)
                      + (tf.contrib.timeseries.TrainEvalFeatures.VALUES,) * 5))
    train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
        reader, batch_size=4, window_size=32)

    estimator = ts_estimators.TimeSeriesRegressor(
        model=_LSTMModel(num_features=5, num_units=128),
        optimizer=tf.train.AdamOptimizer(0.001))

    estimator.train(input_fn=train_input_fn, steps=200)
    evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader)
    evaluation = estimator.evaluate(input_fn=evaluation_input_fn, steps=1)
    # Predict starting after the evaluation
    (predictions,) = tuple(estimator.predict(
        input_fn=tf.contrib.timeseries.predict_continuation_input_fn(
            evaluation, steps=100)))

    observed_times = evaluation["times"][0]
    observed = evaluation["observed"][0, :, :]
    evaluated_times = evaluation["times"][0]
    evaluated = evaluation["mean"][0]
    predicted_times = predictions['times']
    predicted = predictions["mean"]

    plt.figure(figsize=(15, 5))
    plt.axvline(99, linestyle="dotted", linewidth=4, color='r')
    observed_lines = plt.plot(observed_times, observed, label="observation", color="k")
    evaluated_lines = plt.plot(evaluated_times, evaluated, label="evaluation", color="g")
    predicted_lines = plt.plot(predicted_times, predicted, label="prediction", color="r")
    plt.legend(handles=[observed_lines[0], evaluated_lines[0], predicted_lines[0]],
               loc="upper left")
    plt.savefig('predict_result.jpg')
LSTM多變數完整程式碼

&n