Keras實現CNN、RNN(基於attention 的雙向RNN)及兩者的融合
阿新 • • 發佈:2019-01-29
本文主要採用CNN,RNN對時序資料進行二分類
CNN處理時序資料的二分類
model = Sequential() model.add(Conv1D(128, 3, padding='same', input_shape=(max_lenth, max_features))) model.add(BatchNormalization()) model.add(Activation('relu')) model.add(Conv1D(256, 3)) model.add(BatchNormalization()) model.add(Activation('relu')) model.add(Conv1D(128, 3)) model.add(BatchNormalization()) model.add(Activation('relu')) model.add(GlobalAveragePooling1D()) #時序的時間維度上全域性池化 model.add(Dropout(0.5)) model.add(Dense(1)) model.add(Activation('sigmoid')) model.compile(loss='binary_crossentropy', optimizer='adam', metrics=[metrics.binary_crossentropy])
雙層RNN處理時序資料的二分類
import numpy as np import tensorflow as tf from keras.models import Sequential from keras.layers import Dense, Dropout from keras.layers import GRU import keras from keras import regularizers from keras.callbacks import EarlyStopping from sklearn.metrics import roc_auc_score from sklearn.cross_validation import StratifiedKFold from keras import backend as K import my_callbacks from keras.layers.normalization import BatchNormalization import keras.backend.tensorflow_backend as KTF max_lenth = 23 max_features = 12 training_iters = 2000 train_batch_size = 800 test_batch_size = 800 n_hidden_units = 64 lr = 0.0003 cb = [ my_callbacks.RocAucMetricCallback(), # include it before EarlyStopping! EarlyStopping(monitor='roc_auc_val',patience=200, verbose=2,mode='max') ] model = Sequential() model.add(keras.layers.core.Masking(mask_value=0., input_shape=(max_lenth, max_features))) model.add(GRU(units=n_hidden_units,activation='selu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal', bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01), bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None, bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=True,#多層時需設定為true return_state=False, go_backwards=False, stateful=False, unroll=False)) #input_shape=(max_lenth, max_features), model.add(GRU(units=n_hidden_units,activation='selu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal', bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01), bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None, bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=False, return_state=False, go_backwards=False, stateful=False, unroll=False)) #input_shape=(max_lenth, max_features), model.add(Dropout(0.5)) model.add(Dense(1)) model.add(BatchNormalization()) model.add(keras.layers.core.Activation('sigmoid')) model.compile(loss='binary_crossentropy', optimizer='adam', metrics=[metrics.binary_crossentropy]) model.fit(x_train, y_train, batch_size=train_batch_size, epochs=training_iters, verbose=2, callbacks=cb,validation_split=0.2, shuffle=True, class_weight=class_weight, sample_weight=None, initial_epoch=0) pred_y = model.predict(x_test, batch_size=test_batch_size) score = roc_auc_score(y_test,pred_y)
加入attention機制的 雙向RNN(attention即對所有時刻的輸出乘上對應的權重相加作為最終輸出)
from keras import backend as K from keras.layers import Layer from keras import initializers, regularizers, constraints def dot_product(x, kernel): """ Wrapper for dot product operation, in order to be compatible with both Theano and Tensorflow Args: x (): input kernel (): weights Returns: """ if K.backend() == 'tensorflow': return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1) else: return K.dot(x, kernel) class AttentionWithContext(Layer): """ Attention operation, with a context/query vector, for temporal data. Supports Masking. Follows the work of Yang et al. [https://www.cs.cmu.edu/~diyiy/docs/naacl16.pdf] "Hierarchical Attention Networks for Document Classification" by using a context vector to assist the attention # Input shape 3D tensor with shape: `(samples, steps, features)`. # Output shape 2D tensor with shape: `(samples, features)`. How to use: Just put it on top of an RNN Layer (GRU/LSTM/SimpleRNN) with return_sequences=True. The dimensions are inferred based on the output shape of the RNN. Note: The layer has been tested with Keras 2.0.6 Example: model.add(LSTM(64, return_sequences=True)) model.add(AttentionWithContext()) # next add a Dense layer (for classification/regression) or whatever... """ def __init__(self, W_regularizer=None, u_regularizer=None, b_regularizer=None, W_constraint=None, u_constraint=None, b_constraint=None, bias=True, **kwargs): self.supports_masking = True self.init = initializers.get('glorot_uniform') self.W_regularizer = regularizers.get(W_regularizer) self.u_regularizer = regularizers.get(u_regularizer) self.b_regularizer = regularizers.get(b_regularizer) self.W_constraint = constraints.get(W_constraint) self.u_constraint = constraints.get(u_constraint) self.b_constraint = constraints.get(b_constraint) self.bias = bias super(AttentionWithContext, self).__init__(**kwargs) def build(self, input_shape): assert len(input_shape) == 3 self.W = self.add_weight((input_shape[-1], input_shape[-1],), initializer=self.init, name='{}_W'.format(self.name), regularizer=self.W_regularizer, constraint=self.W_constraint) if self.bias: self.b = self.add_weight((input_shape[-1],), initializer='zero', name='{}_b'.format(self.name), regularizer=self.b_regularizer, constraint=self.b_constraint) self.u = self.add_weight((input_shape[-1],), initializer=self.init, name='{}_u'.format(self.name), regularizer=self.u_regularizer, constraint=self.u_constraint) super(AttentionWithContext, self).build(input_shape) def compute_mask(self, input, input_mask=None): # do not pass the mask to the next layers return None def call(self, x, mask=None): uit = dot_product(x, self.W) if self.bias: uit += self.b uit = K.tanh(uit) ait = dot_product(uit, self.u) a = K.exp(ait) # apply mask after the exp. will be re-normalized next if mask is not None: # Cast the mask to floatX to avoid float64 upcasting in theano a *= K.cast(mask, K.floatx()) # in some cases especially in the early stages of training the sum may be almost zero # and this results in NaN's. A workaround is to add a very small positive number ε to the sum. # a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx()) a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx()) a = K.expand_dims(a) weighted_input = x * a return K.sum(weighted_input, axis=1) def compute_output_shape(self, input_shape): return input_shape[0], input_shape[-1]
model = Sequential()
model.add(keras.layers.core.Masking(mask_value=0., input_shape=(max_lenth, max_features)))
model.add(Bidirectional(GRU(units=n_hidden_units,activation='selu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal',
bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01),
bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=True,#多層時需設定為true
return_state=False, go_backwards=False, stateful=False, unroll=False),merge_mode='concat')) #input_shape=(max_lenth, max_features),
model.add(Bidirectional(GRU(units=n_hidden_units,activation='selu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal',
bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01),
bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=True,
return_state=False, go_backwards=False, stateful=False, unroll=False),merge_mode='concat')) #input_shape=(max_lenth, max_features),
model.add(Dropout(0.5))
model.add(AttentionWithContext())
model.add(Dense(1))
model.add(BatchNormalization())
model.add(keras.layers.core.Activation('sigmoid'))
model.compile(loss='binary_crossentropy',
optimizer='adam',
metrics=[metrics.binary_crossentropy])
CNN-RNN融合
class NonMasking(Layer):
def __init__(self, **kwargs):
self.supports_masking = True
super(NonMasking, self).__init__(**kwargs)
def build(self, input_shape):
input_shape = input_shape
def compute_mask(self, input, input_mask=None):
# do not pass the mask to the next layers
return None
def call(self, x, mask=None):
return x
def get_output_shape_for(self, input_shape):
return input_shape
model_left = Sequential()
model_left.add(keras.layers.core.Masking(mask_value=0., input_shape=(max_lenth, max_features))) #解決不同長度的序列問題
model_left.add(GRU(units=left_hidden_units,activation='relu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal',
bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01),
bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=True,#多層時需設定為true
return_state=False, go_backwards=False, stateful=False, unroll=False))
model_left.add(GRU(units=left_hidden_units,activation='relu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal',
bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01),
bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=True,
return_state=False, go_backwards=False, stateful=False, unroll=False))
model_left.add(NonMasking()) #Flatten()不支援masking,此處用於unmask
model_left.add(Flatten())
## FCN
model_right = Sequential()
model_right.add(Conv1D(128, 3, padding='same', input_shape=(max_lenth, max_features)))
model_right.add(BatchNormalization())
model_right.add(Activation('relu'))
model_right.add(Conv1D(256, 3))
model_right.add(BatchNormalization())
model_right.add(Activation('relu'))
model_right.add(Conv1D(128, 3))
model_right.add(BatchNormalization())
model_right.add(Activation('relu'))
model_right.add(GlobalAveragePooling1D())
model_right.add(Reshape((1,1,-1)))
model_right.add(Flatten())
model = Sequential()
model.add(Merge([model_left,model_right], mode='concat'))
model.add(Dense(128))
model.add(Dropout(0.5))
model.add(Dense(1))
model.add(BatchNormalization())
model.add(Activation('sigmoid'))
model.compile(loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy'])
model.fit([left_x_train,right_x_train], y_train, batch_size=train_batch_size, epochs=training_iters, verbose=2,
callbacks=[cb],validation_split=0.2,
shuffle=True, class_weight=None, sample_weight=None, initial_epoch=0)
pred_y = model.predict([left_x_test,right_x_test], batch_size=test_batch_size)
score = roc_auc_score(y_test,pred_y)