Files
project_game/ghosts/ghosts.py

469 lines
18 KiB
Python

from collections import deque
import time
import heapq
import random
# small randomness factor injected into ghost decision-making
# (keeps behavior mostly deterministic/chasing but adds slight variation)
RANDOMNESS = 0.12
class GhostManager:
"""Tracks ghost instances and per-turn reservations to avoid collisions."""
def __init__(self):
self.instances = []
self.reserved_turn = -1
self.reserved = set()
self.turn = -1
def register(self, g):
self.instances.append(g)
def start_turn(self, turn):
if self.reserved_turn != turn:
self.reserved_turn = turn
self.reserved.clear()
self.turn = turn
def reserve(self, pos):
self.reserved.add(pos)
def is_reserved(self, pos):
return pos in self.reserved
def occupied_positions(self, exclude=None):
return {g.position for g in self.instances if g is not exclude}
# path search helpers
def bfs_find(maze, start, goal_test, on_board, blocked=None):
if blocked is None:
blocked = set()
q = deque([start])
prev = {start: None}
while q:
cur = q.popleft()
if goal_test(cur):
path = []
node = cur
while node is not None:
path.append(node)
node = prev[node]
path.reverse()
return path
x, y = cur
for dx, dy in ((1,0),(-1,0),(0,1),(0,-1)):
nx, ny = x+dx, y+dy
npos = (nx, ny)
if npos in prev:
continue
if not on_board(npos):
continue
if maze[ny][nx] == '#':
continue
if npos in blocked:
continue
prev[npos] = cur
q.append(npos)
return None
def astar_find(maze, start, goal, on_board, blocked=None):
if blocked is None:
blocked = set()
def h(a,b):
return abs(a[0]-b[0]) + abs(a[1]-b[1])
openq = []
heapq.heappush(openq, (h(start, goal), 0, start))
came_from = {start: None}
cost_so_far = {start: 0}
while openq:
_, cost, current = heapq.heappop(openq)
if current == goal:
path = []
node = current
while node is not None:
path.append(node)
node = came_from[node]
path.reverse()
return path
x, y = current
for dx, dy in ((1,0),(-1,0),(0,1),(0,-1)):
nx, ny = x+dx, y+dy
npos = (nx, ny)
if not on_board(npos):
continue
if maze[ny][nx] == '#':
continue
if npos in blocked and npos != goal:
continue
new_cost = cost + 1
if npos not in cost_so_far or new_cost < cost_so_far[npos]:
cost_so_far[npos] = new_cost
priority = new_cost + h(npos, goal)
heapq.heappush(openq, (priority, new_cost, npos))
came_from[npos] = current
return None
def _trigger_game_over(game):
"""End the game in a way that's friendly to both headless and UI runs.
HeadlessGame defines an `ended` attribute and an `end()` method we can call.
In graphical runs we prefer to set a `game_over` flag so the main loop
/ input handling stays active and the player can press a key to quit.
"""
# prefer headless behavior: if the object *looks* like HeadlessGame
# (it exposes an `ended` attribute) then call end() immediately.
if getattr(game, 'ended', None) is not None and callable(getattr(game, 'end', None)):
try:
game.end()
return
except Exception:
# fall back to flagging if end() misbehaves
pass
# otherwise, set a flag the UI can react to and avoid calling end()
setattr(game, 'game_over', True)
# schedule a background timer to call end() after a short delay
# so the UI has a small 'caught' moment before the final screen.
if getattr(game, '_game_over_timer', None) is None:
try:
import threading
def _delayed_end():
try:
game.end()
except Exception:
setattr(game, 'ended', True)
t = threading.Timer(1.0, _delayed_end)
t.daemon = True
t.start()
setattr(game, '_game_over_timer', t)
except Exception:
# as a fallback, set a scheduled timestamp watched by GameOverWatcher
if getattr(game, '_game_over_scheduled', None) is None:
setattr(game, '_game_over_scheduled', time.time() + 1.0)
class Ghost:
# use a backing attribute and property so we can log unexpected
# changes to the display flag (this helps trace flicker/invisibility)
# keep the public API the same: `ghost.display` reads/writes still work.
def __init__(self, name, char, start_pos, release_turn, maze, dirs, width, height, move_speed, manager, chase_memory=30, detection_radius=6):
self.name = name
self.character = char
# rendering order: higher z renders on top of lower z
# ensure ghosts are drawn above pellets/walls so they don't appear to "go invisible"
# rendering order: make ghosts above pellets/walls and PacMan to avoid occlusion
# set to 4 so ghosts render on top of PacMan (PacMan.z == 3)
self.z = 4
# ensure instance-level display flag so renderers that check the
# instance attribute won't accidentally inherit a wrong value
# from other code paths
# use a private backing field; set it directly to avoid logging during construction
self._display = True
self.position = start_pos
self.release_turn = release_turn
# default to released so ghosts start roaming immediately
# (this makes them active from turn 0 instead of waiting for
# a delayed release). If you want delayed release again later,
# set release_turn to a positive value and change this flag.
self.released = True
self.current_dir = random.choice(list(dirs.keys()))
self.last_move = -999
self.chasing = False
self.chase_until = -1
# set prev_position to the starting position so renderers that
# interpolate between previous/current positions can do so
# even before the first move (helps smooth motion)
self.prev_position = start_pos
# once a ghost leaves the G-spawn area it should never re-enter
self.left_spawn = False
self.last_seen_pos = None
self.last_seen_turn = -999
self.ghost_type = name.split('_')[-1]
# references
self.MAZE = maze
self.DIRS = dirs
self.WIDTH = width
self.HEIGHT = height
self.MOVE_SPEED = move_speed
self.manager = manager
self.chase_memory = chase_memory
self.detection_radius = detection_radius
manager.register(self)
@property
def display(self):
return getattr(self, '_display', True)
@display.setter
def display(self, val):
old = getattr(self, '_display', True)
if old != val:
# try to include turn info if manager has it; fall back to None
mgr_turn = getattr(self.manager, 'turn', None) if getattr(self, 'manager', None) is not None else None
print(f"[Ghost][DEBUG] {self.name} display changed {old} -> {val} at manager.turn={mgr_turn}")
self._display = val
def on_board(self, pos):
x,y = pos
return 0 <= x < self.WIDTH and 0 <= y < self.HEIGHT
def can_walk(self, pos):
# ensure position is on board before indexing the maze
if not self.on_board(pos):
return False
x, y = pos
# prevent re-entering the spawn area (G) after we've left it
if self.MAZE[y][x] == 'G' and self.left_spawn:
return False
return self.MAZE[y][x] != '#'
def is_spawn(self):
x,y = self.position
return self.MAZE[y][x] == 'G'
def line_of_sight(self, pac_pos):
gx, gy = self.position
px, py = pac_pos
if gx == px:
step = 1 if py > gy else -1
for y in range(gy + step, py, step):
if self.MAZE[y][gx] == '#':
return False
return True
if gy == py:
step = 1 if px > gx else -1
for x in range(gx + step, px, step):
if self.MAZE[gy][x] == '#':
return False
return True
return False
def neighbors(self, pos):
x,y = pos
for dx, dy in self.DIRS.values():
yield (x+dx, y+dy)
def next_pos(self, dir_key):
"""Return next position from current position following direction key."""
if dir_key not in self.DIRS:
return None
dx, dy = self.DIRS[dir_key]
return (self.position[0] + dx, self.position[1] + dy)
def play_turn(self, game):
# start turn bookkeeping
self.manager.start_turn(game.turn_number)
# if the UI/game has already flagged game_over, don't take actions
if getattr(game, 'game_over', False):
return
if game.turn_number - self.last_move < self.MOVE_SPEED:
return
if not self.released:
if game.turn_number >= self.release_turn:
self.released = True
else:
return
pac = game.get_agent_by_name('pacman')
if not pac:
return
# occupied and blocked positions
occupied = self.manager.occupied_positions(exclude=self)
# avoid swapping: treat other ghosts' previous positions as blocked for this turn
swap_block = {g.prev_position for g in self.manager.instances if g is not self and getattr(g, 'prev_position', None) is not None}
blocked = set(occupied) | set(self.manager.reserved) | set(swap_block)
# inside spawn: try to exit quickly
if self.is_spawn():
path = bfs_find(self.MAZE, self.position, lambda p: self.MAZE[p[1]][p[0]] != 'G', self.on_board, blocked)
if path and len(path) > 1:
move = path[1]
if move not in blocked:
self.prev_position = self.position
self.position = move
# once we've moved out of a G tile, lock spawn re-entry
if not self.is_spawn():
self.left_spawn = True
self.last_move = game.turn_number
self.manager.reserve(self.position)
if pac.position == self.position:
# end the game; UI will display a generic 'Game Over'
_trigger_game_over(game)
return
# sensing
los = self.line_of_sight(pac.position)
if los:
self.last_seen_pos = pac.position
self.last_seen_turn = game.turn_number
self.chasing = True
self.chase_until = game.turn_number + self.chase_memory
if self.chasing and game.turn_number > self.chase_until:
self.chasing = False
seen_recently = (game.turn_number - self.last_seen_turn) <= self.chase_memory
within_radius = (abs(self.position[0]-pac.position[0]) + abs(self.position[1]-pac.position[1])) <= self.detection_radius
target = None
if self.chasing or seen_recently or within_radius:
if self.ghost_type == 'red':
target = pac.position
elif self.ghost_type == 'pink':
if pac.current_dir and pac.current_dir in self.DIRS:
dx, dy = self.DIRS[pac.current_dir]
tx = pac.position[0] + dx*2
ty = pac.position[1] + dy*2
tx = max(0, min(self.WIDTH-1, tx))
ty = max(0, min(self.HEIGHT-1, ty))
target = (tx, ty)
else:
target = pac.position
elif self.ghost_type == 'blue':
red = next((g for g in self.manager.instances if g.ghost_type == 'red'), None)
if red and pac.current_dir and pac.current_dir in self.DIRS:
dx, dy = self.DIRS[pac.current_dir]
ahead = (pac.position[0] + dx*2, pac.position[1] + dy*2)
tx = ahead[0]*2 - red.position[0]
ty = ahead[1]*2 - red.position[1]
tx = max(0, min(self.WIDTH-1, tx))
ty = max(0, min(self.HEIGHT-1, ty))
target = (tx, ty)
else:
# unpredictable fallback
if random.random() < 0.5:
# pick random pellet or nearby tile
tx = pac.position[0] + random.randint(-3,3)
ty = pac.position[1] + random.randint(-3,3)
tx = max(0, min(self.WIDTH-1, tx))
ty = max(0, min(self.HEIGHT-1, ty))
target = (tx, ty)
else:
target = pac.position
# add a tiny bit of randomness — occasionally ignore the target
# so ghosts feel less deterministic
if target and random.random() < RANDOMNESS:
target = None
if target:
# allow moving into pac position
blocked_for_path = set(blocked)
if pac.position in blocked_for_path:
blocked_for_path.remove(pac.position)
path = astar_find(self.MAZE, self.position, target, self.on_board, blocked_for_path)
if path and len(path) > 1:
next_pos = path[1]
# avoid reversing
if self.prev_position and next_pos == self.prev_position:
# try alternative neighbors from the path (prefer non-reversing)
alts = [p for p in path[2:6] if p != self.prev_position and p not in blocked]
if alts:
next_pos = random.choice(alts)
if next_pos not in blocked:
self.prev_position = self.position
self.position = next_pos
# lock spawn re-entry if we left
if not self.is_spawn():
self.left_spawn = True
self.last_move = game.turn_number
self.manager.reserve(self.position)
if pac.position == self.position:
_trigger_game_over(game)
return
# fallback movement: try forward, else other options (avoid reverse)
forward = None
if self.current_dir in self.DIRS:
dx, dy = self.DIRS[self.current_dir]
forward = (self.position[0]+dx, self.position[1]+dy)
if forward and self.can_walk(forward) and forward not in blocked:
if self.prev_position and forward == self.prev_position:
# pick alternative
# build neighbor options from direction keys (safe next_pos helper)
options = [p for p in (self.next_pos(d) for d in self.DIRS) if p is not None and self.can_walk(p) and p not in blocked and p != self.prev_position]
if options:
forward = random.choice(options)
self.prev_position = self.position
self.position = forward
# lock spawn re-entry if we left
if not self.is_spawn():
self.left_spawn = True
self.last_move = game.turn_number
self.manager.reserve(self.position)
if pac.position == self.position:
_trigger_game_over(game)
return
options = [p for d in self.DIRS for p in [ (self.position[0]+self.DIRS[d][0], self.position[1]+self.DIRS[d][1]) ] if self.can_walk(p) and p not in blocked and p != self.prev_position]
if options:
chosen = random.choice(options)
self.prev_position = self.position
self.position = chosen
if not self.is_spawn():
self.left_spawn = True
self.last_move = game.turn_number
self.manager.reserve(self.position)
if pac.position == self.position:
_trigger_game_over(game)
return
# If we reached here, the ghost had no non-blocked options. To keep
# ghosts constantly roaming the map, try progressively weaker
# fallbacks instead of staying still:
# 1) try any walkable neighbor ignoring reservations (but avoid immediate reverse)
options_relaxed = [p for d in self.DIRS for p in [ (self.position[0]+self.DIRS[d][0], self.position[1]+self.DIRS[d][1]) ] if self.can_walk(p) and p != self.prev_position]
if options_relaxed:
chosen = random.choice(options_relaxed)
self.prev_position = self.position
self.position = chosen
if not self.is_spawn():
self.left_spawn = True
self.last_move = game.turn_number
# still reserve our new position so others have a hint
self.manager.reserve(self.position)
if pac.position == self.position:
_trigger_game_over(game)
return
# 2) as a last resort, allow reversing back to previous position so the ghost doesn't stall
if self.prev_position and self.can_walk(self.prev_position):
rev = self.prev_position
self.prev_position = self.position
self.position = rev
if not self.is_spawn():
self.left_spawn = True
self.last_move = game.turn_number
self.manager.reserve(self.position)
if pac.position == self.position:
_trigger_game_over(game)
return
def create_ghosts(spawn_points, maze, dirs, width, height, ghost_release_delay, move_speed):
manager = GhostManager()
ghosts = []
# choose three distinct spawn points if possible
unique = list(dict.fromkeys(spawn_points))
if len(unique) < 3:
while len(unique) < 3:
unique.append(unique[-1])
s0, s1, s2 = unique[0], unique[len(unique)//2], unique[-1]
ghosts.append(Ghost('ghost_red', 'R', s0, 0, maze, dirs, width, height, move_speed, manager))
ghosts.append(Ghost('ghost_blue', 'B', s1, ghost_release_delay, maze, dirs, width, height, move_speed, manager))
ghosts.append(Ghost('ghost_pink', 'P', s2, ghost_release_delay*2, maze, dirs, width, height, move_speed, manager))
return ghosts