{ "cells": [ { "cell_type": "code", "execution_count": 1, "id": "85da5df2-c926-417c-bd7b-d214ad31ebe1", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "pygame 2.5.1 (SDL 2.28.2, Python 3.11.5)\n", "Hello from the pygame community. https://www.pygame.org/contribute.html\n" ] } ], "source": [ "import numpy as np\n", "from collections import namedtuple\n", "from IPython.core.debugger import Pdb\n", "\n", "from GameEngine import multiplayer\n", "Point = namedtuple('Point', 'x, y')\n", "SA = namedtuple('SA', 'state, action')" ] }, { "cell_type": "markdown", "id": "d264ae4a-380c-47b6-9b29-c6fe16c6399c", "metadata": {}, "source": [ "### New Game Implementation\n", "\n", "I have an improved game implementation which allows for multiplayer snake games, as well as simplified training. This notebook will go over both of these, including an implementation of q-table learning, as well as a match between a manually filled out q-table and a learned one.\n", "\n", "I'll start by initializing the engine object:" ] }, { "cell_type": "code", "execution_count": 2, "id": "2a382240-906d-474f-94c0-9af1a5de97ae", "metadata": {}, "outputs": [], "source": [ "# defines game window size and block size, in pixels\n", "WINDOW_WIDTH = 480\n", "WINDOW_HEIGHT = 320\n", "GAME_UNITS = 80" ] }, { "cell_type": "code", "execution_count": 3, "id": "976eff80-c50a-492e-a49b-975d9905e274", "metadata": {}, "outputs": [], "source": [ "game_engine = multiplayer.Playfield(window_width=WINDOW_WIDTH,\n", " window_height=WINDOW_HEIGHT,\n", " units=GAME_UNITS,\n", " g_speed=45,\n", " s_size=1)" ] }, { "cell_type": "markdown", "id": "23de2fca-d31a-497f-9a0c-845c568e5df7", "metadata": {}, "source": [ "Here is a run-down of the current functions available to programs utilizing the game engine:\n", "\n", "**add_player**: Returns the player's number to the callee (between 0-3, for a total of 4 players). This number can be used with other functions to index that player's state.\n", "\n", "**get_heads_tails_and_goal**: Returns an array of player heads (in order of player number) the locations of all snake tails, as well as the goal location. Each is stored in an array of named tuples.\n", "\n", "**get_viable_actions**: Given a player's id, returns a list of integers corresponding to actions which will not immediately result in the snake's death.\n", "0 = UP\n", "1 = RIGHT\n", "2 = DOWN\n", "3 = LEFT\n", "\n", "**start_game**: Initializes goal, player, score, and playfield objects. Disables the ability to add new players. Enables use of player_advance function.\n", "\n", "**stop_game**: Sets the game_state to false, allowing new players to be added.\n", "\n", "**cleanup**: Quits the pygame window.\n", "\n", "**player_advance**: Given an array corresponding to each player's action (integers), returns a list of collision results, and updates the internal game state.\n", "\n", "**toggle_draw**: Turns off/on the game UI for faster training.\n", "\n", "I can do a test of these functions:" ] }, { "cell_type": "code", "execution_count": 4, "id": "64470c1a-ddc6-4ce1-b6d4-f1e17e3d0176", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Game starting with 1 players.\n" ] }, { "data": { "text/plain": [ "0" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "p1 = game_engine.add_player()\n", "game_engine.start_game()\n", "p1" ] }, { "cell_type": "code", "execution_count": 5, "id": "28d7fa43-4419-4940-9d4f-d6bd5ccc11ec", "metadata": {}, "outputs": [], "source": [ "viable_actions = game_engine.get_viable_actions(p1)" ] }, { "cell_type": "code", "execution_count": 6, "id": "ba45b03a-9c42-48e9-a259-5d01e667157d", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "game_engine.player_advance([np.random.choice(viable_actions)])" ] }, { "cell_type": "markdown", "id": "df7170ae-e95d-404b-b632-e8e9de46d69e", "metadata": {}, "source": [ "If you looked at the UI for this last statement, you should have seen that the game moved the snake (yellow) in a random direction away from immediate death." ] }, { "cell_type": "markdown", "id": "144bc5ff-756c-4e07-a855-4020a4474d52", "metadata": {}, "source": [ "### State-sensing methods, creating and reading a q-table\n", "Now, we can start redesigning some functions used to allow the snake to play intelligently. We'll use a multi-dimensional numpy array to store the expected rewards corresponding to each state and action. This is called a q-function, or a q-table in this case, and represents one of the most fundamental methods of reinforcement learning. More on this later...\n", "\n", "How many states do I need? Seeing how the new **get_viable_actions** method already prevents the snake from choosing life-ending moves, the snake is no longer tasked with learning or memorizing it.\n", "\n", "The snake does need to be able to interpret progress towards the goal, so I will reinclude one state; the compass direction from the snake's head to the goal. This means I need only 8 states (with entries for each four actions) to represent my game now." ] }, { "cell_type": "code", "execution_count": 7, "id": "88a69876-fbc7-4dc1-a471-d8449fada4e4", "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "array([[0., 0., 0., 0.],\n", " [0., 0., 0., 0.],\n", " [0., 0., 0., 0.],\n", " [0., 0., 0., 0.],\n", " [0., 0., 0., 0.],\n", " [0., 0., 0., 0.],\n", " [0., 0., 0., 0.],\n", " [0., 0., 0., 0.]])" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "goal_relations = 8\n", "actions = 4\n", "q = np.zeros((goal_relations,\n", " actions))\n", "q" ] }, { "cell_type": "code", "execution_count": 8, "id": "3e15744e-1251-4315-8a4c-ed5788d04478", "metadata": {}, "outputs": [], "source": [ "def sense_goal(head, goal):\n", " '''\n", " maps head and goal location onto an\n", " integer corresponding to approx location\n", " '''\n", " diffs = Point(goal.x - head.x, goal.y - head.y)\n", "\n", " if diffs.x == 0 and diffs.y < 0:\n", " return 0\n", " if diffs.x > 0 and diffs.y < 0:\n", " return 1\n", " if diffs.x > 0 and diffs.y == 0:\n", " return 2\n", " if diffs.x > 0 and diffs.y > 0:\n", " return 3\n", " if diffs.x == 0 and diffs.y > 0:\n", " return 4\n", " if diffs.x < 0 and diffs.y > 0:\n", " return 5\n", " if diffs.x < 0 and diffs.y == 0:\n", " return 6\n", " return 7" ] }, { "cell_type": "markdown", "id": "addf716b-892c-4f7f-b71f-c6af8779dff7", "metadata": {}, "source": [ "I will use the getter provided by my engine, which queries various statistics about all agents in the game:\n", "1. An array of head positions\n", "2. An array of all tail locations\n", "3. The goal location" ] }, { "cell_type": "code", "execution_count": 9, "id": "7b5f0d57-8e26-4f95-b7ea-a6810936ad5d", "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "([Point(x=400, y=160)], [Point(x=400, y=160)], Point(x=0, y=80))" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "game_engine.get_heads_tails_and_goal()" ] }, { "cell_type": "markdown", "id": "d3fd47ce-55fe-4d2f-9147-8848193f7ca1", "metadata": {}, "source": [ "Now to make a function to index our expected reward-to-go given a state using sense_goal. Because we only have one state-sensing function, this function really only serves as a neat interface to sense_goal:" ] }, { "cell_type": "code", "execution_count": 10, "id": "85e2bab1-c98b-400e-be41-a47ccd4bc163", "metadata": {}, "outputs": [], "source": [ "def index_actions(q, id):\n", " '''\n", " given q, player_id, an array of heads,\n", " and the goal position,\n", " indexes into the corresponding expected\n", " reward of each action\n", " '''\n", " heads, tails, goal = game_engine.get_heads_tails_and_goal()\n", " state = sense_goal(heads[id], goal)\n", " return state, q[state, :]" ] }, { "cell_type": "markdown", "id": "33ae53a8-989a-410c-a04f-2ac76561ce21", "metadata": {}, "source": [ "Returning state here simplifies some logic later when I train the agent. It will be passed along to my next function, but it can be ignored for now." ] }, { "cell_type": "code", "execution_count": 11, "id": "3808c200-2f67-43c4-a80f-c4c17dcfeacf", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([0., 0., 0., 0.])" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "_, rewards = index_actions(q, p1)\n", "rewards" ] }, { "cell_type": "markdown", "id": "6a2ef7f7-f6f7-4610-8e98-1d389327f3e8", "metadata": {}, "source": [ "In my learning agent, these actions will obviously be associated with different expected rewards. Essentially, I have a function that, given a state, tells me the expected utility of each action. Should I just choose the best one?\n", "\n", "There are two problems with a greedy approach...\n", "\n", "1. If the agent only sticks to the one policy it knows, then it will never truly learn the best solution. This is not the largest problem, due to how few states we have. Additionally, the snake is forced into a new state everytime it collects a goal. Still, a pure greedy approach results in slow, sub-optimal training.\n", "2. My snake is ignorant of which actions will result in a collision. The snake must understand that its Q function will commonly lead it astray in navigating its environment.\n", "\n", "In cases where the snake has explored enough to make its Q function useful and death is not a possibility, I still do want the snake to be greedy. I chose to implement a replacement argmin/max function to select actions from this table, which generates new actions in order from highest expected reward to lowest expected reward." ] }, { "cell_type": "code", "execution_count": 12, "id": "a172e347-75b7-4b0a-8dcc-07a6ba04f77e", "metadata": {}, "outputs": [], "source": [ "def argmin_gen(rewards):\n", " rewards = rewards.copy()\n", " for i in range(rewards.size):\n", " best_action = np.argmin(rewards)\n", " rewards[best_action] = float(\"inf\")\n", " yield best_action" ] }, { "cell_type": "code", "execution_count": 13, "id": "9674f225-e7df-4551-baa8-29eb23fcc1d5", "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0\n", "1\n", "2\n", "3\n" ] } ], "source": [ "for action in argmin_gen(rewards):\n", " print(action)" ] }, { "cell_type": "markdown", "id": "b9bed101-661c-4e61-b8ad-94bbc1900e03", "metadata": {}, "source": [ "How will I use this? Remember that my game engine includes all the logic to calculate which actions are immediately dangerous. If the action generated is not a viable action, we will take the next best action, or the next, next best action, etc.\n", "\n", "What if no actions are viable? Then the agent has boxed itself in, and it doesn't matter what action we choose.\n", "\n", "Previously, I reset the snake if it got stuck in a learning-loop. I will instead use epsilon, as it is the tried-and-true method of enforcing exploration.\n", "\n", "Here is my greedy-epsilon function, combining the work of all the previous code:" ] }, { "cell_type": "code", "execution_count": 14, "id": "d3d65397-62a9-47b6-9c84-808282656f80", "metadata": {}, "outputs": [], "source": [ "def pick_greedy_action(q, id, epsilon):\n", " viable_actions = game_engine.get_viable_actions(id)\n", " state, rewards = index_actions(q, id)\n", "\n", " if np.random.uniform() < epsilon:\n", " # SA -- a STATE-ACTION pair (X as in function input)\n", " return SA(state, np.random.choice(viable_actions)) if viable_actions.size > 0 else SA(state, 0)\n", " for action in argmin_gen(rewards):\n", " if action in viable_actions:\n", " return SA(state, action)\n", " return SA(state, 0) # death" ] }, { "cell_type": "markdown", "id": "2169ac2b-df83-4f53-8a83-b35a2d1a521a", "metadata": {}, "source": [ "I'll set up epsilon to decay over our 500-step test..." ] }, { "cell_type": "code", "execution_count": 15, "id": "a5932d47-adbe-46de-b91e-582e40faf369", "metadata": {}, "outputs": [], "source": [ "n_steps = 200\n", "epsilon = 1\n", "final_epsilon = 0.001\n", "epsilon_decay = np.exp(np.log(final_epsilon) / (n_steps))" ] }, { "cell_type": "markdown", "id": "ee7b066a-330d-4cdd-bbb2-9d2a7ad2ceb4", "metadata": {}, "source": [ "And watch the snake explore:" ] }, { "cell_type": "code", "execution_count": 16, "id": "6beff583-e32a-4c15-8fcb-f5b5d45ad548", "metadata": {}, "outputs": [], "source": [ "for step in range(n_steps):\n", " _, p1_action = pick_greedy_action(q, p1, epsilon)\n", " game_engine.player_advance([p1_action])\n", " epsilon *= epsilon_decay" ] }, { "cell_type": "markdown", "id": "4ee2c2e6-933d-460e-b2fc-f5bf1f22e381", "metadata": {}, "source": [ "This snake obviously has no prior knowledge of how to earn the most reward, but it still does remarkably well because it is not allowed to die. It behaves as expected, favoring up and right when it is not forced to choose a random action.\n", "\n", "Our q_table only has 32 values as a result of removing the 16 danger states... It would be incredibly easy to manually select reward values to fill our q_table with..." ] }, { "cell_type": "code", "execution_count": 17, "id": "7af51359-872c-4d1b-b178-e27cf86eb3cc", "metadata": {}, "outputs": [], "source": [ "set_q = np.array([[-10., -2., 0., -2.],\n", " [-5., -5., 0., 0.],\n", " [-2., -10., 2., 0.],\n", " [0., -5., -5., 0.],\n", " [0., -2., -10., -2.],\n", " [0., 0., -5., -5.],\n", " [-2., 0., -2., -10.],\n", " [-5., 0., 0., -5.]])" ] }, { "cell_type": "code", "execution_count": 18, "id": "4da4d318-f7e0-412b-8545-8fd346e167b3", "metadata": {}, "outputs": [], "source": [ "epsilon = 0\n", "for step in range(n_steps):\n", " X = pick_greedy_action(set_q, p1, epsilon)\n", " game_engine.player_advance([X.action])\n", " epsilon *= epsilon_decay" ] }, { "cell_type": "markdown", "id": "5c36ab97-2ca0-4468-8d4c-ebd1e4deec23", "metadata": {}, "source": [ "And the snake already plays optimally, no learning required! This implementation might be more similar to a passive learning agent, in the sense I already told the snake what policy I want it to follow.\n", "\n", "Now that I have these methods, I will create functions to allow the snake to learn by its own, and then pair it off against the q-table I just built." ] }, { "cell_type": "markdown", "id": "0b9968af-0ec2-4b92-a19d-50912703dd4a", "metadata": {}, "source": [ "### Q-Learning, with Temporal Difference" ] }, { "cell_type": "markdown", "id": "ce537e44-ac8c-4f09-b89d-a330f13277da", "metadata": {}, "source": [ "A rational agent prioritizes actions that leads to the highest expected reward. The Q-function assigns an expected utility to each state-action pair, usually the expected reward-to-go.\n", "\n", "A popular method of adjusting this state-value function is a version of the temporal difference equation, which adjusts the utility associated with each input to agree with the maximum utility of its successor:\n", "\n", "$Q(s,a) = Q(s,a) + \\alpha[(R(s,a,s') + \\gamma * max _a'Q(s',a') - Q(s,a))]$\n", "\n", "The nature of this method is somewhat recursive, as it updates Q to agree with max(Q'), which in turn is updated to agree with max(Q'')... which is why the algorithm is sometimes referred to as bootstrapping.\n", "\n", "The discount factor $\\gamma$ can be used to weight immediate reward higher than future reward, though will be kept as 1 in my solution, which means we consider all future actions equally. All I need to do is assign an enticing enough reinforcement to goal-attaining actions, and use the temporal difference equation to update all other state transitions.\n", "\n", "In order to implement this equation, I simply need a function that takes the q-table to be updated, the old state-action pair, and then ew state-action pair, and the outcome as returned by the game engine so I can assign a reward.\n", "\n", "When the agent does reach the goal, I will manually set that state and action to the best reward, 0. Remember that the q-table is initialized with zeros, meaning untravelled actions are pre-assigned good rewards. Both this and epsilon will encourage exploration.\n", "\n", "Here is the complete function:" ] }, { "cell_type": "code", "execution_count": 19, "id": "a0d3942a-af6a-41f3-be74-167e3abaae0b", "metadata": {}, "outputs": [], "source": [ "reward = -1\n", "\n", "def update_q(q, old_X, new_X, outcome, lr=0.07):\n", " if outcome == multiplayer.CollisionType.GOAL:\n", " q[new_X.state, new_X.action] = 0\n", " else:\n", " td_error = reward + q[new_X.state, new_X.action] - q[old_X.state, old_X.action]\n", " q[old_X.state, old_X.action] += lr * td_error" ] }, { "cell_type": "markdown", "id": "01b21e01-174e-4fdd-ad70-dcc1e6483fb2", "metadata": {}, "source": [ "Now all that is needed is the training loop. I have high expectations for this agent, so I will only allow it 2000 moves to train itself! Here is where the outputs of pick_greedy_action come in handy, because they can be used as a direct index into Q:" ] }, { "cell_type": "code", "execution_count": 20, "id": "95a2ec72-e30c-4730-a876-21f054d3727f", "metadata": {}, "outputs": [], "source": [ "n_steps = 2000\n", "epsilon = 1\n", "final_epsilon = 0.003\n", "epsilon_decay = np.exp(np.log(final_epsilon) / (n_steps))" ] }, { "cell_type": "code", "execution_count": 21, "id": "a71dc022-5e51-46f8-bfed-37b95243fc5e", "metadata": {}, "outputs": [], "source": [ "p1_old_X = pick_greedy_action(q, p1, epsilon) # state, action\n", "game_engine.player_advance([p1_old_X.action])\n", "\n", "for step in range(n_steps):\n", " p1_new_X = pick_greedy_action(q, p1, epsilon) # state, action\n", " outcome = game_engine.player_advance([p1_new_X.action])\n", "\n", " update_q(q, p1_old_X, p1_new_X, outcome)\n", "\n", " epsilon *= epsilon_decay\n", " p1_old_X = p1_new_X" ] }, { "cell_type": "markdown", "id": "c6cbf429-c790-4bb9-8775-ed067844ab4e", "metadata": {}, "source": [ "Most of the time, results look promising. Here is everything it learned:" ] }, { "cell_type": "code", "execution_count": 22, "id": "ad12d31a-a1ec-45af-89b0-f66ca375b524", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([[-7.67491582, -2.33111962, -0.60015858, -1.37540041],\n", " [-5.27152969, -7.98969718, -3.87453687, -1.24435724],\n", " [-6.31827426, -5.34343005, -5.15269717, -3.14416104],\n", " [-4.03006198, -7.7299521 , -5.56781757, -2.25634664],\n", " [-2.21238723, -3.2719937 , -7.19183517, -1.12966867],\n", " [-4.51471731, -3.0205144 , -7.82635816, -5.67783926],\n", " [-3.65731728, -1.23278448, -1.3241749 , -7.94001044],\n", " [-7.61199203, -2.0832036 , -2.27008676, -6.21290761]])" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "q" ] }, { "cell_type": "markdown", "id": "b2414728-c36c-45d6-8f2d-18e78e482054", "metadata": {}, "source": [ "### Multiplayer Demonstration, Saving Tables" ] }, { "cell_type": "markdown", "id": "24c73f20-2853-4ba7-a24e-22c5a4b8da5e", "metadata": {}, "source": [ "The most entertaining way to test the success of my implementation is pair the agents q and set_q against each other. I will first stop and set up a new game:" ] }, { "cell_type": "code", "execution_count": 23, "id": "a93f0fe5-2bc4-45d7-ba31-2306efaa9806", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Game over!\n", "Game starting with 2 players.\n" ] } ], "source": [ "game_engine.stop_game()\n", "p2 = game_engine.add_player()\n", "game_engine.start_game()" ] }, { "cell_type": "markdown", "id": "47a1adde-d95d-444e-9688-1290764f8cfa", "metadata": {}, "source": [ "Now, I can simply call player advance with both player's actions in order, and the engine will handle the rest. I will define a new game loop, similar to the previous one:" ] }, { "cell_type": "code", "execution_count": 24, "id": "d5b5d089-cb66-47de-b352-36c13fd7dead", "metadata": {}, "outputs": [], "source": [ "epsilon = 0" ] }, { "cell_type": "code", "execution_count": 25, "id": "4de326e6-82dc-48df-8920-d28d0154fbd9", "metadata": {}, "outputs": [], "source": [ "for step in range(n_steps):\n", " # p1 (YELLOW)\n", " p1_X = pick_greedy_action(set_q, p1, epsilon)\n", "\n", " # p2 (RED)\n", " p2_X = pick_greedy_action(q, p2, epsilon) # state, action\n", " \n", " game_engine.player_advance([p1_X.action, p2_X.action])\n", "\n", " epsilon *= epsilon_decay" ] }, { "cell_type": "markdown", "id": "820d7e5f-c3e9-4dae-82ce-3c9188d8a8d8", "metadata": {}, "source": [ "The learned agent usually plays almost as well as the artificially-learned one, which is okay given I hardly spent time optimizing the number of steps and learning rate. I plan to compare both of the agents again my neural-network approach, so the last I will do is save the q_tables to a file." ] }, { "cell_type": "code", "execution_count": 26, "id": "95227c89-e7db-4923-9160-dacfa1cf4af8", "metadata": {}, "outputs": [], "source": [ "np.save('superior_qt.npy', set_q)\n", "np.save('inferior_qt.npy', q)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.5" } }, "nbformat": 4, "nbformat_minor": 5 }