In [1]:
import numpy as np
import matplotlib.pyplot as plt
from collections import namedtuple
import queue
from IPython.core.debugger import Pdb
from IPython.display import display, clear_output

from GameEngine import multiplayer
from QTable import qtsnake

Point = namedtuple('Point', 'x, y')

pygame 2.5.1 (SDL 2.28.2, Python 3.11.5)
Hello from the pygame community. https://www.pygame.org/contribute.html


### The 'get_viable_actions' function

#### Hamiltonian Cycles

In [2]:
# defines game window size and block size, in pixels
WINDOW_WIDTH = 300
WINDOW_HEIGHT = 300
GAME_UNITS = 30
S_SIZE = 3
S_START = (0 + ((S_SIZE-1) * GAME_UNITS), 0 * GAME_UNITS) #FIXME

In [3]:
visited = np.zeros((WINDOW_WIDTH // GAME_UNITS,
 WINDOW_HEIGHT // GAME_UNITS))
visited

array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
 [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
 [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
 [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
 [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
 [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
 [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
 [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
 [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
 [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]])

In [4]:
COVERAGE_THRESHOLD = .8 * visited.size
COVERAGE_THRESHOLD

80.0

In [5]:
game_engine = multiplayer.Playfield(window_width=WINDOW_WIDTH,
 window_height=WINDOW_HEIGHT,
 units=GAME_UNITS,
 g_speed=35,
 s_size=S_SIZE)

In [6]:
p1 = game_engine.add_player(S_START)
game_engine.start_game()

Game starting with 1 players.


In [7]:
game_engine.player_advance([1])

[]

In [8]:
def get_visited(unreachable, width=WINDOW_WIDTH,
 height=WINDOW_HEIGHT, units=GAME_UNITS):
 '''
 given a numpy array corresponding to grid
 and a list of tails,
 marks unreachable places as visited
 '''
 visited = np.zeros((height//units, width//units))
 for node in unreachable:
 visited[node.y//units, node.x//units] = 1
 return visited

In [9]:
def is_instant_death(visited, expansion):
 if (min(expansion) < 0 or
 expansion.y >= visited.shape[0] or
 expansion.x >= visited.shape[1] or
 visited[expansion.y, expansion.x] != 0):
 return True
 return False

In [10]:
def generate_successors(visited, frontier, units=GAME_UNITS):
 '''
 a generator function used to generate
 every new reachable state

 actions corresponding to displacement
 0 1 2 3
 UP, RIGHT, DOWN, LEFT
 '''
 actions = [Point(0, -units), Point(units, 0),
 Point(0, units), Point(-units, 0)]
 all_actions = [0, 1, 2, 3]
 expansion = None

 for action in all_actions:
 ''' calculate new position '''
 expansion = Point((frontier.x + actions[action].x) // units,
 (frontier.y + actions[action].y) // units)
 if not is_instant_death(visited, expansion):
 yield expansion, action

In [11]:
visited = get_visited([Point(0,0)], 2, 2, 1)
visited

array([[1., 0.],
 [0., 0.]])

In [12]:
for successor in generate_successors(visited, Point(1,0), 1):
 print(successor)

(Point(x=1, y=1), 2)


In [13]:
ACTION_NAMES = ["UP", "RIGHT", "DOWN", "LEFT"]

In [14]:
def breadth_first_search(visited, frontier, threshold=COVERAGE_THRESHOLD, units=1, verbose=False):
 '''
 A general search algorithm which expands nodes determined by the frontier.
 '''

 while not frontier.empty():
 curr_node = frontier.get()
 visited[curr_node.y, curr_node.x] = 1
 if np.count_nonzero(visited) > threshold:
 return True
 for child, action in generate_successors(visited, curr_node, units):
 if visited[child.y, child.x] == 0:
 visited[child.y, child.x] = 1
 if verbose:
 print(f'From frontier {curr_node}, found child {child} from action {ACTION_NAMES[action]}.')
 print(visited)
 frontier.put(child)

 if verbose:
 print(f'Could not expand enough children!')
 return False						# return failure

In [15]:
visited = get_visited([Point(0,0), Point(1,0)], 4, 4, 1)
visited

array([[1., 1., 0., 0.],
 [0., 0., 0., 0.],
 [0., 0., 0., 0.],
 [0., 0., 0., 0.]])

In [16]:
frontier = queue.Queue()
frontier.put(Point(2,0))
breadth_first_search(visited, frontier, threshold=6, units=1, verbose=True)

From frontier Point(x=2, y=0), found child Point(x=3, y=0) from action RIGHT.
[[1. 1. 1. 1.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]
From frontier Point(x=2, y=0), found child Point(x=2, y=1) from action DOWN.
[[1. 1. 1. 1.]
 [0. 0. 1. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]
From frontier Point(x=3, y=0), found child Point(x=3, y=1) from action DOWN.
[[1. 1. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]
From frontier Point(x=2, y=1), found child Point(x=2, y=2) from action DOWN.
[[1. 1. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 1. 0.]
 [0. 0. 0. 0.]]
From frontier Point(x=2, y=1), found child Point(x=1, y=1) from action LEFT.
[[1. 1. 1. 1.]
 [0. 1. 1. 1.]
 [0. 0. 1. 0.]
 [0. 0. 0. 0.]]


True

In [17]:
visited = get_visited([Point(2,0), Point(2,1), Point(2,2), Point(2,3)], 4, 4, 1)
visited

array([[0., 0., 1., 0.],
 [0., 0., 1., 0.],
 [0., 0., 1., 0.],
 [0., 0., 1., 0.]])

In [18]:
head = Point(2,3)
for successor, action in generate_successors(visited, head, units=1):
 frontier = queue.Queue()
 frontier.put(successor)
 if breadth_first_search(visited, frontier, threshold=12, units=1, verbose=True):
 print(f'action {ACTION_NAMES[action]}: good')
 else:
 print(f'action {ACTION_NAMES[action]}: bad')

From frontier Point(x=3, y=3), found child Point(x=3, y=2) from action UP.
[[0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 1.]
 [0. 0. 1. 1.]]
From frontier Point(x=3, y=2), found child Point(x=3, y=1) from action UP.
[[0. 0. 1. 0.]
 [0. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 1. 1.]]
From frontier Point(x=3, y=1), found child Point(x=3, y=0) from action UP.
[[0. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 1. 1.]]
Could not expand enough children!
action RIGHT: bad
From frontier Point(x=1, y=3), found child Point(x=1, y=2) from action UP.
[[0. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 1. 1. 1.]
 [0. 1. 1. 1.]]
From frontier Point(x=1, y=3), found child Point(x=0, y=3) from action LEFT.
[[0. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 1. 1. 1.]
 [1. 1. 1. 1.]]
From frontier Point(x=1, y=2), found child Point(x=1, y=1) from action UP.
[[0. 0. 1. 1.]
 [0. 1. 1. 1.]
 [0. 1. 1. 1.]
 [1. 1. 1. 1.]]
From frontier Point(x=1, y=2), found child Point(x=0, y=2) from action LEFT.
[[0. 0. 1. 1.]
 [0. 1. 1. 1.]
 [1. 1. 1. 1.]
 [1. 1. 

In [19]:
visited = get_visited([Point(2,0), Point(2,1), Point(2,2), Point(3,0), Point(0,0)], 4, 4, 1)
visited

array([[1., 0., 1., 1.],
 [0., 0., 1., 0.],
 [0., 0., 1., 0.],
 [0., 0., 0., 0.]])

In [20]:
head = Point(3,0)
for successor, action in generate_successors(visited, head, units=1):
 frontier = queue.Queue()
 frontier.put(successor)
 if breadth_first_search(visited, frontier, threshold=15, units=1, verbose=True):
 print(f'action {ACTION_NAMES[action]}: good')
 else:
 print(f'action {ACTION_NAMES[action]}: bad')

From frontier Point(x=3, y=1), found child Point(x=3, y=2) from action DOWN.
[[1. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 0. 0.]]
From frontier Point(x=3, y=2), found child Point(x=3, y=3) from action DOWN.
[[1. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 0. 1.]]
From frontier Point(x=3, y=3), found child Point(x=2, y=3) from action LEFT.
[[1. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 1. 1.]]
From frontier Point(x=2, y=3), found child Point(x=1, y=3) from action LEFT.
[[1. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 1. 1. 1.]]
From frontier Point(x=1, y=3), found child Point(x=1, y=2) from action UP.
[[1. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 1. 1. 1.]
 [0. 1. 1. 1.]]
From frontier Point(x=1, y=3), found child Point(x=0, y=3) from action LEFT.
[[1. 0. 1. 1.]
 [0. 0. 1. 1.]
 [0. 1. 1. 1.]
 [1. 1. 1. 1.]]
From frontier Point(x=1, y=2), found child Point(x=1, y=1) from action UP.
[[1. 0. 1. 1.]
 [0. 1. 1. 1.]
 [0. 1. 1. 1.]
 [1. 1. 1. 1.]]
From frontier Point(x=1, y=2), found c

In [21]:
def get_viable_actions(id):
 heads, tails, _ = game_engine.get_heads_tails_and_goal()
 visited = get_visited(heads + tails)

 viable_actions = []
 valid_actions = []

 for successor, action in generate_successors(visited, heads[id]):
 valid_actions.append(action)
 frontier = queue.Queue()
 frontier.put(successor)
 if breadth_first_search(visited.copy(), frontier):
 viable_actions.append(action)
 if np.count_nonzero(visited) >= COVERAGE_THRESHOLD:
 print(f'Coverage reached!')
 print(f'No more smart actions left!')
 if len(viable_actions) == 0:
 return valid_actions
 return viable_actions

In [22]:
superior_table = qtsnake.load_q('superior_qt.npy')

In [23]:
class QSnakeImproved(qtsnake.QSnake):

 def __init__(self, game_engine):
 super().__init__(game_engine)

 ''' override '''
 def pick_greedy_action(self, q, id, epsilon=0):
 viable_actions = get_viable_actions(id)
 state, rewards = self.index_actions(q, id)

 if np.random.uniform() < epsilon:
 return (state, np.random.choice(viable_actions)) if viable_actions.size > 0 else (state, 0)
 for action in self.argmin_gen(rewards):
 if action in viable_actions:
 return (state, action)
 return (state, 0) # death

In [24]:
q_table = QSnakeImproved(game_engine)

In [25]:
for step in range(2000):
 _, p1_action = q_table.pick_greedy_action(superior_table, p1)
 game_engine.player_advance([p1_action])