Release: 1.3.0b1 beta release | Release Date: November 16, 2018

SQLAlchemy 1.3 Documentation

Contents | Index

Source code for examples.space_invaders.space_invaders

import sys
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Integer, Column, ForeignKey, \
    String, func
from sqlalchemy.orm import relationship, Session, joinedload
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
import curses
import time
import textwrap
import re
import random
import logging

_PY3 = sys.version_info > (3, 0)
if _PY3:
    xrange = range


logging.basicConfig(
    filename="space_invaders.log",
    format="%(asctime)s,%(msecs)03d %(levelname)-5.5s %(message)s"
)
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)

Base = declarative_base()

WINDOW_LEFT = 10
WINDOW_TOP = 2
WINDOW_WIDTH = 70
WINDOW_HEIGHT = 34
VERT_PADDING = 2
HORIZ_PADDING = 5
ENEMY_VERT_SPACING = 4
MAX_X = WINDOW_WIDTH - HORIZ_PADDING
MAX_Y = WINDOW_HEIGHT - VERT_PADDING
LEFT_KEY = ord("j")
RIGHT_KEY = ord("l")
FIRE_KEY = ord(" ")
PAUSE_KEY = ord("p")

COLOR_MAP = {
    "K": curses.COLOR_BLACK,
    "R": curses.COLOR_RED,
    "B": curses.COLOR_BLUE,
    "C": curses.COLOR_CYAN,
    "G": curses.COLOR_GREEN,
    "M": curses.COLOR_MAGENTA,
    "R": curses.COLOR_RED,
    "W": curses.COLOR_WHITE,
    "Y": curses.COLOR_YELLOW
}


class Glyph(Base):
    """Describe a "glyph", a graphical element
    to be painted on the screen.

    """
    __tablename__ = 'glyph'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    type = Column(String)
    width = Column(Integer)
    height = Column(Integer)
    data = Column(String)
    alt_data = Column(String)
    __mapper_args__ = {"polymorphic_on": type}

    def __init__(self, name, img, alt=None):
        self.name = name
        self.data, self.width, self.height = \
            self._encode_glyph(img)
        if alt is not None:
            self.alt_data, alt_w, alt_h = \
                self._encode_glyph(alt)

    def _encode_glyph(self, img):
        """Receive a textual description of the glyph and
        encode into a format understood by
        GlyphCoordinate.render().

        """
        img = re.sub(r'^\n', "", textwrap.dedent(img))
        color = "W"
        lines = [line.rstrip() for line in img.split("\n")]
        data = []
        for line in lines:
            render_line = []
            line = list(line)
            while line:
                char = line.pop(0)
                if char == '#':
                    color = line.pop(0)
                    continue
                render_line.append((color, char))
            data.append(render_line)
        width = max([len(rl) for rl in data])
        data = "".join(
            "".join("%s%s" % (color, char) for color, char in render_line) +
            ("W " * (width - len(render_line)))
            for render_line in data
        )
        return data, width, len(lines)

    def glyph_for_state(self, coord, state):
        """Return the appropriate data representation
        for this Glyph, based on the current coordinates
        and state.

        Subclasses may override this to provide animations.

        """
        return self.data


class GlyphCoordinate(Base):
    """Describe a glyph rendered at a certain x, y coordinate.

    The GlyphCoordinate may also include optional values
    such as the tick at time of render, a label, and a
    score value.

    """
    __tablename__ = 'glyph_coordinate'
    id = Column(Integer, primary_key=True)
    glyph_id = Column(Integer, ForeignKey('glyph.id'))
    x = Column(Integer)
    y = Column(Integer)
    tick = Column(Integer)
    label = Column(String)
    score = Column(Integer)
    glyph = relationship(Glyph, innerjoin=True)

    def __init__(
            self,
            session, glyph_name, x, y,
            tick=None, label=None, score=None):
        self.glyph = session.query(Glyph).\
            filter_by(name=glyph_name).one()
        self.x = x
        self.y = y
        self.tick = tick
        self.label = label
        self.score = score
        session.add(self)

    def render(self, window, state):
        """Render the Glyph at this position."""

        col = 0
        row = 0
        glyph = self.glyph
        data = glyph.glyph_for_state(self, state)
        for color, char in [
            (data[i], data[i + 1])
            for i in xrange(0, len(data), 2)
        ]:

            x = self.x + col
            y = self.y + row
            if 0 <= x <= MAX_X and 0 <= y <= MAX_Y:
                window.addstr(
                    y + VERT_PADDING,
                    x + HORIZ_PADDING,
                    char,
                    _COLOR_PAIRS[color])
            col += 1
            if col == glyph.width:
                col = 0
                row += 1
        if self.label:
            self._render_label(window, False)

    def _render_label(self, window, blank):
        label = self.label if not blank else " " * len(self.label)
        if self.x + self.width + len(self.label) < MAX_X:
            window.addstr(self.y, self.x + self.width, label)
        else:
            window.addstr(self.y, self.x - len(self.label), label)

    def blank(self, window):
        """Render a blank box for this glyph's position and size."""

        glyph = self.glyph
        x = min(max(self.x, 0), MAX_X)
        width = min(glyph.width, MAX_X - x) or 1
        for y_a in xrange(self.y, self.y + glyph.height):
            y = y_a
            window.addstr(
                y + VERT_PADDING,
                x + HORIZ_PADDING,
                " " * width)

        if self.label:
            self._render_label(window, True)

    @hybrid_property
    def width(self):
        return self.glyph.width

    @width.expression
    def width(cls):
        return Glyph.width

    @hybrid_property
    def height(self):
        return self.glyph.height

    @height.expression
    def height(cls):
        return Glyph.height

    @hybrid_property
    def bottom_bound(self):
        return self.y + self.height >= MAX_Y

    @hybrid_property
    def top_bound(self):
        return self.y <= 0

    @hybrid_property
    def left_bound(self):
        return self.x <= 0

    @hybrid_property
    def right_bound(self):
        return self.x + self.width >= MAX_X

    @hybrid_property
    def right_edge_bound(self):
        return self.x > MAX_X

    @hybrid_method
    def intersects(self, other):
        """Return True if this GlyphCoordinate intersects with
        the given GlyphCoordinate."""

        return ~(
            (self.x + self.width < other.x) |
            (self.x > other.x + other.width)
        ) & ~(
            (self.y + self.height < other.y) |
            (self.y > other.y + other.height)
        )


class EnemyGlyph(Glyph):
    """Describe an enemy."""
    __mapper_args__ = {"polymorphic_identity": "enemy"}


class ArmyGlyph(EnemyGlyph):
    """Describe an enemy that's part of the "army". """
    __mapper_args__ = {"polymorphic_identity": "army"}

    def glyph_for_state(self, coord, state):
        if state["flip"]:
            return self.alt_data
        else:
            return self.data


class SaucerGlyph(EnemyGlyph):
    """Describe the enemy saucer flying overhead."""
    __mapper_args__ = {"polymorphic_identity": "saucer"}

    def glyph_for_state(self, coord, state):
        if state["flip"] == 0:
            return self.alt_data
        else:
            return self.data


class MessageGlyph(Glyph):
    """Describe a glyph for displaying a message."""
    __mapper_args__ = {"polymorphic_identity": "message"}


class PlayerGlyph(Glyph):
    """Describe a glyph representing the player."""
    __mapper_args__ = {"polymorphic_identity": "player"}


class MissileGlyph(Glyph):
    """Describe a glyph representing a missile."""
    __mapper_args__ = {"polymorphic_identity": "missile"}


class SplatGlyph(Glyph):
    """Describe a glyph representing a "splat"."""
    __mapper_args__ = {"polymorphic_identity": "splat"}

    def glyph_for_state(self, coord, state):
        age = state["tick"] - coord.tick
        if age > 5:
            return self.alt_data
        else:
            return self.data


def init_glyph(session):
    """Create the glyphs used during play."""

    enemy1 = ArmyGlyph(
        "enemy1", """
         #W-#B^#R-#B^#W-
         #G|   |
        """,
        """
         #W>#B^#R-#B^#W<
         #G^   ^
        """
    )

    enemy2 = ArmyGlyph(
        "enemy2", """
         #W***
        #R<#C~~~#R>
        """,
        """
         #W@@@
        #R<#C---#R>
        """
    )

    enemy3 = ArmyGlyph(
        "enemy3", """
        #Y((--))
        #M-~-~-~
        """,
        """
        #Y[[--]]
        #M~-~-~-
        """
    )

    saucer = SaucerGlyph(
        "saucer",
        """#R~#Y^#R~#G<<((=#WOO#G=))>>""",
        """#Y^#R~#Y^#G<<((=#WOO#G=))>>""",
    )

    splat1 = SplatGlyph(
        "splat1",
        """
             #WVVVVV
            #W> #R*** #W<
             #W^^^^^
        """,
        """
                #M|
             #M- #Y+++ #M-
                #M|
        """
    )

    ship = PlayerGlyph("ship", """
       #Y^
     #G=====
    """)

    missile = MissileGlyph("missile", """
        |
    """)

    start = MessageGlyph(
        "start_message",
        "J = move left; L = move right; SPACE = fire\n"
        "           #GPress any key to start")
    lose = MessageGlyph("lose_message",
                        "#YY O U  L O S E ! ! !")
    win = MessageGlyph(
        "win_message",
        "#RL E V E L  C L E A R E D ! ! !"
    )
    paused = MessageGlyph(
        "pause_message",
        "#WP A U S E D\n#GPress P to continue")
    session.add_all(
        [enemy1, enemy2, enemy3, ship, saucer,
         missile, start, lose, win,
         paused, splat1])


def setup_curses():
    """Setup terminal/curses state."""

    window = curses.initscr()
    curses.noecho()

    window = curses.newwin(
        WINDOW_HEIGHT + (VERT_PADDING * 2),
        WINDOW_WIDTH + (HORIZ_PADDING * 2),
        WINDOW_TOP - VERT_PADDING,
        WINDOW_LEFT - HORIZ_PADDING)
    curses.start_color()

    global _COLOR_PAIRS
    _COLOR_PAIRS = {}
    for i, (k, v) in enumerate(COLOR_MAP.items(), 1):
        curses.init_pair(i, v, curses.COLOR_BLACK)
        _COLOR_PAIRS[k] = curses.color_pair(i)
    return window


def init_positions(session):
    """Establish a new field of play.

    This generates GlyphCoordinate objects
    and persists them to the database.

    """

    # delete all existing coordinates
    session.query(GlyphCoordinate).delete()

    session.add(
        GlyphCoordinate(
            session, "ship",
            WINDOW_WIDTH // 2 - 2,
            WINDOW_HEIGHT - 4)
    )

    arrangement = (
        ("enemy3", 50), ("enemy2", 25),
        ("enemy1", 10), ("enemy2", 25),
        ("enemy1", 10))
    for (ship_vert, (etype, score)) in zip(
            xrange(5, 30, ENEMY_VERT_SPACING), arrangement):
        for ship_horiz in xrange(0, 50, 10):
            session.add(
                GlyphCoordinate(
                    session, etype,
                    ship_horiz,
                    ship_vert,
                    score=score)
            )


def draw(session, window, state):
    """Load all current GlyphCoordinate objects from the
    database and render.

    """
    for gcoord in session.query(GlyphCoordinate).\
            options(joinedload("glyph")):
        gcoord.render(window, state)
    window.addstr(
        1, WINDOW_WIDTH - 5,
        "Score: %.4d" % state['score'])
    window.move(0, 0)
    window.refresh()


def check_win(session, state):
    """Return the number of army glyphs remaining -
    the player wins if this is zero."""

    return session.query(
        func.count(GlyphCoordinate.id)
    ).join(
        GlyphCoordinate.glyph.of_type(ArmyGlyph)
    ).scalar()


def check_lose(session, state):
    """Return the number of army glyphs either colliding
    with the player or hitting the bottom of the screen.

    The player loses if this is non-zero."""

    player = state["player"]
    return session.query(GlyphCoordinate).join(
        GlyphCoordinate.glyph.of_type(ArmyGlyph)
    ).filter(
        GlyphCoordinate.intersects(player) |
        GlyphCoordinate.bottom_bound
    ).count()


def render_message(session, window, msg, x, y):
    """Render a message glyph.

    Clears the area beneath the message first
    and assumes the display will be paused
    afterwards.

    """
    # create message box
    msg = GlyphCoordinate(session, msg, x, y)

    # clear existing glyphs which intersect
    for gly in session.query(GlyphCoordinate).join(
        GlyphCoordinate.glyph
    ).filter(GlyphCoordinate.intersects(msg)):
        gly.blank(window)

    # render
    msg.render(window, {})
    window.refresh()
    return msg


def win(session, window, state):
    """Handle the win case."""
    render_message(session, window, "win_message", 15, 15)
    time.sleep(2)
    start(session, window, state, True)


def lose(session, window, state):
    """Handle the lose case."""
    render_message(session, window, "lose_message", 15, 15)
    time.sleep(2)
    start(session, window, state)


def pause(session, window, state):
    """Pause the game."""
    msg = render_message(session, window, "pause_message", 15, 15)
    prompt(window)
    msg.blank(window)
    session.delete(msg)


def prompt(window):
    """Display a prompt, quashing any keystrokes
    which might have remained."""

    window.move(0, 0)
    window.nodelay(1)
    window.getch()
    window.nodelay(0)
    window.getch()
    window.nodelay(1)


def move_army(session, window, state):
    """Update the army position based on the current
    size of the field."""
    speed = 30 // 25 * state["num_enemies"]

    flip = (state["tick"] % speed) == 0

    if not flip:
        return
    else:
        state["flip"] = not state["flip"]

    x_slide = 1

    # get the lower/upper boundaries of the army
    # along the X axis.
    min_x, max_x = session.query(
        func.min(GlyphCoordinate.x),
        func.max(GlyphCoordinate.x + GlyphCoordinate.width),
    ).join(
        GlyphCoordinate.glyph.of_type(ArmyGlyph)
    ).first()

    if min_x is None or max_x is None:
        # no enemies
        return

    direction = state["army_direction"]
    move_y = False
    if direction == 0 and max_x + x_slide >= MAX_X:
        direction = state["army_direction"] = 1
        move_y = True
    elif direction == 1 and min_x - x_slide <= 0:
        direction = state["army_direction"] = 0
        move_y = True

    for enemy_g in session.query(GlyphCoordinate).join(
        GlyphCoordinate.glyph.of_type(ArmyGlyph)
    ):
        enemy_g.blank(window)

        if move_y:
            enemy_g.y += 1
        elif direction == 0:
            enemy_g.x += x_slide
        elif direction == 1:
            enemy_g.x -= x_slide


def move_player(session, window, state):
    """Receive player input and adjust state."""

    ch = window.getch()
    if ch not in (LEFT_KEY, RIGHT_KEY, FIRE_KEY, PAUSE_KEY):
        return
    elif ch == PAUSE_KEY:
        pause(session, window, state)
        return

    player = state["player"]
    if ch == RIGHT_KEY and not player.right_bound:
        player.blank(window)
        player.x += 1
    elif ch == LEFT_KEY and not player.left_bound:
        player.blank(window)
        player.x -= 1
    elif ch == FIRE_KEY and state["missile"] is None:
        state["missile"] = GlyphCoordinate(
            session,
            "missile",
            player.x + 3,
            player.y - 1)


def move_missile(session, window, state):
    """Update the status of the current missile, if any."""

    if state["missile"] is None or \
            state["tick"] % 2 != 0:
        return

    missile = state["missile"]

    # locate enemy glyphs which intersect with the
    # missile's current position; i.e. a hit
    glyph = session.query(GlyphCoordinate).\
        join(GlyphCoordinate.glyph.of_type(EnemyGlyph)).\
        filter(GlyphCoordinate.intersects(missile)).\
        first()
    missile.blank(window)
    if glyph or missile.top_bound:
        # missle is done
        session.delete(missile)
        state["missile"] = None
        if glyph:
            # score!
            score(session, window, state, glyph)
    else:
        # move missle up one character.
        missile.y -= 1


def move_saucer(session, window, state):
    """Update the status of the saucer."""

    saucer_interval = 500
    saucer_speed_interval = 4
    if state["saucer"] is None and \
            state["tick"] % saucer_interval != 0:
        return

    if state["saucer"] is None:
        state["saucer"] = saucer = GlyphCoordinate(
            session,
            "saucer", -6, 1,
            score=random.randrange(100, 600, 100))
    elif state["tick"] % saucer_speed_interval == 0:
        saucer = state["saucer"]
        saucer.blank(window)
        saucer.x += 1
        if saucer.right_edge_bound:
            session.delete(saucer)
            state["saucer"] = None


def update_splat(session, window, state):
    """Render splat animations."""

    for splat in session.query(GlyphCoordinate).\
            join(GlyphCoordinate.glyph.of_type(SplatGlyph)):
        age = state["tick"] - splat.tick
        if age > 10:
            splat.blank(window)
            session.delete(splat)
        else:
            splat.render(window, state)


def score(session, window, state, glyph):
    """Process a glyph intersecting with a missile."""

    glyph.blank(window)
    session.delete(glyph)
    if state["saucer"] is glyph:
        state["saucer"] = None
    state["score"] += glyph.score
    # render a splat !
    GlyphCoordinate(
        session, "splat1", glyph.x, glyph.y,
        tick=state["tick"], label=str(glyph.score))


def update_state(session, window, state):
    """Update all state for each game tick."""

    num_enemies = state["num_enemies"] = check_win(session, state)
    if num_enemies == 0:
        win(session, window, state)
    elif check_lose(session, state):
        lose(session, window, state)
    else:
        # update the tick counter.
        state["tick"] += 1
        move_player(session, window, state)
        move_missile(session, window, state)
        move_army(session, window, state)
        move_saucer(session, window, state)
        update_splat(session, window, state)


def start(session, window, state, continue_=False):
    """Start a new field of play."""

    render_message(session, window, "start_message", 15, 20)
    prompt(window)

    init_positions(session)

    player = session.query(GlyphCoordinate).join(
        GlyphCoordinate.glyph.of_type(PlayerGlyph)
    ).one()
    state.update({
        "field_pos": 0,
        "alt": False,
        "tick": 0,
        "missile": None,
        "saucer": None,
        "player": player,
        "army_direction": 0,
        "flip": False
    })
    if not continue_:
        state["score"] = 0

    window.clear()
    window.box()
    draw(session, window, state)


def main():
    """Initialize the database and establish the game loop."""

    e = create_engine("sqlite://")
    Base.metadata.create_all(e)
    session = Session(e)
    init_glyph(session)
    session.commit()
    window = setup_curses()
    state = {}
    start(session, window, state)
    while True:
        update_state(session, window, state)
        draw(session, window, state)
        time.sleep(.01)

if __name__ == '__main__':
    main()