Source code for examples.space_invaders.space_invaders
importcursesimportloggingimportrandomimportreimporttextwrapimporttimefromsqlalchemyimportColumnfromsqlalchemyimportcreate_enginefromsqlalchemyimportForeignKeyfromsqlalchemyimportfuncfromsqlalchemyimportIntegerfromsqlalchemyimportStringfromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemy.ext.hybridimporthybrid_methodfromsqlalchemy.ext.hybridimporthybrid_propertyfromsqlalchemy.ormimportjoinedloadfromsqlalchemy.ormimportrelationshipfromsqlalchemy.ormimportSessionlogging.basicConfig(filename="space_invaders.log",format="%(asctime)s,%(msecs)03d%(levelname)-5.5s%(message)s",)logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)Base=declarative_base()WINDOW_LEFT=10WINDOW_TOP=2WINDOW_WIDTH=70WINDOW_HEIGHT=34VERT_PADDING=2HORIZ_PADDING=5ENEMY_VERT_SPACING=4MAX_X=WINDOW_WIDTH-HORIZ_PADDINGMAX_Y=WINDOW_HEIGHT-VERT_PADDINGLEFT_KEY=ord("j")RIGHT_KEY=ord("l")FIRE_KEY=ord(" ")PAUSE_KEY=ord("p")COLOR_MAP={"K":curses.COLOR_BLACK,"B":curses.COLOR_BLUE,"C":curses.COLOR_CYAN,"G":curses.COLOR_GREEN,"M":curses.COLOR_MAGENTA,"R":curses.COLOR_RED,"W":curses.COLOR_WHITE,"Y":curses.COLOR_YELLOW,}classGlyph(Base):"""Describe a "glyph", a graphical element to be painted on the screen. """__tablename__="glyph"id=Column(Integer,primary_key=True)name=Column(String)type=Column(String)width=Column(Integer)height=Column(Integer)data=Column(String)alt_data=Column(String)__mapper_args__={"polymorphic_on":type}def__init__(self,name,img,alt=None):self.name=nameself.data,self.width,self.height=self._encode_glyph(img)ifaltisnotNone:self.alt_data,alt_w,alt_h=self._encode_glyph(alt)def_encode_glyph(self,img):"""Receive a textual description of the glyph and encode into a format understood by GlyphCoordinate.render(). """img=re.sub(r"^\n","",textwrap.dedent(img))color="W"lines=[line.rstrip()forlineinimg.split("\n")]data=[]forlineinlines:render_line=[]line=list(line)whileline:char=line.pop(0)ifchar=="#":color=line.pop(0)continuerender_line.append((color,char))data.append(render_line)width=max([len(rl)forrlindata])data="".join("".join("%s%s"%(color,char)forcolor,charinrender_line)+("W "*(width-len(render_line)))forrender_lineindata)returndata,width,len(lines)defglyph_for_state(self,coord,state):"""Return the appropriate data representation for this Glyph, based on the current coordinates and state. Subclasses may override this to provide animations. """returnself.dataclassGlyphCoordinate(Base):"""Describe a glyph rendered at a certain x, y coordinate. The GlyphCoordinate may also include optional values such as the tick at time of render, a label, and a score value. """__tablename__="glyph_coordinate"id=Column(Integer,primary_key=True)glyph_id=Column(Integer,ForeignKey("glyph.id"))x=Column(Integer)y=Column(Integer)tick=Column(Integer)label=Column(String)score=Column(Integer)glyph=relationship(Glyph,innerjoin=True)def__init__(self,session,glyph_name,x,y,tick=None,label=None,score=None):self.glyph=session.query(Glyph).filter_by(name=glyph_name).one()self.x=xself.y=yself.tick=tickself.label=labelself.score=scoresession.add(self)defrender(self,window,state):"""Render the Glyph at this position."""col=0row=0glyph=self.glyphdata=glyph.glyph_for_state(self,state)forcolor,charin[(data[i],data[i+1])foriinrange(0,len(data),2)]:x=self.x+coly=self.y+rowif0<=x<=MAX_Xand0<=y<=MAX_Y:window.addstr(y+VERT_PADDING,x+HORIZ_PADDING,char,_COLOR_PAIRS[color],)col+=1ifcol==glyph.width:col=0row+=1ifself.label:self._render_label(window,False)def_render_label(self,window,blank):label=self.labelifnotblankelse" "*len(self.label)ifself.x+self.width+len(self.label)<MAX_X:window.addstr(self.y,self.x+self.width,label)else:window.addstr(self.y,self.x-len(self.label),label)defblank(self,window):"""Render a blank box for this glyph's position and size."""glyph=self.glyphx=min(max(self.x,0),MAX_X)width=min(glyph.width,MAX_X-x)or1fory_ainrange(self.y,self.y+glyph.height):y=y_awindow.addstr(y+VERT_PADDING,x+HORIZ_PADDING," "*width)ifself.label:self._render_label(window,True)@hybrid_propertydefwidth(self):returnself.glyph.width@width.expressiondefwidth(cls):returnGlyph.width@hybrid_propertydefheight(self):returnself.glyph.height@height.expressiondefheight(cls):returnGlyph.height@hybrid_propertydefbottom_bound(self):returnself.y+self.height>=MAX_Y@hybrid_propertydeftop_bound(self):returnself.y<=0@hybrid_propertydefleft_bound(self):returnself.x<=0@hybrid_propertydefright_bound(self):returnself.x+self.width>=MAX_X@hybrid_propertydefright_edge_bound(self):returnself.x>MAX_X@hybrid_methoddefintersects(self,other):"""Return True if this GlyphCoordinate intersects with the given GlyphCoordinate."""return~((self.x+self.width<other.x)|(self.x>other.x+other.width))&~((self.y+self.height<other.y)|(self.y>other.y+other.height))classEnemyGlyph(Glyph):"""Describe an enemy."""__mapper_args__={"polymorphic_identity":"enemy"}classArmyGlyph(EnemyGlyph):"""Describe an enemy that's part of the "army"."""__mapper_args__={"polymorphic_identity":"army"}defglyph_for_state(self,coord,state):ifstate["flip"]:returnself.alt_dataelse:returnself.dataclassSaucerGlyph(EnemyGlyph):"""Describe the enemy saucer flying overhead."""__mapper_args__={"polymorphic_identity":"saucer"}defglyph_for_state(self,coord,state):ifstate["flip"]==0:returnself.alt_dataelse:returnself.dataclassMessageGlyph(Glyph):"""Describe a glyph for displaying a message."""__mapper_args__={"polymorphic_identity":"message"}classPlayerGlyph(Glyph):"""Describe a glyph representing the player."""__mapper_args__={"polymorphic_identity":"player"}classMissileGlyph(Glyph):"""Describe a glyph representing a missile."""__mapper_args__={"polymorphic_identity":"missile"}classSplatGlyph(Glyph):"""Describe a glyph representing a "splat"."""__mapper_args__={"polymorphic_identity":"splat"}defglyph_for_state(self,coord,state):age=state["tick"]-coord.tickifage>5:returnself.alt_dataelse:returnself.datadefinit_glyph(session):"""Create the glyphs used during play."""enemy1=ArmyGlyph("enemy1",""" #W-#B^#R-#B^#W- #G| | """,""" #W>#B^#R-#B^#W< #G^ ^ """,)enemy2=ArmyGlyph("enemy2",""" #W*** #R<#C~~~#R> """,""" #W@@@ #R<#C---#R> """,)enemy3=ArmyGlyph("enemy3",""" #Y((--)) #M-~-~-~ """,""" #Y[[--]] #M~-~-~- """,)saucer=SaucerGlyph("saucer","""#R~#Y^#R~#G<<((=#WOO#G=))>>""","""#Y^#R~#Y^#G<<((=#WOO#G=))>>""",)splat1=SplatGlyph("splat1",""" #WVVVVV #W> #R*** #W< #W^^^^^ """,""" #M| #M- #Y+++ #M- #M| """,)ship=PlayerGlyph("ship",""" #Y^ #G===== """,)missile=MissileGlyph("missile",""" | """,)start=MessageGlyph("start_message","J = move left; L = move right; SPACE = fire\n"" #GPress any key to start",)lose=MessageGlyph("lose_message","#YY O U L O S E ! ! !")win=MessageGlyph("win_message","#RL E V E L C L E A R E D ! ! !")paused=MessageGlyph("pause_message","#WP A U S E D\n#GPress P to continue")session.add_all([enemy1,enemy2,enemy3,ship,saucer,missile,start,lose,win,paused,splat1,])defsetup_curses():"""Setup terminal/curses state."""window=curses.initscr()curses.noecho()window=curses.newwin(WINDOW_HEIGHT+(VERT_PADDING*2),WINDOW_WIDTH+(HORIZ_PADDING*2),WINDOW_TOP-VERT_PADDING,WINDOW_LEFT-HORIZ_PADDING,)curses.start_color()global_COLOR_PAIRS_COLOR_PAIRS={}fori,(k,v)inenumerate(COLOR_MAP.items(),1):curses.init_pair(i,v,curses.COLOR_BLACK)_COLOR_PAIRS[k]=curses.color_pair(i)returnwindowdefinit_positions(session):"""Establish a new field of play. This generates GlyphCoordinate objects and persists them to the database. """# delete all existing coordinatessession.query(GlyphCoordinate).delete()session.add(GlyphCoordinate(session,"ship",WINDOW_WIDTH//2-2,WINDOW_HEIGHT-4))arrangement=(("enemy3",50),("enemy2",25),("enemy1",10),("enemy2",25),("enemy1",10),)forship_vert,(etype,score)inzip(range(5,30,ENEMY_VERT_SPACING),arrangement):forship_horizinrange(0,50,10):session.add(GlyphCoordinate(session,etype,ship_horiz,ship_vert,score=score))defdraw(session,window,state):"""Load all current GlyphCoordinate objects from the database and render. """forgcoordinsession.query(GlyphCoordinate).options(joinedload(GlyphCoordinate.glyph)):gcoord.render(window,state)window.addstr(1,WINDOW_WIDTH-5,"Score: %.4d"%state["score"])window.move(0,0)window.refresh()defcheck_win(session,state):"""Return the number of army glyphs remaining - the player wins if this is zero."""return(session.query(func.count(GlyphCoordinate.id)).join(GlyphCoordinate.glyph.of_type(ArmyGlyph)).scalar())defcheck_lose(session,state):"""Return the number of army glyphs either colliding with the player or hitting the bottom of the screen. The player loses if this is non-zero."""player=state["player"]return(session.query(GlyphCoordinate).join(GlyphCoordinate.glyph.of_type(ArmyGlyph)).filter(GlyphCoordinate.intersects(player)|GlyphCoordinate.bottom_bound).count())defrender_message(session,window,msg,x,y):"""Render a message glyph. Clears the area beneath the message first and assumes the display will be paused afterwards. """# create message boxmsg=GlyphCoordinate(session,msg,x,y)# clear existing glyphs which intersectforglyin(session.query(GlyphCoordinate).join(GlyphCoordinate.glyph).filter(GlyphCoordinate.intersects(msg))):gly.blank(window)# rendermsg.render(window,{})window.refresh()returnmsgdefwin(session,window,state):"""Handle the win case."""render_message(session,window,"win_message",15,15)time.sleep(2)start(session,window,state,True)deflose(session,window,state):"""Handle the lose case."""render_message(session,window,"lose_message",15,15)time.sleep(2)start(session,window,state)defpause(session,window,state):"""Pause the game."""msg=render_message(session,window,"pause_message",15,15)prompt(window)msg.blank(window)session.delete(msg)defprompt(window):"""Display a prompt, quashing any keystrokes which might have remained."""window.move(0,0)window.nodelay(1)window.getch()window.nodelay(0)window.getch()window.nodelay(1)defmove_army(session,window,state):"""Update the army position based on the current size of the field."""speed=30//25*state["num_enemies"]flip=(state["tick"]%speed)==0ifnotflip:returnelse:state["flip"]=notstate["flip"]x_slide=1# get the lower/upper boundaries of the army# along the X axis.min_x,max_x=(session.query(func.min(GlyphCoordinate.x),func.max(GlyphCoordinate.x+GlyphCoordinate.width),).join(GlyphCoordinate.glyph.of_type(ArmyGlyph)).first())ifmin_xisNoneormax_xisNone:# no enemiesreturndirection=state["army_direction"]move_y=Falseifdirection==0andmax_x+x_slide>=MAX_X:direction=state["army_direction"]=1move_y=Trueelifdirection==1andmin_x-x_slide<=0:direction=state["army_direction"]=0move_y=Trueforenemy_ginsession.query(GlyphCoordinate).join(GlyphCoordinate.glyph.of_type(ArmyGlyph)):enemy_g.blank(window)ifmove_y:enemy_g.y+=1elifdirection==0:enemy_g.x+=x_slideelifdirection==1:enemy_g.x-=x_slidedefmove_player(session,window,state):"""Receive player input and adjust state."""ch=window.getch()ifchnotin(LEFT_KEY,RIGHT_KEY,FIRE_KEY,PAUSE_KEY):returnelifch==PAUSE_KEY:pause(session,window,state)returnplayer=state["player"]ifch==RIGHT_KEYandnotplayer.right_bound:player.blank(window)player.x+=1elifch==LEFT_KEYandnotplayer.left_bound:player.blank(window)player.x-=1elifch==FIRE_KEYandstate["missile"]isNone:state["missile"]=GlyphCoordinate(session,"missile",player.x+3,player.y-1)defmove_missile(session,window,state):"""Update the status of the current missile, if any."""ifstate["missile"]isNoneorstate["tick"]%2!=0:returnmissile=state["missile"]# locate enemy glyphs which intersect with the# missile's current position; i.e. a hitglyph=(session.query(GlyphCoordinate).join(GlyphCoordinate.glyph.of_type(EnemyGlyph)).filter(GlyphCoordinate.intersects(missile)).first())missile.blank(window)ifglyphormissile.top_bound:# missile is donesession.delete(missile)state["missile"]=Noneifglyph:# score!score(session,window,state,glyph)else:# move missile up one character.missile.y-=1defmove_saucer(session,window,state):"""Update the status of the saucer."""saucer_interval=500saucer_speed_interval=4ifstate["saucer"]isNoneandstate["tick"]%saucer_interval!=0:returnifstate["saucer"]isNone:state["saucer"]=saucer=GlyphCoordinate(session,"saucer",-6,1,score=random.randrange(100,600,100))elifstate["tick"]%saucer_speed_interval==0:saucer=state["saucer"]saucer.blank(window)saucer.x+=1ifsaucer.right_edge_bound:session.delete(saucer)state["saucer"]=Nonedefupdate_splat(session,window,state):"""Render splat animations."""forsplatinsession.query(GlyphCoordinate).join(GlyphCoordinate.glyph.of_type(SplatGlyph)):age=state["tick"]-splat.tickifage>10:splat.blank(window)session.delete(splat)else:splat.render(window,state)defscore(session,window,state,glyph):"""Process a glyph intersecting with a missile."""glyph.blank(window)session.delete(glyph)ifstate["saucer"]isglyph:state["saucer"]=Nonestate["score"]+=glyph.score# render a splat !GlyphCoordinate(session,"splat1",glyph.x,glyph.y,tick=state["tick"],label=str(glyph.score),)defupdate_state(session,window,state):"""Update all state for each game tick."""num_enemies=state["num_enemies"]=check_win(session,state)ifnum_enemies==0:win(session,window,state)elifcheck_lose(session,state):lose(session,window,state)else:# update the tick counter.state["tick"]+=1move_player(session,window,state)move_missile(session,window,state)move_army(session,window,state)move_saucer(session,window,state)update_splat(session,window,state)defstart(session,window,state,continue_=False):"""Start a new field of play."""render_message(session,window,"start_message",15,20)prompt(window)init_positions(session)player=(session.query(GlyphCoordinate).join(GlyphCoordinate.glyph.of_type(PlayerGlyph)).one())state.update({"field_pos":0,"alt":False,"tick":0,"missile":None,"saucer":None,"player":player,"army_direction":0,"flip":False,})ifnotcontinue_:state["score"]=0window.clear()window.box()draw(session,window,state)defmain():"""Initialize the database and establish the game loop."""e=create_engine("sqlite://")Base.metadata.create_all(e)session=Session(e)init_glyph(session)session.commit()window=setup_curses()state={}start(session,window,state)whileTrue:update_state(session,window,state)draw(session,window,state)time.sleep(0.01)if__name__=="__main__":main()