-
Notifications
You must be signed in to change notification settings - Fork 10
/
char-rnn.py
212 lines (147 loc) · 7.25 KB
/
char-rnn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
from __future__ import absolute_import, division, print_function
import tensorflow as tf
import numpy as np
import os
import time
import argparse
tf.enable_eager_execution()
parser = argparse.ArgumentParser()
parser.add_argument('--seq_length', type=int, default=100, help='Input sequence length given to the recurrent network')
parser.add_argument('--recurrent_layers', type=int, default=1, help='Number of stacked recurrent layers')
parser.add_argument('--recurrent_units', type=int, default=1024, help='Number of recurrent units in each layer')
parser.add_argument('--embedding_dim', type=int, default=256, help='Embedding dimension')
parser.add_argument('--epochs', type=int, default=3, help='Number of training epochs')
parser.add_argument('--batch_size', type=int, default=64, help='Size of the training batches')
opt = parser.parse_args()
filePath = 'dataset/preprocessed_data.txt'
# Read, then decode for py2 compat.
text = open(filePath, 'rb').read().decode(encoding='utf-8')
# length of text is the number of characters in it
print('Length of text: {} characters'.format(len(text)))
# Take a look at the first 250 characters in text
print(text[:250])
# The unique characters in the file
vocab = sorted(set(text))
print('{} unique characters'.format(len(vocab)))
# Creating a mapping from unique characters to indices
char2idx = {u: i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)
text_as_int = np.array([char2idx[c] for c in text])
# Show how the first 13 characters from the text are mapped to integers
print('{} ---- characters mapped to int ---- > {}'.format(repr(text[:13]), text_as_int[:13]))
# The maximum length sentence we want for a single input in characters
seq_length = opt.seq_length
examples_per_epoch = len(text) // seq_length
# Create training examples / targets
char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)
for i in char_dataset.take(5):
print(idx2char[i.numpy()])
sequences = char_dataset.batch(seq_length + 1, drop_remainder=True)
for item in sequences.take(5):
print(repr(''.join(idx2char[item.numpy()])))
def split_input_target(chunk):
input_text = chunk[:-1]
target_text = chunk[1:]
return input_text, target_text
dataset = sequences.map(split_input_target)
for input_example, target_example in dataset.take(1):
print('Input data: ', repr(''.join(idx2char[input_example.numpy()])))
print('Target data:', repr(''.join(idx2char[target_example.numpy()])))
for i, (input_idx, target_idx) in enumerate(zip(input_example[:5], target_example[:5])):
print("Step {:4d}".format(i))
print(" input: {} ({:s})".format(input_idx, repr(idx2char[input_idx])))
print(" expected output: {} ({:s})".format(target_idx, repr(idx2char[target_idx])))
# Batch size
BATCH_SIZE = opt.batch_size
steps_per_epoch = examples_per_epoch // BATCH_SIZE
# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
# Length of the vocabulary in chars
vocab_size = len(vocab)
# The embedding dimension
embedding_dim = opt.embedding_dim
# Number of RNN units
rnn_units = opt.recurrent_units
if tf.test.is_gpu_available():
rnn = tf.keras.layers.CuDNNLSTM
else:
import functools
rnn = functools.partial(
tf.keras.layers.LSTM, recurrent_activation='sigmoid')
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
model = tf.keras.Sequential()
model.add(tf.keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]))
for i in range(opt.recurrent_layers):
model.add(rnn(rnn_units, return_sequences=True, recurrent_initializer='glorot_uniform', stateful=True))
model.add(tf.keras.layers.Dense(vocab_size))
return model
model = build_model(
vocab_size=len(vocab),
embedding_dim=embedding_dim,
rnn_units=rnn_units,
batch_size=BATCH_SIZE)
for input_example_batch, target_example_batch in dataset.take(1):
example_batch_predictions = model(input_example_batch)
print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")
model.summary()
sampled_indices = tf.random.multinomial(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()
print("Input: \n", repr("".join(idx2char[input_example_batch[0]])))
print()
print("Next Char Predictions: \n", repr("".join(idx2char[sampled_indices])))
def loss(labels, logits):
return tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels, logits=logits)
example_batch_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss: ", example_batch_loss.numpy().mean())
model.compile(
optimizer=tf.train.AdamOptimizer(),
loss=loss)
# Directory where the checkpoints will be saved
checkpoint_dir = '/mnt/apg-checkpoints/training_checkpoints_LSTM_HL_{}_HU_{}_seq_len_{}'.format(opt.recurrent_layers,
opt.recurrent_units,
opt.seq_length)
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_prefix,
save_weights_only=True)
EPOCHS = opt.epochs
history = model.fit(dataset.repeat(), epochs=EPOCHS, steps_per_epoch=steps_per_epoch, callbacks=[checkpoint_callback])
tf.train.latest_checkpoint(checkpoint_dir)
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
model.build(tf.TensorShape([1, None]))
model.summary()
def generate_text(model, start_string):
# Evaluation step (generating text using the learned model)
# Number of characters to generate
num_generate = 1000
# Converting our start string to numbers (vectorizing)
input_eval = [char2idx[s] for s in start_string]
input_eval = tf.expand_dims(input_eval, 0)
# Empty string to store our results
text_generated = []
# Low temperatures results in more predictable text.
# Higher temperatures results in more surprising text.
# Experiment to find the best setting.
temperature = 1.0
# Here batch size == 1
model.reset_states()
for i in range(num_generate):
predictions = model(input_eval)
# remove the batch dimension
predictions = tf.squeeze(predictions, 0)
# using a multinomial distribution to predict the word returned by the model
predictions = predictions / temperature
predicted_id = tf.random.multinomial(predictions, num_samples=1)[-1, 0].numpy()
# We pass the predicted word as the next input to the model
# along with the previous hidden state
input_eval = tf.expand_dims([predicted_id], 0)
text_generated.append(idx2char[predicted_id])
return (start_string + ''.join(text_generated))
print(generate_text(model, start_string=u"\\documentclass{"))