-
Notifications
You must be signed in to change notification settings - Fork 1
/
用训练好的RNN模型生成诗歌
150 lines (129 loc) · 4.92 KB
/
用训练好的RNN模型生成诗歌
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# -*- coding: utf-8 -*-
import collections
import numpy as np
import tensorflow as tf
poetry_file = 'Desktop/datasets/poetry.txt'
# 诗集
poetrys = []
with open(poetry_file, "r", encoding='utf-8',) as f:
for line in f:
try:
title, content = line.strip().split(':')
content = content.replace(' ','')
if '_' in content or '(' in content or '(' in content or '《' in content or '[' in content:
continue
if len(content) < 5 or len(content) > 79:
continue
content = '[' + content + ']'
poetrys.append(content)
except Exception as e:
pass
#print(poetrys[:1])
# 按诗的字数排序
poetrys = sorted(poetrys,key=lambda line: len(line))
print('唐诗总数: ', len(poetrys))
print(poetrys[-1])
# 统计每个字出现次数
all_words = []
for poetry in poetrys:
all_words += [word for word in poetry]
counter = collections.Counter(all_words)
count_pairs = sorted(counter.items(), key=lambda x: -x[1])
words, _ = zip(*count_pairs)
print(words[:10])
# 取前多少个常用字
words = words[:len(words)] + (' ',)
# 每个字映射为一个数字ID
word_num_map = dict(zip(words, range(len(words))))
# 把诗转换为向量形式,参考TensorFlow练习1
to_num = lambda word: word_num_map.get(word, len(words))
poetrys_vector = [ list(map(to_num, poetry)) for poetry in poetrys]
batch_size = 1
n_chunk = len(poetrys_vector) // batch_size
x_batches = []
y_batches = []
for i in range(n_chunk):
start_index = i * batch_size
end_index = start_index + batch_size
batches = poetrys_vector[start_index:end_index]
length = max(map(len,batches))
xdata = np.full((batch_size,length), word_num_map[' '], np.int32)
for row in range(batch_size):
xdata[row,:len(batches[row])] = batches[row]
ydata = np.copy(xdata)
ydata[:,:-1] = xdata[:,1:]
"""
xdata ydata
[6,2,4,6,9] [2,4,6,9,9]
[1,4,2,8,5] [4,2,8,5,5]
"""
x_batches.append(xdata)
y_batches.append(ydata)
print(x_batches[33:34])
print(y_batches[33:34])
#input_size大小不固定
#output_size大小不固定
batch_size = 1
model ='lstm' # gru , rnn
rnn_size = 128
num_layer =2
input_data = tf.placeholder(tf.int32 ,[batch_size , None])
output_target = tf.placeholder(tf.int32 ,[batch_size , None])
#keep_prob = tf.placeholder(tf.float32)
#定义RNN
#定义cell
lstm_cell_1 = tf.contrib.rnn.BasicLSTMCell(num_units = rnn_size , state_is_tuple=True)
lstm_cell_2 = tf.contrib.rnn.BasicLSTMCell(num_units = rnn_size , state_is_tuple=True)
#lstm_cell = tf.contrib.rnn.DropoutWrapper(cell=lstm_cell,input_keep_prob=1.0 ,output_keep_prob=keep_prob)
lstm_cell = tf.contrib.rnn.MultiRNNCell([lstm_cell_1,lstm_cell_2] , state_is_tuple=True)
initial_state = lstm_cell.zero_state(batch_size , tf.float32)
#共享参数
with tf.variable_scope('rnnlstm'):
softmax_w = tf.get_variable("softmax_w", [rnn_size, len(words)+1])
softmax_b = tf.get_variable("softmax_b", [len(words)+1])
#定义词嵌入处理
with tf.device('/cpu:0'): # 强制cpu处理
embedding = tf.get_variable("embedding", [len(words)+1, rnn_size])
inputs = tf.nn.embedding_lookup(embedding, input_data)
#定义cell输出
outputs ,last_state = tf.nn.dynamic_rnn(lstm_cell, inputs, initial_state=initial_state, scope='rnnlstm')
#对输入进行维度转换
output = tf.reshape(outputs , [-1 , rnn_size])
#定义输出概率的计算
Wx_plus_b = tf.matmul(output , softmax_w) + softmax_b
probs = tf.nn.softmax(Wx_plus_b)
#开始训练
targets = tf.reshape(output_target , [-1])
loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example([Wx_plus_b] ,[targets],[tf.ones_like(targets ,dtype=tf.float32)] ,len(words))
cost = tf.reduce_mean(loss)
learning_rate = tf.Variable(0.0,trainable=False)
tvars = tf.trainable_variables()
grads ,_=tf.clip_by_global_norm(tf.gradients(cost , tvars) , 5)
optimizer = tf.train.AdamOptimizer(learning_rate)
train_op = optimizer.apply_gradients(zip(grads , tvars))
#使用训练完成的模型
def gen_poetry():
def to_word(prob_vector):
t = np.cumsum(prob_vector)
s = np.sum(prob_vector)
sample = int(np.searchsorted(t , np.random.rand(1)*s))
return words[sample]
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver(tf.all_variables())
saver.restore(sess ,'my_net/poetry.module-35')
initial_state_ = sess.run(lstm_cell.zero_state(1 , tf.float32))
x = np.array([list(map(word_num_map.get , '['))])
[probs_ , state_] = sess.run([probs , last_state] , {input_data:x ,initial_state:initial_state_})
word = to_word(probs_)
#word = words[np.argmax(probs_)]
poem = ''
while word !=']':
poem += word
x = np.zeros((1,1))
x[0,0] = word_num_map[word]
[probs_ , state_] = sess.run([probs , last_state] , {input_data:x ,initial_state:initial_state_})
word = to_word(probs_)
#word = words[np.argmax(probs_)]
return poem
print(gen_poetry())