]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- add space invaders example
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 27 Apr 2018 19:05:21 +0000 (15:05 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 27 Apr 2018 19:05:56 +0000 (15:05 -0400)
Change-Id: I439b6012af8c2bd8f555744657b8091ac168242b

doc/build/orm/examples.rst
doc/build/orm/space_invaders.jpg [new file with mode: 0644]
examples/space_invaders/__init__.py [new file with mode: 0644]
examples/space_invaders/space_invaders.py [new file with mode: 0644]

index 76afab26f1a395ba978c4e14ffed11e81403369f..3dd81f856b983c4f0694f065e8a6cccb8b5d55de 100644 (file)
@@ -76,6 +76,13 @@ Relationship Join Conditions
 
 .. automodule:: examples.join_conditions
 
+.. _examples_spaceinvaders:
+
+Space Invaders
+--------------
+
+.. automodule:: examples.space_invaders
+
 .. _examples_xmlpersistence:
 
 XML Persistence
diff --git a/doc/build/orm/space_invaders.jpg b/doc/build/orm/space_invaders.jpg
new file mode 100644 (file)
index 0000000..dfee39d
Binary files /dev/null and b/doc/build/orm/space_invaders.jpg differ
diff --git a/examples/space_invaders/__init__.py b/examples/space_invaders/__init__.py
new file mode 100644 (file)
index 0000000..8816045
--- /dev/null
@@ -0,0 +1,24 @@
+"""
+A Space Invaders game using SQLite as the state machine.
+
+Originally developed in 2012.  Adapted to work in Python 3.
+
+Runs in a textual console using ASCII art.
+
+
+.. image:: space_invaders.jpg
+
+
+To run::
+
+    python -m examples.space_invaders.space_invaders
+
+While it runs, watch the SQL output in the log::
+
+    tail -f space_invaders.log
+
+enjoy!
+
+.. autosource::
+
+"""
\ No newline at end of file
diff --git a/examples/space_invaders/space_invaders.py b/examples/space_invaders/space_invaders.py
new file mode 100644 (file)
index 0000000..3ce280a
--- /dev/null
@@ -0,0 +1,754 @@
+import sys
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import create_engine, Integer, Column, ForeignKey, \
+    String, func
+from sqlalchemy.orm import relationship, Session, joinedload
+from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
+import curses
+import time
+import textwrap
+import re
+import random
+import logging
+
+_PY3 = sys.version_info > (3, 0)
+if _PY3:
+    xrange = range
+
+
+logging.basicConfig(
+    filename="space_invaders.log",
+    format="%(asctime)s,%(msecs)03d %(levelname)-5.5s %(message)s"
+)
+logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
+
+Base = declarative_base()
+
+WINDOW_LEFT = 10
+WINDOW_TOP = 2
+WINDOW_WIDTH = 70
+WINDOW_HEIGHT = 34
+VERT_PADDING = 2
+HORIZ_PADDING = 5
+ENEMY_VERT_SPACING = 4
+MAX_X = WINDOW_WIDTH - HORIZ_PADDING
+MAX_Y = WINDOW_HEIGHT - VERT_PADDING
+LEFT_KEY = ord("j")
+RIGHT_KEY = ord("l")
+FIRE_KEY = ord(" ")
+PAUSE_KEY = ord("p")
+
+COLOR_MAP = {
+    "K": curses.COLOR_BLACK,
+    "R": curses.COLOR_RED,
+    "B": curses.COLOR_BLUE,
+    "C": curses.COLOR_CYAN,
+    "G": curses.COLOR_GREEN,
+    "M": curses.COLOR_MAGENTA,
+    "R": curses.COLOR_RED,
+    "W": curses.COLOR_WHITE,
+    "Y": curses.COLOR_YELLOW
+}
+
+
+class Glyph(Base):
+    """Describe a "glyph", a graphical element
+    to be painted on the screen.
+
+    """
+    __tablename__ = 'glyph'
+    id = Column(Integer, primary_key=True)
+    name = Column(String)
+    type = Column(String)
+    width = Column(Integer)
+    height = Column(Integer)
+    data = Column(String)
+    alt_data = Column(String)
+    __mapper_args__ = {"polymorphic_on": type}
+
+    def __init__(self, name, img, alt=None):
+        self.name = name
+        self.data, self.width, self.height = \
+            self._encode_glyph(img)
+        if alt is not None:
+            self.alt_data, alt_w, alt_h = \
+                self._encode_glyph(alt)
+
+    def _encode_glyph(self, img):
+        """Receive a textual description of the glyph and
+        encode into a format understood by
+        GlyphCoordinate.render().
+
+        """
+        img = re.sub(r'^\n', "", textwrap.dedent(img))
+        color = "W"
+        lines = [line.rstrip() for line in img.split("\n")]
+        data = []
+        for line in lines:
+            render_line = []
+            line = list(line)
+            while line:
+                char = line.pop(0)
+                if char == '#':
+                    color = line.pop(0)
+                    continue
+                render_line.append((color, char))
+            data.append(render_line)
+        width = max([len(rl) for rl in data])
+        data = "".join(
+            "".join("%s%s" % (color, char) for color, char in render_line) +
+            ("W " * (width - len(render_line)))
+            for render_line in data
+        )
+        return data, width, len(lines)
+
+    def glyph_for_state(self, coord, state):
+        """Return the appropriate data representation
+        for this Glyph, based on the current coordinates
+        and state.
+
+        Subclasses may override this to provide animations.
+
+        """
+        return self.data
+
+
+class GlyphCoordinate(Base):
+    """Describe a glyph rendered at a certain x, y coordinate.
+
+    The GlyphCoordinate may also include optional values
+    such as the tick at time of render, a label, and a
+    score value.
+
+    """
+    __tablename__ = 'glyph_coordinate'
+    id = Column(Integer, primary_key=True)
+    glyph_id = Column(Integer, ForeignKey('glyph.id'))
+    x = Column(Integer)
+    y = Column(Integer)
+    tick = Column(Integer)
+    label = Column(String)
+    score = Column(Integer)
+    glyph = relationship(Glyph, innerjoin=True)
+
+    def __init__(
+            self,
+            session, glyph_name, x, y,
+            tick=None, label=None, score=None):
+        self.glyph = session.query(Glyph).\
+            filter_by(name=glyph_name).one()
+        self.x = x
+        self.y = y
+        self.tick = tick
+        self.label = label
+        self.score = score
+        session.add(self)
+
+    def render(self, window, state):
+        """Render the Glyph at this position."""
+
+        col = 0
+        row = 0
+        glyph = self.glyph
+        data = glyph.glyph_for_state(self, state)
+        for color, char in [
+            (data[i], data[i + 1])
+            for i in xrange(0, len(data), 2)
+        ]:
+
+            x = self.x + col
+            y = self.y + row
+            if 0 <= x <= MAX_X and 0 <= y <= MAX_Y:
+                window.addstr(
+                    y + VERT_PADDING,
+                    x + HORIZ_PADDING,
+                    char,
+                    _COLOR_PAIRS[color])
+            col += 1
+            if col == glyph.width:
+                col = 0
+                row += 1
+        if self.label:
+            self._render_label(window, False)
+
+    def _render_label(self, window, blank):
+        label = self.label if not blank else " " * len(self.label)
+        if self.x + self.width + len(self.label) < MAX_X:
+            window.addstr(self.y, self.x + self.width, label)
+        else:
+            window.addstr(self.y, self.x - len(self.label), label)
+
+    def blank(self, window):
+        """Render a blank box for this glyph's position and size."""
+
+        glyph = self.glyph
+        x = min(max(self.x, 0), MAX_X)
+        width = min(glyph.width, MAX_X - x) or 1
+        for y_a in xrange(self.y, self.y + glyph.height):
+            y = y_a
+            window.addstr(
+                y + VERT_PADDING,
+                x + HORIZ_PADDING,
+                " " * width)
+
+        if self.label:
+            self._render_label(window, True)
+
+    @hybrid_property
+    def width(self):
+        return self.glyph.width
+
+    @width.expression
+    def width(cls):
+        return Glyph.width
+
+    @hybrid_property
+    def height(self):
+        return self.glyph.height
+
+    @height.expression
+    def height(cls):
+        return Glyph.height
+
+    @hybrid_property
+    def bottom_bound(self):
+        return self.y + self.height >= MAX_Y
+
+    @hybrid_property
+    def top_bound(self):
+        return self.y <= 0
+
+    @hybrid_property
+    def left_bound(self):
+        return self.x <= 0
+
+    @hybrid_property
+    def right_bound(self):
+        return self.x + self.width >= MAX_X
+
+    @hybrid_property
+    def right_edge_bound(self):
+        return self.x > MAX_X
+
+    @hybrid_method
+    def intersects(self, other):
+        """Return True if this GlyphCoordinate intersects with
+        the given GlyphCoordinate."""
+
+        return ~(
+            (self.x + self.width < other.x) |
+            (self.x > other.x + other.width)
+        ) & ~(
+            (self.y + self.height < other.y) |
+            (self.y > other.y + other.height)
+        )
+
+
+class EnemyGlyph(Glyph):
+    """Describe an enemy."""
+    __mapper_args__ = {"polymorphic_identity": "enemy"}
+
+
+class ArmyGlyph(EnemyGlyph):
+    """Describe an enemy that's part of the "army". """
+    __mapper_args__ = {"polymorphic_identity": "army"}
+
+    def glyph_for_state(self, coord, state):
+        if state["flip"]:
+            return self.alt_data
+        else:
+            return self.data
+
+
+class SaucerGlyph(EnemyGlyph):
+    """Describe the enemy saucer flying overhead."""
+    __mapper_args__ = {"polymorphic_identity": "saucer"}
+
+    def glyph_for_state(self, coord, state):
+        if state["flip"] == 0:
+            return self.alt_data
+        else:
+            return self.data
+
+
+class MessageGlyph(Glyph):
+    """Describe a glyph for displaying a message."""
+    __mapper_args__ = {"polymorphic_identity": "message"}
+
+
+class PlayerGlyph(Glyph):
+    """Describe a glyph representing the player."""
+    __mapper_args__ = {"polymorphic_identity": "player"}
+
+
+class MissileGlyph(Glyph):
+    """Describe a glyph representing a missile."""
+    __mapper_args__ = {"polymorphic_identity": "missile"}
+
+
+class SplatGlyph(Glyph):
+    """Describe a glyph representing a "splat"."""
+    __mapper_args__ = {"polymorphic_identity": "splat"}
+
+    def glyph_for_state(self, coord, state):
+        age = state["tick"] - coord.tick
+        if age > 5:
+            return self.alt_data
+        else:
+            return self.data
+
+
+def init_glyph(session):
+    """Create the glyphs used during play."""
+
+    enemy1 = ArmyGlyph(
+        "enemy1", """
+         #W-#B^#R-#B^#W-
+         #G|   |
+        """,
+        """
+         #W>#B^#R-#B^#W<
+         #G^   ^
+        """
+    )
+
+    enemy2 = ArmyGlyph(
+        "enemy2", """
+         #W***
+        #R<#C~~~#R>
+        """,
+        """
+         #W@@@
+        #R<#C---#R>
+        """
+    )
+
+    enemy3 = ArmyGlyph(
+        "enemy3", """
+        #Y((--))
+        #M-~-~-~
+        """,
+        """
+        #Y[[--]]
+        #M~-~-~-
+        """
+    )
+
+    saucer = SaucerGlyph(
+        "saucer",
+        """#R~#Y^#R~#G<<((=#WOO#G=))>>""",
+        """#Y^#R~#Y^#G<<((=#WOO#G=))>>""",
+    )
+
+    splat1 = SplatGlyph(
+        "splat1",
+        """
+             #WVVVVV
+            #W> #R*** #W<
+             #W^^^^^
+        """,
+        """
+                #M|
+             #M- #Y+++ #M-
+                #M|
+        """
+    )
+
+    ship = PlayerGlyph("ship", """
+       #Y^
+     #G=====
+    """)
+
+    missile = MissileGlyph("missile", """
+        |
+    """)
+
+    start = MessageGlyph(
+        "start_message",
+        "J = move left; L = move right; SPACE = fire\n"
+        "           #GPress any key to start")
+    lose = MessageGlyph("lose_message",
+                        "#YY O U  L O S E ! ! !")
+    win = MessageGlyph(
+        "win_message",
+        "#RL E V E L  C L E A R E D ! ! !"
+    )
+    paused = MessageGlyph(
+        "pause_message",
+        "#WP A U S E D\n#GPress P to continue")
+    session.add_all(
+        [enemy1, enemy2, enemy3, ship, saucer,
+         missile, start, lose, win,
+         paused, splat1])
+
+
+def setup_curses():
+    """Setup terminal/curses state."""
+
+    window = curses.initscr()
+    curses.noecho()
+
+    window = curses.newwin(
+        WINDOW_HEIGHT + (VERT_PADDING * 2),
+        WINDOW_WIDTH + (HORIZ_PADDING * 2),
+        WINDOW_TOP - VERT_PADDING,
+        WINDOW_LEFT - HORIZ_PADDING)
+    curses.start_color()
+
+    global _COLOR_PAIRS
+    _COLOR_PAIRS = {}
+    for i, (k, v) in enumerate(COLOR_MAP.items(), 1):
+        curses.init_pair(i, v, curses.COLOR_BLACK)
+        _COLOR_PAIRS[k] = curses.color_pair(i)
+    return window
+
+
+def init_positions(session):
+    """Establish a new field of play.
+
+    This generates GlyphCoordinate objects
+    and persists them to the database.
+
+    """
+
+    # delete all existing coordinates
+    session.query(GlyphCoordinate).delete()
+
+    session.add(
+        GlyphCoordinate(
+            session, "ship",
+            WINDOW_WIDTH // 2 - 2,
+            WINDOW_HEIGHT - 4)
+    )
+
+    arrangement = (
+        ("enemy3", 50), ("enemy2", 25),
+        ("enemy1", 10), ("enemy2", 25),
+        ("enemy1", 10))
+    for (ship_vert, (etype, score)) in zip(
+            xrange(5, 30, ENEMY_VERT_SPACING), arrangement):
+        for ship_horiz in xrange(0, 50, 10):
+            session.add(
+                GlyphCoordinate(
+                    session, etype,
+                    ship_horiz,
+                    ship_vert,
+                    score=score)
+            )
+
+
+def draw(session, window, state):
+    """Load all current GlyphCoordinate objects from the
+    database and render.
+
+    """
+    for gcoord in session.query(GlyphCoordinate).\
+            options(joinedload("glyph")):
+        gcoord.render(window, state)
+    window.addstr(
+        1, WINDOW_WIDTH - 5,
+        "Score: %.4d" % state['score'])
+    window.move(0, 0)
+    window.refresh()
+
+
+def check_win(session, state):
+    """Return the number of army glyphs remaining -
+    the player wins if this is zero."""
+
+    return session.query(
+        func.count(GlyphCoordinate.id)
+    ).join(
+        GlyphCoordinate.glyph.of_type(ArmyGlyph)
+    ).scalar()
+
+
+def check_lose(session, state):
+    """Return the number of army glyphs either colliding
+    with the player or hitting the bottom of the screen.
+
+    The player loses if this is non-zero."""
+
+    player = state["player"]
+    return session.query(GlyphCoordinate).join(
+        GlyphCoordinate.glyph.of_type(ArmyGlyph)
+    ).filter(
+        GlyphCoordinate.intersects(player) |
+        GlyphCoordinate.bottom_bound
+    ).count()
+
+
+def render_message(session, window, msg, x, y):
+    """Render a message glyph.
+
+    Clears the area beneath the message first
+    and assumes the display will be paused
+    afterwards.
+
+    """
+    # create message box
+    msg = GlyphCoordinate(session, msg, x, y)
+
+    # clear existing glyphs which intersect
+    for gly in session.query(GlyphCoordinate).join(
+        GlyphCoordinate.glyph
+    ).filter(GlyphCoordinate.intersects(msg)):
+        gly.blank(window)
+
+    # render
+    msg.render(window, {})
+    window.refresh()
+    return msg
+
+
+def win(session, window, state):
+    """Handle the win case."""
+    render_message(session, window, "win_message", 15, 15)
+    time.sleep(2)
+    start(session, window, state, True)
+
+
+def lose(session, window, state):
+    """Handle the lose case."""
+    render_message(session, window, "lose_message", 15, 15)
+    time.sleep(2)
+    start(session, window, state)
+
+
+def pause(session, window, state):
+    """Pause the game."""
+    msg = render_message(session, window, "pause_message", 15, 15)
+    prompt(window)
+    msg.blank(window)
+    session.delete(msg)
+
+
+def prompt(window):
+    """Display a prompt, quashing any keystrokes
+    which might have remained."""
+
+    window.move(0, 0)
+    window.nodelay(1)
+    window.getch()
+    window.nodelay(0)
+    window.getch()
+    window.nodelay(1)
+
+
+def move_army(session, window, state):
+    """Update the army position based on the current
+    size of the field."""
+    speed = 30 // 25 * state["num_enemies"]
+
+    flip = (state["tick"] % speed) == 0
+
+    if not flip:
+        return
+    else:
+        state["flip"] = not state["flip"]
+
+    x_slide = 1
+
+    # get the lower/upper boundaries of the army
+    # along the X axis.
+    min_x, max_x = session.query(
+        func.min(GlyphCoordinate.x),
+        func.max(GlyphCoordinate.x + GlyphCoordinate.width),
+    ).join(
+        GlyphCoordinate.glyph.of_type(ArmyGlyph)
+    ).first()
+
+    if min_x is None or max_x is None:
+        # no enemies
+        return
+
+    direction = state["army_direction"]
+    move_y = False
+    if direction == 0 and max_x + x_slide >= MAX_X:
+        direction = state["army_direction"] = 1
+        move_y = True
+    elif direction == 1 and min_x - x_slide <= 0:
+        direction = state["army_direction"] = 0
+        move_y = True
+
+    for enemy_g in session.query(GlyphCoordinate).join(
+        GlyphCoordinate.glyph.of_type(ArmyGlyph)
+    ):
+        enemy_g.blank(window)
+
+        if move_y:
+            enemy_g.y += 1
+        elif direction == 0:
+            enemy_g.x += x_slide
+        elif direction == 1:
+            enemy_g.x -= x_slide
+
+
+def move_player(session, window, state):
+    """Receive player input and adjust state."""
+
+    ch = window.getch()
+    if ch not in (LEFT_KEY, RIGHT_KEY, FIRE_KEY, PAUSE_KEY):
+        return
+    elif ch == PAUSE_KEY:
+        pause(session, window, state)
+        return
+
+    player = state["player"]
+    if ch == RIGHT_KEY and not player.right_bound:
+        player.blank(window)
+        player.x += 1
+    elif ch == LEFT_KEY and not player.left_bound:
+        player.blank(window)
+        player.x -= 1
+    elif ch == FIRE_KEY and state["missile"] is None:
+        state["missile"] = GlyphCoordinate(
+            session,
+            "missile",
+            player.x + 3,
+            player.y - 1)
+
+
+def move_missile(session, window, state):
+    """Update the status of the current missile, if any."""
+
+    if state["missile"] is None or \
+            state["tick"] % 2 != 0:
+        return
+
+    missile = state["missile"]
+
+    # locate enemy glyphs which intersect with the
+    # missile's current position; i.e. a hit
+    glyph = session.query(GlyphCoordinate).\
+        join(GlyphCoordinate.glyph.of_type(EnemyGlyph)).\
+        filter(GlyphCoordinate.intersects(missile)).\
+        first()
+    missile.blank(window)
+    if glyph or missile.top_bound:
+        # missle is done
+        session.delete(missile)
+        state["missile"] = None
+        if glyph:
+            # score!
+            score(session, window, state, glyph)
+    else:
+        # move missle up one character.
+        missile.y -= 1
+
+
+def move_saucer(session, window, state):
+    """Update the status of the saucer."""
+
+    saucer_interval = 500
+    saucer_speed_interval = 4
+    if state["saucer"] is None and \
+            state["tick"] % saucer_interval != 0:
+        return
+
+    if state["saucer"] is None:
+        state["saucer"] = saucer = GlyphCoordinate(
+            session,
+            "saucer", -6, 1,
+            score=random.randrange(100, 600, 100))
+    elif state["tick"] % saucer_speed_interval == 0:
+        saucer = state["saucer"]
+        saucer.blank(window)
+        saucer.x += 1
+        if saucer.right_edge_bound:
+            session.delete(saucer)
+            state["saucer"] = None
+
+
+def update_splat(session, window, state):
+    """Render splat animations."""
+
+    for splat in session.query(GlyphCoordinate).\
+            join(GlyphCoordinate.glyph.of_type(SplatGlyph)):
+        age = state["tick"] - splat.tick
+        if age > 10:
+            splat.blank(window)
+            session.delete(splat)
+        else:
+            splat.render(window, state)
+
+
+def score(session, window, state, glyph):
+    """Process a glyph intersecting with a missile."""
+
+    glyph.blank(window)
+    session.delete(glyph)
+    if state["saucer"] is glyph:
+        state["saucer"] = None
+    state["score"] += glyph.score
+    # render a splat !
+    GlyphCoordinate(
+        session, "splat1", glyph.x, glyph.y,
+        tick=state["tick"], label=str(glyph.score))
+
+
+def update_state(session, window, state):
+    """Update all state for each game tick."""
+
+    num_enemies = state["num_enemies"] = check_win(session, state)
+    if num_enemies == 0:
+        win(session, window, state)
+    elif check_lose(session, state):
+        lose(session, window, state)
+    else:
+        # update the tick counter.
+        state["tick"] += 1
+        move_player(session, window, state)
+        move_missile(session, window, state)
+        move_army(session, window, state)
+        move_saucer(session, window, state)
+        update_splat(session, window, state)
+
+
+def start(session, window, state, continue_=False):
+    """Start a new field of play."""
+
+    render_message(session, window, "start_message", 15, 20)
+    prompt(window)
+
+    init_positions(session)
+
+    player = session.query(GlyphCoordinate).join(
+        GlyphCoordinate.glyph.of_type(PlayerGlyph)
+    ).one()
+    state.update({
+        "field_pos": 0,
+        "alt": False,
+        "tick": 0,
+        "missile": None,
+        "saucer": None,
+        "player": player,
+        "army_direction": 0,
+        "flip": False
+    })
+    if not continue_:
+        state["score"] = 0
+
+    window.clear()
+    window.box()
+    draw(session, window, state)
+
+
+def main():
+    """Initialize the database and establish the game loop."""
+
+    e = create_engine("sqlite://")
+    Base.metadata.create_all(e)
+    session = Session(e)
+    init_glyph(session)
+    session.commit()
+    window = setup_curses()
+    state = {}
+    start(session, window, state)
+    while True:
+        update_state(session, window, state)
+        draw(session, window, state)
+        time.sleep(.01)
+
+if __name__ == '__main__':
+    main()