1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
|
import numpy as np
from QNetwork import optimizers
import sys # for sys.float_info.epsilon
import matplotlib.pyplot as plt
import matplotlib.patches as pltpatch # for Arc
import matplotlib.collections as pltcoll
import math
###############################################
# Thanks to Dr. Chuck Anderson for his neural #
# network with optimizers implementation #
# https://www.cs.colostate.edu/~anderson/wp/ #
###############################################
class NeuralNetwork():
def __init__(self, n_inputs, n_hiddens_per_layer, n_outputs, activation_function='tanh'):
self.n_inputs = n_inputs
self.n_outputs = n_outputs
self.activation_function = activation_function
# Set self.n_hiddens_per_layer to [] if argument is 0, [], or [0]
if n_hiddens_per_layer == 0 or n_hiddens_per_layer == [] or n_hiddens_per_layer == [0]:
self.n_hiddens_per_layer = []
else:
self.n_hiddens_per_layer = n_hiddens_per_layer
# Initialize weights, by first building list of all weight matrix shapes.
n_in = n_inputs
shapes = []
for nh in self.n_hiddens_per_layer:
shapes.append((n_in + 1, nh))
n_in = nh
shapes.append((n_in + 1, n_outputs))
# self.all_weights: vector of all weights
# self.Ws: list of weight matrices by layer
self.all_weights, self.Ws = self.make_weights_and_views(shapes)
# Define arrays to hold gradient values.
# One array for each W array with same shape.
self.all_gradients, self.dE_dWs = self.make_weights_and_views(shapes)
self.trained = False
self.total_epochs = 0
self.error_trace = []
self.Xmeans = None
self.Xstds = None
self.Tmeans = None
self.Tstds = None
def setup_standardization(self, Xmeans, Xstds, Tmeans, Tstds):
self.Xmeans = np.array(Xmeans)
self.Xstds = np.array(Xstds)
self.Tmeans = np.array(Tmeans)
self.Tstds = np.array(Tstds)
def make_weights_and_views(self, shapes):
# vector of all weights built by horizontally stacking flatenned matrices
# for each layer initialized with uniformly-distributed values.
all_weights = np.hstack([np.random.uniform(-1, 1, size=shape).flat / np.sqrt(shape[0])
for shape in shapes])
# Build list of views by reshaping corresponding elements from vector of all weights
# into correct shape for each layer.
views = []
start = 0
for shape in shapes:
size =shape[0] * shape[1]
views.append(all_weights[start:start + size].reshape(shape))
start += size
return all_weights, views
# Return string that shows how the constructor was called
def __repr__(self):
return f'{type(self).__name__}({self.n_inputs}, {self.n_hiddens_per_layer}, {self.n_outputs}, \'{self.activation_function}\')'
# Return string that is more informative to the user about the state of this neural network.
def __str__(self):
result = self.__repr__()
if len(self.error_trace) > 0:
return self.__repr__() + f' trained for {len(self.error_trace)} epochs, final training error {self.error_trace[-1]:.4f}'
def train(self, X, T, n_epochs, learning_rate, method='sgd', verbose=True):
'''
train:
X: n_samples x n_inputs matrix of input samples, one per row
T: n_samples x n_outputs matrix of target output values, one sample per row
n_epochs: number of passes to take through all samples updating weights each pass
learning_rate: factor controlling the step size of each update
method: is either 'sgd' or 'adam'
'''
# Setup standardization parameters
if self.Xmeans is None:
self.Xmeans = X.mean(axis=0)
self.Xstds = X.std(axis=0)
self.Xstds[self.Xstds == 0] = 1 # So we don't divide by zero when standardizing
self.Tmeans = T.mean(axis=0)
self.Tstds = T.std(axis=0)
# Standardize X and T
X = (X - self.Xmeans) / self.Xstds
T = (T - self.Tmeans) / self.Tstds
# Instantiate Optimizers object by giving it vector of all weights
optimizer = optimizers.Optimizers(self.all_weights)
# Define function to convert value from error_f into error in original T units,
# but only if the network has a single output. Multiplying by self.Tstds for
# multiple outputs does not correctly unstandardize the error.
if len(self.Tstds) == 1:
error_convert_f = lambda err: (np.sqrt(err) * self.Tstds)[0] # to scalar
else:
error_convert_f = lambda err: np.sqrt(err)[0] # to scalar
if method == 'sgd':
error_trace = optimizer.sgd(self.error_f, self.gradient_f,
fargs=[X, T], n_epochs=n_epochs,
learning_rate=learning_rate,
verbose=verbose,
error_convert_f=error_convert_f)
elif method == 'adam':
error_trace = optimizer.adam(self.error_f, self.gradient_f,
fargs=[X, T], n_epochs=n_epochs,
learning_rate=learning_rate,
verbose=verbose,
error_convert_f=error_convert_f)
else:
raise Exception("method must be 'sgd' or 'adam'")
self.error_trace = error_trace
# Return neural network object to allow applying other methods after training.
# Example: Y = nnet.train(X, T, 100, 0.01).use(X)
return self
def relu(self, s):
s[s < 0] = 0
return s
def grad_relu(self, s):
return (s > 0).astype(int)
def forward_pass(self, X):
'''X assumed already standardized. Output returned as standardized.'''
self.Ys = [X]
for W in self.Ws[:-1]:
if self.activation_function == 'relu':
self.Ys.append(self.relu(self.Ys[-1] @ W[1:, :] + W[0:1, :]))
else:
self.Ys.append(np.tanh(self.Ys[-1] @ W[1:, :] + W[0:1, :]))
last_W = self.Ws[-1]
self.Ys.append(self.Ys[-1] @ last_W[1:, :] + last_W[0:1, :])
return self.Ys
# Function to be minimized by optimizer method, mean squared error
def error_f(self, X, T):
Ys = self.forward_pass(X)
mean_sq_error = np.mean((T - Ys[-1]) ** 2)
return mean_sq_error
# Gradient of function to be minimized for use by optimizer method
def gradient_f(self, X, T):
'''Assumes forward_pass just called with layer outputs in self.Ys.'''
error = T - self.Ys[-1]
n_samples = X.shape[0]
n_outputs = T.shape[1]
delta = - error / (n_samples * n_outputs)
n_layers = len(self.n_hiddens_per_layer) + 1
# Step backwards through the layers to back-propagate the error (delta)
for layeri in range(n_layers - 1, -1, -1):
# gradient of all but bias weights
self.dE_dWs[layeri][1:, :] = self.Ys[layeri].T @ delta
# gradient of just the bias weights
self.dE_dWs[layeri][0:1, :] = np.sum(delta, 0)
# Back-propagate this layer's delta to previous layer
if self.activation_function == 'relu':
delta = delta @ self.Ws[layeri][1:, :].T * self.grad_relu(self.Ys[layeri])
else:
delta = delta @ self.Ws[layeri][1:, :].T * (1 - self.Ys[layeri] ** 2)
return self.all_gradients
def use(self, X):
'''X assumed to not be standardized'''
# Standardize X
X = (X - self.Xmeans) / self.Xstds
Ys = self.forward_pass(X)
Y = Ys[-1]
# Unstandardize output Y before returning it
return Y * self.Tstds + self.Tmeans
def draw(self, input_names=None, output_names=None, scale='by layer', gray=False):
plt.title('{} weights'.format(sum([Wi.size for Wi in self.Ws])))
def isOdd(x):
return x % 2 != 0
n_layers = len(self.Ws)
Wmax_overall = np.max(np.abs(np.hstack([w.reshape(-1) for w in self.Ws])))
# calculate xlim and ylim for whole network plot
# Assume 4 characters fit between each wire
# -0.5 is to leave 0.5 spacing before first wire
xlim = max(map(len, input_names)) / 4.0 if input_names else 1
ylim = 0
for li in range(n_layers):
ni, no = self.Ws[li].shape #no means number outputs this layer
if not isOdd(li):
ylim += ni + 0.5
else:
xlim += ni + 0.5
ni, no = self.Ws[n_layers-1].shape #no means number outputs this layer
if isOdd(n_layers):
xlim += no + 0.5
else:
ylim += no + 0.5
# Add space for output names
if output_names:
if isOdd(n_layers):
ylim += 0.25
else:
xlim += round(max(map(len, output_names)) / 4.0)
ax = plt.gca()
# changes from Jim Jazwiecki (jim.jazwiecki@gmail.com) CS480 student
character_width_factor = 0.07
padding = 2
if input_names:
x0 = max([1, max(map(len, input_names)) * (character_width_factor * 3.5)])
else:
x0 = 1
y0 = 0 # to allow for constant input to first layer
# First Layer
if input_names:
y = 0.55
for n in input_names:
y += 1
ax.text(x0 - (character_width_factor * padding), y, n, horizontalalignment="right", fontsize=20)
patches = []
for li in range(n_layers):
thisW = self.Ws[li]
if scale == 'by layer':
maxW = np.max(np.abs(thisW))
else:
maxW = Wmax_overall
ni, no = thisW.shape
if not isOdd(li):
# Even layer index. Vertical layer. Origin is upper left.
# Constant input
ax.text(x0 - 0.2, y0 + 0.5, '1', fontsize=20)
for i in range(ni):
ax.plot((x0, x0 + no - 0.5), (y0 + i + 0.5, y0 + i + 0.5), color='gray')
# output lines
for i in range(no):
ax.plot((x0 + 1 + i - 0.5, x0 + 1 + i - 0.5), (y0, y0 + ni + 1), color='gray')
# cell "bodies"
xs = x0 + np.arange(no) + 0.5
ys = np.array([y0 + ni + 0.5] * no)
for x, y in zip(xs, ys):
patches.append(pltpatch.RegularPolygon((x, y - 0.4), 3, 0.3, 0, color ='#555555'))
# weights
if gray:
colors = np.array(['black', 'gray'])[(thisW.flat >= 0) + 0]
else:
colors = np.array(['red', 'green'])[(thisW.flat >= 0) + 0]
xs = np.arange(no) + x0 + 0.5
ys = np.arange(ni) + y0 + 0.5
coords = np.meshgrid(xs, ys)
for x, y, w, c in zip(coords[0].flat, coords[1].flat,
np.abs(thisW / maxW).flat, colors):
patches.append(pltpatch.Rectangle((x - w / 2, y - w / 2), w, w, color=c))
y0 += ni + 1
x0 += -1 ## shift for next layer's constant input
else:
# Odd layer index. Horizontal layer. Origin is upper left.
# Constant input
ax.text(x0 + 0.5, y0 - 0.2, '1', fontsize=20)
# input lines
for i in range(ni):
ax.plot((x0 + i + 0.5, x0 + i + 0.5), (y0, y0 + no - 0.5), color='gray')
# output lines
for i in range(no):
ax.plot((x0, x0 + ni + 1), (y0 + i+ 0.5, y0 + i + 0.5), color='gray')
# cell 'bodies'
xs = np.array([x0 + ni + 0.5] * no)
ys = y0 + 0.5 + np.arange(no)
for x, y in zip(xs, ys):
patches.append(pltpatch.RegularPolygon((x - 0.4, y), 3, 0.3, -math.pi / 2, color ='#555555'))
# weights
if gray:
colors = np.array(['black', 'gray'])[(thisW.flat >= 0) + 0]
else:
colors = np.array(['red', 'green'])[(thisW.flat >= 0) + 0]
xs = np.arange(ni) + x0 + 0.5
ys = np.arange(no) + y0 + 0.5
coords = np.meshgrid(xs, ys)
for x, y, w, c in zip(coords[0].flat, coords[1].flat,
np.abs(thisW / maxW).flat, colors):
patches.append(pltpatch.Rectangle((x - w / 2, y - w / 2), w, w, color=c))
x0 += ni + 1
y0 -= 1 ##shift to allow for next layer's constant input
collection = pltcoll.PatchCollection(patches, match_original=True)
ax.add_collection(collection)
# Last layer output labels
if output_names:
if isOdd(n_layers):
x = x0 + 1.5
for n in output_names:
x += 1
ax.text(x, y0 + 0.5, n, fontsize=20)
else:
y = y0 + 0.6
for n in output_names:
y += 1
ax.text(x0 + 0.2, y, n, fontsize=20)
ax.axis([0, xlim, ylim, 0])
ax.axis('off')
|