summaryrefslogtreecommitdiff
path: root/QNetwork
diff options
context:
space:
mode:
Diffstat (limited to 'QNetwork')
-rw-r--r--QNetwork/__init__.py0
-rw-r--r--QNetwork/neuralnetwork_regression.py332
-rw-r--r--QNetwork/optimizers.py116
3 files changed, 448 insertions, 0 deletions
diff --git a/QNetwork/__init__.py b/QNetwork/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/QNetwork/__init__.py
diff --git a/QNetwork/neuralnetwork_regression.py b/QNetwork/neuralnetwork_regression.py
new file mode 100644
index 0000000..b26be5e
--- /dev/null
+++ b/QNetwork/neuralnetwork_regression.py
@@ -0,0 +1,332 @@
+import numpy as np
+from QNetwork import optimizers
+import sys # for sys.float_info.epsilon
+import matplotlib.pyplot as plt
+import matplotlib.patches as pltpatch # for Arc
+import matplotlib.collections as pltcoll
+import math
+
+######################################################################
+## class NeuralNetwork()
+######################################################################
+
+class NeuralNetwork():
+
+
+ def __init__(self, n_inputs, n_hiddens_per_layer, n_outputs, activation_function='tanh'):
+ self.n_inputs = n_inputs
+ self.n_outputs = n_outputs
+ self.activation_function = activation_function
+
+ # Set self.n_hiddens_per_layer to [] if argument is 0, [], or [0]
+ if n_hiddens_per_layer == 0 or n_hiddens_per_layer == [] or n_hiddens_per_layer == [0]:
+ self.n_hiddens_per_layer = []
+ else:
+ self.n_hiddens_per_layer = n_hiddens_per_layer
+
+ # Initialize weights, by first building list of all weight matrix shapes.
+ n_in = n_inputs
+ shapes = []
+ for nh in self.n_hiddens_per_layer:
+ shapes.append((n_in + 1, nh))
+ n_in = nh
+ shapes.append((n_in + 1, n_outputs))
+
+ # self.all_weights: vector of all weights
+ # self.Ws: list of weight matrices by layer
+ self.all_weights, self.Ws = self.make_weights_and_views(shapes)
+
+ # Define arrays to hold gradient values.
+ # One array for each W array with same shape.
+ self.all_gradients, self.dE_dWs = self.make_weights_and_views(shapes)
+
+ self.trained = False
+ self.total_epochs = 0
+ self.error_trace = []
+ self.Xmeans = None
+ self.Xstds = None
+ self.Tmeans = None
+ self.Tstds = None
+
+ def setup_standardization(self, Xmeans, Xstds, Tmeans, Tstds):
+ self.Xmeans = np.array(Xmeans)
+ self.Xstds = np.array(Xstds)
+ self.Tmeans = np.array(Tmeans)
+ self.Tstds = np.array(Tstds)
+
+ def make_weights_and_views(self, shapes):
+ # vector of all weights built by horizontally stacking flatenned matrices
+ # for each layer initialized with uniformly-distributed values.
+ all_weights = np.hstack([np.random.uniform(-1, 1, size=shape).flat / np.sqrt(shape[0])
+ for shape in shapes])
+ # Build list of views by reshaping corresponding elements from vector of all weights
+ # into correct shape for each layer.
+ views = []
+ start = 0
+ for shape in shapes:
+ size =shape[0] * shape[1]
+ views.append(all_weights[start:start + size].reshape(shape))
+ start += size
+ return all_weights, views
+
+
+ # Return string that shows how the constructor was called
+ def __repr__(self):
+ return f'{type(self).__name__}({self.n_inputs}, {self.n_hiddens_per_layer}, {self.n_outputs}, \'{self.activation_function}\')'
+
+
+ # Return string that is more informative to the user about the state of this neural network.
+ def __str__(self):
+ result = self.__repr__()
+ if len(self.error_trace) > 0:
+ return self.__repr__() + f' trained for {len(self.error_trace)} epochs, final training error {self.error_trace[-1]:.4f}'
+
+
+ def train(self, X, T, n_epochs, learning_rate, method='sgd', verbose=True):
+ '''
+train:
+ X: n_samples x n_inputs matrix of input samples, one per row
+ T: n_samples x n_outputs matrix of target output values, one sample per row
+ n_epochs: number of passes to take through all samples updating weights each pass
+ learning_rate: factor controlling the step size of each update
+ method: is either 'sgd' or 'adam'
+ '''
+
+ # Setup standardization parameters
+ if self.Xmeans is None:
+ self.Xmeans = X.mean(axis=0)
+ self.Xstds = X.std(axis=0)
+ self.Xstds[self.Xstds == 0] = 1 # So we don't divide by zero when standardizing
+ self.Tmeans = T.mean(axis=0)
+ self.Tstds = T.std(axis=0)
+
+ # Standardize X and T
+ X = (X - self.Xmeans) / self.Xstds
+ T = (T - self.Tmeans) / self.Tstds
+
+ # Instantiate Optimizers object by giving it vector of all weights
+ optimizer = optimizers.Optimizers(self.all_weights)
+
+ # Define function to convert value from error_f into error in original T units,
+ # but only if the network has a single output. Multiplying by self.Tstds for
+ # multiple outputs does not correctly unstandardize the error.
+ if len(self.Tstds) == 1:
+ error_convert_f = lambda err: (np.sqrt(err) * self.Tstds)[0] # to scalar
+ else:
+ error_convert_f = lambda err: np.sqrt(err)[0] # to scalar
+
+
+ if method == 'sgd':
+
+ error_trace = optimizer.sgd(self.error_f, self.gradient_f,
+ fargs=[X, T], n_epochs=n_epochs,
+ learning_rate=learning_rate,
+ verbose=verbose,
+ error_convert_f=error_convert_f)
+
+ elif method == 'adam':
+
+ error_trace = optimizer.adam(self.error_f, self.gradient_f,
+ fargs=[X, T], n_epochs=n_epochs,
+ learning_rate=learning_rate,
+ verbose=verbose,
+ error_convert_f=error_convert_f)
+
+ else:
+ raise Exception("method must be 'sgd' or 'adam'")
+
+ self.error_trace = error_trace
+
+ # Return neural network object to allow applying other methods after training.
+ # Example: Y = nnet.train(X, T, 100, 0.01).use(X)
+ return self
+
+ def relu(self, s):
+ s[s < 0] = 0
+ return s
+
+ def grad_relu(self, s):
+ return (s > 0).astype(int)
+
+ def forward_pass(self, X):
+ '''X assumed already standardized. Output returned as standardized.'''
+ self.Ys = [X]
+ for W in self.Ws[:-1]:
+ if self.activation_function == 'relu':
+ self.Ys.append(self.relu(self.Ys[-1] @ W[1:, :] + W[0:1, :]))
+ else:
+ self.Ys.append(np.tanh(self.Ys[-1] @ W[1:, :] + W[0:1, :]))
+ last_W = self.Ws[-1]
+ self.Ys.append(self.Ys[-1] @ last_W[1:, :] + last_W[0:1, :])
+ return self.Ys
+
+ # Function to be minimized by optimizer method, mean squared error
+ def error_f(self, X, T):
+ Ys = self.forward_pass(X)
+ mean_sq_error = np.mean((T - Ys[-1]) ** 2)
+ return mean_sq_error
+
+ # Gradient of function to be minimized for use by optimizer method
+ def gradient_f(self, X, T):
+ '''Assumes forward_pass just called with layer outputs in self.Ys.'''
+ error = T - self.Ys[-1]
+ n_samples = X.shape[0]
+ n_outputs = T.shape[1]
+ delta = - error / (n_samples * n_outputs)
+ n_layers = len(self.n_hiddens_per_layer) + 1
+ # Step backwards through the layers to back-propagate the error (delta)
+ for layeri in range(n_layers - 1, -1, -1):
+ # gradient of all but bias weights
+ self.dE_dWs[layeri][1:, :] = self.Ys[layeri].T @ delta
+ # gradient of just the bias weights
+ self.dE_dWs[layeri][0:1, :] = np.sum(delta, 0)
+ # Back-propagate this layer's delta to previous layer
+ if self.activation_function == 'relu':
+ delta = delta @ self.Ws[layeri][1:, :].T * self.grad_relu(self.Ys[layeri])
+ else:
+ delta = delta @ self.Ws[layeri][1:, :].T * (1 - self.Ys[layeri] ** 2)
+ return self.all_gradients
+
+ def use(self, X):
+ '''X assumed to not be standardized'''
+ # Standardize X
+ X = (X - self.Xmeans) / self.Xstds
+ Ys = self.forward_pass(X)
+ Y = Ys[-1]
+ # Unstandardize output Y before returning it
+ return Y * self.Tstds + self.Tmeans
+
+ def draw(self, input_names=None, output_names=None, scale='by layer', gray=False):
+ plt.title('{} weights'.format(sum([Wi.size for Wi in self.Ws])))
+
+ def isOdd(x):
+ return x % 2 != 0
+
+ n_layers = len(self.Ws)
+
+ Wmax_overall = np.max(np.abs(np.hstack([w.reshape(-1) for w in self.Ws])))
+
+ # calculate xlim and ylim for whole network plot
+ # Assume 4 characters fit between each wire
+ # -0.5 is to leave 0.5 spacing before first wire
+ xlim = max(map(len, input_names)) / 4.0 if input_names else 1
+ ylim = 0
+
+ for li in range(n_layers):
+ ni, no = self.Ws[li].shape #no means number outputs this layer
+ if not isOdd(li):
+ ylim += ni + 0.5
+ else:
+ xlim += ni + 0.5
+
+ ni, no = self.Ws[n_layers-1].shape #no means number outputs this layer
+ if isOdd(n_layers):
+ xlim += no + 0.5
+ else:
+ ylim += no + 0.5
+
+ # Add space for output names
+ if output_names:
+ if isOdd(n_layers):
+ ylim += 0.25
+ else:
+ xlim += round(max(map(len, output_names)) / 4.0)
+
+ ax = plt.gca()
+
+ # changes from Jim Jazwiecki (jim.jazwiecki@gmail.com) CS480 student
+ character_width_factor = 0.07
+ padding = 2
+ if input_names:
+ x0 = max([1, max(map(len, input_names)) * (character_width_factor * 3.5)])
+ else:
+ x0 = 1
+ y0 = 0 # to allow for constant input to first layer
+ # First Layer
+ if input_names:
+ y = 0.55
+ for n in input_names:
+ y += 1
+ ax.text(x0 - (character_width_factor * padding), y, n, horizontalalignment="right", fontsize=20)
+
+ patches = []
+ for li in range(n_layers):
+ thisW = self.Ws[li]
+ if scale == 'by layer':
+ maxW = np.max(np.abs(thisW))
+ else:
+ maxW = Wmax_overall
+ ni, no = thisW.shape
+ if not isOdd(li):
+ # Even layer index. Vertical layer. Origin is upper left.
+ # Constant input
+ ax.text(x0 - 0.2, y0 + 0.5, '1', fontsize=20)
+ for i in range(ni):
+ ax.plot((x0, x0 + no - 0.5), (y0 + i + 0.5, y0 + i + 0.5), color='gray')
+ # output lines
+ for i in range(no):
+ ax.plot((x0 + 1 + i - 0.5, x0 + 1 + i - 0.5), (y0, y0 + ni + 1), color='gray')
+ # cell "bodies"
+ xs = x0 + np.arange(no) + 0.5
+ ys = np.array([y0 + ni + 0.5] * no)
+ for x, y in zip(xs, ys):
+ patches.append(pltpatch.RegularPolygon((x, y - 0.4), 3, 0.3, 0, color ='#555555'))
+ # weights
+ if gray:
+ colors = np.array(['black', 'gray'])[(thisW.flat >= 0) + 0]
+ else:
+ colors = np.array(['red', 'green'])[(thisW.flat >= 0) + 0]
+ xs = np.arange(no) + x0 + 0.5
+ ys = np.arange(ni) + y0 + 0.5
+ coords = np.meshgrid(xs, ys)
+ for x, y, w, c in zip(coords[0].flat, coords[1].flat,
+ np.abs(thisW / maxW).flat, colors):
+ patches.append(pltpatch.Rectangle((x - w / 2, y - w / 2), w, w, color=c))
+ y0 += ni + 1
+ x0 += -1 ## shift for next layer's constant input
+ else:
+ # Odd layer index. Horizontal layer. Origin is upper left.
+ # Constant input
+ ax.text(x0 + 0.5, y0 - 0.2, '1', fontsize=20)
+ # input lines
+ for i in range(ni):
+ ax.plot((x0 + i + 0.5, x0 + i + 0.5), (y0, y0 + no - 0.5), color='gray')
+ # output lines
+ for i in range(no):
+ ax.plot((x0, x0 + ni + 1), (y0 + i+ 0.5, y0 + i + 0.5), color='gray')
+ # cell 'bodies'
+ xs = np.array([x0 + ni + 0.5] * no)
+ ys = y0 + 0.5 + np.arange(no)
+ for x, y in zip(xs, ys):
+ patches.append(pltpatch.RegularPolygon((x - 0.4, y), 3, 0.3, -math.pi / 2, color ='#555555'))
+ # weights
+ if gray:
+ colors = np.array(['black', 'gray'])[(thisW.flat >= 0) + 0]
+ else:
+ colors = np.array(['red', 'green'])[(thisW.flat >= 0) + 0]
+ xs = np.arange(ni) + x0 + 0.5
+ ys = np.arange(no) + y0 + 0.5
+ coords = np.meshgrid(xs, ys)
+ for x, y, w, c in zip(coords[0].flat, coords[1].flat,
+ np.abs(thisW / maxW).flat, colors):
+ patches.append(pltpatch.Rectangle((x - w / 2, y - w / 2), w, w, color=c))
+ x0 += ni + 1
+ y0 -= 1 ##shift to allow for next layer's constant input
+
+ collection = pltcoll.PatchCollection(patches, match_original=True)
+ ax.add_collection(collection)
+
+ # Last layer output labels
+ if output_names:
+ if isOdd(n_layers):
+ x = x0 + 1.5
+ for n in output_names:
+ x += 1
+ ax.text(x, y0 + 0.5, n, fontsize=20)
+ else:
+ y = y0 + 0.6
+ for n in output_names:
+ y += 1
+ ax.text(x0 + 0.2, y, n, fontsize=20)
+ ax.axis([0, xlim, ylim, 0])
+ ax.axis('off')
diff --git a/QNetwork/optimizers.py b/QNetwork/optimizers.py
new file mode 100644
index 0000000..7d28f92
--- /dev/null
+++ b/QNetwork/optimizers.py
@@ -0,0 +1,116 @@
+import numpy as np
+
+######################################################################
+## class Optimizers()
+######################################################################
+
+class Optimizers():
+
+ def __init__(self, all_weights):
+ '''all_weights is a vector of all of a neural networks weights concatenated into a one-dimensional vector'''
+
+ self.all_weights = all_weights
+
+ # The following initializations are only used by adam.
+ # Only initializing m, v, beta1t and beta2t here allows multiple calls to adam to handle training
+ # with multiple subsets (batches) of training data.
+ self.mt = np.zeros_like(all_weights)
+ self.vt = np.zeros_like(all_weights)
+ self.beta1 = 0.9
+ self.beta2 = 0.999
+ self.beta1t = 1
+ self.beta2t = 1
+
+
+ def sgd(self, error_f, gradient_f, fargs=[], n_epochs=100, learning_rate=0.001, verbose=True, error_convert_f=None):
+ '''
+error_f: function that requires X and T as arguments (given in fargs) and returns mean squared error.
+gradient_f: function that requires X and T as arguments (in fargs) and returns gradient of mean squared error
+ with respect to each weight.
+error_convert_f: function that converts the standardized error from error_f to original T units.
+ '''
+
+ error_trace = []
+ epochs_per_print = n_epochs // 10
+
+ for epoch in range(n_epochs):
+
+ error = error_f(*fargs)
+ grad = gradient_f(*fargs)
+
+ # Update all weights using -= to modify their values in-place.
+ self.all_weights -= learning_rate * grad
+
+ if error_convert_f:
+ error = error_convert_f(error)
+ error_trace.append(error)
+
+ if verbose and ((epoch + 1) % max(1, epochs_per_print) == 0):
+ print(f'sgd: Epoch {epoch+1:d} Error={error:.5f}')
+
+ return error_trace
+
+ def adam(self, error_f, gradient_f, fargs=[], n_epochs=100, learning_rate=0.001, verbose=True, error_convert_f=None):
+ '''
+error_f: function that requires X and T as arguments (given in fargs) and returns mean squared error.
+gradient_f: function that requires X and T as arguments (in fargs) and returns gradient of mean squared error
+ with respect to each weight.
+error_convert_f: function that converts the standardized error from error_f to original T units.
+ '''
+
+ alpha = learning_rate # learning rate called alpha in original paper on adam
+ epsilon = 1e-8
+ error_trace = []
+ epochs_per_print = n_epochs // 10
+
+ for epoch in range(n_epochs):
+
+ error = error_f(*fargs)
+ grad = gradient_f(*fargs)
+
+ self.mt[:] = self.beta1 * self.mt + (1 - self.beta1) * grad
+ self.vt[:] = self.beta2 * self.vt + (1 - self.beta2) * grad * grad
+ self.beta1t *= self.beta1
+ self.beta2t *= self.beta2
+
+ m_hat = self.mt / (1 - self.beta1t)
+ v_hat = self.vt / (1 - self.beta2t)
+
+ # Update all weights using -= to modify their values in-place.
+ self.all_weights -= alpha * m_hat / (np.sqrt(v_hat) + epsilon)
+
+ if error_convert_f:
+ error = error_convert_f(error)
+ error_trace.append(error)
+
+ if verbose and ((epoch + 1) % max(1, epochs_per_print) == 0):
+ print(f'Adam: Epoch {epoch+1:d} Error={error:.5f}')
+
+ return error_trace
+
+if __name__ == '__main__':
+
+ import matplotlib.pyplot as plt
+ plt.ion()
+
+ def parabola(wmin):
+ return ((w - wmin) ** 2)[0]
+
+ def parabola_gradient(wmin):
+ return 2 * (w - wmin)
+
+ w = np.array([0.0])
+ optimizer = Optimizers(w)
+
+ wmin = 5
+ optimizer.sgd(parabola, parabola_gradient, [wmin],
+ n_epochs=500, learning_rate=0.1)
+
+ print(f'sgd: Minimum of parabola is at {wmin}. Value found is {w}')
+
+ w = np.array([0.0])
+ optimizer = Optimizers(w)
+ optimizer.adam(parabola, parabola_gradient, [wmin],
+ n_epochs=500, learning_rate=0.1)
+
+ print(f'adam: Minimum of parabola is at {wmin}. Value found is {w}')