generated from mwc/project_game
469 lines
18 KiB
Python
469 lines
18 KiB
Python
from collections import deque
|
|
import time
|
|
import heapq
|
|
import random
|
|
|
|
# small randomness factor injected into ghost decision-making
|
|
# (keeps behavior mostly deterministic/chasing but adds slight variation)
|
|
RANDOMNESS = 0.12
|
|
|
|
class GhostManager:
|
|
"""Tracks ghost instances and per-turn reservations to avoid collisions."""
|
|
def __init__(self):
|
|
self.instances = []
|
|
self.reserved_turn = -1
|
|
self.reserved = set()
|
|
self.turn = -1
|
|
|
|
def register(self, g):
|
|
self.instances.append(g)
|
|
|
|
def start_turn(self, turn):
|
|
if self.reserved_turn != turn:
|
|
self.reserved_turn = turn
|
|
self.reserved.clear()
|
|
self.turn = turn
|
|
|
|
def reserve(self, pos):
|
|
self.reserved.add(pos)
|
|
|
|
def is_reserved(self, pos):
|
|
return pos in self.reserved
|
|
|
|
def occupied_positions(self, exclude=None):
|
|
return {g.position for g in self.instances if g is not exclude}
|
|
|
|
|
|
# path search helpers
|
|
|
|
def bfs_find(maze, start, goal_test, on_board, blocked=None):
|
|
if blocked is None:
|
|
blocked = set()
|
|
q = deque([start])
|
|
prev = {start: None}
|
|
while q:
|
|
cur = q.popleft()
|
|
if goal_test(cur):
|
|
path = []
|
|
node = cur
|
|
while node is not None:
|
|
path.append(node)
|
|
node = prev[node]
|
|
path.reverse()
|
|
return path
|
|
x, y = cur
|
|
for dx, dy in ((1,0),(-1,0),(0,1),(0,-1)):
|
|
nx, ny = x+dx, y+dy
|
|
npos = (nx, ny)
|
|
if npos in prev:
|
|
continue
|
|
if not on_board(npos):
|
|
continue
|
|
if maze[ny][nx] == '#':
|
|
continue
|
|
if npos in blocked:
|
|
continue
|
|
prev[npos] = cur
|
|
q.append(npos)
|
|
return None
|
|
|
|
|
|
def astar_find(maze, start, goal, on_board, blocked=None):
|
|
if blocked is None:
|
|
blocked = set()
|
|
def h(a,b):
|
|
return abs(a[0]-b[0]) + abs(a[1]-b[1])
|
|
openq = []
|
|
heapq.heappush(openq, (h(start, goal), 0, start))
|
|
came_from = {start: None}
|
|
cost_so_far = {start: 0}
|
|
while openq:
|
|
_, cost, current = heapq.heappop(openq)
|
|
if current == goal:
|
|
path = []
|
|
node = current
|
|
while node is not None:
|
|
path.append(node)
|
|
node = came_from[node]
|
|
path.reverse()
|
|
return path
|
|
x, y = current
|
|
for dx, dy in ((1,0),(-1,0),(0,1),(0,-1)):
|
|
nx, ny = x+dx, y+dy
|
|
npos = (nx, ny)
|
|
if not on_board(npos):
|
|
continue
|
|
if maze[ny][nx] == '#':
|
|
continue
|
|
if npos in blocked and npos != goal:
|
|
continue
|
|
new_cost = cost + 1
|
|
if npos not in cost_so_far or new_cost < cost_so_far[npos]:
|
|
cost_so_far[npos] = new_cost
|
|
priority = new_cost + h(npos, goal)
|
|
heapq.heappush(openq, (priority, new_cost, npos))
|
|
came_from[npos] = current
|
|
return None
|
|
|
|
|
|
def _trigger_game_over(game):
|
|
"""End the game in a way that's friendly to both headless and UI runs.
|
|
HeadlessGame defines an `ended` attribute and an `end()` method we can call.
|
|
In graphical runs we prefer to set a `game_over` flag so the main loop
|
|
/ input handling stays active and the player can press a key to quit.
|
|
"""
|
|
# prefer headless behavior: if the object *looks* like HeadlessGame
|
|
# (it exposes an `ended` attribute) then call end() immediately.
|
|
if getattr(game, 'ended', None) is not None and callable(getattr(game, 'end', None)):
|
|
try:
|
|
game.end()
|
|
return
|
|
except Exception:
|
|
# fall back to flagging if end() misbehaves
|
|
pass
|
|
|
|
# otherwise, set a flag the UI can react to and avoid calling end()
|
|
setattr(game, 'game_over', True)
|
|
|
|
# schedule a background timer to call end() after a short delay
|
|
# so the UI has a small 'caught' moment before the final screen.
|
|
if getattr(game, '_game_over_timer', None) is None:
|
|
try:
|
|
import threading
|
|
|
|
def _delayed_end():
|
|
try:
|
|
game.end()
|
|
except Exception:
|
|
setattr(game, 'ended', True)
|
|
|
|
t = threading.Timer(1.0, _delayed_end)
|
|
t.daemon = True
|
|
t.start()
|
|
setattr(game, '_game_over_timer', t)
|
|
except Exception:
|
|
# as a fallback, set a scheduled timestamp watched by GameOverWatcher
|
|
if getattr(game, '_game_over_scheduled', None) is None:
|
|
setattr(game, '_game_over_scheduled', time.time() + 1.0)
|
|
|
|
|
|
class Ghost:
|
|
# use a backing attribute and property so we can log unexpected
|
|
# changes to the display flag (this helps trace flicker/invisibility)
|
|
# keep the public API the same: `ghost.display` reads/writes still work.
|
|
|
|
def __init__(self, name, char, start_pos, release_turn, maze, dirs, width, height, move_speed, manager, chase_memory=30, detection_radius=6):
|
|
self.name = name
|
|
self.character = char
|
|
# rendering order: higher z renders on top of lower z
|
|
# ensure ghosts are drawn above pellets/walls so they don't appear to "go invisible"
|
|
# rendering order: make ghosts above pellets/walls and PacMan to avoid occlusion
|
|
# set to 4 so ghosts render on top of PacMan (PacMan.z == 3)
|
|
self.z = 4
|
|
# ensure instance-level display flag so renderers that check the
|
|
# instance attribute won't accidentally inherit a wrong value
|
|
# from other code paths
|
|
# use a private backing field; set it directly to avoid logging during construction
|
|
self._display = True
|
|
self.position = start_pos
|
|
self.release_turn = release_turn
|
|
# default to released so ghosts start roaming immediately
|
|
# (this makes them active from turn 0 instead of waiting for
|
|
# a delayed release). If you want delayed release again later,
|
|
# set release_turn to a positive value and change this flag.
|
|
self.released = True
|
|
self.current_dir = random.choice(list(dirs.keys()))
|
|
self.last_move = -999
|
|
self.chasing = False
|
|
self.chase_until = -1
|
|
# set prev_position to the starting position so renderers that
|
|
# interpolate between previous/current positions can do so
|
|
# even before the first move (helps smooth motion)
|
|
self.prev_position = start_pos
|
|
# once a ghost leaves the G-spawn area it should never re-enter
|
|
self.left_spawn = False
|
|
self.last_seen_pos = None
|
|
self.last_seen_turn = -999
|
|
self.ghost_type = name.split('_')[-1]
|
|
|
|
# references
|
|
self.MAZE = maze
|
|
self.DIRS = dirs
|
|
self.WIDTH = width
|
|
self.HEIGHT = height
|
|
self.MOVE_SPEED = move_speed
|
|
self.manager = manager
|
|
self.chase_memory = chase_memory
|
|
self.detection_radius = detection_radius
|
|
|
|
manager.register(self)
|
|
|
|
@property
|
|
def display(self):
|
|
return getattr(self, '_display', True)
|
|
|
|
@display.setter
|
|
def display(self, val):
|
|
old = getattr(self, '_display', True)
|
|
if old != val:
|
|
# try to include turn info if manager has it; fall back to None
|
|
mgr_turn = getattr(self.manager, 'turn', None) if getattr(self, 'manager', None) is not None else None
|
|
print(f"[Ghost][DEBUG] {self.name} display changed {old} -> {val} at manager.turn={mgr_turn}")
|
|
self._display = val
|
|
|
|
def on_board(self, pos):
|
|
x,y = pos
|
|
return 0 <= x < self.WIDTH and 0 <= y < self.HEIGHT
|
|
|
|
def can_walk(self, pos):
|
|
# ensure position is on board before indexing the maze
|
|
if not self.on_board(pos):
|
|
return False
|
|
x, y = pos
|
|
# prevent re-entering the spawn area (G) after we've left it
|
|
if self.MAZE[y][x] == 'G' and self.left_spawn:
|
|
return False
|
|
return self.MAZE[y][x] != '#'
|
|
|
|
def is_spawn(self):
|
|
x,y = self.position
|
|
return self.MAZE[y][x] == 'G'
|
|
|
|
def line_of_sight(self, pac_pos):
|
|
gx, gy = self.position
|
|
px, py = pac_pos
|
|
if gx == px:
|
|
step = 1 if py > gy else -1
|
|
for y in range(gy + step, py, step):
|
|
if self.MAZE[y][gx] == '#':
|
|
return False
|
|
return True
|
|
if gy == py:
|
|
step = 1 if px > gx else -1
|
|
for x in range(gx + step, px, step):
|
|
if self.MAZE[gy][x] == '#':
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
def neighbors(self, pos):
|
|
x,y = pos
|
|
for dx, dy in self.DIRS.values():
|
|
yield (x+dx, y+dy)
|
|
|
|
def next_pos(self, dir_key):
|
|
"""Return next position from current position following direction key."""
|
|
if dir_key not in self.DIRS:
|
|
return None
|
|
dx, dy = self.DIRS[dir_key]
|
|
return (self.position[0] + dx, self.position[1] + dy)
|
|
|
|
def play_turn(self, game):
|
|
# start turn bookkeeping
|
|
self.manager.start_turn(game.turn_number)
|
|
|
|
# if the UI/game has already flagged game_over, don't take actions
|
|
if getattr(game, 'game_over', False):
|
|
return
|
|
|
|
if game.turn_number - self.last_move < self.MOVE_SPEED:
|
|
return
|
|
|
|
if not self.released:
|
|
if game.turn_number >= self.release_turn:
|
|
self.released = True
|
|
else:
|
|
return
|
|
|
|
pac = game.get_agent_by_name('pacman')
|
|
if not pac:
|
|
return
|
|
|
|
# occupied and blocked positions
|
|
occupied = self.manager.occupied_positions(exclude=self)
|
|
# avoid swapping: treat other ghosts' previous positions as blocked for this turn
|
|
swap_block = {g.prev_position for g in self.manager.instances if g is not self and getattr(g, 'prev_position', None) is not None}
|
|
blocked = set(occupied) | set(self.manager.reserved) | set(swap_block)
|
|
|
|
# inside spawn: try to exit quickly
|
|
if self.is_spawn():
|
|
path = bfs_find(self.MAZE, self.position, lambda p: self.MAZE[p[1]][p[0]] != 'G', self.on_board, blocked)
|
|
if path and len(path) > 1:
|
|
move = path[1]
|
|
if move not in blocked:
|
|
self.prev_position = self.position
|
|
self.position = move
|
|
# once we've moved out of a G tile, lock spawn re-entry
|
|
if not self.is_spawn():
|
|
self.left_spawn = True
|
|
self.last_move = game.turn_number
|
|
self.manager.reserve(self.position)
|
|
if pac.position == self.position:
|
|
# end the game; UI will display a generic 'Game Over'
|
|
_trigger_game_over(game)
|
|
return
|
|
|
|
# sensing
|
|
los = self.line_of_sight(pac.position)
|
|
if los:
|
|
self.last_seen_pos = pac.position
|
|
self.last_seen_turn = game.turn_number
|
|
self.chasing = True
|
|
self.chase_until = game.turn_number + self.chase_memory
|
|
|
|
if self.chasing and game.turn_number > self.chase_until:
|
|
self.chasing = False
|
|
|
|
seen_recently = (game.turn_number - self.last_seen_turn) <= self.chase_memory
|
|
within_radius = (abs(self.position[0]-pac.position[0]) + abs(self.position[1]-pac.position[1])) <= self.detection_radius
|
|
|
|
target = None
|
|
if self.chasing or seen_recently or within_radius:
|
|
if self.ghost_type == 'red':
|
|
target = pac.position
|
|
elif self.ghost_type == 'pink':
|
|
if pac.current_dir and pac.current_dir in self.DIRS:
|
|
dx, dy = self.DIRS[pac.current_dir]
|
|
tx = pac.position[0] + dx*2
|
|
ty = pac.position[1] + dy*2
|
|
tx = max(0, min(self.WIDTH-1, tx))
|
|
ty = max(0, min(self.HEIGHT-1, ty))
|
|
target = (tx, ty)
|
|
else:
|
|
target = pac.position
|
|
elif self.ghost_type == 'blue':
|
|
red = next((g for g in self.manager.instances if g.ghost_type == 'red'), None)
|
|
if red and pac.current_dir and pac.current_dir in self.DIRS:
|
|
dx, dy = self.DIRS[pac.current_dir]
|
|
ahead = (pac.position[0] + dx*2, pac.position[1] + dy*2)
|
|
tx = ahead[0]*2 - red.position[0]
|
|
ty = ahead[1]*2 - red.position[1]
|
|
tx = max(0, min(self.WIDTH-1, tx))
|
|
ty = max(0, min(self.HEIGHT-1, ty))
|
|
target = (tx, ty)
|
|
else:
|
|
# unpredictable fallback
|
|
if random.random() < 0.5:
|
|
# pick random pellet or nearby tile
|
|
tx = pac.position[0] + random.randint(-3,3)
|
|
ty = pac.position[1] + random.randint(-3,3)
|
|
tx = max(0, min(self.WIDTH-1, tx))
|
|
ty = max(0, min(self.HEIGHT-1, ty))
|
|
target = (tx, ty)
|
|
else:
|
|
target = pac.position
|
|
|
|
# add a tiny bit of randomness — occasionally ignore the target
|
|
# so ghosts feel less deterministic
|
|
if target and random.random() < RANDOMNESS:
|
|
target = None
|
|
|
|
if target:
|
|
# allow moving into pac position
|
|
blocked_for_path = set(blocked)
|
|
if pac.position in blocked_for_path:
|
|
blocked_for_path.remove(pac.position)
|
|
path = astar_find(self.MAZE, self.position, target, self.on_board, blocked_for_path)
|
|
if path and len(path) > 1:
|
|
next_pos = path[1]
|
|
# avoid reversing
|
|
if self.prev_position and next_pos == self.prev_position:
|
|
# try alternative neighbors from the path (prefer non-reversing)
|
|
alts = [p for p in path[2:6] if p != self.prev_position and p not in blocked]
|
|
if alts:
|
|
next_pos = random.choice(alts)
|
|
if next_pos not in blocked:
|
|
self.prev_position = self.position
|
|
self.position = next_pos
|
|
# lock spawn re-entry if we left
|
|
if not self.is_spawn():
|
|
self.left_spawn = True
|
|
self.last_move = game.turn_number
|
|
self.manager.reserve(self.position)
|
|
if pac.position == self.position:
|
|
_trigger_game_over(game)
|
|
return
|
|
|
|
# fallback movement: try forward, else other options (avoid reverse)
|
|
forward = None
|
|
if self.current_dir in self.DIRS:
|
|
dx, dy = self.DIRS[self.current_dir]
|
|
forward = (self.position[0]+dx, self.position[1]+dy)
|
|
if forward and self.can_walk(forward) and forward not in blocked:
|
|
if self.prev_position and forward == self.prev_position:
|
|
# pick alternative
|
|
# build neighbor options from direction keys (safe next_pos helper)
|
|
options = [p for p in (self.next_pos(d) for d in self.DIRS) if p is not None and self.can_walk(p) and p not in blocked and p != self.prev_position]
|
|
if options:
|
|
forward = random.choice(options)
|
|
self.prev_position = self.position
|
|
self.position = forward
|
|
# lock spawn re-entry if we left
|
|
if not self.is_spawn():
|
|
self.left_spawn = True
|
|
self.last_move = game.turn_number
|
|
self.manager.reserve(self.position)
|
|
if pac.position == self.position:
|
|
_trigger_game_over(game)
|
|
return
|
|
|
|
options = [p for d in self.DIRS for p in [ (self.position[0]+self.DIRS[d][0], self.position[1]+self.DIRS[d][1]) ] if self.can_walk(p) and p not in blocked and p != self.prev_position]
|
|
if options:
|
|
chosen = random.choice(options)
|
|
self.prev_position = self.position
|
|
self.position = chosen
|
|
if not self.is_spawn():
|
|
self.left_spawn = True
|
|
self.last_move = game.turn_number
|
|
self.manager.reserve(self.position)
|
|
if pac.position == self.position:
|
|
_trigger_game_over(game)
|
|
return
|
|
|
|
# If we reached here, the ghost had no non-blocked options. To keep
|
|
# ghosts constantly roaming the map, try progressively weaker
|
|
# fallbacks instead of staying still:
|
|
# 1) try any walkable neighbor ignoring reservations (but avoid immediate reverse)
|
|
options_relaxed = [p for d in self.DIRS for p in [ (self.position[0]+self.DIRS[d][0], self.position[1]+self.DIRS[d][1]) ] if self.can_walk(p) and p != self.prev_position]
|
|
if options_relaxed:
|
|
chosen = random.choice(options_relaxed)
|
|
self.prev_position = self.position
|
|
self.position = chosen
|
|
if not self.is_spawn():
|
|
self.left_spawn = True
|
|
self.last_move = game.turn_number
|
|
# still reserve our new position so others have a hint
|
|
self.manager.reserve(self.position)
|
|
if pac.position == self.position:
|
|
_trigger_game_over(game)
|
|
return
|
|
|
|
# 2) as a last resort, allow reversing back to previous position so the ghost doesn't stall
|
|
if self.prev_position and self.can_walk(self.prev_position):
|
|
rev = self.prev_position
|
|
self.prev_position = self.position
|
|
self.position = rev
|
|
if not self.is_spawn():
|
|
self.left_spawn = True
|
|
self.last_move = game.turn_number
|
|
self.manager.reserve(self.position)
|
|
if pac.position == self.position:
|
|
_trigger_game_over(game)
|
|
return
|
|
|
|
|
|
def create_ghosts(spawn_points, maze, dirs, width, height, ghost_release_delay, move_speed):
|
|
manager = GhostManager()
|
|
ghosts = []
|
|
# choose three distinct spawn points if possible
|
|
unique = list(dict.fromkeys(spawn_points))
|
|
if len(unique) < 3:
|
|
while len(unique) < 3:
|
|
unique.append(unique[-1])
|
|
s0, s1, s2 = unique[0], unique[len(unique)//2], unique[-1]
|
|
|
|
ghosts.append(Ghost('ghost_red', 'R', s0, 0, maze, dirs, width, height, move_speed, manager))
|
|
ghosts.append(Ghost('ghost_blue', 'B', s1, ghost_release_delay, maze, dirs, width, height, move_speed, manager))
|
|
ghosts.append(Ghost('ghost_pink', 'P', s2, ghost_release_delay*2, maze, dirs, width, height, move_speed, manager))
|
|
return ghosts
|