-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpyESN.py
255 lines (220 loc) · 10.6 KB
/
pyESN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import numpy as np
def correct_dimensions(s, targetlength):
"""checks the dimensionality of some numeric argument s, broadcasts it
to the specified length if possible.
Args:
s: None, scalar or 1D array
targetlength: expected length of s
Returns:
None if s is None, else numpy vector of length targetlength
"""
if s is not None:
s = np.array(s)
if s.ndim == 0:
s = np.array([s] * targetlength)
elif s.ndim == 1:
if not len(s) == targetlength:
raise ValueError("arg must have length " + str(targetlength))
else:
raise ValueError("Invalid argument")
return s
def identity(x):
return x
class ESN():
def __init__(self, n_inputs, n_outputs, n_reservoir=200,
spectral_radius=0.95, sparsity=0, noise=0.001, input_shift=None,
input_scaling=None, teacher_forcing=True, feedback_scaling=None,
teacher_scaling=None, teacher_shift=None,
out_activation=identity, inverse_out_activation=identity,
random_state=None, silent=True):
"""
Args:
n_inputs: nr of input dimensions
n_outputs: nr of output dimensions
n_reservoir: nr of reservoir neurons
spectral_radius: spectral radius of the recurrent weight matrix
sparsity: proportion of recurrent weights set to zero
noise: noise added to each neuron (regularization)
input_shift: scalar or vector of length n_inputs to add to each
input dimension before feeding it to the network.
input_scaling: scalar or vector of length n_inputs to multiply
with each input dimension before feeding it to the netw.
teacher_forcing: if True, feed the target back into output units
teacher_scaling: factor applied to the target signal
teacher_shift: additive term applied to the target signal
out_activation: output activation function (applied to the readout)
inverse_out_activation: inverse of the output activation function
random_state: positive integer seed, np.rand.RandomState object,
or None to use numpy's builting RandomState.
silent: supress messages
"""
# check for proper dimensionality of all arguments and write them down.
self.n_inputs = n_inputs
self.n_reservoir = n_reservoir
self.n_outputs = n_outputs
self.spectral_radius = spectral_radius
self.sparsity = sparsity
self.noise = noise
self.input_shift = correct_dimensions(input_shift, n_inputs)
self.input_scaling = correct_dimensions(input_scaling, n_inputs)
self.teacher_scaling = teacher_scaling
self.teacher_shift = teacher_shift
self.out_activation = out_activation
self.inverse_out_activation = inverse_out_activation
self.random_state = random_state
# the given random_state might be either an actual RandomState object,
# a seed or None (in which case we use numpy's builtin RandomState)
if isinstance(random_state, np.random.RandomState):
self.random_state_ = random_state
elif random_state:
try:
self.random_state_ = np.random.RandomState(random_state)
except TypeError as e:
raise Exception("Invalid seed: " + str(e))
else:
self.random_state_ = np.random.mtrand._rand
self.teacher_forcing = teacher_forcing
self.silent = silent
self.initweights()
def initweights(self):
# initialize recurrent weights:
# begin with a random matrix centered around zero:
W = self.random_state_.rand(self.n_reservoir, self.n_reservoir) - 0.5
# delete the fraction of connections given by (self.sparsity):
W[self.random_state_.rand(*W.shape) < self.sparsity] = 0
# compute the spectral radius of these weights:
radius = np.max(np.abs(np.linalg.eigvals(W)))
# rescale them to reach the requested spectral radius:
self.W = W * (self.spectral_radius / radius)
# random input weights:
self.W_in = self.random_state_.rand(
self.n_reservoir, self.n_inputs) * 2 - 1
# random feedback (teacher forcing) weights:
self.W_feedb = self.random_state_.rand(
self.n_reservoir, self.n_outputs) * 2 - 1
def _update(self, state, input_pattern, output_pattern):
"""performs one update step.
i.e., computes the next network state by applying the recurrent weights
to the last state & and feeding in the current input and output patterns
"""
if self.teacher_forcing:
preactivation = (np.dot(self.W, state)
+ np.dot(self.W_in, input_pattern)
+ np.dot(self.W_feedb, output_pattern))
else:
preactivation = (np.dot(self.W, state)
+ np.dot(self.W_in, input_pattern))
return (np.tanh(preactivation)
+ self.noise * (self.random_state_.rand(self.n_reservoir) - 0.5))
def _scale_inputs(self, inputs):
"""for each input dimension j: multiplies by the j'th entry in the
input_scaling argument, then adds the j'th entry of the input_shift
argument."""
if self.input_scaling is not None:
inputs = np.dot(inputs, np.diag(self.input_scaling))
if self.input_shift is not None:
inputs = inputs + self.input_shift
return inputs
def _scale_teacher(self, teacher):
"""multiplies the teacher/target signal by the teacher_scaling argument,
then adds the teacher_shift argument to it."""
if self.teacher_scaling is not None:
teacher = teacher * self.teacher_scaling
if self.teacher_shift is not None:
teacher = teacher + self.teacher_shift
return teacher
def _unscale_teacher(self, teacher_scaled):
"""inverse operation of the _scale_teacher method."""
if self.teacher_shift is not None:
teacher_scaled = teacher_scaled - self.teacher_shift
if self.teacher_scaling is not None:
teacher_scaled = teacher_scaled / self.teacher_scaling
return teacher_scaled
def fit(self, inputs, outputs, inspect=False):
"""
Collect the network's reaction to training data, train readout weights.
Args:
inputs: array of dimensions (N_training_samples x n_inputs)
outputs: array of dimension (N_training_samples x n_outputs)
inspect: show a visualisation of the collected reservoir states
Returns:
the network's output on the training data, using the trained weights
"""
# transform any vectors of shape (x,) into vectors of shape (x,1):
if inputs.ndim < 2:
inputs = np.reshape(inputs, (len(inputs), -1))
if outputs.ndim < 2:
outputs = np.reshape(outputs, (len(outputs), -1))
# transform input and teacher signal:
inputs_scaled = self._scale_inputs(inputs)
teachers_scaled = self._scale_teacher(outputs)
if not self.silent:
print("harvesting states...")
# step the reservoir through the given input,output pairs:
states = np.zeros((inputs.shape[0], self.n_reservoir))
for n in range(1, inputs.shape[0]):
states[n, :] = self._update(states[n - 1], inputs_scaled[n, :],
teachers_scaled[n - 1, :])
# learn the weights, i.e. find the linear combination of collected
# network states that is closest to the target output
if not self.silent:
print("fitting...")
# we'll disregard the first few states:
transient = min(int(inputs.shape[1] / 10), 100)
# include the raw inputs:
extended_states = np.hstack((states, inputs_scaled))
# Solve for W_out:
self.W_out = np.dot(np.linalg.pinv(extended_states[transient:, :]),
self.inverse_out_activation(teachers_scaled[transient:, :])).T
# remember the last state for later:
self.laststate = states[-1, :]
self.lastinput = inputs[-1, :]
self.lastoutput = teachers_scaled[-1, :]
# optionally visualize the collected states
if inspect:
from matplotlib import pyplot as plt
# (^-- we depend on matplotlib only if this option is used)
plt.figure(
figsize=(states.shape[0] * 0.0025, states.shape[1] * 0.01))
plt.imshow(extended_states.T, aspect='auto',
interpolation='nearest')
plt.colorbar()
if not self.silent:
print("training error:")
# apply learned weights to the collected states:
pred_train = self._unscale_teacher(self.out_activation(
np.dot(extended_states, self.W_out.T)))
if not self.silent:
print(np.sqrt(np.mean((pred_train - outputs)**2)))
return pred_train, np.sqrt(np.mean((pred_train - outputs)**2))
def predict(self, inputs, continuation=True):
"""
Apply the learned weights to the network's reactions to new input.
Args:
inputs: array of dimensions (N_test_samples x n_inputs)
continuation: if True, start the network from the last training state
Returns:
Array of output activations
"""
if inputs.ndim < 2:
inputs = np.reshape(inputs, (len(inputs), -1))
n_samples = inputs.shape[0]
if continuation:
laststate = self.laststate
lastinput = self.lastinput
lastoutput = self.lastoutput
else:
laststate = np.zeros(self.n_reservoir)
lastinput = np.zeros(self.n_inputs)
lastoutput = np.zeros(self.n_outputs)
inputs = np.vstack([lastinput, self._scale_inputs(inputs)])
states = np.vstack(
[laststate, np.zeros((n_samples, self.n_reservoir))])
outputs = np.vstack(
[lastoutput, np.zeros((n_samples, self.n_outputs))])
for n in range(n_samples):
states[
n + 1, :] = self._update(states[n, :], inputs[n + 1, :], outputs[n, :])
outputs[n + 1, :] = self.out_activation(np.dot(self.W_out,
np.concatenate([states[n + 1, :], inputs[n + 1, :]])))
return self._unscale_teacher(self.out_activation(outputs[1:]))