1. 程式人生 > >tensorflow 1.9.0 語音識別簡單實現

tensorflow 1.9.0 語音識別簡單實現

最近公司要上語音識別的功能,想著最近tensorflow比較火熱,遍著手開始研究,網上也找了很多的案例,一一去實現,結果一直都出各種問題,比如tensorflow版本不一樣,或者資料集不可用,或者準確率極低,或者是看的我雲裡霧裡的一些問題 等各種坑爹的問題,後面經過一些整理和修改,這裡記錄一下:

使用的訓練資料

開發環境

tensorflow 1.9.0 python 3.6.6 numpy 1.15.0 librosa 0.6.2

程式碼

訓練的程式碼
#coding=utf-8
import tensorflow as tf
import numpy as np
import os
from collections import Counter
import librosa
import time

# 訓練樣本路徑【這裡我單獨選擇了 seven  語音的包】
wav_path = 'D:/AI/seven/'

# 語音檔案對應的標籤
wav_title='seven'

# 獲得訓練用的wav檔案路徑列表
def get_wave_files(wav_path=wav_path):
    wav_files = []
    for (dirpath,dirnames,filenames) in os.walk(wav_path):#訪問資料夾下的所有檔案
        #os.walk() 方法用於通過在目錄樹種遊走輸出在目錄中的檔名,向上或者向下
        for filename in filenames:
             if filename.endswith('.wav') or filename.endswith('.WAV'):
                #endswith() 方法用於判斷字串是否以指定字尾結尾,如果以指定字尾結尾返回True,否則返回False
                filename_path = os.sep.join([dirpath,filename])#定義檔案路徑(連)
                # print(os.stat(filename_path).st_size)
                # if os.stat(filename_path).st_size < 32000:#st_size檔案的大小,以位為單位
                    # continue
                wav_files.append(filename_path)#載入檔案
    return wav_files

wav_files = get_wave_files()#獲取檔名列表

#讀取wav檔案對應的label
def get_wav_label(wav_files=wav_files):

    labels=[]
    new_wav_files = []
    for wav_file in wav_files:
        wav_id = os.path.basename(wav_file).split('.')[0]
        labels.append(wav_title)# 每條語音對應的標籤
        new_wav_files.append(wav_file)

    return new_wav_files,labels#返回標籤和對應的檔案

wav_files,labels = get_wav_label()#得到標籤和對應的語音檔案
print("載入訓練樣本:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
print("樣本數:",len(wav_files))

#詞彙表(參考對話、詩詞生成)
all_words = []
for label in labels:
    all_words += [word for word in label]
counter = Counter(all_words)
count_pairs =sorted(counter.items(),key=lambda x: -x[1])

words,_=zip(*count_pairs)
words_size =len(words)#詞彙表尺寸
print('詞彙表大小:',words_size)

#詞彙對映成id表示
word_num_map = dict(zip(words,range(len(words))))
to_num = lambda word: word_num_map.get(word,len(words))#詞彙對映函式
labels_vector =[list(map(to_num,label)) for label in labels]

label_max_len= np.max([len(label) for label in labels_vector])#獲取最長字數
print('最長句子的字數:',label_max_len)

wav_max_len=0
for wav in wav_files:
    wav,sr = librosa.load(wav,mono=True)#處理語音訊號的庫librosa
    #載入音訊檔案作為a floating point time series.(可以是wav,mp3等格式)mono=True:signal->mono
    mfcc=np.transpose(librosa.feature.mfcc(wav,sr),[1,0])#轉置特徵引數
    #librosa.feature.mfcc特徵提取函式
    if len(mfcc)>wav_max_len:
        wav_max_len = len(mfcc)
print("最長的語音:",wav_max_len)


batch_size=100#每批次取100個檔案
n_batch = len(wav_files)//batch_size#總批次數

pointer =0#全域性變數初值為0,定義該變數用以逐步確定batch
def get_next_batches(batch_size):
    global pointer
    batches_wavs = []
    batches_labels = []
    for i in range(batch_size):
        wav,sr=librosa.load(wav_files[pointer],mono=True)
        mfcc =np.transpose(librosa.feature.mfcc(wav,sr),[1,0])
        batches_wavs.append(mfcc.tolist())#轉換成列表表存入
        batches_labels.append(labels_vector[pointer])
        pointer+=1
    #補0對齊
    for mfcc in batches_wavs:
        while len(mfcc)<wav_max_len:
            mfcc.append([0]*20)#補一個全0列表
    for label in batches_labels:
        while len(label)<label_max_len:
            label.append(0)
    return batches_wavs,batches_labels

X=tf.placeholder(dtype=tf.float32,shape=[batch_size,None,20])#定義輸入格式
sequence_len = tf.reduce_sum(tf.cast(tf.not_equal(tf.reduce_sum(X,reduction_indices=2), 0.), tf.int32), reduction_indices=1)
Y= tf.placeholder(dtype=tf.int32,shape=[batch_size,None])#輸出格式



#第一層卷積
conv1d_index = 0
def conv1d_layer(input_tensor,size,dim,activation,scale,bias):
    global conv1d_index
    with tf.variable_scope('conv1d_'+str(conv1d_index)):
        W= tf.get_variable('W', (size, input_tensor.get_shape().as_list()[-1], dim), dtype=tf.float32, initializer=tf.random_uniform_initializer(minval=-scale, maxval=scale))
        if bias:
            b= tf.get_variable('b',[dim],dtype=tf.float32,initializer=tf.constant_initializer(0))
        out = tf.nn.conv1d(input_tensor,  W, stride=1, padding='SAME')#輸出與輸入同緯度
        if not bias:
            beta = tf.get_variable('beta', dim, dtype=tf.float32, initializer=tf.constant_initializer(0))
            gamma = tf.get_variable('gamma', dim, dtype=tf.float32, initializer=tf.constant_initializer(1))
            #均值
            mean_running = tf.get_variable('mean', dim, dtype=tf.float32, initializer=tf.constant_initializer(0))
            #方差
            variance_running = tf.get_variable('variance', dim, dtype=tf.float32,
                                               initializer=tf.constant_initializer(1))
            # print(len(out.get_shape()))
            # print(range(len(out.get_shape()) - 1))
            mean, variance = tf.nn.moments(out, axes=list(range(len(out.get_shape()) - 1)))
            #可以根據矩(均值和方差)來做normalize,見tf.nn.moments
            def update_running_stat():
                decay =0.99
                #mean_running、variance_running更新操作
                update_op = [mean_running.assign(mean_running * decay + mean * (1 - decay)),
                             variance_running.assign(variance_running * decay + variance * (1 - decay))]
                with tf.control_dependencies(update_op):
                    return tf.identity(mean), tf.identity(variance)
                #返回mean,variance
                m, v = tf.cond(tf.Variable(False, trainable=False, collections=[tf.GraphKeys.LOCAL_VARIABLES]),
                               update_running_stat, lambda: (mean_running, variance_running))
                out = tf.nn.batch_normalization(out, m, v, beta, gamma, 1e-8)#batch_normalization
        if activation == 'tanh':
            out = tf.nn.tanh(out)
        if activation == 'sigmoid':
            out = tf.nn.sigmoid(out)

        conv1d_index += 1
        return out


# aconv1d_layer
aconv1d_index = 0
def aconv1d_layer(input_tensor, size, rate, activation, scale, bias):
    global aconv1d_index
    with tf.variable_scope('aconv1d_' + str(aconv1d_index)):
        shape = input_tensor.get_shape().as_list()#以list的形式返回tensor的shape
        W = tf.get_variable('W', (1, size, shape[-1], shape[-1]), dtype=tf.float32,
                            initializer=tf.random_uniform_initializer(minval=-scale, maxval=scale))
        if bias:
            b = tf.get_variable('b', [shape[-1]], dtype=tf.float32, initializer=tf.constant_initializer(0))
        out = tf.nn.atrous_conv2d(tf.expand_dims(input_tensor, dim=1), W, rate=rate, padding='SAME')
        #tf.expand_dims(input_tensor,dim=1)==>在第二維添加了一維,rate:取樣率
        out = tf.squeeze(out, [1])#去掉第二維
        #同上
        if not bias:
            beta = tf.get_variable('beta', shape[-1], dtype=tf.float32, initializer=tf.constant_initializer(0))
            gamma = tf.get_variable('gamma', shape[-1], dtype=tf.float32, initializer=tf.constant_initializer(1))
            mean_running = tf.get_variable('mean', shape[-1], dtype=tf.float32, initializer=tf.constant_initializer(0))
            variance_running = tf.get_variable('variance', shape[-1], dtype=tf.float32,
                                               initializer=tf.constant_initializer(1))
            mean, variance = tf.nn.moments(out, axes=list(range(len(out.get_shape()) - 1)))

            def update_running_stat():
                decay = 0.99
                update_op = [mean_running.assign(mean_running * decay + mean * (1 - decay)),
                             variance_running.assign(variance_running * decay + variance * (1 - decay))]
                with tf.control_dependencies(update_op):
                    return tf.identity(mean), tf.identity(variance)
                m, v = tf.cond(tf.Variable(False, trainable=False, collections=[tf.GraphKeys.LOCAL_VARIABLES]),
                               update_running_stat, lambda: (mean_running, variance_running))
                out = tf.nn.batch_normalization(out, m, v, beta, gamma, 1e-8)
        if activation == 'tanh':
            out = tf.nn.tanh(out)
        if activation == 'sigmoid':
            out = tf.nn.sigmoid(out)

        aconv1d_index += 1
        return out


# 定義神經網路
def speech_to_text_network(n_dim=128, n_blocks=3):
    #卷積層輸出
    out = conv1d_layer(input_tensor=X, size=1, dim=n_dim, activation='tanh', scale=0.14, bias=False)

    # skip connections
    def residual_block(input_sensor, size, rate):
        conv_filter = aconv1d_layer(input_sensor, size=size, rate=rate, activation='tanh', scale=0.03, bias=False)
        conv_gate = aconv1d_layer(input_sensor, size=size, rate=rate, activation='sigmoid', scale=0.03, bias=False)
        out = conv_filter * conv_gate
        out = conv1d_layer(out, size=1, dim=n_dim, activation='tanh', scale=0.08, bias=False)
        return out + input_sensor, out

    skip = 0
    for _ in range(n_blocks):
        for r in [1, 2, 4, 8, 16]:
            out, s = residual_block(out, size=7, rate=r)#根據取樣頻率發生變化
            skip += s

    #兩層卷積
    logit = conv1d_layer(skip, size=1, dim=skip.get_shape().as_list()[-1], activation='tanh', scale=0.08, bias=False)
    logit = conv1d_layer(logit, size=1, dim=words_size, activation=None, scale=0.04, bias=True)

    return logit



# 對優化類進行一些自定義操作。
class MaxPropOptimizer(tf.train.Optimizer):
    def __init__(self, learning_rate=0.001, beta2=0.999, use_locking=False, name="MaxProp"):
        super(MaxPropOptimizer, self).__init__(use_locking, name)
        self._lr = learning_rate
        self._beta2 = beta2
        self._lr_t = None
        self._beta2_t = None
    def _prepare(self):
        self._lr_t = tf.convert_to_tensor(self._lr, name="learning_rate")
        self._beta2_t = tf.convert_to_tensor(self._beta2, name="beta2")
    def _create_slots(self, var_list):
        for v in var_list:
            self._zeros_slot(v, "m", self._name)
    def _apply_dense(self, grad, var):
        lr_t = tf.cast(self._lr_t, var.dtype.base_dtype)
        beta2_t = tf.cast(self._beta2_t, var.dtype.base_dtype)
        if var.dtype.base_dtype == tf.float16:
            eps = 1e-7
        else:
            eps = 1e-8
        m = self.get_slot(var, "m")
        m_t = m.assign(tf.maximum(beta2_t * m + eps, tf.abs(grad)))
        g_t = grad / m_t
        var_update = tf.assign_sub(var, lr_t * g_t)
        return tf.group(*[var_update, m_t])
    def _apply_sparse(self, grad, var):
        return self._apply_dense(grad, var)


def train_speech_to_text_network():
    print("開始訓練:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
    logit = speech_to_text_network()

    # CTC loss
    indices = tf.where(tf.not_equal(tf.cast(Y, tf.float32), 0.))
    # print(tf.gather_nd(Y, indices) - 1)
    target = tf.SparseTensor(indices=indices, values=tf.gather_nd(Y, indices) - 1, dense_shape=tf.cast(tf.shape(Y), tf.int64))
    loss = tf.nn.ctc_loss(target, logit, sequence_len, time_major=False)
    # optimizer
    lr = tf.Variable(0.001, dtype=tf.float32, trainable=False)
    optimizer = MaxPropOptimizer(learning_rate=lr, beta2=0.99)
    var_list = [t for t in tf.trainable_variables()]
    gradient = optimizer.compute_gradients(loss, var_list=var_list)
    optimizer_op = optimizer.apply_gradients(gradient)

    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())#初始化變數

        saver = tf.train.Saver(tf.global_variables())

        for epoch in range(16):
            print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
            print("第%d次迴圈迭代:"%(epoch))
            sess.run(tf.assign(lr, 0.001 * (0.97 ** epoch)))

            global pointer
            pointer = 0#根據pointer來確定
            for batch in range(n_batch):
                batches_wavs, batches_labels = get_next_batches(batch_size)
                train_loss, _ = sess.run([loss, optimizer_op], feed_dict={X: batches_wavs, Y: batches_labels})
                print(epoch, batch, train_loss)
            if epoch % 5 == 0:
                print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
                print("第%d次模型儲存結果:"%(epoch//5))
                saver.save(sess, './speech.module', global_step=epoch)
    print("結束訓練時刻:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))

# 訓練
train_speech_to_text_network()

注意:上面我單獨訓練的是寫死的seven發音的檔案,正式環境下,需要換成自動讀取每個語音檔案和對應的標籤。訓練結束之後會在對應的目錄生成一堆的module檔案。後面識別會用到。

識別程式碼:

#coding=utf-8
import tensorflow as tf
import numpy as np
import librosa


#注意這裡和訓練的時候是不一樣的
X=tf.placeholder(dtype=tf.float32,shape=[1,None,20])#定義輸入格式
sequence_len = tf.reduce_sum(tf.cast(tf.not_equal(tf.reduce_sum(X,reduction_indices=2), 0.), tf.int32), reduction_indices=1)
Y= tf.placeholder(dtype=tf.int32,shape=[1,None])#輸出格式


#第一層卷積
conv1d_index = 0
def conv1d_layer(input_tensor,size,dim,activation,scale,bias):
    global conv1d_index
    with tf.variable_scope('conv1d_'+str(conv1d_index)):
        W= tf.get_variable('W', (size, input_tensor.get_shape().as_list()[-1], dim), dtype=tf.float32, initializer=tf.random_uniform_initializer(minval=-scale, maxval=scale))
        if bias:
            b= tf.get_variable('b',[dim],dtype=tf.float32,initializer=tf.constant_initializer(0))
        out = tf.nn.conv1d(input_tensor,  W, stride=1, padding='SAME')#輸出與輸入同緯度
        if not bias:
            beta = tf.get_variable('beta', dim, dtype=tf.float32, initializer=tf.constant_initializer(0))
            gamma = tf.get_variable('gamma', dim, dtype=tf.float32, initializer=tf.constant_initializer(1))
            #均值
            mean_running = tf.get_variable('mean', dim, dtype=tf.float32, initializer=tf.constant_initializer(0))
            #方差
            variance_running = tf.get_variable('variance', dim, dtype=tf.float32,
                                               initializer=tf.constant_initializer(1))
            # print(len(out.get_shape()))
            # print(range(len(out.get_shape()) - 1))
            mean, variance = tf.nn.moments(out, axes=list(range(len(out.get_shape()) - 1)))
            #可以根據矩(均值和方差)來做normalize,見tf.nn.moments
            def update_running_stat():
                decay =0.99
                #mean_running、variance_running更新操作
                update_op = [mean_running.assign(mean_running * decay + mean * (1 - decay)),
                             variance_running.assign(variance_running * decay + variance * (1 - decay))]
                with tf.control_dependencies(update_op):
                    return tf.identity(mean), tf.identity(variance)
                #返回mean,variance
                m, v = tf.cond(tf.Variable(False, trainable=False, collections=[tf.GraphKeys.LOCAL_VARIABLES]),
                               update_running_stat, lambda: (mean_running, variance_running))
                out = tf.nn.batch_normalization(out, m, v, beta, gamma, 1e-8)#batch_normalization
        if activation == 'tanh':
            out = tf.nn.tanh(out)
        if activation == 'sigmoid':
            out = tf.nn.sigmoid(out)

        conv1d_index += 1
        return out


# aconv1d_layer
aconv1d_index = 0
def aconv1d_layer(input_tensor, size, rate, activation, scale, bias):
    global aconv1d_index
    with tf.variable_scope('aconv1d_' + str(aconv1d_index)):
        shape = input_tensor.get_shape().as_list()#以list的形式返回tensor的shape
        W = tf.get_variable('W', (1, size, shape[-1], shape[-1]), dtype=tf.float32,
                            initializer=tf.random_uniform_initializer(minval=-scale, maxval=scale))
        if bias:
            b = tf.get_variable('b', [shape[-1]], dtype=tf.float32, initializer=tf.constant_initializer(0))
        out = tf.nn.atrous_conv2d(tf.expand_dims(input_tensor, dim=1), W, rate=rate, padding='SAME')
        #tf.expand_dims(input_tensor,dim=1)==>在第二維添加了一維,rate:取樣率
        out = tf.squeeze(out, [1])#去掉第二維
        #同上
        if not bias:
            beta = tf.get_variable('beta', shape[-1], dtype=tf.float32, initializer=tf.constant_initializer(0))
            gamma = tf.get_variable('gamma', shape[-1], dtype=tf.float32, initializer=tf.constant_initializer(1))
            mean_running = tf.get_variable('mean', shape[-1], dtype=tf.float32, initializer=tf.constant_initializer(0))
            variance_running = tf.get_variable('variance', shape[-1], dtype=tf.float32,
                                               initializer=tf.constant_initializer(1))
            mean, variance = tf.nn.moments(out, axes=list(range(len(out.get_shape()) - 1)))

            def update_running_stat():
                decay = 0.99
                update_op = [mean_running.assign(mean_running * decay + mean * (1 - decay)),
                             variance_running.assign(variance_running * decay + variance * (1 - decay))]
                with tf.control_dependencies(update_op):
                    return tf.identity(mean), tf.identity(variance)
                m, v = tf.cond(tf.Variable(False, trainable=False, collections=[tf.GraphKeys.LOCAL_VARIABLES]),
                               update_running_stat, lambda: (mean_running, variance_running))
                out = tf.nn.batch_normalization(out, m, v, beta, gamma, 1e-8)
        if activation == 'tanh':
            out = tf.nn.tanh(out)
        if activation == 'sigmoid':
            out = tf.nn.sigmoid(out)

        aconv1d_index += 1
        return out



# 定義神經網路
def speech_to_text_network(n_dim=128, n_blocks=3):
    #卷積層輸出
    out = conv1d_layer(input_tensor=X, size=1, dim=n_dim, activation='tanh', scale=0.14, bias=False)

    # skip connections
    def residual_block(input_sensor, size, rate):
        conv_filter = aconv1d_layer(input_sensor, size=size, rate=rate, activation='tanh', scale=0.03, bias=False)
        conv_gate = aconv1d_layer(input_sensor, size=size, rate=rate, activation='sigmoid', scale=0.03, bias=False)
        out = conv_filter * conv_gate
        out = conv1d_layer(out, size=1, dim=n_dim, activation='tanh', scale=0.08, bias=False)
        return out + input_sensor, out

    skip = 0
    for _ in range(n_blocks):
        for r in [1, 2, 4, 8, 16]:
            out, s = residual_block(out, size=7, rate=r)#根據取樣頻率發生變化
            skip += s

    #兩層卷積
    logit = conv1d_layer(skip, size=1, dim=skip.get_shape().as_list()[-1], activation='tanh', scale=0.08, bias=False)
    logit = conv1d_layer(logit, size=1, dim=4, activation=None, scale=0.04, bias=True)

    return logit


# 測試效果
def speech_to_text(wav_file):
    wav, sr = librosa.load(wav_file, mono=True)
    mfcc = np.transpose(np.expand_dims(librosa.feature.mfcc(wav, sr), axis=0), [0, 2, 1])

    logit = speech_to_text_network()

    saver = tf.train.Saver()
    with tf.Session() as sess:
        saver.restore(sess, "D:/AI/mpdules/speech.module-15") # saver.restore(sess, tf.train.latest_checkpoint('.'))

        decoded = tf.transpose(logit, perm=[1, 0, 2])
        decoded, _ = tf.nn.ctc_beam_search_decoder(decoded, sequence_len, merge_repeated=False)

        predict = tf.sparse_to_dense(decoded[0].indices, decoded[0].dense_shape, decoded[0].values) + 1

        output = sess.run(predict, feed_dict={X: mfcc})

        print(output)

speech_to_text("D:\\AI\\00f0204f_nohash_0.wav")
注:由於是測試,並沒有對程式碼架構進行優化,正式環境需要優化程式碼和架構。