summaryrefslogtreecommitdiff
path: root/QNetwork/neuralnetwork_regression.py
diff options
context:
space:
mode:
authorbd-912 <bdunahu@gmail.com>2023-11-12 20:10:57 -0700
committerbd-912 <bdunahu@gmail.com>2023-11-12 20:26:49 -0700
commita2b56742da7b30afa00f33c9a806fa6031be68a5 (patch)
tree94acd653183c0cc57e0434f39f5d3917eb99fdc0 /QNetwork/neuralnetwork_regression.py
parentfa75138690814ad7a06194883a12f25c3936a15e (diff)
Added initial files
Diffstat (limited to 'QNetwork/neuralnetwork_regression.py')
-rw-r--r--QNetwork/neuralnetwork_regression.py332
1 files changed, 332 insertions, 0 deletions
diff --git a/QNetwork/neuralnetwork_regression.py b/QNetwork/neuralnetwork_regression.py
new file mode 100644
index 0000000..b26be5e
--- /dev/null
+++ b/QNetwork/neuralnetwork_regression.py
@@ -0,0 +1,332 @@
+import numpy as np
+from QNetwork import optimizers
+import sys # for sys.float_info.epsilon
+import matplotlib.pyplot as plt
+import matplotlib.patches as pltpatch # for Arc
+import matplotlib.collections as pltcoll
+import math
+
+######################################################################
+## class NeuralNetwork()
+######################################################################
+
+class NeuralNetwork():
+
+
+ def __init__(self, n_inputs, n_hiddens_per_layer, n_outputs, activation_function='tanh'):
+ self.n_inputs = n_inputs
+ self.n_outputs = n_outputs
+ self.activation_function = activation_function
+
+ # Set self.n_hiddens_per_layer to [] if argument is 0, [], or [0]
+ if n_hiddens_per_layer == 0 or n_hiddens_per_layer == [] or n_hiddens_per_layer == [0]:
+ self.n_hiddens_per_layer = []
+ else:
+ self.n_hiddens_per_layer = n_hiddens_per_layer
+
+ # Initialize weights, by first building list of all weight matrix shapes.
+ n_in = n_inputs
+ shapes = []
+ for nh in self.n_hiddens_per_layer:
+ shapes.append((n_in + 1, nh))
+ n_in = nh
+ shapes.append((n_in + 1, n_outputs))
+
+ # self.all_weights: vector of all weights
+ # self.Ws: list of weight matrices by layer
+ self.all_weights, self.Ws = self.make_weights_and_views(shapes)
+
+ # Define arrays to hold gradient values.
+ # One array for each W array with same shape.
+ self.all_gradients, self.dE_dWs = self.make_weights_and_views(shapes)
+
+ self.trained = False
+ self.total_epochs = 0
+ self.error_trace = []
+ self.Xmeans = None
+ self.Xstds = None
+ self.Tmeans = None
+ self.Tstds = None
+
+ def setup_standardization(self, Xmeans, Xstds, Tmeans, Tstds):
+ self.Xmeans = np.array(Xmeans)
+ self.Xstds = np.array(Xstds)
+ self.Tmeans = np.array(Tmeans)
+ self.Tstds = np.array(Tstds)
+
+ def make_weights_and_views(self, shapes):
+ # vector of all weights built by horizontally stacking flatenned matrices
+ # for each layer initialized with uniformly-distributed values.
+ all_weights = np.hstack([np.random.uniform(-1, 1, size=shape).flat / np.sqrt(shape[0])
+ for shape in shapes])
+ # Build list of views by reshaping corresponding elements from vector of all weights
+ # into correct shape for each layer.
+ views = []
+ start = 0
+ for shape in shapes:
+ size =shape[0] * shape[1]
+ views.append(all_weights[start:start + size].reshape(shape))
+ start += size
+ return all_weights, views
+
+
+ # Return string that shows how the constructor was called
+ def __repr__(self):
+ return f'{type(self).__name__}({self.n_inputs}, {self.n_hiddens_per_layer}, {self.n_outputs}, \'{self.activation_function}\')'
+
+
+ # Return string that is more informative to the user about the state of this neural network.
+ def __str__(self):
+ result = self.__repr__()
+ if len(self.error_trace) > 0:
+ return self.__repr__() + f' trained for {len(self.error_trace)} epochs, final training error {self.error_trace[-1]:.4f}'
+
+
+ def train(self, X, T, n_epochs, learning_rate, method='sgd', verbose=True):
+ '''
+train:
+ X: n_samples x n_inputs matrix of input samples, one per row
+ T: n_samples x n_outputs matrix of target output values, one sample per row
+ n_epochs: number of passes to take through all samples updating weights each pass
+ learning_rate: factor controlling the step size of each update
+ method: is either 'sgd' or 'adam'
+ '''
+
+ # Setup standardization parameters
+ if self.Xmeans is None:
+ self.Xmeans = X.mean(axis=0)
+ self.Xstds = X.std(axis=0)
+ self.Xstds[self.Xstds == 0] = 1 # So we don't divide by zero when standardizing
+ self.Tmeans = T.mean(axis=0)
+ self.Tstds = T.std(axis=0)
+
+ # Standardize X and T
+ X = (X - self.Xmeans) / self.Xstds
+ T = (T - self.Tmeans) / self.Tstds
+
+ # Instantiate Optimizers object by giving it vector of all weights
+ optimizer = optimizers.Optimizers(self.all_weights)
+
+ # Define function to convert value from error_f into error in original T units,
+ # but only if the network has a single output. Multiplying by self.Tstds for
+ # multiple outputs does not correctly unstandardize the error.
+ if len(self.Tstds) == 1:
+ error_convert_f = lambda err: (np.sqrt(err) * self.Tstds)[0] # to scalar
+ else:
+ error_convert_f = lambda err: np.sqrt(err)[0] # to scalar
+
+
+ if method == 'sgd':
+
+ error_trace = optimizer.sgd(self.error_f, self.gradient_f,
+ fargs=[X, T], n_epochs=n_epochs,
+ learning_rate=learning_rate,
+ verbose=verbose,
+ error_convert_f=error_convert_f)
+
+ elif method == 'adam':
+
+ error_trace = optimizer.adam(self.error_f, self.gradient_f,
+ fargs=[X, T], n_epochs=n_epochs,
+ learning_rate=learning_rate,
+ verbose=verbose,
+ error_convert_f=error_convert_f)
+
+ else:
+ raise Exception("method must be 'sgd' or 'adam'")
+
+ self.error_trace = error_trace
+
+ # Return neural network object to allow applying other methods after training.
+ # Example: Y = nnet.train(X, T, 100, 0.01).use(X)
+ return self
+
+ def relu(self, s):
+ s[s < 0] = 0
+ return s
+
+ def grad_relu(self, s):
+ return (s > 0).astype(int)
+
+ def forward_pass(self, X):
+ '''X assumed already standardized. Output returned as standardized.'''
+ self.Ys = [X]
+ for W in self.Ws[:-1]:
+ if self.activation_function == 'relu':
+ self.Ys.append(self.relu(self.Ys[-1] @ W[1:, :] + W[0:1, :]))
+ else:
+ self.Ys.append(np.tanh(self.Ys[-1] @ W[1:, :] + W[0:1, :]))
+ last_W = self.Ws[-1]
+ self.Ys.append(self.Ys[-1] @ last_W[1:, :] + last_W[0:1, :])
+ return self.Ys
+
+ # Function to be minimized by optimizer method, mean squared error
+ def error_f(self, X, T):
+ Ys = self.forward_pass(X)
+ mean_sq_error = np.mean((T - Ys[-1]) ** 2)
+ return mean_sq_error
+
+ # Gradient of function to be minimized for use by optimizer method
+ def gradient_f(self, X, T):
+ '''Assumes forward_pass just called with layer outputs in self.Ys.'''
+ error = T - self.Ys[-1]
+ n_samples = X.shape[0]
+ n_outputs = T.shape[1]
+ delta = - error / (n_samples * n_outputs)
+ n_layers = len(self.n_hiddens_per_layer) + 1
+ # Step backwards through the layers to back-propagate the error (delta)
+ for layeri in range(n_layers - 1, -1, -1):
+ # gradient of all but bias weights
+ self.dE_dWs[layeri][1:, :] = self.Ys[layeri].T @ delta
+ # gradient of just the bias weights
+ self.dE_dWs[layeri][0:1, :] = np.sum(delta, 0)
+ # Back-propagate this layer's delta to previous layer
+ if self.activation_function == 'relu':
+ delta = delta @ self.Ws[layeri][1:, :].T * self.grad_relu(self.Ys[layeri])
+ else:
+ delta = delta @ self.Ws[layeri][1:, :].T * (1 - self.Ys[layeri] ** 2)
+ return self.all_gradients
+
+ def use(self, X):
+ '''X assumed to not be standardized'''
+ # Standardize X
+ X = (X - self.Xmeans) / self.Xstds
+ Ys = self.forward_pass(X)
+ Y = Ys[-1]
+ # Unstandardize output Y before returning it
+ return Y * self.Tstds + self.Tmeans
+
+ def draw(self, input_names=None, output_names=None, scale='by layer', gray=False):
+ plt.title('{} weights'.format(sum([Wi.size for Wi in self.Ws])))
+
+ def isOdd(x):
+ return x % 2 != 0
+
+ n_layers = len(self.Ws)
+
+ Wmax_overall = np.max(np.abs(np.hstack([w.reshape(-1) for w in self.Ws])))
+
+ # calculate xlim and ylim for whole network plot
+ # Assume 4 characters fit between each wire
+ # -0.5 is to leave 0.5 spacing before first wire
+ xlim = max(map(len, input_names)) / 4.0 if input_names else 1
+ ylim = 0
+
+ for li in range(n_layers):
+ ni, no = self.Ws[li].shape #no means number outputs this layer
+ if not isOdd(li):
+ ylim += ni + 0.5
+ else:
+ xlim += ni + 0.5
+
+ ni, no = self.Ws[n_layers-1].shape #no means number outputs this layer
+ if isOdd(n_layers):
+ xlim += no + 0.5
+ else:
+ ylim += no + 0.5
+
+ # Add space for output names
+ if output_names:
+ if isOdd(n_layers):
+ ylim += 0.25
+ else:
+ xlim += round(max(map(len, output_names)) / 4.0)
+
+ ax = plt.gca()
+
+ # changes from Jim Jazwiecki (jim.jazwiecki@gmail.com) CS480 student
+ character_width_factor = 0.07
+ padding = 2
+ if input_names:
+ x0 = max([1, max(map(len, input_names)) * (character_width_factor * 3.5)])
+ else:
+ x0 = 1
+ y0 = 0 # to allow for constant input to first layer
+ # First Layer
+ if input_names:
+ y = 0.55
+ for n in input_names:
+ y += 1
+ ax.text(x0 - (character_width_factor * padding), y, n, horizontalalignment="right", fontsize=20)
+
+ patches = []
+ for li in range(n_layers):
+ thisW = self.Ws[li]
+ if scale == 'by layer':
+ maxW = np.max(np.abs(thisW))
+ else:
+ maxW = Wmax_overall
+ ni, no = thisW.shape
+ if not isOdd(li):
+ # Even layer index. Vertical layer. Origin is upper left.
+ # Constant input
+ ax.text(x0 - 0.2, y0 + 0.5, '1', fontsize=20)
+ for i in range(ni):
+ ax.plot((x0, x0 + no - 0.5), (y0 + i + 0.5, y0 + i + 0.5), color='gray')
+ # output lines
+ for i in range(no):
+ ax.plot((x0 + 1 + i - 0.5, x0 + 1 + i - 0.5), (y0, y0 + ni + 1), color='gray')
+ # cell "bodies"
+ xs = x0 + np.arange(no) + 0.5
+ ys = np.array([y0 + ni + 0.5] * no)
+ for x, y in zip(xs, ys):
+ patches.append(pltpatch.RegularPolygon((x, y - 0.4), 3, 0.3, 0, color ='#555555'))
+ # weights
+ if gray:
+ colors = np.array(['black', 'gray'])[(thisW.flat >= 0) + 0]
+ else:
+ colors = np.array(['red', 'green'])[(thisW.flat >= 0) + 0]
+ xs = np.arange(no) + x0 + 0.5
+ ys = np.arange(ni) + y0 + 0.5
+ coords = np.meshgrid(xs, ys)
+ for x, y, w, c in zip(coords[0].flat, coords[1].flat,
+ np.abs(thisW / maxW).flat, colors):
+ patches.append(pltpatch.Rectangle((x - w / 2, y - w / 2), w, w, color=c))
+ y0 += ni + 1
+ x0 += -1 ## shift for next layer's constant input
+ else:
+ # Odd layer index. Horizontal layer. Origin is upper left.
+ # Constant input
+ ax.text(x0 + 0.5, y0 - 0.2, '1', fontsize=20)
+ # input lines
+ for i in range(ni):
+ ax.plot((x0 + i + 0.5, x0 + i + 0.5), (y0, y0 + no - 0.5), color='gray')
+ # output lines
+ for i in range(no):
+ ax.plot((x0, x0 + ni + 1), (y0 + i+ 0.5, y0 + i + 0.5), color='gray')
+ # cell 'bodies'
+ xs = np.array([x0 + ni + 0.5] * no)
+ ys = y0 + 0.5 + np.arange(no)
+ for x, y in zip(xs, ys):
+ patches.append(pltpatch.RegularPolygon((x - 0.4, y), 3, 0.3, -math.pi / 2, color ='#555555'))
+ # weights
+ if gray:
+ colors = np.array(['black', 'gray'])[(thisW.flat >= 0) + 0]
+ else:
+ colors = np.array(['red', 'green'])[(thisW.flat >= 0) + 0]
+ xs = np.arange(ni) + x0 + 0.5
+ ys = np.arange(no) + y0 + 0.5
+ coords = np.meshgrid(xs, ys)
+ for x, y, w, c in zip(coords[0].flat, coords[1].flat,
+ np.abs(thisW / maxW).flat, colors):
+ patches.append(pltpatch.Rectangle((x - w / 2, y - w / 2), w, w, color=c))
+ x0 += ni + 1
+ y0 -= 1 ##shift to allow for next layer's constant input
+
+ collection = pltcoll.PatchCollection(patches, match_original=True)
+ ax.add_collection(collection)
+
+ # Last layer output labels
+ if output_names:
+ if isOdd(n_layers):
+ x = x0 + 1.5
+ for n in output_names:
+ x += 1
+ ax.text(x, y0 + 0.5, n, fontsize=20)
+ else:
+ y = y0 + 0.6
+ for n in output_names:
+ y += 1
+ ax.text(x0 + 0.2, y, n, fontsize=20)
+ ax.axis([0, xlim, ylim, 0])
+ ax.axis('off')