深度有趣 | 29 方言種類分類
對於每一個MFCC特徵都輸出一個概率分佈,然後結合CTC演算法即可實現語音識別
相比之下,語音分類要簡單很多,因為對於整個MFCC特徵序列只需要輸出一個分類結果即可
語音分類和語音識別的區別,可以類比一下文字分類和序列標註的區別
具體實現時,只需要稍微修改一下網路結構即可
資料
使用科大訊飛方言種類識別AI挑戰賽提供的資料,challenge.xfyun.cn/,初賽提供了6種方言,複賽提供了10種方言
每種方言包括30個人每人200條共計6000條訓練資料,以及10個人每人50條共計500條驗證資料
資料以pcm格式提供,可以理解為wav檔案去掉多餘資訊之後,僅保留語音資料的格式
實現
以下以長沙、南昌、上海三種方言資料為例,介紹如何實現語音分類
載入庫
# -*- coding:utf-8 -*- import numpy as np import os from matplotlib import pyplot as plt from mpl_toolkits.axes_grid1 import make_axes_locatable %matplotlib inline from sklearn.utils import shuffle import glob import pickle from tqdm import tqdm from keras.models import Model from keras.preprocessing.sequence import pad_sequences from keras.layers import Input, Activation, Conv1D, Add, Multiply, BatchNormalization, GlobalMaxPooling1D, Dropout from keras.optimizers import Adam from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau from python_speech_features import mfcc import librosa from IPython.display import Audio import wave 複製程式碼
載入pcm檔案,共1W8條訓練資料,1.5K條驗證資料
train_files = glob.glob('data/*/train/*/*.pcm') dev_files = glob.glob('data/*/dev/*/*/*.pcm') print(len(train_files), len(dev_files), train_files[0]) 複製程式碼
整理每條語音資料對應的分類標籤
labels = {'train': [], 'dev': []} for i in tqdm(range(len(train_files))): path = train_files[i] label = path.split('/')[1] labels['train'].append(label) for i in tqdm(range(len(dev_files))): path = dev_files[i] label = path.split('/')[1] labels['dev'].append(label) print(len(labels['train']), len(labels['dev'])) 複製程式碼
定義處理語音、pcm轉wav、視覺化語音的三個函式,由於語音片段長短不一,所以去除少於1s的短片段,對於長片段則切分為不超過3s的片段
mfcc_dim = 13 sr = 16000 min_length = 1 * sr slice_length = 3 * sr def load_and_trim(path, sr=16000): audio = np.memmap(path, dtype='h', mode='r') audio = audio[2000:-2000] audio = audio.astype(np.float32) energy = librosa.feature.rmse(audio) frames = np.nonzero(energy >= np.max(energy) / 5) indices = librosa.core.frames_to_samples(frames)[1] audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0] slices = [] for i in range(0, audio.shape[0], slice_length): s = audio[i: i + slice_length] if s.shape[0] >= min_length: slices.append(s) return audio, slices def pcm2wav(pcm_path, wav_path, channels=1, bits=16, sample_rate=sr): data = open(pcm_path, 'rb').read() fw = wave.open(wav_path, 'wb') fw.setnchannels(channels) fw.setsampwidth(bits // 8) fw.setframerate(sample_rate) fw.writeframes(data) fw.close() def visualize(index, source='train'): if source == 'train': path = train_files[index] else: path = dev_files[index] print(path) audio, slices = load_and_trim(path) print('Duration: %.2f s' % (audio.shape[0] / sr)) plt.figure(figsize=(12, 3)) plt.plot(np.arange(len(audio)), audio) plt.title('Raw Audio Signal') plt.xlabel('Time') plt.ylabel('Audio Amplitude') plt.show() feature = mfcc(audio, sr, numcep=mfcc_dim) print('Shape of MFCC:', feature.shape) fig = plt.figure(figsize=(12, 5)) ax = fig.add_subplot(111) im = ax.imshow(feature, cmap=plt.cm.jet, aspect='auto') plt.title('Normalized MFCC') plt.ylabel('Time') plt.xlabel('MFCC Coefficient') plt.colorbar(im, cax=make_axes_locatable(ax).append_axes('right', size='5%', pad=0.05)) ax.set_xticks(np.arange(0, 13, 2), minor=False); plt.show() wav_path = 'example.wav' pcm2wav(path, wav_path) return wav_path Audio(visualize(2)) 複製程式碼
一句長沙話對應的波形和MFCC特徵

整理資料,檢視語音片段的長度分佈,最後得到了18890個訓練片段,1632個驗證片段
X_train = [] X_dev = [] Y_train = [] Y_dev = [] lengths = [] for i in tqdm(range(len(train_files))): path = train_files[i] audio, slices = load_and_trim(path) lengths.append(audio.shape[0] / sr) for s in slices: X_train.append(mfcc(s, sr, numcep=mfcc_dim)) Y_train.append(labels['train'][i]) for i in tqdm(range(len(dev_files))): path = dev_files[i] audio, slices = load_and_trim(path) lengths.append(audio.shape[0] / sr) for s in slices: X_dev.append(mfcc(s, sr, numcep=mfcc_dim)) Y_dev.append(labels['dev'][i]) print(len(X_train), len(X_dev)) plt.hist(lengths, bins=100) plt.show() 複製程式碼

將MFCC特徵進行歸一化
samples = np.vstack(X_train) mfcc_mean = np.mean(samples, axis=0) mfcc_std = np.std(samples, axis=0) print(mfcc_mean) print(mfcc_std) X_train = [(x - mfcc_mean) / (mfcc_std + 1e-14) for x in X_train] X_dev = [(x - mfcc_mean) / (mfcc_std + 1e-14) for x in X_dev] maxlen = np.max([x.shape[0] for x in X_train + X_dev]) X_train = pad_sequences(X_train, maxlen, 'float32', padding='post', value=0.0) X_dev = pad_sequences(X_dev, maxlen, 'float32', padding='post', value=0.0) print(X_train.shape, X_dev.shape) 複製程式碼
對分類標籤進行處理
from sklearn.preprocessing import LabelEncoder from keras.utils import to_categorical le = LabelEncoder() Y_train = le.fit_transform(Y_train) Y_dev = le.transform(Y_dev) print(le.classes_) class2id = {c: i for i, c in enumerate(le.classes_)} id2class = {i: c for i, c in enumerate(le.classes_)} num_class = len(le.classes_) Y_train = to_categorical(Y_train, num_class) Y_dev = to_categorical(Y_dev, num_class) print(Y_train.shape, Y_dev.shape) 複製程式碼
定義產生批資料的迭代器
batch_size = 16 def batch_generator(x, y, batch_size=batch_size): offset = 0 while True: offset += batch_size if offset == batch_size or offset >= len(x): x, y = shuffle(x, y) offset = batch_size X_batch = x[offset - batch_size: offset] Y_batch = y[offset - batch_size: offset] yield (X_batch, Y_batch) 複製程式碼
定義模型並訓練,通過GlobalMaxPooling1D對整個序列的輸出進行降維,從而變成標準的分類任務
epochs = 10 num_blocks = 3 filters = 128 drop_rate = 0.25 X = Input(shape=(None, mfcc_dim,), dtype='float32') def conv1d(inputs, filters, kernel_size, dilation_rate): return Conv1D(filters=filters, kernel_size=kernel_size, strides=1, padding='causal', activation=None, dilation_rate=dilation_rate)(inputs) def batchnorm(inputs): return BatchNormalization()(inputs) def activation(inputs, activation): return Activation(activation)(inputs) def res_block(inputs, filters, kernel_size, dilation_rate): hf = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'tanh') hg = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'sigmoid') h0 = Multiply()([hf, hg]) ha = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh') hs = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh') return Add()([ha, inputs]), hs h0 = activation(batchnorm(conv1d(X, filters, 1, 1)), 'tanh') shortcut = [] for i in range(num_blocks): for r in [1, 2, 4, 8, 16]: h0, s = res_block(h0, filters, 7, r) shortcut.append(s) h1 = activation(Add()(shortcut), 'relu') h1 = activation(batchnorm(conv1d(h1, filters, 1, 1)), 'relu') # batch_size, seq_len, filters h1 = batchnorm(conv1d(h1, num_class, 1, 1)) # batch_size, seq_len, num_class h1 = GlobalMaxPooling1D()(h1) # batch_size, num_class Y = activation(h1, 'softmax') optimizer = Adam(lr=0.01, clipnorm=5) model = Model(inputs=X, outputs=Y) model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy']) checkpointer = ModelCheckpoint(filepath='fangyan.h5', verbose=0) lr_decay = ReduceLROnPlateau(monitor='loss', factor=0.2, patience=1, min_lr=0.000) history = model.fit_generator( generator=batch_generator(X_train, Y_train), steps_per_epoch=len(X_train) // batch_size, epochs=epochs, validation_data=batch_generator(X_dev, Y_dev), validation_steps=len(X_dev) // batch_size, callbacks=[checkpointer, lr_decay]) 複製程式碼
繪製損失函式曲線和正確率曲線,經過10輪的訓練後,訓練集的正確率已經將近100%,而驗證集則不太穩定,大概在89%左右
train_loss = history.history['loss'] valid_loss = history.history['val_loss'] plt.plot(train_loss, label='train') plt.plot(valid_loss, label='valid') plt.legend(loc='upper right') plt.xlabel('Epoch') plt.ylabel('Loss') plt.show() train_acc = history.history['acc'] valid_acc = history.history['val_acc'] plt.plot(train_acc, label='train') plt.plot(valid_acc, label='valid') plt.legend(loc='upper right') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.show() 複製程式碼
驗證集結果不夠好的原因可能是訓練資料不足,雖然一共有1W8條訓練資料,但實際上只有90個說話人
如果說話人更多一些、聲音更多樣一些,模型應該能夠學到各種方言所對應的更為通用的特徵


儲存分類和方言名稱之間的對映,以便後續使用
with open('resources.pkl', 'wb') as fw: pickle.dump([class2id, id2class, mfcc_mean, mfcc_std], fw) 複製程式碼
在單機上載入訓練好的模型,隨機選擇一條語音進行分類
# -*- coding:utf-8 -*- import numpy as np from keras.models import load_model from keras.preprocessing.sequence import pad_sequences import librosa from python_speech_features import mfcc import pickle import wave import glob with open('resources.pkl', 'rb') as fr: [class2id, id2class, mfcc_mean, mfcc_std] = pickle.load(fr) model = load_model('fangyan.h5') paths = glob.glob('data/*/dev/*/*/*.pcm') path = np.random.choice(paths, 1)[0] label = path.split('/')[1] print(label, path) mfcc_dim = 13 sr = 16000 min_length = 1 * sr slice_length = 3 * sr def load_and_trim(path, sr=16000): audio = np.memmap(path, dtype='h', mode='r') audio = audio[2000:-2000] audio = audio.astype(np.float32) energy = librosa.feature.rmse(audio) frames = np.nonzero(energy >= np.max(energy) / 5) indices = librosa.core.frames_to_samples(frames)[1] audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0] slices = [] for i in range(0, audio.shape[0], slice_length): s = audio[i: i + slice_length] slices.append(s) return audio, slices audio, slices = load_and_trim(path) X_data = [mfcc(s, sr, numcep=mfcc_dim) for s in slices] X_data = [(x - mfcc_mean) / (mfcc_std + 1e-14) for x in X_data] maxlen = np.max([x.shape[0] for x in X_data]) X_data = pad_sequences(X_data, maxlen, 'float32', padding='post', value=0.0) print(X_data.shape) prob = model.predict(X_data) prob = np.mean(prob, axis=0) pred = np.argmax(prob) prob = prob[pred] pred = id2class[pred] print('True:', label) print('Pred:', pred, 'Confidence:', prob) 複製程式碼
最後再提一下,既然是對三維tensor做分類,那麼就和文字分類問題極其相似,所以也可以考慮使用BiLSTM之類的其他模型