黑马程序员技术交流社区

标题: 【上海校区】深度有趣 | 29 方言种类分类 [打印本页]

作者: 不二晨    时间: 2018-9-25 09:57
标题: 【上海校区】深度有趣 | 29 方言种类分类
简介结合上节课的内容,使用WaveNet进行语音分类
原理对于每一个MFCC特征都输出一个概率分布,然后结合CTC算法即可实现语音识别
相比之下,语音分类要简单很多,因为对于整个MFCC特征序列只需要输出一个分类结果即可
语音分类和语音识别的区别,可以类比一下文本分类和序列标注的区别
具体实现时,只需要稍微修改一下网络结构即可
数据使用科大讯飞方言种类识别AI挑战赛提供的数据,challenge.xfyun.cn/,初赛提供了6种方言,复赛提供了10种方言
每种方言包括30个人每人200条共计6000条训练数据,以及10个人每人50条共计500条验证数据
数据以pcm格式提供,可以理解为wav文件去掉多余信息之后,仅保留语音数据的格式
实现以下以长沙、南昌、上海三种方言数据为例,介绍如何实现语音分类
加载库
# -*- coding:utf-8 -*-import numpy as npimport osfrom matplotlib import pyplot as pltfrom mpl_toolkits.axes_grid1 import make_axes_locatable%matplotlib inlinefrom sklearn.utils import shuffleimport globimport picklefrom tqdm import tqdmfrom keras.models import Modelfrom keras.preprocessing.sequence import pad_sequencesfrom keras.layers import Input, Activation, Conv1D, Add, Multiply, BatchNormalization, GlobalMaxPooling1D, Dropoutfrom keras.optimizers import Adamfrom keras.callbacks import ModelCheckpoint, ReduceLROnPlateaufrom python_speech_features import mfccimport librosafrom IPython.display import Audioimport wave复制代码加载pcm文件,共1W8条训练数据,1.5K条验证数据
train_files = glob.glob('data/*/train/*/*.pcm')dev_files = glob.glob('data/*/dev/*/*/*.pcm')print(len(train_files), len(dev_files), train_files[0])复制代码整理每条语音数据对应的分类标签
labels = {'train': [], 'dev': []}    for i in tqdm(range(len(train_files))):    path = train_files    label = path.split('/')[1]    labels['train'].append(label)    for i in tqdm(range(len(dev_files))):    path = dev_files    label = path.split('/')[1]    labels['dev'].append(label)print(len(labels['train']), len(labels['dev']))复制代码定义处理语音、pcm转wav、可视化语音的三个函数,由于语音片段长短不一,所以去除少于1s的短片段,对于长片段则切分为不超过3s的片段
mfcc_dim = 13sr = 16000min_length = 1 * srslice_length = 3 * srdef load_and_trim(path, sr=16000):    audio = np.memmap(path, dtype='h', mode='r')    audio = audio[2000:-2000]    audio = audio.astype(np.float32)    energy = librosa.feature.rmse(audio)    frames = np.nonzero(energy >= np.max(energy) / 5)    indices = librosa.core.frames_to_samples(frames)[1]    audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0]        slices = []    for i in range(0, audio.shape[0], slice_length):        s = audio[i: i + slice_length]        if s.shape[0] >= min_length:            slices.append(s)        return audio, slicesdef pcm2wav(pcm_path, wav_path, channels=1, bits=16, sample_rate=sr):    data = open(pcm_path, 'rb').read()    fw = wave.open(wav_path, 'wb')    fw.setnchannels(channels)    fw.setsampwidth(bits // 8)    fw.setframerate(sample_rate)    fw.writeframes(data)    fw.close()def visualize(index, source='train'):    if source == 'train':        path = train_files[index]    else:        path = dev_files[index]    print(path)            audio, slices = load_and_trim(path)    print('Duration: %.2f s' % (audio.shape[0] / sr))    plt.figure(figsize=(12, 3))    plt.plot(np.arange(len(audio)), audio)    plt.title('Raw Audio Signal')    plt.xlabel('Time')    plt.ylabel('Audio Amplitude')    plt.show()        feature = mfcc(audio, sr, numcep=mfcc_dim)    print('Shape of MFCC:', feature.shape)    fig = plt.figure(figsize=(12, 5))    ax = fig.add_subplot(111)    im = ax.imshow(feature, cmap=plt.cm.jet, aspect='auto')    plt.title('Normalized MFCC')    plt.ylabel('Time')    plt.xlabel('MFCC Coefficient')    plt.colorbar(im, cax=make_axes_locatable(ax).append_axes('right', size='5%', pad=0.05))    ax.set_xticks(np.arange(0, 13, 2), minor=False);    plt.show()        wav_path = 'example.wav'    pcm2wav(path, wav_path)        return wav_pathAudio(visualize(2))复制代码一句长沙话对应的波形和MFCC特征


整理数据,查看语音片段的长度分布,最后得到了18890个训练片段,1632个验证片段
X_train = []X_dev = []Y_train = []Y_dev = []lengths = []for i in tqdm(range(len(train_files))):    path = train_files    audio, slices = load_and_trim(path)    lengths.append(audio.shape[0] / sr)    for s in slices:        X_train.append(mfcc(s, sr, numcep=mfcc_dim))        Y_train.append(labels['train'])for i in tqdm(range(len(dev_files))):    path = dev_files    audio, slices = load_and_trim(path)    lengths.append(audio.shape[0] / sr)    for s in slices:        X_dev.append(mfcc(s, sr, numcep=mfcc_dim))        Y_dev.append(labels['dev'])    print(len(X_train), len(X_dev))plt.hist(lengths, bins=100)plt.show()复制代码

将MFCC特征进行归一化
samples = np.vstack(X_train)mfcc_mean = np.mean(samples, axis=0)mfcc_std = np.std(samples, axis=0)print(mfcc_mean)print(mfcc_std)X_train = [(x - mfcc_mean) / (mfcc_std + 1e-14) for x in X_train]X_dev = [(x - mfcc_mean) / (mfcc_std + 1e-14) for x in X_dev]maxlen = np.max([x.shape[0] for x in X_train + X_dev])X_train = pad_sequences(X_train, maxlen, 'float32', padding='post', value=0.0)X_dev = pad_sequences(X_dev, maxlen, 'float32', padding='post', value=0.0)print(X_train.shape, X_dev.shape)复制代码对分类标签进行处理
from sklearn.preprocessing import LabelEncoderfrom keras.utils import to_categoricalle = LabelEncoder()Y_train = le.fit_transform(Y_train)Y_dev = le.transform(Y_dev)print(le.classes_)class2id = {c: i for i, c in enumerate(le.classes_)}id2class = {i: c for i, c in enumerate(le.classes_)}num_class = len(le.classes_)Y_train = to_categorical(Y_train, num_class)Y_dev = to_categorical(Y_dev, num_class)print(Y_train.shape, Y_dev.shape)复制代码定义产生批数据的迭代器
batch_size = 16def batch_generator(x, y, batch_size=batch_size):     offset = 0    while True:        offset += batch_size                if offset == batch_size or offset >= len(x):            x, y = shuffle(x, y)            offset = batch_size                    X_batch = x[offset - batch_size: offset]            Y_batch = y[offset - batch_size: offset]                yield (X_batch, Y_batch)复制代码定义模型并训练,通过GlobalMaxPooling1D对整个序列的输出进行降维,从而变成标准的分类任务
epochs = 10num_blocks = 3filters = 128drop_rate = 0.25X = Input(shape=(None, mfcc_dim,), dtype='float32')def conv1d(inputs, filters, kernel_size, dilation_rate):    return Conv1D(filters=filters, kernel_size=kernel_size, strides=1, padding='causal', activation=None, dilation_rate=dilation_rate)(inputs)def batchnorm(inputs):    return BatchNormalization()(inputs)def activation(inputs, activation):    return Activation(activation)(inputs)def res_block(inputs, filters, kernel_size, dilation_rate):    hf = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'tanh')    hg = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'sigmoid')    h0 = Multiply()([hf, hg])        ha = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh')    hs = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh')        return Add()([ha, inputs]), hsh0 = activation(batchnorm(conv1d(X, filters, 1, 1)), 'tanh')shortcut = []for i in range(num_blocks):    for r in [1, 2, 4, 8, 16]:        h0, s = res_block(h0, filters, 7, r)        shortcut.append(s)h1 = activation(Add()(shortcut), 'relu')h1 = activation(batchnorm(conv1d(h1, filters, 1, 1)), 'relu') # batch_size, seq_len, filtersh1 = batchnorm(conv1d(h1, num_class, 1, 1)) # batch_size, seq_len, num_classh1 = GlobalMaxPooling1D()(h1) # batch_size, num_classY = activation(h1, 'softmax')optimizer = Adam(lr=0.01, clipnorm=5)model = Model(inputs=X, outputs=Y)model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])checkpointer = ModelCheckpoint(filepath='fangyan.h5', verbose=0)lr_decay = ReduceLROnPlateau(monitor='loss', factor=0.2, patience=1, min_lr=0.000)history = model.fit_generator(    generator=batch_generator(X_train, Y_train),     steps_per_epoch=len(X_train) // batch_size,    epochs=epochs,     validation_data=batch_generator(X_dev, Y_dev),     validation_steps=len(X_dev) // batch_size,     callbacks=[checkpointer, lr_decay])复制代码绘制损失函数曲线和正确率曲线,经过10轮的训练后,训练集的正确率已经将近100%,而验证集则不太稳定,大概在89%左右
train_loss = history.history['loss']valid_loss = history.history['val_loss']plt.plot(train_loss, label='train')plt.plot(valid_loss, label='valid')plt.legend(loc='upper right')plt.xlabel('Epoch')plt.ylabel('Loss')plt.show()train_acc = history.history['acc']valid_acc = history.history['val_acc']plt.plot(train_acc, label='train')plt.plot(valid_acc, label='valid')plt.legend(loc='upper right')plt.xlabel('Epoch')plt.ylabel('Accuracy')plt.show()复制代码验证集结果不够好的原因可能是训练数据不足,虽然一共有1W8条训练数据,但实际上只有90个说话人
如果说话人更多一些、声音更多样一些,模型应该能够学到各种方言所对应的更为通用的特征




保存分类和方言名称之间的映射,以便后续使用
with open('resources.pkl', 'wb') as fw:    pickle.dump([class2id, id2class, mfcc_mean, mfcc_std], fw)复制代码在单机上加载训练好的模型,随机选择一条语音进行分类
# -*- coding:utf-8 -*-import numpy as npfrom keras.models import load_modelfrom keras.preprocessing.sequence import pad_sequencesimport librosafrom python_speech_features import mfccimport pickleimport waveimport globwith open('resources.pkl', 'rb') as fr:    [class2id, id2class, mfcc_mean, mfcc_std] = pickle.load(fr)model = load_model('fangyan.h5')paths = glob.glob('data/*/dev/*/*/*.pcm')path = np.random.choice(paths, 1)[0]label = path.split('/')[1]print(label, path)mfcc_dim = 13sr = 16000min_length = 1 * srslice_length = 3 * srdef load_and_trim(path, sr=16000):    audio = np.memmap(path, dtype='h', mode='r')    audio = audio[2000:-2000]    audio = audio.astype(np.float32)    energy = librosa.feature.rmse(audio)    frames = np.nonzero(energy >= np.max(energy) / 5)    indices = librosa.core.frames_to_samples(frames)[1]    audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0]        slices = []    for i in range(0, audio.shape[0], slice_length):        s = audio[i: i + slice_length]        slices.append(s)        return audio, slicesaudio, slices = load_and_trim(path)X_data = [mfcc(s, sr, numcep=mfcc_dim) for s in slices]X_data = [(x - mfcc_mean) / (mfcc_std + 1e-14) for x in X_data]maxlen = np.max([x.shape[0] for x in X_data])X_data = pad_sequences(X_data, maxlen, 'float32', padding='post', value=0.0)print(X_data.shape)prob = model.predict(X_data)prob = np.mean(prob, axis=0)pred = np.argmax(prob)prob = prob[pred]pred = id2class[pred]print('True:', label)print('Pred:', pred, 'Confidence:', prob)复制代码最后再提一下,既然是对三维tensor做分类,那么就和文本分类问题极其相似,所以也可以考虑使用BiLSTM之类的其他模型



链接:https://juejin.im/post/5ba4dfe1e51d450e551a09e7




作者: 不二晨    时间: 2018-10-10 11:46
奈斯
作者: 魔都黑马少年梦    时间: 2018-11-1 16:37





欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2