Source code for examples.versioned_rows.versioned_map
"""A variant of the versioned_rows example built around theconcept of a "vertical table" structure, like those illustrated in:ref:`examples_vertical_tables` examples.Here we store a dictionary of key/value pairs, storing the k/v's in a"vertical" fashion where each key gets a row. The value is split outinto two separate datatypes, string and int - the range of datatypestorage can be adjusted for individual needs.Changes to the "data" attribute of a ConfigData object result in theConfigData object being copied into a new one, and new associations toits data are created. Values which aren't changed between versions arereferenced by both the former and the newer ConfigData object.Overall, only INSERT statements are emitted - no rows are UPDATed orDELETEd.An optional feature is also illustrated which associates individualkey/value pairs with the ConfigData object in which it firstoriginated. Since a new row is only persisted when a new value iscreated for a particular key, the recipe provides a way to query amongthe full series of changes which occurred for any particular key inthe dictionary.The set of all ConfigData in a particular table represents a singleseries of versions. By adding additional columns to ConfigData, thesystem can be made to store multiple version streams distinguished bythose additional values."""fromsqlalchemyimportColumnfromsqlalchemyimportcreate_enginefromsqlalchemyimporteventfromsqlalchemyimportForeignKeyfromsqlalchemyimportIntegerfromsqlalchemyimportStringfromsqlalchemy.ext.associationproxyimportassociation_proxyfromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemy.ormimportattributesfromsqlalchemy.ormimportbackreffromsqlalchemy.ormimportmake_transientfromsqlalchemy.ormimportrelationshipfromsqlalchemy.ormimportSessionfromsqlalchemy.ormimportsessionmakerfromsqlalchemy.ormimportvalidatesfromsqlalchemy.orm.collectionsimportattribute_keyed_dict@event.listens_for(Session,"before_flush")defbefore_flush(session,flush_context,instances):"""Apply the new_version() method of objects which are marked as dirty during a flush. """forinstanceinsession.dirty:ifhasattr(instance,"new_version")andsession.is_modified(instance):# make it transientinstance.new_version(session)# re-addsession.add(instance)Base=declarative_base()classConfigData(Base):"""Represent a series of key/value pairs. ConfigData will generate a new version of itself upon change. The "data" dictionary provides access via string name mapped to a string/int value. """__tablename__="config"id=Column(Integer,primary_key=True)"""Primary key column of this ConfigData."""elements=relationship("ConfigValueAssociation",collection_class=attribute_keyed_dict("name"),backref=backref("config_data"),lazy="subquery",)"""Dictionary-backed collection of ConfigValueAssociation objects, keyed to the name of the associated ConfigValue. Note there's no "cascade" here. ConfigValueAssociation objects are never deleted or changed. """def_new_value(name,value):"""Create a new entry for usage in the 'elements' dictionary."""returnConfigValueAssociation(ConfigValue(name,value))data=association_proxy("elements","value",creator=_new_value)"""Proxy to the 'value' elements of each related ConfigValue, via the 'elements' dictionary. """def__init__(self,data):self.data=data@validates("elements")def_associate_with_element(self,key,element):"""Associate incoming ConfigValues with this ConfigData, if not already associated. This is an optional feature which allows more comprehensive history tracking. """ifelement.config_value.originating_configisNone:element.config_value.originating_config=selfreturnelementdefnew_version(self,session):# convert to an INSERTmake_transient(self)self.id=None# history of the 'elements' collection.# this is a tuple of groups: (added, unchanged, deleted)hist=attributes.get_history(self,"elements")# rewrite the 'elements' collection# from scratch, removing all historyattributes.set_committed_value(self,"elements",{})# new elements in the "added" group# are moved to our new collection.foreleminhist.added:self.elements[elem.name]=elem# copy elements in the 'unchanged' group.# the new ones associate with the new ConfigData,# the old ones stay associated with the old ConfigDataforeleminhist.unchanged:self.elements[elem.name]=ConfigValueAssociation(elem.config_value)# we also need to expire changes on each ConfigValueAssociation# that is to remain associated with the old ConfigData.# Here, each one takes care of that in its new_version()# method, though we could do that here as well.classConfigValueAssociation(Base):"""Relate ConfigData objects to associated ConfigValue objects."""__tablename__="config_value_association"config_id=Column(ForeignKey("config.id"),primary_key=True)"""Reference the primary key of the ConfigData object."""config_value_id=Column(ForeignKey("config_value.id"),primary_key=True)"""Reference the primary key of the ConfigValue object."""config_value=relationship("ConfigValue",lazy="joined",innerjoin=True)"""Reference the related ConfigValue object."""def__init__(self,config_value):self.config_value=config_valuedefnew_version(self,session):"""Expire all pending state, as ConfigValueAssociation is immutable."""session.expire(self)@propertydefname(self):returnself.config_value.name@propertydefvalue(self):returnself.config_value.value@value.setterdefvalue(self,value):"""Intercept set events. Create a new ConfigValueAssociation upon change, replacing this one in the parent ConfigData's dictionary. If no net change, do nothing. """ifvalue!=self.config_value.value:self.config_data.elements[self.name]=ConfigValueAssociation(ConfigValue(self.config_value.name,value))classConfigValue(Base):"""Represent an individual key/value pair at a given point in time. ConfigValue is immutable. """__tablename__="config_value"id=Column(Integer,primary_key=True)name=Column(String(50),nullable=False)originating_config_id=Column(Integer,ForeignKey("config.id"),nullable=False)int_value=Column(Integer)string_value=Column(String(255))def__init__(self,name,value):self.name=nameself.value=valueoriginating_config=relationship("ConfigData")"""Reference to the originating ConfigData. This is optional, and allows history tracking of individual values. """defnew_version(self,session):raiseNotImplementedError("ConfigValue is immutable.")@propertydefvalue(self):forkin("int_value","string_value"):v=getattr(self,k)ifvisnotNone:returnvelse:returnNone@value.setterdefvalue(self,value):ifisinstance(value,int):self.int_value=valueself.string_value=Noneelse:self.string_value=str(value)self.int_value=Noneif__name__=="__main__":engine=create_engine("sqlite://",echo=True)Base.metadata.create_all(engine)Session=sessionmaker(engine)sess=Session()config=ConfigData({"user_name":"twitter","hash_id":"4fedffca37eaf","x":27,"y":450})sess.add(config)sess.commit()version_one=config.idconfig.data["user_name"]="yahoo"sess.commit()version_two=config.idassertversion_one!=version_two# two versions have been created.assertconfig.data=={"user_name":"yahoo","hash_id":"4fedffca37eaf","x":27,"y":450,}old_config=sess.query(ConfigData).get(version_one)assertold_config.data=={"user_name":"twitter","hash_id":"4fedffca37eaf","x":27,"y":450,}# the history of any key can be acquired using# the originating_config_id attributehistory=(sess.query(ConfigValue).filter(ConfigValue.name=="user_name").order_by(ConfigValue.originating_config_id).all())assert[(h.value,h.originating_config_id)forhinhistory]==([("twitter",version_one),("yahoo",version_two)])