A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

简介使用RNN实现歌词和古诗的自动生成
RNN多用于处理序列数据,通过学习数据上下文之间的关系,可以在给定若干个连续数据点的基础上,预测下一个可能的数据点
以下是最基础的RNN公式,当然也可以使用LSTM(Long Short-Term Memory)或GRU(Gated Recurrent Unit)生成序列
准备一些序列数据,这里我们主要使用文本,例如歌词和古诗等
手动版先来个最手动的版本,用numpy实现歌词生成。歌词爬取自网络,主要参考了以下代码,gist.github.com/karpathy/d4…
加载库和歌词,去掉英文占比较多的歌词(可能为英文歌),还剩36616首歌
# -*- coding: utf-8 -*-import numpy as npsentences = []with open('../lyrics.txt', 'r', encoding='utf8') as fr:    lines = fr.readlines()    for line in lines:        line = line.strip()        count = 0        for c in line:            if (c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z'):                count += 1        if count / len(line) < 0.1:            sentences.append(line)print('共%d首歌' % len(sentences))复制代码整理字和id之间的映射,共10131个字
chars = {}for sentence in sentences:    for c in sentence:        chars[c] = chars.get(c, 0) + 1chars = sorted(chars.items(), key=lambda x:x[1], reverse=True)chars = [char[0] for char in chars]vocab_size = len(chars)print('共%d个字' % vocab_size, chars[:20])char2id = {c: i for i, c in enumerate(chars)}id2char = {i: c for i, c in enumerate(chars)}复制代码定义一些训练参数和模型参数,整理训练数据
hidden_size = 100maxlen = 25learning_rate = 0.1X_data = []Y_data = []for sentence in sentences:    for i in range(0, len(sentence) - maxlen - 1, maxlen):        X_data.append([char2id[c] for c in sentence[i: i + maxlen]])        Y_data.append([char2id[c] for c in sentence[i + 1: i + maxlen + 1]])print(len(X_data))Wxh = np.random.randn(hidden_size, vocab_size) * 0.01Whh = np.random.randn(hidden_size, hidden_size) * 0.01Why = np.random.randn(vocab_size, hidden_size) * 0.01bh = np.zeros((hidden_size, 1))by = np.zeros((vocab_size, 1))复制代码损失函数
def lossFun(inputs, targets, hprev):    xs, hs, ys, ps = {}, {}, {}, {}    hs[-1] = np.copy(hprev)    loss = 0    # forward pass    for t in range(len(inputs)):        xs[t] = np.zeros((vocab_size, 1))        xs[t][inputs[t]] = 1        hs[t] = np.tanh(np.dot(Wxh, xs[t]) + np.dot(Whh, hs[t - 1]) + bh)        ys[t] = np.dot(Why, hs[t]) + by        ps[t] = np.exp(ys[t]) / np.sum(np.exp(ys[t]))        loss += -np.log(ps[t][targets[t], 0])        # backward pass    dWxh, dWhh, dWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why)    dbh, dby = np.zeros_like(bh), np.zeros_like(by)    dhnext = np.zeros_like(hs[0])    for t in reversed(range(len(inputs))):        dy = np.copy(ps[t])        dy[targets[t]] -= 1        dWhy += np.dot(dy, hs[t].T)        dby += dy        dh = np.dot(Why.T, dy) + dhnext        dhraw = (1 - hs[t] * hs[t]) * dh        dbh += dhraw        dWxh += np.dot(dhraw, xs[t].T)        dWhh += np.dot(dhraw, hs[t-1].T)        dhnext = np.dot(Whh.T, dhraw)        for dparam in [dWxh, dWhh, dWhy, dbh, dby]:        np.clip(dparam, -5, 5, out=dparam)    return loss, dWxh, dWhh, dWhy, dbh, dby, hs[len(inputs) - 1]复制代码样本生成函数,每经过若干轮迭代就调用一次
def sample(h, seed_ix, n):    x = np.zeros((vocab_size, 1))    x[seed_ix] = 1    ixes = []    for t in range(n):        h = np.tanh(np.dot(Wxh, x) + np.dot(Whh, h) + bh)        y = np.dot(Why, h) + by        p = np.exp(y) / np.sum(np.exp(y))        ix = np.random.choice(range(vocab_size), p=p.ravel())        ixes.append(ix)                x = np.zeros((vocab_size, 1))        x[ix] = 1        return ixes复制代码初始化训练变量,这里使用Adagrad优化算法,所以需要一些额外的缓存变量
n = 0mWxh, mWhh, mWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why)mbh, mby = np.zeros_like(bh), np.zeros_like(by)smooth_loss = -np.log(1.0 / vocab_size) * maxlen复制代码训练模型,会一直循环进行
while True:    if n == 0 or n == len(X_data):         hprev = np.zeros((hidden_size, 1))        n = 0        X = X_data[n]    Y = Y_data[n]    loss, dWxh, dWhh, dWhy, dbh, dby, hprev = lossFun(X, Y, hprev)    smooth_loss = smooth_loss * 0.999 + loss * 0.001    for param, dparam, mem in zip([Wxh, Whh, Why, bh, by], [dWxh, dWhh, dWhy, dbh, dby], [mWxh, mWhh, mWhy, mbh, mby]):        mem += dparam * dparam        param += -learning_rate * dparam / np.sqrt(mem + 1e-8)    if n % 100 == 0:        print('iter %d, loss: %f' % (n, smooth_loss))        sample_ix = sample(hprev, X[0], 200)        txt = ''.join(id2char[ix] for ix in sample_ix)        print(txt)    n += 1复制代码经过54W次迭代后,生成了这么一段话,虽然并不通顺,但似乎确实学习到了一些词语和句法
颜悲 心已中雨著街眼泪不知 留在这时祈忘的自己一样无常 你我的欢 当时是你能止学了绽放瞥袖 前朝来去勇气 让你是一双睡过以后  因为你飞雪中的街音里飞   此模糊的爱 只有谁要再多少时 管只是无度美醉不给主题衬  曾流盲双脚一片城本身边 来并肩常与尽是一点和缺 好爱得也还记得证着多梦 愛 做人来 这吃碎 我们精神蹲着你的门 口不信心终究理想透完了谁几度 我都在凭营力的光体 卖爱不说 爱你是我的好复制代码KerasKeras官方提供了使用LSTM生成文本的示例
github.com/fchollet/ke…
简单地改一下,数据还是使用之前的歌词
加载库
# -*- coding: utf-8 -*-from keras.models import Sequentialfrom keras.layers import Dense, LSTM, Embeddingfrom keras.callbacks import LambdaCallbackimport numpy as npimport randomimport sysimport pickle复制代码加载数据,整理字和id之间的映射
sentences = []with open('../lyrics.txt', 'r', encoding='utf8') as fr:    lines = fr.readlines()    for line in lines:        line = line.strip()        count = 0        for c in line:            if (c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z'):                count += 1        if count / len(line) < 0.1:            sentences.append(line)print('共%d首歌' % len(sentences))chars = {}for sentence in sentences:    for c in sentence:        chars[c] = chars.get(c, 0) + 1chars = sorted(chars.items(), key=lambda x:x[1], reverse=True)chars = [char[0] for char in chars]vocab_size = len(chars)print('共%d个字' % vocab_size, chars[:20])char2id = {c: i for i, c in enumerate(chars)}id2char = {i: c for i, c in enumerate(chars)}with open('dictionary.pkl', 'wb') as fw:    pickle.dump([char2id, id2char], fw)复制代码整理训练数据,定义模型并编译
maxlen = 10step = 3embed_size = 128hidden_size = 128vocab_size = len(chars)batch_size = 64epochs = 20X_data = []Y_data = []for sentence in sentences:    for i in range(0, len(sentence) - maxlen, step):        X_data.append([char2id[c] for c in sentence[i: i + maxlen]])        y = np.zeros(vocab_size, dtype=np.bool)        y[char2id[sentence[i + maxlen]]] = 1        Y_data.append(y)X_data = np.array(X_data)Y_data = np.array(Y_data)print(X_data.shape, Y_data.shape)model = Sequential()model.add(Embedding(input_dim=vocab_size, output_dim=embed_size, input_length=maxlen))model.add(LSTM(hidden_size, input_shape=(maxlen, embed_size)))model.add(Dense(vocab_size, activation='softmax'))model.compile(loss='categorical_crossentropy', optimizer='adam')复制代码定义序列样本生成函数
def sample(preds, diversity=1.0):    preds = np.asarray(preds).astype('float64')    preds = np.log(preds + 1e-10) / diversity    exp_preds = np.exp(preds)    preds = exp_preds / np.sum(exp_preds)    probas = np.random.multinomial(1, preds, 1)    return np.argmax(probas)复制代码定义每轮训练结束后的回调函数
def on_epoch_end(epoch, logs):    print('-' * 30)    print('Epoch', epoch)    index = random.randint(0, len(sentences))    for diversity in [0.2, 0.5, 1.0]:        print('----- diversity:', diversity)        sentence = sentences[index][:maxlen]        print('----- Generating with seed: ' + sentence)        sys.stdout.write(sentence)        for i in range(400):            x_pred = np.zeros((1, maxlen))            for t, char in enumerate(sentence):                x_pred[0, t] = char2id[char]            preds = model.predict(x_pred, verbose=0)[0]            next_index = sample(preds, diversity)            next_char = id2char[next_index]            sentence = sentence[1:] + next_char            sys.stdout.write(next_char)            sys.stdout.flush()复制代码训练模型并保存
model.fit(X_data, Y_data, batch_size=batch_size, epochs=epochs, callbacks=[LambdaCallback(on_epoch_end=on_epoch_end)])model.save('song_keras.h5')复制代码使用以下代码调用模型生成歌词,需提供一句起始歌词
# -*- coding: utf-8 -*-from keras.models import load_modelimport numpy as npimport pickleimport sysmaxlen = 10model = load_model('song_keras.h5')with open('dictionary.pkl', 'rb') as fr:    [char2id, id2char] = pickle.load(fr)def sample(preds, diversity=1.0):    preds = np.asarray(preds).astype('float64')    preds = np.log(preds + 1e-10) / diversity    exp_preds = np.exp(preds)    preds = exp_preds / np.sum(exp_preds)    probas = np.random.multinomial(1, preds, 1)    return np.argmax(probas)sentence = '能不能给我一首歌的时间'sentence = sentence[:maxlen]diversity = 1.0print('----- Generating with seed: ' + sentence)print('----- diversity:', diversity)sys.stdout.write(sentence)for i in range(400):    x_pred = np.zeros((1, maxlen))    for t, char in enumerate(sentence):        x_pred[0, t] = char2id[char]    preds = model.predict(x_pred, verbose=0)[0]    next_index = sample(preds, diversity)    next_char = id2char[next_index]    sentence = sentence[1:] + next_char    sys.stdout.write(next_char)    sys.stdout.flush()复制代码生成结果如下,比之前的结果似乎好一些,有意义的词语和短句更多
能不能给我一首歌的时间 要去人还有古年 你代表我所的 只愿为你做下一个成熟 从那个歌声中  你的别思量 写你的画面走过了西陌上雨张 小水没忘了 我欲再感受   我终于你开心哭过心事流出了我心痛 就看口提幽纹太多 独自一直行 你也在想 我感到最此的第一次 只想要闲想 穿行多高楼的星云 看见鞍上云 青竹琼楼又新叶 人潮春涌成度过 幸福呜 风雪落入丽筝凄凄 万顷枯枝回伸离袖弦  不幸以潮 到底必经认来我不变 都想你 这星辰 暮鼓 WA Lsevemusich hey Live 走进不在乎 不愿天涯 如此温柔 不够支离 多巧认真和你还太平行 哎呀呀呀 呀呀呀呀呀呀呀啊嘿 饿不好去哪儿呀 那我的聪明? 王王之以下 下也难改徒有爱还能敢相离 拨开你的嘴角 相识的一见 到你的世界所世 才发现我也不会躲藏 让我决定有人担心善良 像一个人世界内心长着  夜晚需来又头 与我专车征 战天几天不懂配游戏 也是自己应吗 你给我来的狠也复制代码TensorFlow换一下工具和数据,使用TensorFlow实现古诗生成,使用以下数据,github.com/chinese-poe…
加载库
# -*- coding: utf-8 -*-import tensorflow as tfimport numpy as npimport globimport jsonfrom collections import Counterfrom tqdm import tqdmfrom snownlp import SnowNLP复制代码加载数据,共105336首诗
poets = []paths = glob.glob('chinese-poetry/json/poet.*.json')for path in paths:    data = open(path, 'r').read()    data = json.loads(data)    for item in data:        content = ''.join(item['paragraphs'])        if len(content) >= 24 and len(content) <= 32:            content = SnowNLP(content)            poets.append('[' + content.han + ']')poets.sort(key=lambda x: len(x))print('共%d首诗' % len(poets), poets[0], poets[-1])复制代码整理字和id之间的映射,共8072个不同的字
chars = []for item in poets:    chars += [c for c in item]print('共%d个字' % len(chars))chars = sorted(Counter(chars).items(), key=lambda x:x[1], reverse=True)print('共%d个不同的字' % len(chars))print(chars[:10])chars = [c[0] for c in chars]char2id = {c: i + 1 for i, c in enumerate(chars)}id2char = {i + 1: c for i, c in enumerate(chars)}复制代码整理训练数据
batch_size = 64X_data = []Y_data = []for b in range(len(poets) // batch_size):    start = b * batch_size    end = b * batch_size + batch_size    batch = [[char2id[c] for c in poets] for i in range(start, end)]    maxlen = max(map(len, batch))    X_batch = np.full((batch_size, maxlen - 1), 0, np.int32)    Y_batch = np.full((batch_size, maxlen - 1), 0, np.int32)    for i in range(batch_size):        X_batch[i, :len(batch) - 1] = batch[:-1]        Y_batch[i, :len(batch) - 1] = batch[1:]        X_data.append(X_batch)    Y_data.append(Y_batch)    print(len(X_data), len(Y_data))复制代码定义模型结构和优化器
hidden_size = 256num_layer = 2embedding_size = 256X = tf.placeholder(tf.int32, [batch_size, None])Y = tf.placeholder(tf.int32, [batch_size, None])learning_rate = tf.Variable(0.0, trainable=False)cell = tf.nn.rnn_cell.MultiRNNCell(    [tf.nn.rnn_cell.BasicLSTMCell(hidden_size, state_is_tuple=True) for i in range(num_layer)],     state_is_tuple=True)initial_state = cell.zero_state(batch_size, tf.float32)embeddings = tf.Variable(tf.random_uniform([len(char2id) + 1, embedding_size], -1.0, 1.0))embedded = tf.nn.embedding_lookup(embeddings, X)# outputs: batch_size, max_time, hidden_size# last_states: 2 tuple(two LSTM), 2 tuple(c and h)#              batch_size, hidden_sizeoutputs, last_states = tf.nn.dynamic_rnn(cell, embedded, initial_state=initial_state)outputs = tf.reshape(outputs, [-1, hidden_size])                # batch_size * max_time, hidden_sizelogits = tf.layers.dense(outputs, units=len(char2id) + 1)       # batch_size * max_time, len(char2id) + 1logits = tf.reshape(logits, [batch_size, -1, len(char2id) + 1]) # batch_size, max_time, len(char2id) + 1probs = tf.nn.softmax(logits)                                   # batch_size, max_time, len(char2id) + 1loss = tf.reduce_mean(tf.contrib.seq2seq.sequence_loss(logits, Y, tf.ones_like(Y, dtype=tf.float32)))params = tf.trainable_variables()grads, _ = tf.clip_by_global_norm(tf.gradients(loss, params), 5)optimizer = tf.train.AdamOptimizer(learning_rate).apply_gradients(zip(grads, params))复制代码训练模型,共训练50轮
sess = tf.Session()sess.run(tf.global_variables_initializer())for epoch in range(50):    sess.run(tf.assign(learning_rate, 0.002 * (0.97 ** epoch)))        data_index = np.arange(len(X_data))    np.random.shuffle(data_index)    X_data = [X_data for i in data_index]    Y_data = [Y_data for i in data_index]        losses = []    for i in tqdm(range(len(X_data))):        ls_,  _ = sess.run([loss, optimizer], feed_dict={X: X_data, Y: Y_data})        losses.append(ls_)        print('Epoch %d Loss %.5f' % (epoch, np.mean(losses)))复制代码保存模型,以便在单机上使用
saver = tf.train.Saver()saver.save(sess, './poet_generation_tensorflow')import picklewith open('dictionary.pkl', 'wb') as fw:    pickle.dump([char2id, id2char], fw)复制代码在单机上使用模型生成古诗,可随机生成或生成藏头诗
# -*- coding: utf-8 -*-import tensorflow as tfimport numpy as npimport picklewith open('dictionary.pkl', 'rb') as fr:    [char2id, id2char] = pickle.load(fr)batch_size = 1hidden_size = 256num_layer = 2embedding_size = 256X = tf.placeholder(tf.int32, [batch_size, None])Y = tf.placeholder(tf.int32, [batch_size, None])learning_rate = tf.Variable(0.0, trainable=False)cell = tf.nn.rnn_cell.MultiRNNCell(    [tf.nn.rnn_cell.BasicLSTMCell(hidden_size, state_is_tuple=True) for i in range(num_layer)],     state_is_tuple=True)initial_state = cell.zero_state(batch_size, tf.float32)embeddings = tf.Variable(tf.random_uniform([len(char2id) + 1, embedding_size], -1.0, 1.0))embedded = tf.nn.embedding_lookup(embeddings, X)outputs, last_states = tf.nn.dynamic_rnn(cell, embedded, initial_state=initial_state)outputs = tf.reshape(outputs, [-1, hidden_size])logits = tf.layers.dense(outputs, units=len(char2id) + 1)probs = tf.nn.softmax(logits)targets = tf.reshape(Y, [-1])loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels=targets))params = tf.trainable_variables()grads, _ = tf.clip_by_global_norm(tf.gradients(loss, params), 5)optimizer = tf.train.AdamOptimizer(learning_rate).apply_gradients(zip(grads, params))sess = tf.Session()sess.run(tf.global_variables_initializer())saver = tf.train.Saver()saver.restore(sess, tf.train.latest_checkpoint('./'))def generate():    states_ = sess.run(initial_state)        gen = ''    c = '['    while c != ']':        gen += c        x = np.zeros((batch_size, 1))        x[:, 0] = char2id[c]        probs_, states_ = sess.run([probs, last_states], feed_dict={X: x, initial_state: states_})        probs_ = np.squeeze(probs_)        pos = int(np.searchsorted(np.cumsum(probs_), np.random.rand() * np.sum(probs_)))        c = id2char[pos]        return gen[1:]def generate_with_head(head):    states_ = sess.run(initial_state)        gen = ''    c = '['    i = 0    while c != ']':        gen += c        x = np.zeros((batch_size, 1))        x[:, 0] = char2id[c]        probs_, states_ = sess.run([probs, last_states], feed_dict={X: x, initial_state: states_})        probs_ = np.squeeze(probs_)        pos = int(np.searchsorted(np.cumsum(probs_), np.random.rand() * np.sum(probs_)))        if (c == '[' or c == '。' or c == ',') and i < len(head):            c = head            i += 1        else:            c = id2char[pos]        return gen[1:]print(generate())print(generate_with_head('深度学习'))复制代码生成结果如下,字数和标点符号都对上了,内容也像那么回事,反正也看不太懂
百计无心魄可无,知君又到两家书。自知君子有天禄,天下名通赤子虚。深山宜数月交驰,度世曾徒有客期。学子今来能入楚,习家不瘿莫辞卑。复制代码参考


链接:https://juejin.im/post/5ba4dd425188255c5442a7c8




2 个回复

倒序浏览
奈斯
回复 使用道具 举报
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马