1. 程式人生 > >keras 使用卷積神經網路進行序列處理

keras 使用卷積神經網路進行序列處理

本文主要介紹使用卷積神經網路進行序列處理。

下面是示例程式碼:


# coding: utf-8

# In[2]:


'''
使用convnet進行序列處理:
在Keras中,通過`Conv1D`層使用1D convnet,它具有與`Conv2D`非常相似的介面。
它需要具有shape`(樣本,時間,特徵)的3D張量輸入,並且還返回類似形狀的3D張量。
卷積視窗是時間軸上的1D視窗,輸入張量中的軸1。
構建一個簡單的2層1D convnet,並將其應用於IMDB情感分類任務。
這是獲取和預處理資料的程式碼
'''
from  keras.datasets import imdb
from keras.preprocessing import sequence

max_features = 10000  # 作為特徵的單詞數量
maxlen = 500  # 之後的文字全部截斷

print('Loading data ...')


# In[5]:


(x_train, y_train), (x_test, y_test) = imdb .load_data(num_words=max_features)
print(len(x_train), 'train sequences')
print(len(x_test), 'test sequences')


# In[8]:


print('Pad sequences (samples x time)')
x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)
print('x_train shape', x_train.shape)
print('x_test shape', x_test.shape)


# In[11]:


'''
1D convnets的結構:它們由一堆`Conv1D`和`MaxPooling1D`層組成,
最終以全域性池層或`Flatten`結尾。 圖層,將3D輸出轉換為2D輸出,
允許將一個或多個“Dense”層新增到模型中,以進行分類或迴歸。
但是,一個不同之處在於我們可以負擔得起使用帶有1D網路的更大卷積視窗。 
實際上,對於2D卷積層,3×3卷積視窗包含3 * 3 = 9個特徵向量,但是對於
1D卷積層,大小為3的卷積視窗將僅包含3個特徵向量。 因此,
可以輕鬆地提供尺寸為7或9的1D卷積視窗。這是IMDB資料集的示例1D convnet:
'''
from keras.models import Sequential
from keras import layers
from keras.optimizers import RMSprop

model = Sequential()
model.add(layers.Embedding(max_features, 128, input_length=maxlen))
model.add(layers.Conv1D(32, 7, activation='relu'))
model.add(layers.MaxPooling1D(5))
model.add(layers.Conv1D(32, 7, activation='relu'))
model.add(layers.GlobalMaxPooling1D())
model.add(layers.Dense(1))

model.summary()


# In[15]:


model.compile(optimizer=RMSprop(lr=1e-4),
             loss='binary_crossentropy',
             metrics=['acc'])
history = model.fit(x_train, y_train, 
         epochs=10,
         batch_size=128,
         validation_split=0.2)


# In[ ]:


'''
以下是訓練和驗證結果:驗證準確性略低於LSTM,但執行時間更快,無論是在CPU還是GPU上。
重新訓練此模型,並在測試集上執行它。 這是一個令人信服的證明,一維訊號傳輸可以在字
級情緒分類任務上為迴圈網路提供快速,廉價的替代方案。
'''
import matplotlib.pyplot as plt

acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(len(acc))

plt.plot(epochs, acc, 'bo', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()

plt.figure()

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()


# In[20]:


'''
結合CNN和RNN來處理長序列由於1D convnets獨立處理input patches,
因此它們對時間步長的順序(超出區域性尺度,卷積視窗的大小)不敏感,
與RNN不同。 當然,為了能夠識別長期模式,可以堆疊許多卷積層和彙集層,
從而導致上層“看到”原始輸入的長塊 - 但這仍然是一個相當弱的方式 誘導順序敏感性。
證明這一弱點的一種方法是溫度預測問題的1D輪詢,其中順序敏感性是產生
良好預測的關鍵。 
'''
import os
import numpy as np

data_dir = 'jena_climate_2009_2016.csv'

f = open(data_dir)
data = f.read()
f.close()

lines = data.split('\n')
header = lines[0].split(',')
lines = lines[1:]

float_data = np.zeros((len(lines), len(header) - 1))
for i, line in enumerate(lines):
    values = [float(x) for x in line.split(',')[1:]]
    float_data[i, :] = values

mean = float_data[: 200000].mean(axis=0)
float_data -= mean
std = float_data[: 200000].std(axis=0)
float_data /=  std

def generator(data, lookback, delay, min_index, max_index, 
             shuffle=False, batch_size=128, step=6):
    if max_index is None:
        max_index = len(data) - delay - 1
    i = min_index + lookback
    while 1:
        if shuffle:
            rows = np.random.randint(
            min_index + lookback, max_index, size=batch_size)
        else:
            if i + batch_size >= max_index:
                i = min_index + lookback
            rows = np.arange(i, min(i + batch_size, max_index))
            i += len(rows)
        
        samples = np.zeros((len(rows), 
                           lookback // step,
                           data.shape[-1]))
        targets = np.zeros((len(rows), ))
        for j, row in enumerate(rows):
            indices = range(rows[j] - lookback, rows[j], step)
            samples[j] = data[indices]
            targets[j] = data[rows[j] + delay][1]
        yield samples, targets
    

lookback = 1440
step = 6
delay = 144
batch_size = 128

train_gen = generator(float_data,
                     lookback=lookback,
                     delay=delay,
                     min_index=0,
                     max_index=200000,
                     shuffle=True,
                     step=step,
                     batch_size=batch_size)
val_gen = generator(float_data,
                   lookback=lookback,
                   min_index=200001,
                   max_index=300000,
                   step=step,
                   batch_size=batch_size)
test_gen = generator(float_data,
                    lookback=lookback,
                    delay=delay,
                    min_index=300001,
                    max_index=None,
                    step=step,
                    batch_size=batch_size)

# 從val_gen`中抽取多少步驟以檢視整個驗證集:
val_steps = (300000 - 200001 - lookback) // batch_size

# 從‘test_val’中抽取多少以檢視整個測試集
test_steps = (len(float_data) - 300001 - lookback) // batch_size


# In[ ]:


from keras.models import Sequential
from keras import layers
from keras.optimizers import RMSprop

model = Sequential()
model.add(layers.Conv1D(32, 5, activation='relu',
                       input_shape=(None, float_data.shape[-1])))
model.add(layers.MaxPooling1D(3))
model.add(layers.Conv1D(32, 5, activation='relu'))
model.add(layers.MaxPool1D(3))
model.add(layers.Conv1D(32, 5, activation='relu'))
model.add(layers.GlobalMaxPool1D())
model.add(layers.Dense(1))

model.compile(optimizer=RMSprop(), loss='mae')
history = model.fit_generator(train_gen,
                             steps_per_epoch=500,
                             epochs=5,  # 推薦訓練20輪左右
                             validation_data=val_gen,
                             validation_steps=val_steps)


# In[ ]:


import matplotlib.pyplot as plt

loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(len(loss))

plt.figure()

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()


# In[ ]:


step = 3
lookback = 720  # Unchanged
delay = 144 # Unchanged

train_gen = generator(float_data,
                      lookback=lookback,
                      delay=delay,
                      min_index=0,
                      max_index=200000,
                      shuffle=True,
                      step=step)
val_gen = generator(float_data,
                    lookback=lookback,
                    delay=delay,
                    min_index=200001,
                    max_index=300000,
                    step=step)
test_gen = generator(float_data,
                     lookback=lookback,
                     delay=delay,
                     min_index=300001,
                     max_index=None,
                     step=step)
val_steps = (300000 - 200001 - lookback) // 128
test_steps = (len(float_data) - 300001 - lookback) // 128


# In[ ]:



model = Sequential()
model.add(layers.Conv1D(32, 5, activation='relu',
                        input_shape=(None, float_data.shape[-1])))
model.add(layers.MaxPooling1D(3))
model.add(layers.Conv1D(32, 5, activation='relu'))
model.add(layers.GRU(32, dropout=0.1, recurrent_dropout=0.5))
model.add(layers.Dense(1))

model.summary()

model.compile(optimizer=RMSprop(), loss='mae')
history = model.fit_generator(train_gen,
                              steps_per_epoch=500,
                              epochs=20,
                              validation_data=val_gen,
                              validation_steps=val_steps)


# In[ ]:


loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(len(loss))

plt.figure()

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()