generated from mwc/project_game
394 lines
12 KiB
Python
394 lines
12 KiB
Python
from ghosts.ghosts import create_ghosts, _trigger_game_over
|
|
import os
|
|
import sys
|
|
|
|
# ================= CONFIG =================
|
|
WIDTH = 40
|
|
HEIGHT = 15
|
|
MOVE_SPEED = 3
|
|
GHOST_RELEASE_DELAY = 50
|
|
CHASE_MEMORY = 30
|
|
# =========================================
|
|
|
|
RAW_MAP = [
|
|
"########################################",
|
|
"#......................................#",
|
|
"#.####.#####.#.##.#####.#####.#.##.#.###",
|
|
"#............#............#........#.###",
|
|
"#.#.####.###.#########.########.####.###",
|
|
"#.#......#............................##",
|
|
"#.######.#####################.######.##",
|
|
"#.........####################........##",
|
|
"#.#######.###GGGGGGGGGGGGG####.#########",
|
|
"#.........###GGGGGGGGGGGGG####.........#",
|
|
"#.####.#.########GGGGG###########.#.####",
|
|
"#.#....#.#.............#........#...####",
|
|
"#.#.####.###.######.####.#####.####.####",
|
|
"#...................................####",
|
|
"########################################",
|
|
]
|
|
|
|
MAZE = RAW_MAP
|
|
|
|
DIRS = {
|
|
"left": (-1, 0),
|
|
"right": (1, 0),
|
|
"up": (0, -1),
|
|
"down": (0, 1),
|
|
}
|
|
|
|
# ---------- Player start ----------
|
|
for y in range(HEIGHT):
|
|
for x in range(WIDTH):
|
|
if MAZE[y][x] == ".":
|
|
START_POS = (x, y)
|
|
break
|
|
else:
|
|
continue
|
|
break
|
|
|
|
|
|
def trigger_game_over(game):
|
|
# delegate to the ghosts module helper which handles headless vs UI
|
|
# and schedules a short delay for the UI.
|
|
try:
|
|
_trigger_game_over(game)
|
|
except Exception:
|
|
# fallback: attempt immediate end
|
|
if getattr(game, 'ended', None) is None:
|
|
setattr(game, 'ended', True)
|
|
try:
|
|
game.end()
|
|
except Exception:
|
|
setattr(game, 'game_over', True)
|
|
|
|
|
|
def add_points(game, pts):
|
|
"""Add points to the game's canonical score storage.
|
|
This updates common places where score may be kept so the
|
|
final display always reads the same value.
|
|
"""
|
|
# primary: top-level attribute
|
|
if hasattr(game, 'score'):
|
|
try:
|
|
game.score += pts
|
|
except Exception:
|
|
try:
|
|
setattr(game, 'score', getattr(game, 'score', 0) + pts)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
setattr(game, 'score', getattr(game, 'score', 0) + pts)
|
|
except Exception:
|
|
pass
|
|
|
|
# secondary: if the game keeps a state dict-like object, update that too
|
|
st = getattr(game, 'state', None)
|
|
if isinstance(st, dict):
|
|
st['score'] = st.get('score', 0) + pts
|
|
|
|
|
|
class PacMan:
|
|
name = "pacman"
|
|
character = "C"
|
|
display = True
|
|
z = 3
|
|
|
|
def __init__(self):
|
|
self.position = START_POS
|
|
self.prev_position = START_POS
|
|
self.current_dir = None
|
|
self.buffered_dir = None
|
|
self.last_move = -999
|
|
|
|
def handle_keystroke(self, key, game):
|
|
if game.ended:
|
|
return
|
|
# guard against non-key events
|
|
if key is None:
|
|
return
|
|
|
|
# extract a readable name from common key event shapes
|
|
kn = None
|
|
if isinstance(key, str):
|
|
kn = key.lower()
|
|
else:
|
|
# prefer .char, then .name, then .key
|
|
for attr in ('char', 'name', 'key'):
|
|
val = getattr(key, attr, None)
|
|
if val:
|
|
kn = str(val).lower()
|
|
break
|
|
if not kn:
|
|
return
|
|
|
|
# ignore arrow keys explicitly
|
|
if any(x in kn for x in ('left', 'right', 'up', 'down', 'arrow')):
|
|
return
|
|
|
|
# normalize to tokens and look for a single-letter wasd token
|
|
import re
|
|
tokens = [t for t in re.sub(r'[^a-z0-9]', '_', kn).split('_') if t]
|
|
letter = None
|
|
for t in tokens:
|
|
if t in ('w', 'a', 's', 'd'):
|
|
letter = t
|
|
break
|
|
# fallback: if kn itself is a single char and is wasd
|
|
if letter is None and len(kn) == 1 and kn in ('w', 'a', 's', 'd'):
|
|
letter = kn
|
|
|
|
if not letter:
|
|
return
|
|
|
|
if letter == 'w':
|
|
self.buffered_dir = 'up'
|
|
elif letter == 'a':
|
|
self.buffered_dir = 'left'
|
|
elif letter == 's':
|
|
self.buffered_dir = 'down'
|
|
elif letter == 'd':
|
|
self.buffered_dir = 'right'
|
|
|
|
def can_move(self, d):
|
|
dx, dy = DIRS[d]
|
|
x, y = self.position[0] + dx, self.position[1] + dy
|
|
return 0 <= x < WIDTH and 0 <= y < HEIGHT and MAZE[y][x] != "#"
|
|
|
|
def play_turn(self, game):
|
|
if game.ended:
|
|
return
|
|
|
|
if game.turn_number - self.last_move < MOVE_SPEED:
|
|
return
|
|
|
|
if self.buffered_dir and self.can_move(self.buffered_dir):
|
|
self.current_dir = self.buffered_dir
|
|
|
|
if self.current_dir and self.can_move(self.current_dir):
|
|
dx, dy = DIRS[self.current_dir]
|
|
self.prev_position = self.position
|
|
self.position = (self.position[0] + dx, self.position[1] + dy)
|
|
self.last_move = game.turn_number
|
|
|
|
for agent in game.agents:
|
|
if getattr(agent, "name", "").startswith("ghost_"):
|
|
if agent.position == self.position:
|
|
trigger_game_over(game)
|
|
return
|
|
|
|
|
|
class Wall:
|
|
display = True
|
|
character = "#"
|
|
z = 0
|
|
|
|
def __init__(self, pos):
|
|
self.position = pos
|
|
self.name = f"wall_{pos}"
|
|
|
|
|
|
class Pellet:
|
|
display = True
|
|
character = "."
|
|
z = 1
|
|
|
|
def __init__(self, pos):
|
|
self.position = pos
|
|
self.name = f"pellet_{pos}"
|
|
|
|
def play_turn(self, game):
|
|
if game.ended:
|
|
return
|
|
|
|
pac = game.get_agent_by_name("pacman")
|
|
if pac and pac.position == self.position:
|
|
add_points(game, 10)
|
|
game.remove_agent_by_name(self.name)
|
|
# if there are no pellets left, we've collected them all -> max
|
|
any_pellets = any(getattr(a, 'name', '').startswith('pellet_') for a in game.agents)
|
|
if not any_pellets:
|
|
setattr(game, 'maxed', True)
|
|
# trigger the same game over flow as being caught by a ghost
|
|
trigger_game_over(game)
|
|
|
|
|
|
class GameOverWatcher:
|
|
"""Watches for a scheduled game-over timestamp and calls end()
|
|
once the wall-clock time has passed. This is a fallback if
|
|
threading.Timer isn't available; normally _trigger_game_over
|
|
will start a timer.
|
|
"""
|
|
name = "gameover_watcher"
|
|
display = False
|
|
|
|
def play_turn(self, game):
|
|
if getattr(game, 'ended', False):
|
|
return
|
|
if not getattr(game, 'game_over', False):
|
|
return
|
|
sched = getattr(game, '_game_over_scheduled', None)
|
|
if sched is None:
|
|
# no schedule found; call end immediately for safety
|
|
try:
|
|
game.end()
|
|
except Exception:
|
|
setattr(game, 'ended', True)
|
|
return
|
|
import time as _time
|
|
if _time.time() >= sched:
|
|
try:
|
|
game.end()
|
|
except Exception:
|
|
setattr(game, 'ended', True)
|
|
|
|
|
|
class GameOverlay:
|
|
"""Create temporary high-z agents that draw a centered overlay text
|
|
when `game.game_over` is set. The overlay stays in place until
|
|
the game actually ends.
|
|
"""
|
|
name = "game_overlay"
|
|
display = False
|
|
|
|
def __init__(self):
|
|
self._created = False
|
|
|
|
def _make_char_agent(self, ch, pos):
|
|
# simple per-char agent
|
|
class _C:
|
|
display = True
|
|
z = 100
|
|
def __init__(self, pos, ch):
|
|
self.position = pos
|
|
self.character = ch
|
|
self.name = f"overlay_{pos[0]}_{pos[1]}"
|
|
return _C(pos, ch)
|
|
|
|
def play_turn(self, game):
|
|
# create overlay once when game_over is flagged
|
|
if getattr(game, 'game_over', False) and not getattr(game, '_overlay_present', False):
|
|
# build two lines
|
|
w = getattr(game, 'board_size', (WIDTH, HEIGHT))[0] if hasattr(game, 'board_size') else WIDTH
|
|
h = getattr(game, 'board_size', (WIDTH, HEIGHT))[1] if hasattr(game, 'board_size') else HEIGHT
|
|
mx = " [max]" if getattr(game, 'maxed', False) else ""
|
|
lines = ["GAME OVER", f"SCORE: {getattr(game, 'score', 0)}{mx}"]
|
|
start_y = h//2 - len(lines)//2
|
|
overlay_agents = []
|
|
for i, line in enumerate(lines):
|
|
y = start_y + i
|
|
x0 = max(0, w//2 - len(line)//2)
|
|
for j, ch in enumerate(line):
|
|
pos = (x0 + j, y)
|
|
a = self._make_char_agent(ch, pos)
|
|
overlay_agents.append(a)
|
|
# attach overlay agents to the game and mark presence
|
|
game.agents.extend(overlay_agents)
|
|
game._overlay_present = True
|
|
game._overlay_agents = overlay_agents
|
|
|
|
# remove overlay agents once the game has ended
|
|
if getattr(game, 'ended', False) and getattr(game, '_overlay_present', False):
|
|
for a in list(getattr(game, '_overlay_agents', [])):
|
|
try:
|
|
# remove by name
|
|
game.remove_agent_by_name(a.name)
|
|
except Exception:
|
|
try:
|
|
game.agents.remove(a)
|
|
except Exception:
|
|
pass
|
|
game._overlay_present = False
|
|
game._overlay_agents = []
|
|
|
|
|
|
# ---------- Build agents ----------
|
|
agents = [PacMan()]
|
|
ghost_spawns = []
|
|
|
|
for y, row in enumerate(MAZE):
|
|
for x, ch in enumerate(row):
|
|
if ch == "#":
|
|
agents.append(Wall((x, y)))
|
|
elif ch == ".":
|
|
agents.append(Pellet((x, y)))
|
|
elif ch == "G":
|
|
ghost_spawns.append((x, y))
|
|
|
|
ghosts = create_ghosts(
|
|
ghost_spawns,
|
|
MAZE,
|
|
DIRS,
|
|
WIDTH,
|
|
HEIGHT,
|
|
GHOST_RELEASE_DELAY,
|
|
MOVE_SPEED + 1
|
|
)
|
|
|
|
agents.extend(ghosts)
|
|
|
|
# watcher to transition from flagged game_over to final end() after a short delay
|
|
agents.append(GameOverWatcher())
|
|
agents.append(GameOverlay())
|
|
|
|
# ---------- Start ----------
|
|
HEADLESS = os.environ.get("HEADLESS") == "1" or "--headless" in sys.argv
|
|
|
|
if HEADLESS:
|
|
class HeadlessGame:
|
|
def __init__(self, agents):
|
|
self.agents = agents
|
|
self.turn_number = 0
|
|
self.score = 0
|
|
self.ended = False
|
|
|
|
def get_agent_by_name(self, name):
|
|
for a in self.agents:
|
|
if getattr(a, "name", None) == name:
|
|
return a
|
|
return None
|
|
|
|
def remove_agent_by_name(self, name):
|
|
self.agents = [a for a in self.agents if getattr(a, "name", None) != name]
|
|
|
|
def end(self):
|
|
print("\n===== GAME OVER =====")
|
|
mx = " [max]" if getattr(self, 'maxed', False) else ""
|
|
print(f"Final Score: {self.score}{mx}")
|
|
print("=====================")
|
|
self.ended = True
|
|
|
|
def play(self, max_turns=500):
|
|
while not self.ended and self.turn_number < max_turns:
|
|
for agent in list(self.agents):
|
|
if hasattr(agent, "play_turn"):
|
|
agent.play_turn(self)
|
|
self.turn_number += 1
|
|
|
|
HeadlessGame(agents).play()
|
|
|
|
else:
|
|
# import the graphical runtime only when running with UI
|
|
from retro.game import Game
|
|
from retro.agent import ArrowKeyAgent
|
|
|
|
ArrowKeyAgent()
|
|
game = Game(agents, {"score": 0}, board_size=(WIDTH, HEIGHT))
|
|
game.score = 0
|
|
game.ended = False
|
|
|
|
print("Pac-Man | Correct Ghost AI")
|
|
game.play()
|
|
|
|
os.system("cls" if os.name == "nt" else "clear")
|
|
print("\n" * 3)
|
|
print("########################################")
|
|
print("# #")
|
|
print("# GAME OVER #")
|
|
print("# #")
|
|
mx = " [max]" if getattr(game, 'maxed', False) else ""
|
|
print(f"# FINAL SCORE: {game.score:<6}{mx} #")
|
|
print("# #")
|
|
print("########################################")
|