In [1]:
import numpy as np
from collections import namedtuple
from IPython.core.debugger import Pdb

from GameEngine import multiplayer
Point = namedtuple('Point', 'x, y')

pygame 2.5.1 (SDL 2.28.2, Python 3.11.5)
Hello from the pygame community. https://www.pygame.org/contribute.html


### New Game Implementation

I have an improved game implementation which allows for multiplayer snake games, as well as simplified training. This notebook will go over both of these, including an implementation of q-table learning, as well as a match between a manually filled out q-table and a learned one.

Let's start by initializing the engine object:

In [2]:
# defines game window size and block size, in pixels
WINDOW_WIDTH = 640
WINDOW_HEIGHT = 480
GAME_UNITS = 80

In [3]:
game_engine = multiplayer.Playfield(window_width=WINDOW_WIDTH,
                                    window_height=WINDOW_HEIGHT,
                                    units=GAME_UNITS,
                                    g_speed=35,
                                    s_size=1)

Here is a run-down of the current functions available to programs utilizing the game engine:

**add_player**: Returns the player's number to the callee (between 0-3, for a total of 4 players). This number can be used with other functions to index that player's state.

**get_heads_tails_and_goal**: Returns an array of player heads (in order of player number) the locations of all snake tails, as well as the goal location. Each is stored in an array of named tuples.

**get_viable_actions**: Given a player's id, returns a list of integers corresponding to actions which will not immediately result in the snake's death.
0 = UP
1 = RIGHT
2 = DOWN
3 = LEFT

**start_game**: Initializes goal, player, score, and playfield objects. Disables the ability to add new players. Enables use of player_advance function.

**stop_game**: Sets the game_state to false, allowing new players to be added.

**cleanup**: Quits the pygame window.

**player_advance**: Given an array corresponding to each player's action (integers), returns a list of collision results, and updates the internal game state.

**toggle_draw**: Turns off/on the game UI for faster training.

Let's do a test of these functions:

In [4]:
p1 = game_engine.add_player()
game_engine.start_game()
p1

Game starting with 1 players.


0

In [5]:
viable_actions = game_engine.get_viable_actions(p1)

In [6]:
game_engine.player_advance([np.random.choice(viable_actions)])

[<CollisionType.GOAL: 1>]

If you looked at the UI for this last statement, you should have seen that the game moved the snake (yellow) in a random direction away from immediate death.

### State-sensing methods, creating and reading a q-table
Now, we can start redesigning some functions used to allow the snake to play intelligently. We'll use a multi-dimensional numpy array to store the rewards corresponding to each state and action. This is called a q-function, or a q-table in this case.

How many states do I need? Seeing how the new **get_viable_actions** method already prevents the snake from choosing life-ending moves, the snake is no longer tasked with learning or memorizing it.

The snake doesneed to be able to interpret progress towards the goal, so I will reinclude one state-sensing to sense the goal direction. I need only 8 states (with entries for each four actions) to represent our game now.

In [7]:
goal_relations = 8
actions = 4
q = np.zeros((goal_relations,
              actions))
q

array([[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]])

In [8]:
def sense_goal(head, goal):
    '''
    maps head and goal location onto an
    integer corresponding to approx location
    '''
    diffs = Point(goal.x - head.x, goal.y - head.y)

    if diffs.x == 0 and diffs.y <  0:
        return 0
    if diffs.x >  0 and diffs.y <  0:
        return 1
    if diffs.x >  0 and diffs.y == 0:
        return 2
    if diffs.x >  0 and diffs.y >  0:
        return 3
    if diffs.x == 0 and diffs.y >  0:
        return 4
    if diffs.x <  0 and diffs.y >  0:
        return 5
    if diffs.x <  0 and diffs.y == 0:
        return 6
    return 7

I will use the getter provided by my engine, which queries various statistics about all agents in the game:
1. An array of head positions
2. An array of all tail locations
3. The goal location

In [9]:
game_engine.get_heads_tails_and_goal()

([Point(x=480, y=0)],
 [Point(x=480, y=0), Point(x=560, y=0)],
 Point(x=0, y=400))

Now to use sense_goal as an index into our q table:

In [10]:
def index_actions(q, id):
    '''
    given q, player_id, an array of heads,
    and the goal position,
    indexes into the corresponding expected
    reward of each action
    '''
    heads, tails, goal = game_engine.get_heads_tails_and_goal()
    state = sense_goal(heads[id], goal)
    return state, q[state, :]

Returning state here simplifies some logic later when I train the agent. It will be passed along to my next function, but it can be ignored for now.

In [11]:
_, rewards = index_actions(q, p1)
rewards

array([0., 0., 0., 0.])

In our learning agent, these actions will obviously be associated with different expected rewards. But it is not enough to take the best reward, because the positions of the hazards have not been accounted for. I chose to implement a replacement argmin/max function to select actions from this table, which generates new actions in order from highest expected reward to lowest expected reward.

In [12]:
def argmin_gen(rewards):
    rewards = rewards.copy()
    for i in range(rewards.size):
        best_action = np.argmin(rewards)
        rewards[best_action] = float("inf")
        yield best_action

In [13]:
for action in argmin_gen(rewards):
    print(action)

0
1
2
3


How will we use this? If the action generated is not a viable action, we will take the next best action.

What if no actions are viable? Then the agent has boxed itself in, and it doesn't matter what action we choose.

Previously, I reset the snake if it got stuck in a learning-loop. I will instead use epsilon, as it gives me a bit more control over training. Here is my greedy-action selector function, combining the work of all the previous code:

In [14]:
def pick_greedy_action(q, id, epsilon):
    viable_actions = game_engine.get_viable_actions(id)
    state, rewards = index_actions(q, id)

    if np.random.uniform() < epsilon:
        return (state, np.random.choice(viable_actions)) if viable_actions.size > 0 else (state, 0)
    for action in argmin_gen(rewards):
        if action in viable_actions:
            return (state, action)
    return (state, 0) # death

We'll set up epsilon to decay over our 500-step test...

In [15]:
n_steps = 200
epsilon = 1
final_epsilon = 0.001
epsilon_decay =  np.exp(np.log(final_epsilon) / (n_steps))

And watch the snake explore:

In [16]:
for step in range(n_steps):
    _, p1_action = pick_greedy_action(q, p1, epsilon)
    game_engine.player_advance([p1_action])
    epsilon *= epsilon_decay

This snake obviously has no prior knowledge of how to earn the most reward, but it still does remarkably well because it is not allowed to die. It behaves as expected, favoring up and right when it is not forced to choose a random action.

Our q_table only has 32 values as a result of removing the 16 danger states... It would be incredibly easy to manually select reward values to fill our q_table with...

In [17]:
set_q = np.array([[-10., -2., 0., -2.],
                 [-5., -5., 0., 0.],
                 [-2., -10., 2., 0.],
                 [0., -5., -5., 0.],
                 [0., -2., -10., -2.],
                 [0., 0., -5., -5.],
                 [-2., 0., -2., -10.],
                 [-5., 0., 0., -5.]])

In [18]:
epsilon = 0
for step in range(n_steps):
    _, p1_action = pick_greedy_action(set_q, p1, epsilon)
    game_engine.player_advance([p1_action])
    epsilon *= epsilon_decay

And the snake already plays optimally, no learning required.

Now that we have these methods, I will create functions to allow the snake to learn by its own, and then pair it off against the q-table I just built.

### Learning and Temporal Difference

I will be using the temporal difference equation as the key learning element of my reinforcement function. In theory, it allows me adjust the expected reward of a state to agree with its observed successor. In practice, it will allow out agent to take actions that previously led it closer to the goal.

In order to use this equation, I simply need to create a function that takes the current state/action/outcome, and the previous state/action, as this will be updated in cases where the agent did not reach the goal.

When the agent does reach the goal, I will manually set that state and action to the best reward, 0. Remember that the q-table is initialized with zeros, meaning untravelled actions are pre-assigned good rewards. Both this and epsilon will encourage exploration.

Here is the complete function:

In [19]:
def update_q(q, old_state_action, new_state_action, outcome, lr=0.05):
    if outcome == multiplayer.CollisionType.GOAL:
        q[new_state_action[0], new_state_action[1]] = 0
    else:
        td_error = -1 + q[new_state_action[0], new_state_action[1]] - q[old_state_action[0], old_state_action[1]]
        q[old_state_action[0], old_state_action[1]] += lr * td_error

Now all that is needed is the training loop. I have high expectations for this agent, so I will only allow it 1500 moves to train itself! Here is where the outputs of pick_greedy_action come in handy:

In [20]:
n_steps = 1500
epsilon = 1
final_epsilon = 0.001
epsilon_decay =  np.exp(np.log(final_epsilon) / (n_steps))

In [21]:
p1_old_s_a = pick_greedy_action(q, p1, epsilon) # state, action
game_engine.player_advance([p1_old_s_a[1]])

for step in range(n_steps):
    p1_new_s_a = pick_greedy_action(q, p1, epsilon) # state, action
    outcome = game_engine.player_advance([p1_new_s_a[1]])

    update_q(q, p1_old_s_a, p1_new_s_a, outcome)

    epsilon *= epsilon_decay
    p1_old_s_a = p1_new_s_a

The results look promising. Here is everything it learned:

In [22]:
q

array([[-1.85942515, -0.28461569, -0.1475    , -2.25187465],
       [-6.22032336, -2.97794369, -0.54294824, -0.73044876],
       [-1.36441677, -5.89002527, -0.37131058, -0.44722626],
       [-4.05086759, -5.41891136, -2.6667349 , -1.52778296],
       [-0.74803435, -1.03051018, -5.33579039, -1.17403081],
       [-1.14848949, -1.11816252, -5.95403265, -2.19930379],
       [-0.91983985, -0.43064397, -0.97973069, -6.54929252],
       [-5.1999968 , -0.75777597, -0.38182729, -2.87468477]])

### Multiplayer Demonstration, Saving Tables

The most entertaining way to test the success of my implementation is pair the agents q and set_q against each other. I will first stop and set up a new game:

In [23]:
game_engine.stop_game()
p2 = game_engine.add_player()
game_engine.start_game()

Game over!
Game starting with 2 players.


Now, I can simply call player advance with both player's actions in order, and the engine will handle the rest. I will define a new game loop, similar to the previous one:

In [24]:
epsilon = 0

In [25]:
for step in range(n_steps):
    # p1
    _, p1_action = pick_greedy_action(set_q, p1, epsilon)

    # p2
    p2_new_s_a = pick_greedy_action(q, p2, epsilon) # state, action
    
    game_engine.player_advance([p1_action, p2_new_s_a[1]])

    epsilon *= epsilon_decay
    p2_old_s_a = p2_new_s_a

The learned agent normally plays significantly worse than the artificially-learned one, which is okay given I hardly spent time optimizing the number of steps and learning rate. I plan to compare both of the agents again my neural-network approach, so the last this I will do is save the q_tables to a file.

In [26]:
np.save('superior_qt.npy', set_q)
np.save('inferior_qt.npy', q)