深度有趣 | 28 自動語音識別
介紹自動語音識別(Automatic Speech Recognition,ASR)的原理,並用WaveNet實現。
原理
ASR的輸入是語音片段,輸出是對應的文字內容
使用深度神經網路(Deep Neural Networks,DNN)實現ASR的一般流程如下

- 從原始語音到聲學特徵
- 將聲學特徵輸入到神經網路,輸出對應的概率
- 根據概率輸出文字序列
一種常用的聲學特徵是梅爾頻率倒譜系數(Mel Frequency Cepstral Coefficents,MFCC), ofollow,noindex">www.practicalcryptography.com/miscellaneo…
將原始語音切分成小的片段後,根據每個片段計算對應的MFCC特徵,即可得到一個二維陣列
其中第一個維度為小片段的個數,原始語音越長,第一個維度也越大,第二個維度為MFCC特徵的維度
得到原始語音的數值表示後,就可以使用WaveNet實現ASR
WaveNet模型結構如下所示,主要使用了多層因果空洞卷積(Causal Dilated Convolution)和Skip Connections

由於MFCC特徵為一維序列,所以使用Conv1D進行卷積
因果是指,卷積的輸出只和當前位置之前的輸入有關,即不使用未來的特徵,可以理解為將卷積的位置向前偏移

空洞是指,卷積是跳躍進行的,經過多次堆疊後可以有效地擴大感受野,從而學習到長序列之間的依賴
最後一層卷積的特徵圖個數和字典大小相同,經過softmax處理之後,每一個小片段對應的MFCC都能得到在整個字典上的概率分佈
但小片段的個數一般要大於文字內容中字的個數,即使是同一句話,每個字的持續時間和發音輕重,字之間的停頓時間,也都有無數種可能的變化
在之前的中文分詞中,模型輸出的概率序列和標籤序列的長度一致,而在ASR中則不一樣,類似的問題還有光學字元識別(Optical Character Recognition,OCR)等
這裡使用CTC(Connectionist temporal classification)演算法來計算損失函式, zhuanlan.zhihu.com/p/36488476
資料
使用以下資料,www.openslr.org/18/,包括13388條中文語音檔案以及對應的文字標註
實現
用到一些庫,沒有則安裝
pip install python_speech_features librosa 複製程式碼
程式碼執行過程中如果出現 NoBackendError
,執行以下命令安裝 ffmpeg
conda install -c conda-forge ffmpeg 複製程式碼
載入庫
# -*- coding: utf-8 -*- from keras.models import Model from keras.layers import Input, Activation, Conv1D, Lambda, Add, Multiply, BatchNormalization from keras.optimizers import Adam, SGD from keras import backend as K from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau import numpy as np import matplotlib.pyplot as plt from mpl_toolkits.axes_grid1 import make_axes_locatable %matplotlib inline import random import pickle import glob from tqdm import tqdm import os from python_speech_features import mfcc import scipy.io.wavfile as wav import librosa from IPython.display import Audio 複製程式碼
載入文字標註路徑並檢視
text_paths = glob.glob('data/*.trn') total = len(text_paths) print(total) with open(text_paths[0], 'r', encoding='utf8') as fr: lines = fr.readlines() print(lines) 複製程式碼
提取文字標註和語音檔案路徑,保留中文並去掉空格
texts = [] paths = [] for path in text_paths: with open(path, 'r', encoding='utf8') as fr: lines = fr.readlines() line = lines[0].strip('\n').replace(' ', '') texts.append(line) paths.append(path.rstrip('.trn')) print(paths[0], texts[0]) 複製程式碼
MFCC特徵保留13維,定義載入語音檔案並去掉兩端靜音的函式,以及視覺化語音檔案的函式
mfcc_dim = 13 def load_and_trim(path): audio, sr = librosa.load(path) energy = librosa.feature.rmse(audio) frames = np.nonzero(energy >= np.max(energy) / 5) indices = librosa.core.frames_to_samples(frames)[1] audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0] return audio, sr def visualize(index): path = paths[index] text = texts[index] print('Audio Text:', text) audio, sr = load_and_trim(path) plt.figure(figsize=(12, 3)) plt.plot(np.arange(len(audio)), audio) plt.title('Raw Audio Signal') plt.xlabel('Time') plt.ylabel('Audio Amplitude') plt.show() feature = mfcc(audio, sr, numcep=mfcc_dim, nfft=551) print('Shape of MFCC:', feature.shape) fig = plt.figure(figsize=(12, 5)) ax = fig.add_subplot(111) im = ax.imshow(feature, cmap=plt.cm.jet, aspect='auto') plt.title('Normalized MFCC') plt.ylabel('Time') plt.xlabel('MFCC Coefficient') plt.colorbar(im, cax=make_axes_locatable(ax).append_axes('right', size='5%', pad=0.05)) ax.set_xticks(np.arange(0, 13, 2), minor=False); plt.show() return path Audio(visualize(0)) 複製程式碼
第一條語音檔案對應的原始波形和MFCC特徵如下所示

獲取全部語音檔案對應的MFCC特徵
features = [] for i in tqdm(range(total)): path = paths[i] audio, sr = load_and_trim(path) features.append(mfcc(audio, sr, numcep=mfcc_dim, nfft=551)) print(len(features), features[0].shape) 複製程式碼
將MFCC特徵進行歸一化
samples = random.sample(features, 100) samples = np.vstack(samples) mfcc_mean = np.mean(samples, axis=0) mfcc_std = np.std(samples, axis=0) print(mfcc_mean) print(mfcc_std) features = [(feature - mfcc_mean) / (mfcc_std + 1e-14) for feature in features] 複製程式碼
建立字典,共2883個不同的字
chars = {} for text in texts: for c in text: chars[c] = chars.get(c, 0) + 1 chars = sorted(chars.items(), key=lambda x: x[1], reverse=True) chars = [char[0] for char in chars] print(len(chars), chars[:100]) char2id = {c: i for i, c in enumerate(chars)} id2char = {i: c for i, c in enumerate(chars)} 複製程式碼
劃分訓練資料和測試資料,定義產生批資料的函式
data_index = np.arange(total) np.random.shuffle(data_index) train_size = int(0.9 * total) test_size = total - train_size train_index = data_index[:train_size] test_index = data_index[train_size:] X_train = [features[i] for i in train_index] Y_train = [texts[i] for i in train_index] X_test = [features[i] for i in test_index] Y_test = [texts[i] for i in test_index] batch_size = 16 def batch_generator(x, y, batch_size=batch_size): offset = 0 while True: offset += batch_size if offset == batch_size or offset >= len(x): data_index = np.arange(len(x)) np.random.shuffle(data_index) x = [x[i] for i in data_index] y = [y[i] for i in data_index] offset = batch_size X_data = x[offset - batch_size: offset] Y_data = y[offset - batch_size: offset] X_maxlen = max([X_data[i].shape[0] for i in range(batch_size)]) Y_maxlen = max([len(Y_data[i]) for i in range(batch_size)]) X_batch = np.zeros([batch_size, X_maxlen, mfcc_dim]) Y_batch = np.ones([batch_size, Y_maxlen]) * len(char2id) X_length = np.zeros([batch_size, 1], dtype='int32') Y_length = np.zeros([batch_size, 1], dtype='int32') for i in range(batch_size): X_length[i, 0] = X_data[i].shape[0] X_batch[i, :X_length[i, 0], :] = X_data[i] Y_length[i, 0] = len(Y_data[i]) Y_batch[i, :Y_length[i, 0]] = [char2id[c] for c in Y_data[i]] inputs = {'X': X_batch, 'Y': Y_batch, 'X_length': X_length, 'Y_length': Y_length} outputs = {'ctc': np.zeros([batch_size])} yield (inputs, outputs) 複製程式碼
定義訓練引數和模型結構並開始訓練
epochs = 50 num_blocks = 3 filters = 128 X = Input(shape=(None, mfcc_dim,), dtype='float32', name='X') Y = Input(shape=(None,), dtype='float32', name='Y') X_length = Input(shape=(1,), dtype='int32', name='X_length') Y_length = Input(shape=(1,), dtype='int32', name='Y_length') def conv1d(inputs, filters, kernel_size, dilation_rate): return Conv1D(filters=filters, kernel_size=kernel_size, strides=1, padding='causal', activation=None, dilation_rate=dilation_rate)(inputs) def batchnorm(inputs): return BatchNormalization()(inputs) def activation(inputs, activation): return Activation(activation)(inputs) def res_block(inputs, filters, kernel_size, dilation_rate): hf = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'tanh') hg = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'sigmoid') h0 = Multiply()([hf, hg]) ha = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh') hs = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh') return Add()([ha, inputs]), hs h0 = activation(batchnorm(conv1d(X, filters, 1, 1)), 'tanh') shortcut = [] for i in range(num_blocks): for r in [1, 2, 4, 8, 16]: h0, s = res_block(h0, filters, 7, r) shortcut.append(s) h1 = activation(Add()(shortcut), 'relu') h1 = activation(batchnorm(conv1d(h1, filters, 1, 1)), 'relu') Y_pred = activation(batchnorm(conv1d(h1, len(char2id) + 1, 1, 1)), 'softmax') sub_model = Model(inputs=X, outputs=Y_pred) def calc_ctc_loss(args): y, yp, ypl, yl = args return K.ctc_batch_cost(y, yp, ypl, yl) ctc_loss = Lambda(calc_ctc_loss, output_shape=(1,), name='ctc')([Y, Y_pred, X_length, Y_length]) model = Model(inputs=[X, Y, X_length, Y_length], outputs=ctc_loss) optimizer = SGD(lr=0.02, momentum=0.9, nesterov=True, clipnorm=5) model.compile(loss={'ctc': lambda ctc_true, ctc_pred: ctc_pred}, optimizer=optimizer) checkpointer = ModelCheckpoint(filepath='asr.h5', verbose=0) lr_decay = ReduceLROnPlateau(monitor='loss', factor=0.2, patience=1, min_lr=0.000) history = model.fit_generator( generator=batch_generator(X_train, Y_train), steps_per_epoch=len(X_train) // batch_size, epochs=epochs, validation_data=batch_generator(X_test, Y_test), validation_steps=len(X_test) // batch_size, callbacks=[checkpointer, lr_decay]) 複製程式碼
儲存模型和字典
sub_model.save('asr.h5') with open('dictionary.pkl', 'wb') as fw: pickle.dump([char2id, id2char, mfcc_mean, mfcc_std], fw) 複製程式碼
繪製訓練過程中的損失函式曲線
train_loss = history.history['loss'] valid_loss = history.history['val_loss'] plt.plot(np.linspace(1, epochs, epochs), train_loss, label='train') plt.plot(np.linspace(1, epochs, epochs), valid_loss, label='valid') plt.legend(loc='upper right') plt.xlabel('Epoch') plt.ylabel('Loss') plt.show() 複製程式碼

載入模型,隨機對訓練集和測試集中的語音進行識別
from keras.models import load_model import pickle with open('dictionary.pkl', 'rb') as fr: [char2id, id2char, mfcc_mean, mfcc_std] = pickle.load(fr) sub_model = load_model('asr.h5') def random_predict(x, y): index = np.random.randint(len(x)) feature = x[index] text = y[index] pred = sub_model.predict(np.expand_dims(feature, axis=0)) pred_ids = K.eval(K.ctc_decode(pred, [feature.shape[0]], greedy=False, beam_width=10, top_paths=1)[0][0]) pred_ids = pred_ids.flatten().tolist() print('True transcription:\n-- ', text, '\n') print('Predicted transcription:\n-- ' + ''.join([id2char[i] for i in pred_ids]), '\n') random_predict(X_train, Y_train) random_predict(X_test, Y_test) 複製程式碼
訓練集中隨機選擇一條語音,正確文字和識別文字分別為
- 而此時正趕上咸陽地市機構變化原咸陽市改為秦都區咸陽地區改為咸陽市
- 而此時正趕上咸陽地市機構變化原咸陽市改為秦都區咸陽地區改為咸陽市
測試集中隨機選擇一條語音,正確文字和識別文字分別為
- 全黨必須緊緊團結在以江澤民同志為核心的黨中央周圍一心一意穩紮穩打共創未來
- 人南必須經緊團結在以江澤民同志威核心的黨中央州圍一心一穩教扎穩打共創未
在本地上載入訓練好的模型,隨機選擇錄音檔案並識別
# -*- coding: utf-8 -*- from keras.models import load_model from keras import backend as K import numpy as np import librosa from python_speech_features import mfcc import pickle import glob wavs = glob.glob('data/*.wav') with open('dictionary.pkl', 'rb') as fr: [char2id, id2char, mfcc_mean, mfcc_std] = pickle.load(fr) mfcc_dim = 13 model = load_model('asr.h5') index = np.random.randint(len(wavs)) print(wavs[index]) audio, sr = librosa.load(wavs[index]) energy = librosa.feature.rmse(audio) frames = np.nonzero(energy >= np.max(energy) / 5) indices = librosa.core.frames_to_samples(frames)[1] audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0] X_data = mfcc(audio, sr, numcep=mfcc_dim, nfft=551) X_data = (X_data - mfcc_mean) / (mfcc_std + 1e-14) print(X_data.shape) with open(wavs[index] + '.trn', 'r', encoding='utf8') as fr: label = fr.readlines()[0] print(label) pred = model.predict(np.expand_dims(X_data, axis=0)) pred_ids = K.eval(K.ctc_decode(pred, [X_data.shape[0]], greedy=False, beam_width=10, top_paths=1)[0][0]) pred_ids = pred_ids.flatten().tolist() print(''.join([id2char[i] for i in pred_ids])) 複製程式碼