Source code for examples.versioned_rows.versioned_update_old_row
"""Illustrates the same UPDATE into INSERT technique of ``versioned_rows.py``,but also emits an UPDATE on the **old** row to affect a change in timestamp.Also includes a :meth:`.SessionEvents.do_orm_execute` hook to limit queriesto only the most recent version."""importdatetimeimporttimefromsqlalchemyimportand_fromsqlalchemyimportColumnfromsqlalchemyimportcreate_enginefromsqlalchemyimportDateTimefromsqlalchemyimporteventfromsqlalchemyimportinspectfromsqlalchemyimportIntegerfromsqlalchemyimportStringfromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemy.ormimportattributesfromsqlalchemy.ormimportbackreffromsqlalchemy.ormimportmake_transientfromsqlalchemy.ormimportmake_transient_to_detachedfromsqlalchemy.ormimportrelationshipfromsqlalchemy.ormimportSessionfromsqlalchemy.ormimportwith_loader_criteriaBase=declarative_base()# this will be the current time as the test runsnow=None# in practice this would be a real "now" functiondefcurrent_time():returnnowclassVersionedStartEnd:start=Column(DateTime,primary_key=True)end=Column(DateTime,primary_key=True)def__init__(self,**kw):# reduce some verbosity when we make a new objectkw.setdefault("start",current_time()-datetime.timedelta(days=3))kw.setdefault("end",current_time()+datetime.timedelta(days=3))super().__init__(**kw)defnew_version(self,session):# our current identity key, which will be used on the "old"# version of us to emit an UPDATE. this is just for assertion purposesold_identity_key=inspect(self).key# make sure self.start / self.end are not expiredself.id,self.start,self.end# turn us into an INSERTmake_transient(self)# make the "old" version of us, which we will turn into an# UPDATEold_copy_of_us=self.__class__(id=self.id,start=self.start,end=self.end)# turn old_copy_of_us into an UPDATEmake_transient_to_detached(old_copy_of_us)# the "old" object has our old identity key (that we no longer have)assertinspect(old_copy_of_us).key==old_identity_key# now put it back in the sessionsession.add(old_copy_of_us)# now update the 'end' - SQLAlchemy sees this as a PK switchold_copy_of_us.end=current_time()# fun fact! the new_version() routine is *not* called for# old_copy_of_us! because we are already in the before_flush() hook!# this surprised even me. I was thinking we had to guard against# it. Still might be a good idea to do so.self.start=current_time()self.end=current_time()+datetime.timedelta(days=2)@event.listens_for(Session,"before_flush")defbefore_flush(session,flush_context,instances):forinstanceinsession.dirty:ifnotisinstance(instance,VersionedStartEnd):continueifnotsession.is_modified(instance):continueifnotattributes.instance_state(instance).has_identity:continue# make it transientinstance.new_version(session)# re-addsession.add(instance)@event.listens_for(Session,"do_orm_execute",retval=True)defdo_orm_execute(execute_state):"""ensure all queries for VersionedStartEnd include criteria"""ct=current_time()+datetime.timedelta(seconds=1)execute_state.statement=execute_state.statement.options(with_loader_criteria(VersionedStartEnd,lambdacls:and_(ct>cls.start,ct<cls.end),include_aliases=True,))classParent(VersionedStartEnd,Base):__tablename__="parent"id=Column(Integer,primary_key=True)start=Column(DateTime,primary_key=True)end=Column(DateTime,primary_key=True)data=Column(String)child_n=Column(Integer)child=relationship("Child",primaryjoin=("Child.id == foreign(Parent.child_n)"),# note the primaryjoin can also be:## "and_(Child.id == foreign(Parent.child_n), "# "func.now().between(Child.start, Child.end))"## however the before_compile() above will take care of this for us in# all cases except for joinedload. You *can* use the above primaryjoin# as well, it just means the criteria will be present twice for most# parent->child load operations#uselist=False,backref=backref("parent",uselist=False),)classChild(VersionedStartEnd,Base):__tablename__="child"id=Column(Integer,primary_key=True)start=Column(DateTime,primary_key=True)end=Column(DateTime,primary_key=True)data=Column(String)defnew_version(self,session):# expire parent's reference to ussession.expire(self.parent,["child"])# create new versionVersionedStartEnd.new_version(self,session)# re-add ourselves to the parentself.parent.child=selftimes=[]deftime_passes(s):"""keep track of timestamps in terms of the database and allow time to pass between steps."""# close the transaction, if any, since PG time doesn't increment in the# transactions.commit()# get "now" in terms of the DB so we can keep the ranges low and# still have our assertions passiftimes:time.sleep(1)times.append(datetime.datetime.now())iflen(times)>1:asserttimes[-1]>times[-2]returntimes[-1]e=create_engine("sqlite://",echo="debug")Base.metadata.create_all(e)s=Session(e)now=time_passes(s)c1=Child(id=1,data="child 1")p1=Parent(id=1,data="c1",child=c1)s.add(p1)s.commit()# assert raw DB dataasserts.query(Parent.__table__).all()==[(1,times[0]-datetime.timedelta(days=3),times[0]+datetime.timedelta(days=3),"c1",1,)]asserts.query(Child.__table__).all()==[(1,times[0]-datetime.timedelta(days=3),times[0]+datetime.timedelta(days=3),"child 1",)]now=time_passes(s)p1_check=s.query(Parent).first()assertp1_checkisp1assertp1_check.childisc1p1.child.data="elvis presley"s.commit()p2_check=s.query(Parent).first()assertp2_checkisp1_checkc2_check=p2_check.child# same objectassertp2_check.childisc1# new dataassertc1.data=="elvis presley"# new end timeassertc1.end==now+datetime.timedelta(days=2)# assert raw DB dataasserts.query(Parent.__table__).all()==[(1,times[0]-datetime.timedelta(days=3),times[0]+datetime.timedelta(days=3),"c1",1,)]asserts.query(Child.__table__).order_by(Child.end).all()==[(1,times[0]-datetime.timedelta(days=3),times[1],"child 1"),(1,times[1],times[1]+datetime.timedelta(days=2),"elvis presley"),]now=time_passes(s)p1.data="c2 elvis presley"s.commit()# assert raw DB data. now there are two parent rows.asserts.query(Parent.__table__).order_by(Parent.end).all()==[(1,times[0]-datetime.timedelta(days=3),times[2],"c1",1),(1,times[2],times[2]+datetime.timedelta(days=2),"c2 elvis presley",1,),]asserts.query(Child.__table__).order_by(Child.end).all()==[(1,times[0]-datetime.timedelta(days=3),times[1],"child 1"),(1,times[1],times[1]+datetime.timedelta(days=2),"elvis presley"),]# add some more rows to test that these aren't coming back for# queriess.add(Parent(id=2,data="unrelated",child=Child(id=2,data="unrelated")))s.commit()# Query only knows about one parent for id=1p3_check=s.query(Parent).filter_by(id=1).one()assertp3_checkisp1assertp3_check.childisc1# and one child.c3_check=s.query(Child).filter(Child.parent==p3_check).one()assertc3_checkisc1# one child one parent....c3_check=(s.query(Child).join(Parent.child).filter(Parent.id==p3_check.id).one())