diff options
Diffstat (limited to 'QNetwork')
-rw-r--r-- | QNetwork/__init__.py | 0 | ||||
-rw-r--r-- | QNetwork/neuralnetwork_regression.py | 332 | ||||
-rw-r--r-- | QNetwork/optimizers.py | 116 |
3 files changed, 448 insertions, 0 deletions
diff --git a/QNetwork/__init__.py b/QNetwork/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/QNetwork/__init__.py diff --git a/QNetwork/neuralnetwork_regression.py b/QNetwork/neuralnetwork_regression.py new file mode 100644 index 0000000..b26be5e --- /dev/null +++ b/QNetwork/neuralnetwork_regression.py @@ -0,0 +1,332 @@ +import numpy as np +from QNetwork import optimizers +import sys # for sys.float_info.epsilon +import matplotlib.pyplot as plt +import matplotlib.patches as pltpatch # for Arc +import matplotlib.collections as pltcoll +import math + +###################################################################### +## class NeuralNetwork() +###################################################################### + +class NeuralNetwork(): + + + def __init__(self, n_inputs, n_hiddens_per_layer, n_outputs, activation_function='tanh'): + self.n_inputs = n_inputs + self.n_outputs = n_outputs + self.activation_function = activation_function + + # Set self.n_hiddens_per_layer to [] if argument is 0, [], or [0] + if n_hiddens_per_layer == 0 or n_hiddens_per_layer == [] or n_hiddens_per_layer == [0]: + self.n_hiddens_per_layer = [] + else: + self.n_hiddens_per_layer = n_hiddens_per_layer + + # Initialize weights, by first building list of all weight matrix shapes. + n_in = n_inputs + shapes = [] + for nh in self.n_hiddens_per_layer: + shapes.append((n_in + 1, nh)) + n_in = nh + shapes.append((n_in + 1, n_outputs)) + + # self.all_weights: vector of all weights + # self.Ws: list of weight matrices by layer + self.all_weights, self.Ws = self.make_weights_and_views(shapes) + + # Define arrays to hold gradient values. + # One array for each W array with same shape. + self.all_gradients, self.dE_dWs = self.make_weights_and_views(shapes) + + self.trained = False + self.total_epochs = 0 + self.error_trace = [] + self.Xmeans = None + self.Xstds = None + self.Tmeans = None + self.Tstds = None + + def setup_standardization(self, Xmeans, Xstds, Tmeans, Tstds): + self.Xmeans = np.array(Xmeans) + self.Xstds = np.array(Xstds) + self.Tmeans = np.array(Tmeans) + self.Tstds = np.array(Tstds) + + def make_weights_and_views(self, shapes): + # vector of all weights built by horizontally stacking flatenned matrices + # for each layer initialized with uniformly-distributed values. + all_weights = np.hstack([np.random.uniform(-1, 1, size=shape).flat / np.sqrt(shape[0]) + for shape in shapes]) + # Build list of views by reshaping corresponding elements from vector of all weights + # into correct shape for each layer. + views = [] + start = 0 + for shape in shapes: + size =shape[0] * shape[1] + views.append(all_weights[start:start + size].reshape(shape)) + start += size + return all_weights, views + + + # Return string that shows how the constructor was called + def __repr__(self): + return f'{type(self).__name__}({self.n_inputs}, {self.n_hiddens_per_layer}, {self.n_outputs}, \'{self.activation_function}\')' + + + # Return string that is more informative to the user about the state of this neural network. + def __str__(self): + result = self.__repr__() + if len(self.error_trace) > 0: + return self.__repr__() + f' trained for {len(self.error_trace)} epochs, final training error {self.error_trace[-1]:.4f}' + + + def train(self, X, T, n_epochs, learning_rate, method='sgd', verbose=True): + ''' +train: + X: n_samples x n_inputs matrix of input samples, one per row + T: n_samples x n_outputs matrix of target output values, one sample per row + n_epochs: number of passes to take through all samples updating weights each pass + learning_rate: factor controlling the step size of each update + method: is either 'sgd' or 'adam' + ''' + + # Setup standardization parameters + if self.Xmeans is None: + self.Xmeans = X.mean(axis=0) + self.Xstds = X.std(axis=0) + self.Xstds[self.Xstds == 0] = 1 # So we don't divide by zero when standardizing + self.Tmeans = T.mean(axis=0) + self.Tstds = T.std(axis=0) + + # Standardize X and T + X = (X - self.Xmeans) / self.Xstds + T = (T - self.Tmeans) / self.Tstds + + # Instantiate Optimizers object by giving it vector of all weights + optimizer = optimizers.Optimizers(self.all_weights) + + # Define function to convert value from error_f into error in original T units, + # but only if the network has a single output. Multiplying by self.Tstds for + # multiple outputs does not correctly unstandardize the error. + if len(self.Tstds) == 1: + error_convert_f = lambda err: (np.sqrt(err) * self.Tstds)[0] # to scalar + else: + error_convert_f = lambda err: np.sqrt(err)[0] # to scalar + + + if method == 'sgd': + + error_trace = optimizer.sgd(self.error_f, self.gradient_f, + fargs=[X, T], n_epochs=n_epochs, + learning_rate=learning_rate, + verbose=verbose, + error_convert_f=error_convert_f) + + elif method == 'adam': + + error_trace = optimizer.adam(self.error_f, self.gradient_f, + fargs=[X, T], n_epochs=n_epochs, + learning_rate=learning_rate, + verbose=verbose, + error_convert_f=error_convert_f) + + else: + raise Exception("method must be 'sgd' or 'adam'") + + self.error_trace = error_trace + + # Return neural network object to allow applying other methods after training. + # Example: Y = nnet.train(X, T, 100, 0.01).use(X) + return self + + def relu(self, s): + s[s < 0] = 0 + return s + + def grad_relu(self, s): + return (s > 0).astype(int) + + def forward_pass(self, X): + '''X assumed already standardized. Output returned as standardized.''' + self.Ys = [X] + for W in self.Ws[:-1]: + if self.activation_function == 'relu': + self.Ys.append(self.relu(self.Ys[-1] @ W[1:, :] + W[0:1, :])) + else: + self.Ys.append(np.tanh(self.Ys[-1] @ W[1:, :] + W[0:1, :])) + last_W = self.Ws[-1] + self.Ys.append(self.Ys[-1] @ last_W[1:, :] + last_W[0:1, :]) + return self.Ys + + # Function to be minimized by optimizer method, mean squared error + def error_f(self, X, T): + Ys = self.forward_pass(X) + mean_sq_error = np.mean((T - Ys[-1]) ** 2) + return mean_sq_error + + # Gradient of function to be minimized for use by optimizer method + def gradient_f(self, X, T): + '''Assumes forward_pass just called with layer outputs in self.Ys.''' + error = T - self.Ys[-1] + n_samples = X.shape[0] + n_outputs = T.shape[1] + delta = - error / (n_samples * n_outputs) + n_layers = len(self.n_hiddens_per_layer) + 1 + # Step backwards through the layers to back-propagate the error (delta) + for layeri in range(n_layers - 1, -1, -1): + # gradient of all but bias weights + self.dE_dWs[layeri][1:, :] = self.Ys[layeri].T @ delta + # gradient of just the bias weights + self.dE_dWs[layeri][0:1, :] = np.sum(delta, 0) + # Back-propagate this layer's delta to previous layer + if self.activation_function == 'relu': + delta = delta @ self.Ws[layeri][1:, :].T * self.grad_relu(self.Ys[layeri]) + else: + delta = delta @ self.Ws[layeri][1:, :].T * (1 - self.Ys[layeri] ** 2) + return self.all_gradients + + def use(self, X): + '''X assumed to not be standardized''' + # Standardize X + X = (X - self.Xmeans) / self.Xstds + Ys = self.forward_pass(X) + Y = Ys[-1] + # Unstandardize output Y before returning it + return Y * self.Tstds + self.Tmeans + + def draw(self, input_names=None, output_names=None, scale='by layer', gray=False): + plt.title('{} weights'.format(sum([Wi.size for Wi in self.Ws]))) + + def isOdd(x): + return x % 2 != 0 + + n_layers = len(self.Ws) + + Wmax_overall = np.max(np.abs(np.hstack([w.reshape(-1) for w in self.Ws]))) + + # calculate xlim and ylim for whole network plot + # Assume 4 characters fit between each wire + # -0.5 is to leave 0.5 spacing before first wire + xlim = max(map(len, input_names)) / 4.0 if input_names else 1 + ylim = 0 + + for li in range(n_layers): + ni, no = self.Ws[li].shape #no means number outputs this layer + if not isOdd(li): + ylim += ni + 0.5 + else: + xlim += ni + 0.5 + + ni, no = self.Ws[n_layers-1].shape #no means number outputs this layer + if isOdd(n_layers): + xlim += no + 0.5 + else: + ylim += no + 0.5 + + # Add space for output names + if output_names: + if isOdd(n_layers): + ylim += 0.25 + else: + xlim += round(max(map(len, output_names)) / 4.0) + + ax = plt.gca() + + # changes from Jim Jazwiecki (jim.jazwiecki@gmail.com) CS480 student + character_width_factor = 0.07 + padding = 2 + if input_names: + x0 = max([1, max(map(len, input_names)) * (character_width_factor * 3.5)]) + else: + x0 = 1 + y0 = 0 # to allow for constant input to first layer + # First Layer + if input_names: + y = 0.55 + for n in input_names: + y += 1 + ax.text(x0 - (character_width_factor * padding), y, n, horizontalalignment="right", fontsize=20) + + patches = [] + for li in range(n_layers): + thisW = self.Ws[li] + if scale == 'by layer': + maxW = np.max(np.abs(thisW)) + else: + maxW = Wmax_overall + ni, no = thisW.shape + if not isOdd(li): + # Even layer index. Vertical layer. Origin is upper left. + # Constant input + ax.text(x0 - 0.2, y0 + 0.5, '1', fontsize=20) + for i in range(ni): + ax.plot((x0, x0 + no - 0.5), (y0 + i + 0.5, y0 + i + 0.5), color='gray') + # output lines + for i in range(no): + ax.plot((x0 + 1 + i - 0.5, x0 + 1 + i - 0.5), (y0, y0 + ni + 1), color='gray') + # cell "bodies" + xs = x0 + np.arange(no) + 0.5 + ys = np.array([y0 + ni + 0.5] * no) + for x, y in zip(xs, ys): + patches.append(pltpatch.RegularPolygon((x, y - 0.4), 3, 0.3, 0, color ='#555555')) + # weights + if gray: + colors = np.array(['black', 'gray'])[(thisW.flat >= 0) + 0] + else: + colors = np.array(['red', 'green'])[(thisW.flat >= 0) + 0] + xs = np.arange(no) + x0 + 0.5 + ys = np.arange(ni) + y0 + 0.5 + coords = np.meshgrid(xs, ys) + for x, y, w, c in zip(coords[0].flat, coords[1].flat, + np.abs(thisW / maxW).flat, colors): + patches.append(pltpatch.Rectangle((x - w / 2, y - w / 2), w, w, color=c)) + y0 += ni + 1 + x0 += -1 ## shift for next layer's constant input + else: + # Odd layer index. Horizontal layer. Origin is upper left. + # Constant input + ax.text(x0 + 0.5, y0 - 0.2, '1', fontsize=20) + # input lines + for i in range(ni): + ax.plot((x0 + i + 0.5, x0 + i + 0.5), (y0, y0 + no - 0.5), color='gray') + # output lines + for i in range(no): + ax.plot((x0, x0 + ni + 1), (y0 + i+ 0.5, y0 + i + 0.5), color='gray') + # cell 'bodies' + xs = np.array([x0 + ni + 0.5] * no) + ys = y0 + 0.5 + np.arange(no) + for x, y in zip(xs, ys): + patches.append(pltpatch.RegularPolygon((x - 0.4, y), 3, 0.3, -math.pi / 2, color ='#555555')) + # weights + if gray: + colors = np.array(['black', 'gray'])[(thisW.flat >= 0) + 0] + else: + colors = np.array(['red', 'green'])[(thisW.flat >= 0) + 0] + xs = np.arange(ni) + x0 + 0.5 + ys = np.arange(no) + y0 + 0.5 + coords = np.meshgrid(xs, ys) + for x, y, w, c in zip(coords[0].flat, coords[1].flat, + np.abs(thisW / maxW).flat, colors): + patches.append(pltpatch.Rectangle((x - w / 2, y - w / 2), w, w, color=c)) + x0 += ni + 1 + y0 -= 1 ##shift to allow for next layer's constant input + + collection = pltcoll.PatchCollection(patches, match_original=True) + ax.add_collection(collection) + + # Last layer output labels + if output_names: + if isOdd(n_layers): + x = x0 + 1.5 + for n in output_names: + x += 1 + ax.text(x, y0 + 0.5, n, fontsize=20) + else: + y = y0 + 0.6 + for n in output_names: + y += 1 + ax.text(x0 + 0.2, y, n, fontsize=20) + ax.axis([0, xlim, ylim, 0]) + ax.axis('off') diff --git a/QNetwork/optimizers.py b/QNetwork/optimizers.py new file mode 100644 index 0000000..7d28f92 --- /dev/null +++ b/QNetwork/optimizers.py @@ -0,0 +1,116 @@ +import numpy as np + +###################################################################### +## class Optimizers() +###################################################################### + +class Optimizers(): + + def __init__(self, all_weights): + '''all_weights is a vector of all of a neural networks weights concatenated into a one-dimensional vector''' + + self.all_weights = all_weights + + # The following initializations are only used by adam. + # Only initializing m, v, beta1t and beta2t here allows multiple calls to adam to handle training + # with multiple subsets (batches) of training data. + self.mt = np.zeros_like(all_weights) + self.vt = np.zeros_like(all_weights) + self.beta1 = 0.9 + self.beta2 = 0.999 + self.beta1t = 1 + self.beta2t = 1 + + + def sgd(self, error_f, gradient_f, fargs=[], n_epochs=100, learning_rate=0.001, verbose=True, error_convert_f=None): + ''' +error_f: function that requires X and T as arguments (given in fargs) and returns mean squared error. +gradient_f: function that requires X and T as arguments (in fargs) and returns gradient of mean squared error + with respect to each weight. +error_convert_f: function that converts the standardized error from error_f to original T units. + ''' + + error_trace = [] + epochs_per_print = n_epochs // 10 + + for epoch in range(n_epochs): + + error = error_f(*fargs) + grad = gradient_f(*fargs) + + # Update all weights using -= to modify their values in-place. + self.all_weights -= learning_rate * grad + + if error_convert_f: + error = error_convert_f(error) + error_trace.append(error) + + if verbose and ((epoch + 1) % max(1, epochs_per_print) == 0): + print(f'sgd: Epoch {epoch+1:d} Error={error:.5f}') + + return error_trace + + def adam(self, error_f, gradient_f, fargs=[], n_epochs=100, learning_rate=0.001, verbose=True, error_convert_f=None): + ''' +error_f: function that requires X and T as arguments (given in fargs) and returns mean squared error. +gradient_f: function that requires X and T as arguments (in fargs) and returns gradient of mean squared error + with respect to each weight. +error_convert_f: function that converts the standardized error from error_f to original T units. + ''' + + alpha = learning_rate # learning rate called alpha in original paper on adam + epsilon = 1e-8 + error_trace = [] + epochs_per_print = n_epochs // 10 + + for epoch in range(n_epochs): + + error = error_f(*fargs) + grad = gradient_f(*fargs) + + self.mt[:] = self.beta1 * self.mt + (1 - self.beta1) * grad + self.vt[:] = self.beta2 * self.vt + (1 - self.beta2) * grad * grad + self.beta1t *= self.beta1 + self.beta2t *= self.beta2 + + m_hat = self.mt / (1 - self.beta1t) + v_hat = self.vt / (1 - self.beta2t) + + # Update all weights using -= to modify their values in-place. + self.all_weights -= alpha * m_hat / (np.sqrt(v_hat) + epsilon) + + if error_convert_f: + error = error_convert_f(error) + error_trace.append(error) + + if verbose and ((epoch + 1) % max(1, epochs_per_print) == 0): + print(f'Adam: Epoch {epoch+1:d} Error={error:.5f}') + + return error_trace + +if __name__ == '__main__': + + import matplotlib.pyplot as plt + plt.ion() + + def parabola(wmin): + return ((w - wmin) ** 2)[0] + + def parabola_gradient(wmin): + return 2 * (w - wmin) + + w = np.array([0.0]) + optimizer = Optimizers(w) + + wmin = 5 + optimizer.sgd(parabola, parabola_gradient, [wmin], + n_epochs=500, learning_rate=0.1) + + print(f'sgd: Minimum of parabola is at {wmin}. Value found is {w}') + + w = np.array([0.0]) + optimizer = Optimizers(w) + optimizer.adam(parabola, parabola_gradient, [wmin], + n_epochs=500, learning_rate=0.1) + + print(f'adam: Minimum of parabola is at {wmin}. Value found is {w}') |