A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

简介介绍自动语音识别(Automatic Speech Recognition,ASR)的原理,并用WaveNet实现。
原理ASR的输入是语音片段,输出是对应的文本内容
使用深度神经网络(Deep Neural Networks,DNN)实现ASR的一般流程如下


  • 从原始语音到声学特征
  • 将声学特征输入到神经网络,输出对应的概率
  • 根据概率输出文本序列
一种常用的声学特征是梅尔频率倒谱系数(Mel Frequency Cepstral Coefficents,MFCC),www.practicalcryptography.com/miscellaneo…
将原始语音切分成小的片段后,根据每个片段计算对应的MFCC特征,即可得到一个二维数组
其中第一个维度为小片段的个数,原始语音越长,第一个维度也越大,第二个维度为MFCC特征的维度
得到原始语音的数值表示后,就可以使用WaveNet实现ASR
WaveNet模型结构如下所示,主要使用了多层因果空洞卷积(Causal Dilated Convolution)和Skip Connections


由于MFCC特征为一维序列,所以使用Conv1D进行卷积
因果是指,卷积的输出只和当前位置之前的输入有关,即不使用未来的特征,可以理解为将卷积的位置向前偏移


空洞是指,卷积是跳跃进行的,经过多次堆叠后可以有效地扩大感受野,从而学习到长序列之间的依赖
最后一层卷积的特征图个数和字典大小相同,经过softmax处理之后,每一个小片段对应的MFCC都能得到在整个字典上的概率分布
但小片段的个数一般要大于文本内容中字的个数,即使是同一句话,每个字的持续时间和发音轻重,字之间的停顿时间,也都有无数种可能的变化
在之前的中文分词中,模型输出的概率序列和标签序列的长度一致,而在ASR中则不一样,类似的问题还有光学字符识别(Optical Character Recognition,OCR)等
这里使用CTC(Connectionist temporal classification)算法来计算损失函数,zhuanlan.zhihu.com/p/36488476
数据使用以下数据,www.openslr.org/18/,包括13388条中文语音文件以及对应的文本标注
实现用到一些库,没有则安装
pip install python_speech_features librosa复制代码代码运行过程中如果出现NoBackendError,运行以下命令安装ffmpeg
conda install -c conda-forge ffmpeg复制代码加载库
# -*- coding: utf-8 -*-from keras.models import Modelfrom keras.layers import Input, Activation, Conv1D, Lambda, Add, Multiply, BatchNormalizationfrom keras.optimizers import Adam, SGDfrom keras import backend as Kfrom keras.callbacks import ModelCheckpoint, ReduceLROnPlateauimport numpy as npimport matplotlib.pyplot as pltfrom mpl_toolkits.axes_grid1 import make_axes_locatable%matplotlib inlineimport randomimport pickleimport globfrom tqdm import tqdmimport osfrom python_speech_features import mfccimport scipy.io.wavfile as wavimport librosafrom IPython.display import Audio复制代码加载文本标注路径并查看
text_paths = glob.glob('data/*.trn')total = len(text_paths)print(total)with open(text_paths[0], 'r', encoding='utf8') as fr:    lines = fr.readlines()    print(lines)复制代码提取文本标注和语音文件路径,保留中文并去掉空格
texts = []paths = []for path in text_paths:    with open(path, 'r', encoding='utf8') as fr:        lines = fr.readlines()        line = lines[0].strip('\n').replace(' ', '')        texts.append(line)        paths.append(path.rstrip('.trn'))print(paths[0], texts[0])复制代码MFCC特征保留13维,定义加载语音文件并去掉两端静音的函数,以及可视化语音文件的函数
mfcc_dim = 13def load_and_trim(path):    audio, sr = librosa.load(path)    energy = librosa.feature.rmse(audio)    frames = np.nonzero(energy >= np.max(energy) / 5)    indices = librosa.core.frames_to_samples(frames)[1]    audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0]        return audio, srdef visualize(index):    path = paths[index]    text = texts[index]    print('Audio Text:', text)        audio, sr = load_and_trim(path)    plt.figure(figsize=(12, 3))    plt.plot(np.arange(len(audio)), audio)    plt.title('Raw Audio Signal')    plt.xlabel('Time')    plt.ylabel('Audio Amplitude')    plt.show()        feature = mfcc(audio, sr, numcep=mfcc_dim, nfft=551)    print('Shape of MFCC:', feature.shape)        fig = plt.figure(figsize=(12, 5))    ax = fig.add_subplot(111)    im = ax.imshow(feature, cmap=plt.cm.jet, aspect='auto')    plt.title('Normalized MFCC')    plt.ylabel('Time')    plt.xlabel('MFCC Coefficient')    plt.colorbar(im, cax=make_axes_locatable(ax).append_axes('right', size='5%', pad=0.05))    ax.set_xticks(np.arange(0, 13, 2), minor=False);    plt.show()        return pathAudio(visualize(0))复制代码第一条语音文件对应的原始波形和MFCC特征如下所示


获取全部语音文件对应的MFCC特征
features = []for i in tqdm(range(total)):    path = paths    audio, sr = load_and_trim(path)    features.append(mfcc(audio, sr, numcep=mfcc_dim, nfft=551))    print(len(features), features[0].shape)复制代码将MFCC特征进行归一化
samples = random.sample(features, 100)samples = np.vstack(samples)mfcc_mean = np.mean(samples, axis=0)mfcc_std = np.std(samples, axis=0)print(mfcc_mean)print(mfcc_std)features = [(feature - mfcc_mean) / (mfcc_std + 1e-14) for feature in features]复制代码建立字典,共2883个不同的字
chars = {}for text in texts:    for c in text:        chars[c] = chars.get(c, 0) + 1chars = sorted(chars.items(), key=lambda x: x[1], reverse=True)chars = [char[0] for char in chars]print(len(chars), chars[:100])char2id = {c: i for i, c in enumerate(chars)}id2char = {i: c for i, c in enumerate(chars)}复制代码划分训练数据和测试数据,定义产生批数据的函数
data_index = np.arange(total)np.random.shuffle(data_index)train_size = int(0.9 * total)test_size = total - train_sizetrain_index = data_index[:train_size]test_index = data_index[train_size:]X_train = [features for i in train_index]Y_train = [texts for i in train_index]X_test = [features for i in test_index]Y_test = [texts for i in test_index]batch_size = 16    def batch_generator(x, y, batch_size=batch_size):      offset = 0    while True:        offset += batch_size                if offset == batch_size or offset >= len(x):            data_index = np.arange(len(x))            np.random.shuffle(data_index)            x = [x for i in data_index]            y = [y for i in data_index]            offset = batch_size                    X_data = x[offset - batch_size: offset]        Y_data = y[offset - batch_size: offset]                X_maxlen = max([X_data.shape[0] for i in range(batch_size)])        Y_maxlen = max([len(Y_data) for i in range(batch_size)])                X_batch = np.zeros([batch_size, X_maxlen, mfcc_dim])        Y_batch = np.ones([batch_size, Y_maxlen]) * len(char2id)        X_length = np.zeros([batch_size, 1], dtype='int32')        Y_length = np.zeros([batch_size, 1], dtype='int32')                for i in range(batch_size):            X_length[i, 0] = X_data.shape[0]            X_batch[i, :X_length[i, 0], :] = X_data                        Y_length[i, 0] = len(Y_data)            Y_batch[i, :Y_length[i, 0]] = [char2id[c] for c in Y_data]                inputs = {'X': X_batch, 'Y': Y_batch, 'X_length': X_length, 'Y_length': Y_length}        outputs = {'ctc': np.zeros([batch_size])}                yield (inputs, outputs)复制代码定义训练参数和模型结构并开始训练
epochs = 50num_blocks = 3filters = 128X = Input(shape=(None, mfcc_dim,), dtype='float32', name='X')Y = Input(shape=(None,), dtype='float32', name='Y')X_length = Input(shape=(1,), dtype='int32', name='X_length')Y_length = Input(shape=(1,), dtype='int32', name='Y_length')def conv1d(inputs, filters, kernel_size, dilation_rate):    return Conv1D(filters=filters, kernel_size=kernel_size, strides=1, padding='causal', activation=None, dilation_rate=dilation_rate)(inputs)def batchnorm(inputs):    return BatchNormalization()(inputs)def activation(inputs, activation):    return Activation(activation)(inputs)def res_block(inputs, filters, kernel_size, dilation_rate):    hf = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'tanh')    hg = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'sigmoid')    h0 = Multiply()([hf, hg])        ha = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh')    hs = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh')        return Add()([ha, inputs]), hsh0 = activation(batchnorm(conv1d(X, filters, 1, 1)), 'tanh')shortcut = []for i in range(num_blocks):    for r in [1, 2, 4, 8, 16]:        h0, s = res_block(h0, filters, 7, r)        shortcut.append(s)h1 = activation(Add()(shortcut), 'relu')h1 = activation(batchnorm(conv1d(h1, filters, 1, 1)), 'relu')Y_pred = activation(batchnorm(conv1d(h1, len(char2id) + 1, 1, 1)), 'softmax')sub_model = Model(inputs=X, outputs=Y_pred)def calc_ctc_loss(args):    y, yp, ypl, yl = args    return K.ctc_batch_cost(y, yp, ypl, yl)ctc_loss = Lambda(calc_ctc_loss, output_shape=(1,), name='ctc')([Y, Y_pred, X_length, Y_length])model = Model(inputs=[X, Y, X_length, Y_length], outputs=ctc_loss)optimizer = SGD(lr=0.02, momentum=0.9, nesterov=True, clipnorm=5)model.compile(loss={'ctc': lambda ctc_true, ctc_pred: ctc_pred}, optimizer=optimizer)checkpointer = ModelCheckpoint(filepath='asr.h5', verbose=0)lr_decay = ReduceLROnPlateau(monitor='loss', factor=0.2, patience=1, min_lr=0.000)history = model.fit_generator(    generator=batch_generator(X_train, Y_train),     steps_per_epoch=len(X_train) // batch_size,    epochs=epochs,     validation_data=batch_generator(X_test, Y_test),     validation_steps=len(X_test) // batch_size,     callbacks=[checkpointer, lr_decay])复制代码保存模型和字典
sub_model.save('asr.h5')with open('dictionary.pkl', 'wb') as fw:    pickle.dump([char2id, id2char, mfcc_mean, mfcc_std], fw)复制代码绘制训练过程中的损失函数曲线
train_loss = history.history['loss']valid_loss = history.history['val_loss']plt.plot(np.linspace(1, epochs, epochs), train_loss, label='train')plt.plot(np.linspace(1, epochs, epochs), valid_loss, label='valid')plt.legend(loc='upper right')plt.xlabel('Epoch')plt.ylabel('Loss')plt.show()复制代码

加载模型,随机对训练集和测试集中的语音进行识别
from keras.models import load_modelimport picklewith open('dictionary.pkl', 'rb') as fr:    [char2id, id2char, mfcc_mean, mfcc_std] = pickle.load(fr)sub_model = load_model('asr.h5')def random_predict(x, y):    index = np.random.randint(len(x))    feature = x[index]    text = y[index]        pred = sub_model.predict(np.expand_dims(feature, axis=0))    pred_ids = K.eval(K.ctc_decode(pred, [feature.shape[0]], greedy=False, beam_width=10, top_paths=1)[0][0])    pred_ids = pred_ids.flatten().tolist()        print('True transcription:\n-- ', text, '\n')    print('Predicted transcription:\n-- ' + ''.join([id2char for i in pred_ids]), '\n')random_predict(X_train, Y_train)random_predict(X_test, Y_test)复制代码训练集中随机选择一条语音,正确文本和识别文本分别为
  • 而此时正赶上咸阳地市机构变化原咸阳市改为秦都区咸阳地区改为咸阳市
  • 而此时正赶上咸阳地市机构变化原咸阳市改为秦都区咸阳地区改为咸阳市
测试集中随机选择一条语音,正确文本和识别文本分别为
  • 全党必须紧紧团结在以江泽民同志为核心的党中央周围一心一意稳扎稳打共创未来
  • 人南必须经紧团结在以江泽民同志威核心的党中央州围一心一稳教扎稳打共创未
在本地上加载训练好的模型,随机选择录音文件并识别
# -*- coding: utf-8 -*-from keras.models import load_modelfrom keras import backend as Kimport numpy as npimport librosafrom python_speech_features import mfccimport pickleimport globwavs = glob.glob('data/*.wav')with open('dictionary.pkl', 'rb') as fr:    [char2id, id2char, mfcc_mean, mfcc_std] = pickle.load(fr)mfcc_dim = 13model = load_model('asr.h5')index = np.random.randint(len(wavs))print(wavs[index])audio, sr = librosa.load(wavs[index])energy = librosa.feature.rmse(audio)frames = np.nonzero(energy >= np.max(energy) / 5)indices = librosa.core.frames_to_samples(frames)[1]audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0]X_data = mfcc(audio, sr, numcep=mfcc_dim, nfft=551)X_data = (X_data - mfcc_mean) / (mfcc_std + 1e-14)print(X_data.shape)with open(wavs[index] + '.trn', 'r', encoding='utf8') as fr:    label = fr.readlines()[0]    print(label)pred = model.predict(np.expand_dims(X_data, axis=0))pred_ids = K.eval(K.ctc_decode(pred, [X_data.shape[0]], greedy=False, beam_width=10, top_paths=1)[0][0])pred_ids = pred_ids.flatten().tolist()print(''.join([id2char for i in pred_ids]))

接:https://juejin.im/post/5ba4df8b6fb9a05d0b143463



2 个回复

正序浏览
回复 使用道具 举报
奈斯
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马