原理输入是一张图片,输出是一句对图片进行描述的文本,这就是图像标题生成
基本思路是先通过预训练的图像分类模型,从某一个卷积层得到原始图片的表示,或者称为上下文contexts
例如从VGG19的conv5_3拿到原始图片的表示,shape为14*14*512,即512张14*14的小图
这样一来,可以理解为将原始图片分成14*14共196个小块,每个小块对应一个512维的特征
根据contexts使用LSTM逐步生成单词,即可产生原始图片对应的描述文本
在生成每一个单词时,应该对196个块有不同的偏重,即所谓的注意力机制
就像我们人一样,考虑下一个词时,对图片的不同区域会有不同的关注度,相关性更强的区域会获得更多的注意力即更高的权重
根据注意力权重对196个512维的特征进行加权求和,即可得到基于注意力机制的上下文context
和之前介绍过的Seq2Seq Learning联系起来,图像标题生成便属于one to many这种情况
数据使用COCO2014数据,cocodataset.org/#download,训练集包括8W多张图片,验证集包括4W多张图片,并且提供了每张图片对应的标题
每张图片的标题不止一个,因此训练集一共411593个标题,而验证集一共201489个标题,平均一张图片五个标题
实现基于以下项目实现,github.com/yunjey/show…
训练首先是训练部分代码
加载库
# -*- coding: utf-8 -*-import tensorflow as tfimport numpy as npimport matplotlib.pyplot as plt%matplotlib inlinefrom sklearn.utils import shufflefrom imageio import imreadimport scipy.ioimport cv2import osimport jsonfrom tqdm import tqdmimport pickle复制代码加载数据,因为一张图片可能对应多个标题,因此以一个图片id和一个标题为一条数据。对于图片内容,保留中心正方形区域并缩放;对于标题,长度超过20个词则去除
batch_size = 128maxlen = 20image_size = 224MEAN_VALUES = np.array([123.68, 116.779, 103.939]).reshape((1, 1, 3))def load_data(image_dir, annotation_path): with open(annotation_path, 'r') as fr: annotation = json.load(fr) ids = [] captions = [] image_dict = {} for i in tqdm(range(len(annotation['annotations']))): item = annotation['annotations'] caption = item['caption'].strip().lower() caption = caption.replace('.', '').replace(',', '').replace("'", '').replace('"', '') caption = caption.replace('&', 'and').replace('(', '').replace(')', '').replace('-', ' ').split() caption = [w for w in caption if len(w) > 0] if len(caption) <= maxlen: if not item['image_id'] in image_dict: img = imread(image_dir + '%012d.jpg' % item['image_id']) h = img.shape[0] w = img.shape[1] if h > w: img = img[h // 2 - w // 2: h // 2 + w // 2, :] else: img = img[:, w // 2 - h // 2: w // 2 + h // 2] img = cv2.resize(img, (image_size, image_size)) if len(img.shape) < 3: img = np.expand_dims(img, -1) img = np.concatenate([img, img, img], axis=-1) image_dict[item['image_id']] = img ids.append(item['image_id']) captions.append(caption) return ids, captions, image_dicttrain_json = 'data/train/captions_train2014.json'train_ids, train_captions, train_dict = load_data('data/train/images/COCO_train2014_', train_json)print(len(train_ids))复制代码查看一下标题标注
data_index = np.arange(len(train_ids))np.random.shuffle(data_index)N = 4data_index = data_index[:N]plt.figure(figsize=(12, 20))for i in range(N): caption = train_captions[data_index] img = train_dict[train_ids[data_index]] plt.subplot(4, 1, i + 1) plt.imshow(img) plt.title(' '.join(caption)) plt.axis('off')复制代码整理词典,一共23728个词,建立词和id之间的映射,并使用到三个特殊词
vocabulary = {}for caption in train_captions: for word in caption: vocabulary[word] = vocabulary.get(word, 0) + 1vocabulary = sorted(vocabulary.items(), key=lambda x:-x[1])vocabulary = [w[0] for w in vocabulary]word2id = {'<pad>': 0, '<start>': 1, '<end>': 2}for i, w in enumerate(vocabulary): word2id[w] = i + 3id2word = {i: w for w, i in word2id.items()}print(len(vocabulary), vocabulary[:20])with open('dictionary.pkl', 'wb') as fw: pickle.dump([vocabulary, word2id, id2word], fw)def translate(ids): words = [id2word for i in ids if i >= 3] return ' '.join(words) + '.'复制代码将标题转换为id序列
def convert_captions(data): result = [] for caption in data: vector = [word2id['<start>']] for word in caption: if word in word2id: vector.append(word2id[word]) vector.append(word2id['<end>']) result.append(vector) array = np.zeros((len(data), maxlen + 2), np.int32) for i in tqdm(range(len(result))): array[i, :len(result)] = result return arraytrain_captions = convert_captions(train_captions)print(train_captions.shape)print(train_captions[0])print(translate(train_captions[0]))复制代码使用图像风格迁移中用过的imagenet-vgg-verydeep-19.mat来提取图像特征,加载vgg19模型并定义一个函数,对于给定的输入,返回vgg19各个层的输出值,通过variable_scope实现网络的重用,将conv5_3的输出作为原始图片的表示
vgg = scipy.io.loadmat('imagenet-vgg-verydeep-19.mat')vgg_layers = vgg['layers']def vgg_endpoints(inputs, reuse=None): with tf.variable_scope('endpoints', reuse=reuse): def _weights(layer, expected_layer_name): W = vgg_layers[0][layer][0][0][2][0][0] b = vgg_layers[0][layer][0][0][2][0][1] layer_name = vgg_layers[0][layer][0][0][0][0] assert layer_name == expected_layer_name return W, b def _conv2d_relu(prev_layer, layer, layer_name): W, b = _weights(layer, layer_name) W = tf.constant(W) b = tf.constant(np.reshape(b, (b.size))) return tf.nn.relu(tf.nn.conv2d(prev_layer, filter=W, strides=[1, 1, 1, 1], padding='SAME') + b) def _avgpool(prev_layer): return tf.nn.avg_pool(prev_layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') graph = {} graph['conv1_1'] = _conv2d_relu(inputs, 0, 'conv1_1') graph['conv1_2'] = _conv2d_relu(graph['conv1_1'], 2, 'conv1_2') graph['avgpool1'] = _avgpool(graph['conv1_2']) graph['conv2_1'] = _conv2d_relu(graph['avgpool1'], 5, 'conv2_1') graph['conv2_2'] = _conv2d_relu(graph['conv2_1'], 7, 'conv2_2') graph['avgpool2'] = _avgpool(graph['conv2_2']) graph['conv3_1'] = _conv2d_relu(graph['avgpool2'], 10, 'conv3_1') graph['conv3_2'] = _conv2d_relu(graph['conv3_1'], 12, 'conv3_2') graph['conv3_3'] = _conv2d_relu(graph['conv3_2'], 14, 'conv3_3') graph['conv3_4'] = _conv2d_relu(graph['conv3_3'], 16, 'conv3_4') graph['avgpool3'] = _avgpool(graph['conv3_4']) graph['conv4_1'] = _conv2d_relu(graph['avgpool3'], 19, 'conv4_1') graph['conv4_2'] = _conv2d_relu(graph['conv4_1'], 21, 'conv4_2') graph['conv4_3'] = _conv2d_relu(graph['conv4_2'], 23, 'conv4_3') graph['conv4_4'] = _conv2d_relu(graph['conv4_3'], 25, 'conv4_4') graph['avgpool4'] = _avgpool(graph['conv4_4']) graph['conv5_1'] = _conv2d_relu(graph['avgpool4'], 28, 'conv5_1') graph['conv5_2'] = _conv2d_relu(graph['conv5_1'], 30, 'conv5_2') graph['conv5_3'] = _conv2d_relu(graph['conv5_2'], 32, 'conv5_3') graph['conv5_4'] = _conv2d_relu(graph['conv5_3'], 34, 'conv5_4') graph['avgpool5'] = _avgpool(graph['conv5_4']) return graphX = tf.placeholder(tf.float32, [None, image_size, image_size, 3])encoded = vgg_endpoints(X - MEAN_VALUES)['conv5_3']print(encoded)复制代码基于以上contexts,实现初始化、词嵌入、特征映射等部分
k_initializer = tf.contrib.layers.xavier_initializer()b_initializer = tf.constant_initializer(0.0)e_initializer = tf.random_uniform_initializer(-1.0, 1.0)def dense(inputs, units, activation=tf.nn.tanh, use_bias=True, name=None): return tf.layers.dense(inputs, units, activation, use_bias, kernel_initializer=k_initializer, bias_initializer=b_initializer, name=name)def batch_norm(inputs, name): return tf.contrib.layers.batch_norm(inputs, decay=0.95, center=True, scale=True, is_training=True, updates_collections=None, scope=name)def dropout(inputs): return tf.layers.dropout(inputs, rate=0.5, training=True)num_block = 14 * 14num_filter = 512hidden_size = 1024embedding_size = 512encoded = tf.reshape(encoded, [-1, num_block, num_filter]) # batch_size, num_block, num_filtercontexts = batch_norm(encoded, 'contexts')Y = tf.placeholder(tf.int32, [None, maxlen + 2])Y_in = Y[:, :-1]Y_out = Y[:, 1:]mask = tf.to_float(tf.not_equal(Y_out, word2id['<pad>']))with tf.variable_scope('initialize'): context_mean = tf.reduce_mean(contexts, 1) state = dense(context_mean, hidden_size, name='initial_state') memory = dense(context_mean, hidden_size, name='initial_memory') with tf.variable_scope('embedding'): embeddings = tf.get_variable('weights', [len(word2id), embedding_size], initializer=e_initializer) embedded = tf.nn.embedding_lookup(embeddings, Y_in) with tf.variable_scope('projected'): projected_contexts = tf.reshape(contexts, [-1, num_filter]) # batch_size * num_block, num_filter projected_contexts = dense(projected_contexts, num_filter, activation=None, use_bias=False, name='projected_contexts') projected_contexts = tf.reshape(projected_contexts, [-1, num_block, num_filter]) # batch_size, num_block, num_filterlstm = tf.nn.rnn_cell.BasicLSTMCell(hidden_size)loss = 0alphas = []复制代码依次生成标题中的每个词,包括计算注意力和context、计算选择器、lstm处理、计算输出、计算损失函数几个部分
for t in range(maxlen + 1): with tf.variable_scope('attend'): h0 = dense(state, num_filter, activation=None, name='fc_state') # batch_size, num_filter h0 = tf.nn.relu(projected_contexts + tf.expand_dims(h0, 1)) # batch_size, num_block, num_filter h0 = tf.reshape(h0, [-1, num_filter]) # batch_size * num_block, num_filter h0 = dense(h0, 1, activation=None, use_bias=False, name='fc_attention') # batch_size * num_block, 1 h0 = tf.reshape(h0, [-1, num_block]) # batch_size, num_block alpha = tf.nn.softmax(h0) # batch_size, num_block # contexts: batch_size, num_block, num_filter # tf.expand_dims(alpha, 2): batch_size, num_block, 1 context = tf.reduce_sum(contexts * tf.expand_dims(alpha, 2), 1, name='context') # batch_size, num_filter alphas.append(alpha) with tf.variable_scope('selector'): beta = dense(state, 1, activation=tf.nn.sigmoid, name='fc_beta') # batch_size, 1 context = tf.multiply(beta, context, name='selected_context') # batch_size, num_filter with tf.variable_scope('lstm'): h0 = tf.concat([embedded[:, t, :], context], 1) # batch_size, embedding_size + num_filter _, (memory, state) = lstm(inputs=h0, state=[memory, state]) with tf.variable_scope('decode'): h0 = dropout(state) h0 = dense(h0, embedding_size, activation=None, name='fc_logits_state') h0 += dense(context, embedding_size, activation=None, use_bias=False, name='fc_logits_context') h0 += embedded[:, t, :] h0 = tf.nn.tanh(h0) h0 = dropout(h0) logits = dense(h0, len(word2id), activation=None, name='fc_logits') loss += tf.reduce_sum(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=Y_out[:, t], logits=logits) * mask[:, t]) tf.get_variable_scope().reuse_variables()复制代码在损失函数中加入注意力正则项,定义优化器
alphas = tf.transpose(tf.stack(alphas), (1, 0, 2)) # batch_size, maxlen + 1, num_blockalphas = tf.reduce_sum(alphas, 1) # batch_size, num_blockattention_loss = tf.reduce_sum(((maxlen + 1) / num_block - alphas) ** 2)total_loss = (loss + attention_loss) / batch_sizewith tf.variable_scope('optimizer', reuse=tf.AUTO_REUSE): global_step = tf.Variable(0, trainable=False) vars_t = [var for var in tf.trainable_variables() if not var.name.startswith('endpoints')] train = tf.contrib.layers.optimize_loss(total_loss, global_step, 0.001, 'Adam', clip_gradients=5.0, variables=vars_t)复制代码训练模型,将一些tensor的值写入events文件,便于后续使用tensorboard查看
sess = tf.Session()sess.run(tf.global_variables_initializer())saver = tf.train.Saver()OUTPUT_DIR = 'model'if not os.path.exists(OUTPUT_DIR): os.mkdir(OUTPUT_DIR)tf.summary.scalar('losses/loss', loss)tf.summary.scalar('losses/attention_loss', attention_loss)tf.summary.scalar('losses/total_loss', total_loss)summary = tf.summary.merge_all()writer = tf.summary.FileWriter(OUTPUT_DIR)epochs = 20for e in range(epochs): train_ids, train_captions = shuffle(train_ids, train_captions) for i in tqdm(range(len(train_ids) // batch_size)): X_batch = np.array([train_dict[x] for x in train_ids[i * batch_size: i * batch_size + batch_size]]) Y_batch = train_captions[i * batch_size: i * batch_size + batch_size] _ = sess.run(train, feed_dict={X: X_batch, Y: Y_batch}) if i > 0 and i % 100 == 0: writer.add_summary(sess.run(summary, feed_dict={X: X_batch, Y: Y_batch}), e * len(train_ids) // batch_size + i) writer.flush() saver.save(sess, os.path.join(OUTPUT_DIR, 'image_caption'))复制代码使用以下命令可以在tensorboard中查看历史训练数据
tensorboard --logdir=model复制代码经过20轮训练后,损失函数曲线如下
验证接下来是验证部分代码,即在验证集上生成每张图片的标题,然后和标注进行对比和评估
在生成每一个词的时候,可以选择概率最大的词,即贪婪的做法,但不一定最优,因为当前概率最大的词并不能保证之后产生的序列整体概率最大
也不能像中文分词中那样使用viterbi算法,因为viterbi算法要求整个序列的概率分布已知,才能使用动态规划找到最大概率路径
但生成标题的时候,是一个词一个词地生成,而且选择的类别等于词典的大小,远远超出中文分词序列标注中的四分类,因此不可能穷尽所有可能的序列
一种折中的做法是使用beam search,涉及一个参数beam size,举个例子,当beam size等于3时
- 生成第一个词时,保留概率最大的三个词
- 生成第二个词时,在以上三个词的基础上,进一步生成九个词,保留九个序列中概率最大的三个
- 生成第n个词时,基于上一步保留下来的三个序列,进一步生成九个词,保留新的九个序列中概率最大的三个
- 就好比一棵树,每一次所有的树枝都会进一步长出三个子树枝,然后对于所有树枝,保留最好的三个,其他全部砍掉
- 重复以上过程,直到生成了结束词,或者生成的序列达到了最大长度
验证部分的大多数代码和训练部分相同
加载库
# -*- coding: utf-8 -*-import tensorflow as tfimport numpy as npimport matplotlib.pyplot as plt%matplotlib inlinefrom sklearn.utils import shufflefrom imageio import imreadimport scipy.ioimport cv2import osimport jsonfrom tqdm import tqdmimport pickle复制代码加载数据
batch_size = 128maxlen = 20image_size = 224MEAN_VALUES = np.array([123.68, 116.779, 103.939]).reshape((1, 1, 3))def load_data(image_dir, annotation_path): with open(annotation_path, 'r') as fr: annotation = json.load(fr) ids = [] captions = [] image_dict = {} for i in tqdm(range(len(annotation['annotations']))): item = annotation['annotations'] caption = item['caption'].strip().lower() caption = caption.replace('.', '').replace(',', '').replace("'", '').replace('"', '') caption = caption.replace('&', 'and').replace('(', '').replace(')', '').replace('-', ' ').split() caption = [w for w in caption if len(w) > 0] if len(caption) <= maxlen: if not item['image_id'] in image_dict: img = imread(image_dir + '%012d.jpg' % item['image_id']) h = img.shape[0] w = img.shape[1] if h > w: img = img[h // 2 - w // 2: h // 2 + w // 2, :] else: img = img[:, w // 2 - h // 2: w // 2 + h // 2] img = cv2.resize(img, (image_size, image_size)) if len(img.shape) < 3: img = np.expand_dims(img, -1) img = np.concatenate([img, img, img], axis=-1) image_dict[item['image_id']] = img ids.append(item['image_id']) captions.append(caption) return ids, captions, image_dictval_json = 'data/val/captions_val2014.json'val_ids, val_captions, val_dict = load_data('data/val/images/COCO_val2014_', val_json)print(len(val_ids))复制代码整理正确答案
gt = {}for i in tqdm(range(len(val_ids))): val_id = val_ids if not val_id in gt: gt[val_id] = [] gt[val_id].append(' '.join(val_captions) + ' .')print(len(gt))复制代码加载训练部分整理好的词典
with open('dictionary.pkl', 'rb') as fr: [vocabulary, word2id, id2word] = pickle.load(fr)def translate(ids): words = [id2word for i in ids if i >= 3] return ' '.join(words) + ' .'复制代码加载vgg19模型
vgg = scipy.io.loadmat('imagenet-vgg-verydeep-19.mat')vgg_layers = vgg['layers']def vgg_endpoints(inputs, reuse=None): with tf.variable_scope('endpoints', reuse=reuse): def _weights(layer, expected_layer_name): W = vgg_layers[0][layer][0][0][2][0][0] b = vgg_layers[0][layer][0][0][2][0][1] layer_name = vgg_layers[0][layer][0][0][0][0] assert layer_name == expected_layer_name return W, b def _conv2d_relu(prev_layer, layer, layer_name): W, b = _weights(layer, layer_name) W = tf.constant(W) b = tf.constant(np.reshape(b, (b.size))) return tf.nn.relu(tf.nn.conv2d(prev_layer, filter=W, strides=[1, 1, 1, 1], padding='SAME') + b) def _avgpool(prev_layer): return tf.nn.avg_pool(prev_layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') graph = {} graph['conv1_1'] = _conv2d_relu(inputs, 0, 'conv1_1') graph['conv1_2'] = _conv2d_relu(graph['conv1_1'], 2, 'conv1_2') graph['avgpool1'] = _avgpool(graph['conv1_2']) graph['conv2_1'] = _conv2d_relu(graph['avgpool1'], 5, 'conv2_1') graph['conv2_2'] = _conv2d_relu(graph['conv2_1'], 7, 'conv2_2') graph['avgpool2'] = _avgpool(graph['conv2_2']) graph['conv3_1'] = _conv2d_relu(graph['avgpool2'], 10, 'conv3_1') graph['conv3_2'] = _conv2d_relu(graph['conv3_1'], 12, 'conv3_2') graph['conv3_3'] = _conv2d_relu(graph['conv3_2'], 14, 'conv3_3') graph['conv3_4'] = _conv2d_relu(graph['conv3_3'], 16, 'conv3_4') graph['avgpool3'] = _avgpool(graph['conv3_4']) graph['conv4_1'] = _conv2d_relu(graph['avgpool3'], 19, 'conv4_1') graph['conv4_2'] = _conv2d_relu(graph['conv4_1'], 21, 'conv4_2') graph['conv4_3'] = _conv2d_relu(graph['conv4_2'], 23, 'conv4_3') graph['conv4_4'] = _conv2d_relu(graph['conv4_3'], 25, 'conv4_4') graph['avgpool4'] = _avgpool(graph['conv4_4']) graph['conv5_1'] = _conv2d_relu(graph['avgpool4'], 28, 'conv5_1') graph['conv5_2'] = _conv2d_relu(graph['conv5_1'], 30, 'conv5_2') graph['conv5_3'] = _conv2d_relu(graph['conv5_2'], 32, 'conv5_3') graph['conv5_4'] = _conv2d_relu(graph['conv5_3'], 34, 'conv5_4') graph['avgpool5'] = _avgpool(graph['conv5_4']) return graphX = tf.placeholder(tf.float32, [None, image_size, image_size, 3])encoded = vgg_endpoints(X - MEAN_VALUES)['conv5_3']print(encoded)复制代码验证部分需要定义几个placeholder,因为要使用到beam search,所以每生成一个词就需要输入一些相关的值
k_initializer = tf.contrib.layers.xavier_initializer()b_initializer = tf.constant_initializer(0.0)e_initializer = tf.random_uniform_initializer(-1.0, 1.0)def dense(inputs, units, activation=tf.nn.tanh, use_bias=True, name=None): return tf.layers.dense(inputs, units, activation, use_bias, kernel_initializer=k_initializer, bias_initializer=b_initializer, name=name)def batch_norm(inputs, name): return tf.contrib.layers.batch_norm(inputs, decay=0.95, center=True, scale=True, is_training=False, updates_collections=None, scope=name)def dropout(inputs): return tf.layers.dropout(inputs, rate=0.5, training=False)num_block = 14 * 14num_filter = 512hidden_size = 1024embedding_size = 512encoded = tf.reshape(encoded, [-1, num_block, num_filter]) # batch_size, num_block, num_filtercontexts = batch_norm(encoded, 'contexts')with tf.variable_scope('initialize'): context_mean = tf.reduce_mean(contexts, 1) initial_state = dense(context_mean, hidden_size, name='initial_state') initial_memory = dense(context_mean, hidden_size, name='initial_memory') contexts_phr = tf.placeholder(tf.float32, [None, num_block, num_filter])last_memory = tf.placeholder(tf.float32, [None, hidden_size])last_state = tf.placeholder(tf.float32, [None, hidden_size])last_word = tf.placeholder(tf.int32, [None])with tf.variable_scope('embedding'): embeddings = tf.get_variable('weights', [len(word2id), embedding_size], initializer=e_initializer) embedded = tf.nn.embedding_lookup(embeddings, last_word) with tf.variable_scope('projected'): projected_contexts = tf.reshape(contexts_phr, [-1, num_filter]) # batch_size * num_block, num_filter projected_contexts = dense(projected_contexts, num_filter, activation=None, use_bias=False, name='projected_contexts') projected_contexts = tf.reshape(projected_contexts, [-1, num_block, num_filter]) # batch_size, num_block, num_filterlstm = tf.nn.rnn_cell.BasicLSTMCell(hidden_size)复制代码生成部分,不需要循环,重复一次即可,后面进行beam search时再进行多次输入数据并得到输出
with tf.variable_scope('attend'): h0 = dense(last_state, num_filter, activation=None, name='fc_state') # batch_size, num_filter h0 = tf.nn.relu(projected_contexts + tf.expand_dims(h0, 1)) # batch_size, num_block, num_filter h0 = tf.reshape(h0, [-1, num_filter]) # batch_size * num_block, num_filter h0 = dense(h0, 1, activation=None, use_bias=False, name='fc_attention') # batch_size * num_block, 1 h0 = tf.reshape(h0, [-1, num_block]) # batch_size, num_block alpha = tf.nn.softmax(h0) # batch_size, num_block # contexts: batch_size, num_block, num_filter # tf.expand_dims(alpha, 2): batch_size, num_block, 1 context = tf.reduce_sum(contexts_phr * tf.expand_dims(alpha, 2), 1, name='context') # batch_size, num_filterwith tf.variable_scope('selector'): beta = dense(last_state, 1, activation=tf.nn.sigmoid, name='fc_beta') # batch_size, 1 context = tf.multiply(beta, context, name='selected_context') # batch_size, num_filterwith tf.variable_scope('lstm'): h0 = tf.concat([embedded, context], 1) # batch_size, embedding_size + num_filter _, (current_memory, current_state) = lstm(inputs=h0, state=[last_memory, last_state])with tf.variable_scope('decode'): h0 = dropout(current_state) h0 = dense(h0, embedding_size, activation=None, name='fc_logits_state') h0 += dense(context, embedding_size, activation=None, use_bias=False, name='fc_logits_context') h0 += embedded h0 = tf.nn.tanh(h0) h0 = dropout(h0) logits = dense(h0, len(word2id), activation=None, name='fc_logits') probs = tf.nn.softmax(logits)复制代码加载训练好的模型,对每个batch的数据进行beam search,依次生成每一个词
这里beam size设为1主要是为了节省时间,验证共花了10个小时,具体应用时可以适当加大beam size
MODEL_DIR = 'model'sess = tf.Session()sess.run(tf.global_variables_initializer())saver = tf.train.Saver()saver.restore(sess, tf.train.latest_checkpoint(MODEL_DIR))beam_size = 1id2sentence = {}val_ids = list(set(val_ids))if len(val_ids) % batch_size != 0: for i in range(batch_size - len(val_ids) % batch_size): val_ids.append(val_ids[0])print(len(val_ids))for i in tqdm(range(len(val_ids) // batch_size)): X_batch = np.array([val_dict[x] for x in val_ids[i * batch_size: i * batch_size + batch_size]]) contexts_, initial_memory_, initial_state_ = sess.run([contexts, initial_memory, initial_state], feed_dict={X: X_batch}) result = [] complete = [] for b in range(batch_size): result.append([{ 'sentence': [], 'memory': initial_memory_, 'state': initial_state_, 'score': 1.0, 'alphas': [] }]) complete.append([]) for t in range(maxlen + 1): cache = [[] for b in range(batch_size)] step = 1 if t == 0 else beam_size for s in range(step): if t == 0: last_word_ = np.ones([batch_size], np.int32) * word2id['<start>'] else: last_word_ = np.array([result['sentence'][-1] for b in range(batch_size)], np.int32) last_memory_ = np.array([result['memory'] for b in range(batch_size)], np.float32) last_state_ = np.array([result['state'] for b in range(batch_size)], np.float32) current_memory_, current_state_, probs_, alpha_ = sess.run( [current_memory, current_state, probs, alpha], feed_dict={ contexts_phr: contexts_, last_memory: last_memory_, last_state: last_state_, last_word: last_word_ }) for b in range(batch_size): word_and_probs = [[w, p] for w, p in enumerate(probs_)] word_and_probs.sort(key=lambda x:-x[1]) word_and_probs = word_and_probs[:beam_size + 1] for w, p in word_and_probs: item = { 'sentence': result['sentence'] + [w], 'memory': current_memory_, 'state': current_state_, 'score': result['score'] * p, 'alphas': result['alphas'] + [alpha_] } if id2word[w] == '<end>': complete.append(item) else: cache.append(item) for b in range(batch_size): cache.sort(key=lambda x:-x['score']) cache = cache[:beam_size] result = cache.copy() for b in range(batch_size): if len(complete) == 0: final_sentence = result[0]['sentence'] else: final_sentence = complete[0]['sentence'] val_id = val_ids[i * batch_size + b] if not val_id in id2sentence: id2sentence[val_id] = [translate(final_sentence)]print(len(id2sentence))复制代码将标题生成结果写入文件,便于后续评估
with open('generated.txt', 'w') as fw: for i in id2sentence.keys(): fw.write(str(i) + '^' + id2sentence[0] + '^' + '_'.join(gt) + '\n')复制代码调用以下项目进行评估,github.com/tylin/coco-…,评估指标包括BLEU、Rouge、Cider三个
其中BLEU在图像标题生成、机器翻译等任务中用得比较多,可以简单理解为1-gram、2-gram、3-gram、4-gram的命中率
吴恩达深度学习微专业课中关于BLEU的介绍,mooc.study.163.com/course/2001…
from pycocoevalcap.bleu.bleu import Bleufrom pycocoevalcap.rouge.rouge import Rougefrom pycocoevalcap.cider.cider import Ciderid2sentence = {}gt = {}with open('generated.txt', 'r') as fr: lines = fr.readlines() for line in lines: line = line.strip('\n').split('^') i = line[0] id2sentence = [line[1]] gt = line[2].split('_')scorers = [ (Bleu(4), ['Bleu_1', 'Bleu_2', 'Bleu_3', 'Bleu_4']), (Rouge(), 'ROUGE_L'), (Cider(), 'CIDEr')]for scorer, name in scorers: score, _ = scorer.compute_score(gt, id2sentence) if type(score) == list: for n, s in zip(name, score): print(n, s) else: print(name, score)复制代码评估结果如下,适当加大beam size可以进一步提高各项指标
- Bleu_1:0.6878
- Bleu_2:0.4799
- Bleu_3:0.3347
- Bleu_4:0.2355
- ROUGE: 0.5304
- CIDEr: 0.7293
使用最后,通过以下代码在本机上使用训练好的模型,为任意图片生成标题
整体代码结构和验证部分比较类似,但是由于只需要对一张图片生成标题,所以beam search部分的代码简化很多
# -*- coding: utf-8 -*-import tensorflow as tfimport numpy as npimport matplotlib.pyplot as pltfrom imageio import imreadimport scipy.ioimport cv2import osimport picklebatch_size = 1maxlen = 20image_size = 224MEAN_VALUES = np.array([123.68, 116.779, 103.939]).reshape((1, 1, 3))with open('dictionary.pkl', 'rb') as fr: [vocabulary, word2id, id2word] = pickle.load(fr)def translate(ids): words = [id2word for i in ids if i >= 3] return ' '.join(words) + ' .'vgg = scipy.io.loadmat('imagenet-vgg-verydeep-19.mat')vgg_layers = vgg['layers']def vgg_endpoints(inputs, reuse=None): with tf.variable_scope('endpoints', reuse=reuse): def _weights(layer, expected_layer_name): W = vgg_layers[0][layer][0][0][2][0][0] b = vgg_layers[0][layer][0][0][2][0][1] layer_name = vgg_layers[0][layer][0][0][0][0] assert layer_name == expected_layer_name return W, b def _conv2d_relu(prev_layer, layer, layer_name): W, b = _weights(layer, layer_name) W = tf.constant(W) b = tf.constant(np.reshape(b, (b.size))) return tf.nn.relu(tf.nn.conv2d(prev_layer, filter=W, strides=[1, 1, 1, 1], padding='SAME') + b) def _avgpool(prev_layer): return tf.nn.avg_pool(prev_layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') graph = {} graph['conv1_1'] = _conv2d_relu(inputs, 0, 'conv1_1') graph['conv1_2'] = _conv2d_relu(graph['conv1_1'], 2, 'conv1_2') graph['avgpool1'] = _avgpool(graph['conv1_2']) graph['conv2_1'] = _conv2d_relu(graph['avgpool1'], 5, 'conv2_1') graph['conv2_2'] = _conv2d_relu(graph['conv2_1'], 7, 'conv2_2') graph['avgpool2'] = _avgpool(graph['conv2_2']) graph['conv3_1'] = _conv2d_relu(graph['avgpool2'], 10, 'conv3_1') graph['conv3_2'] = _conv2d_relu(graph['conv3_1'], 12, 'conv3_2') graph['conv3_3'] = _conv2d_relu(graph['conv3_2'], 14, 'conv3_3') graph['conv3_4'] = _conv2d_relu(graph['conv3_3'], 16, 'conv3_4') graph['avgpool3'] = _avgpool(graph['conv3_4']) graph['conv4_1'] = _conv2d_relu(graph['avgpool3'], 19, 'conv4_1') graph['conv4_2'] = _conv2d_relu(graph['conv4_1'], 21, 'conv4_2') graph['conv4_3'] = _conv2d_relu(graph['conv4_2'], 23, 'conv4_3') graph['conv4_4'] = _conv2d_relu(graph['conv4_3'], 25, 'conv4_4') graph['avgpool4'] = _avgpool(graph['conv4_4']) graph['conv5_1'] = _conv2d_relu(graph['avgpool4'], 28, 'conv5_1') graph['conv5_2'] = _conv2d_relu(graph['conv5_1'], 30, 'conv5_2') graph['conv5_3'] = _conv2d_relu(graph['conv5_2'], 32, 'conv5_3') graph['conv5_4'] = _conv2d_relu(graph['conv5_3'], 34, 'conv5_4') graph['avgpool5'] = _avgpool(graph['conv5_4']) return graphX = tf.placeholder(tf.float32, [None, image_size, image_size, 3])encoded = vgg_endpoints(X - MEAN_VALUES)['conv5_3']k_initializer = tf.contrib.layers.xavier_initializer()b_initializer = tf.constant_initializer(0.0)e_initializer = tf.random_uniform_initializer(-1.0, 1.0)def dense(inputs, units, activation=tf.nn.tanh, use_bias=True, name=None): return tf.layers.dense(inputs, units, activation, use_bias, kernel_initializer=k_initializer, bias_initializer=b_initializer, name=name)def batch_norm(inputs, name): return tf.contrib.layers.batch_norm(inputs, decay=0.95, center=True, scale=True, is_training=False, updates_collections=None, scope=name)def dropout(inputs): return tf.layers.dropout(inputs, rate=0.5, training=False)num_block = 14 * 14num_filter = 512hidden_size = 1024embedding_size = 512encoded = tf.reshape(encoded, [-1, num_block, num_filter]) # batch_size, num_block, num_filtercontexts = batch_norm(encoded, 'contexts')with tf.variable_scope('initialize'): context_mean = tf.reduce_mean(contexts, 1) initial_state = dense(context_mean, hidden_size, name='initial_state') initial_memory = dense(context_mean, hidden_size, name='initial_memory') contexts_phr = tf.placeholder(tf.float32, [None, num_block, num_filter])last_memory = tf.placeholder(tf.float32, [None, hidden_size])last_state = tf.placeholder(tf.float32, [None, hidden_size])last_word = tf.placeholder(tf.int32, [None])with tf.variable_scope('embedding'): embeddings = tf.get_variable('weights', [len(word2id), embedding_size], initializer=e_initializer) embedded = tf.nn.embedding_lookup(embeddings, last_word) with tf.variable_scope('projected'): projected_contexts = tf.reshape(contexts_phr, [-1, num_filter]) # batch_size * num_block, num_filter projected_contexts = dense(projected_contexts, num_filter, activation=None, use_bias=False, name='projected_contexts') projected_contexts = tf.reshape(projected_contexts, [-1, num_block, num_filter]) # batch_size, num_block, num_filterlstm = tf.nn.rnn_cell.BasicLSTMCell(hidden_size)with tf.variable_scope('attend'): h0 = dense(last_state, num_filter, activation=None, name='fc_state') # batch_size, num_filter h0 = tf.nn.relu(projected_contexts + tf.expand_dims(h0, 1)) # batch_size, num_block, num_filter h0 = tf.reshape(h0, [-1, num_filter]) # batch_size * num_block, num_filter h0 = dense(h0, 1, activation=None, use_bias=False, name='fc_attention') # batch_size * num_block, 1 h0 = tf.reshape(h0, [-1, num_block]) # batch_size, num_block alpha = tf.nn.softmax(h0) # batch_size, num_block # contexts: batch_size, num_block, num_filter # tf.expand_dims(alpha, 2): batch_size, num_block, 1 context = tf.reduce_sum(contexts_phr * tf.expand_dims(alpha, 2), 1, name='context') # batch_size, num_filterwith tf.variable_scope('selector'): beta = dense(last_state, 1, activation=tf.nn.sigmoid, name='fc_beta') # batch_size, 1 context = tf.multiply(beta, context, name='selected_context') # batch_size, num_filterwith tf.variable_scope('lstm'): h0 = tf.concat([embedded, context], 1) # batch_size, embedding_size + num_filter _, (current_memory, current_state) = lstm(inputs=h0, state=[last_memory, last_state])with tf.variable_scope('decode'): h0 = dropout(current_state) h0 = dense(h0, embedding_size, activation=None, name='fc_logits_state') h0 += dense(context, embedding_size, activation=None, use_bias=False, name='fc_logits_context') h0 += embedded h0 = tf.nn.tanh(h0) h0 = dropout(h0) logits = dense(h0, len(word2id), activation=None, name='fc_logits') probs = tf.nn.softmax(logits)MODEL_DIR = 'model'sess = tf.Session()sess.run(tf.global_variables_initializer())saver = tf.train.Saver()saver.restore(sess, tf.train.latest_checkpoint(MODEL_DIR))beam_size = 3img = imread('test.png')if img.shape[-1] == 4: img = img[:, :, :-1]h = img.shape[0]w = img.shape[1]if h > w: img = img[h // 2 - w // 2: h // 2 + w // 2, :]else: img = img[:, w // 2 - h // 2: w // 2 + h // 2]img = cv2.resize(img, (image_size, image_size))X_data = np.expand_dims(img, 0)contexts_, initial_memory_, initial_state_ = sess.run([contexts, initial_memory, initial_state], feed_dict={X: X_data})result = [{ 'sentence': [], 'memory': initial_memory_[0], 'state': initial_state_[0], 'score': 1.0, 'alphas': []}]complete = []for t in range(maxlen + 1): cache = [] step = 1 if t == 0 else beam_size for s in range(step): if t == 0: last_word_ = np.ones([batch_size], np.int32) * word2id['<start>'] else: last_word_ = np.array([result['sentence'][-1]], np.int32) last_memory_ = np.array([result['memory']], np.float32) last_state_ = np.array([result['state']], np.float32) current_memory_, current_state_, probs_, alpha_ = sess.run( [current_memory, current_state, probs, alpha], feed_dict={ contexts_phr: contexts_, last_memory: last_memory_, last_state: last_state_, last_word: last_word_ }) word_and_probs = [[w, p] for w, p in enumerate(probs_[0])] word_and_probs.sort(key=lambda x:-x[1]) word_and_probs = word_and_probs[:beam_size + 1] for w, p in word_and_probs: item = { 'sentence': result['sentence'] + [w], 'memory': current_memory_[0], 'state': current_state_[0], 'score': result['score'] * p, 'alphas': result['alphas'] + [alpha_[0]] } if id2word[w] == '<end>': complete.append(item) else: cache.append(item) cache.sort(key=lambda x:-x['score']) cache = cache[:beam_size] result = cache.copy()if len(complete) == 0: final_sentence = result[0]['sentence'] alphas = result[0]['alphas']else: final_sentence = complete[0]['sentence'] alphas = complete[0]['alphas']sentence = translate(final_sentence)print(sentence)sentence = sentence.split(' ')img = (img - img.min()) / (img.max() - img.min())n = int(np.ceil(np.sqrt(len(sentence))))plt.figure(figsize=(10, 8))for i in range(len(sentence)): word = sentence a = np.reshape(alphas, (14, 14)) a = cv2.resize(a, (image_size, image_size)) a = np.expand_dims(a, -1) a = (a - a.min()) / (a.max() - a.min()) combine = 0.5 * img + 0.5 * a plt.subplot(n, n, i + 1) plt.text(0, 1, word, color='black', backgroundcolor='white', fontsize=12) plt.imshow(combine) plt.axis('off')plt.show()复制代码标题生成结果如下,非常准确地涵盖了新娘、新郎、摆pose、拍照等关键词,并且注意力可视化也很好地反映了生成每个词时模型对图片不同区域的关注度
参考
链接:https://juejin.im/post/5ba4de5af265da0a89302eba
|
|