ORM Examples

The SQLAlchemy distribution includes a variety of code examples illustrating a select set of patterns, some typical and some not so typical. All are runnable and can be found in the /examples directory of the distribution. Descriptions and source code for all can be found here.

Additional SQLAlchemy examples, some user contributed, are available on the wiki at https://www.sqlalchemy.org/trac/wiki/UsageRecipes.

Mapping Recipes

Adjacency List

An example of a dictionary-of-dictionaries structure mapped using an adjacency list model.

E.g.:

node = TreeNode('rootnode')
node.append('node1')
node.append('node3')
session.add(node)
session.commit()

dump_tree(node)

Listing of files:

Associations

Examples illustrating the usage of the “association object” pattern, where an intermediary class mediates the relationship between two classes that are associated in a many-to-many pattern.

Listing of files:

  • proxied_association.py - Same example as basic_association, adding in usage of sqlalchemy.ext.associationproxy to make explicit references to OrderItem optional.

  • basic_association.py - Illustrate a many-to-many relationship between an “Order” and a collection of “Item” objects, associating a purchase price with each via an association object called “OrderItem”

  • dict_of_sets_with_default.py - An advanced association proxy example which illustrates nesting of association proxies to produce multi-level Python collections, in this case a dictionary with string keys and sets of integers as values, which conceal the underlying mapped classes.

Asyncio Integration

Examples illustrating the asyncio engine feature of SQLAlchemy.

Listing of files:

  • async_orm.py - Illustrates use of the sqlalchemy.ext.asyncio.AsyncSession object for asynchronous ORM use.

  • async_orm_writeonly.py - Illustrates using write only relationships for simpler handling of ORM collections under asyncio.

  • gather_orm_statements.py - Illustrates how to run many statements concurrently using asyncio.gather() along many asyncio database connections, merging ORM results into a single AsyncSession.

  • basic.py - Illustrates the asyncio engine / connection interface.

  • greenlet_orm.py - Illustrates use of the sqlalchemy.ext.asyncio.AsyncSession object for asynchronous ORM use, including the optional run_sync() method.

Directed Graphs

An example of persistence for a directed graph structure. The graph is stored as a collection of edges, each referencing both a “lower” and an “upper” node in a table of nodes. Basic persistence and querying for lower- and upper- neighbors are illustrated:

n2 = Node(2)
n5 = Node(5)
n2.add_neighbor(n5)
print(n2.higher_neighbors())

Listing of files:

Dynamic Relations as Dictionaries

Illustrates how to place a dictionary-like facade on top of a “dynamic” relation, so that dictionary operations (assuming simple string keys) can operate upon a large collection without loading the full collection at once.

Listing of files:

Generic Associations

Illustrates various methods of associating multiple types of parents with a particular child object.

The examples all use the declarative extension along with declarative mixins. Each one presents the identical use case at the end - two classes, Customer and Supplier, both subclassing the HasAddresses mixin, which ensures that the parent class is provided with an addresses collection which contains Address objects.

The discriminator_on_association.py and generic_fk.py scripts are modernized versions of recipes presented in the 2007 blog post Polymorphic Associations with SQLAlchemy.

Listing of files:

  • table_per_association.py - Illustrates a mixin which provides a generic association via a individually generated association tables for each parent class. The associated objects themselves are persisted in a single table shared among all parents.

  • table_per_related.py - Illustrates a generic association which persists association objects within individual tables, each one generated to persist those objects on behalf of a particular parent class.

  • discriminator_on_association.py - Illustrates a mixin which provides a generic association using a single target table and a single association table, referred to by all parent tables. The association table contains a “discriminator” column which determines what type of parent object associates to each particular row in the association table.

  • generic_fk.py - Illustrates a so-called “generic foreign key”, in a similar fashion to that of popular frameworks such as Django, ROR, etc. This approach bypasses standard referential integrity practices, in that the “foreign key” column is not actually constrained to refer to any particular table; instead, in-application logic is used to determine which table is referenced.

Materialized Paths

Illustrates the “materialized paths” pattern for hierarchical data using the SQLAlchemy ORM.

Listing of files:

Nested Sets

Illustrates a rudimentary way to implement the “nested sets” pattern for hierarchical data using the SQLAlchemy ORM.

Listing of files:

Performance

A performance profiling suite for a variety of SQLAlchemy use cases.

Each suite focuses on a specific use case with a particular performance profile and associated implications:

  • bulk inserts

  • individual inserts, with or without transactions

  • fetching large numbers of rows

  • running lots of short queries

All suites include a variety of use patterns illustrating both Core and ORM use, and are generally sorted in order of performance from worst to greatest, inversely based on amount of functionality provided by SQLAlchemy, greatest to least (these two things generally correspond perfectly).

A command line tool is presented at the package level which allows individual suites to be run:

$ python -m examples.performance --help
usage: python -m examples.performance [-h] [--test TEST] [--dburl DBURL]
                                      [--num NUM] [--profile] [--dump]
                                      [--echo]

                                      {bulk_inserts,large_resultsets,single_inserts}

positional arguments:
  {bulk_inserts,large_resultsets,single_inserts}
                        suite to run

optional arguments:
  -h, --help            show this help message and exit
  --test TEST           run specific test name
  --dburl DBURL         database URL, default sqlite:///profile.db
  --num NUM             Number of iterations/items/etc for tests;
                        default is module-specific
  --profile             run profiling and dump call counts
  --dump                dump full call profile (implies --profile)
  --echo                Echo SQL output

An example run looks like:

$ python -m examples.performance bulk_inserts

Or with options:

$ python -m examples.performance bulk_inserts \
    --dburl mysql+mysqldb://scott:tiger@localhost/test \
    --profile --num 1000

File Listing

Listing of files:

  • bulk_updates.py - This series of tests will illustrate different ways to UPDATE a large number of rows in bulk (under construction! there’s just one test at the moment)

  • large_resultsets.py - In this series of tests, we are looking at time to load a large number of very small and simple rows.

  • bulk_inserts.py - This series of tests illustrates different ways to INSERT a large number of rows in bulk.

  • short_selects.py - This series of tests illustrates different ways to SELECT a single record by primary key

  • single_inserts.py - In this series of tests, we’re looking at a method that inserts a row within a distinct transaction, and afterwards returns to essentially a “closed” state. This would be analogous to an API call that starts up a database connection, inserts the row, commits and closes.

  • __main__.py - Allows the examples/performance package to be run as a script.

Running all tests with time

This is the default form of run:

$ python -m examples.performance single_inserts
Tests to run: test_orm_commit, test_bulk_save,
              test_bulk_insert_dictionaries, test_core,
              test_core_query_caching, test_dbapi_raw_w_connect,
              test_dbapi_raw_w_pool

test_orm_commit : Individual INSERT/COMMIT pairs via the
    ORM (10000 iterations); total time 13.690218 sec
test_bulk_save : Individual INSERT/COMMIT pairs using
    the "bulk" API  (10000 iterations); total time 11.290371 sec
test_bulk_insert_dictionaries : Individual INSERT/COMMIT pairs using
    the "bulk" API with dictionaries (10000 iterations);
    total time 10.814626 sec
test_core : Individual INSERT/COMMIT pairs using Core.
    (10000 iterations); total time 9.665620 sec
test_core_query_caching : Individual INSERT/COMMIT pairs using Core
    with query caching (10000 iterations); total time 9.209010 sec
test_dbapi_raw_w_connect : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection each time (10000 iterations); total time 9.551103 sec
test_dbapi_raw_w_pool : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection pool (10000 iterations); total time 8.001813 sec

Dumping Profiles for Individual Tests

A Python profile output can be dumped for all tests, or more commonly individual tests:

$ python -m examples.performance single_inserts --test test_core --num 1000 --dump
Tests to run: test_core
test_core : Individual INSERT/COMMIT pairs using Core. (1000 iterations); total fn calls 186109
         186109 function calls (186102 primitive calls) in 1.089 seconds

   Ordered by: internal time, call count

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1000    0.634    0.001    0.634    0.001 {method 'commit' of 'sqlite3.Connection' objects}
     1000    0.154    0.000    0.154    0.000 {method 'execute' of 'sqlite3.Cursor' objects}
     1000    0.021    0.000    0.074    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/sql/compiler.py:1950(_get_colparams)
     1000    0.015    0.000    0.034    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/engine/default.py:503(_init_compiled)
        1    0.012    0.012    1.091    1.091 examples/performance/single_inserts.py:79(test_core)

    ...

Writing your Own Suites

The profiler suite system is extensible, and can be applied to your own set of tests. This is a valuable technique to use in deciding upon the proper approach for some performance-critical set of routines. For example, if we wanted to profile the difference between several kinds of loading, we can create a file test_loads.py, with the following content:

from examples.performance import Profiler
from sqlalchemy import Integer, Column, create_engine, ForeignKey
from sqlalchemy.orm import relationship, joinedload, subqueryload, Session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
engine = None
session = None


class Parent(Base):
    __tablename__ = 'parent'
    id = Column(Integer, primary_key=True)
    children = relationship("Child")


class Child(Base):
    __tablename__ = 'child'
    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey('parent.id'))


# Init with name of file, default number of items
Profiler.init("test_loads", 1000)


@Profiler.setup_once
def setup_once(dburl, echo, num):
    "setup once.  create an engine, insert fixture data"
    global engine
    engine = create_engine(dburl, echo=echo)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)
    sess = Session(engine)
    sess.add_all([
        Parent(children=[Child() for j in range(100)])
        for i in range(num)
    ])
    sess.commit()


@Profiler.setup
def setup(dburl, echo, num):
    "setup per test.  create a new Session."
    global session
    session = Session(engine)
    # pre-connect so this part isn't profiled (if we choose)
    session.connection()


@Profiler.profile
def test_lazyload(n):
    "load everything, no eager loading."

    for parent in session.query(Parent):
        parent.children


@Profiler.profile
def test_joinedload(n):
    "load everything, joined eager loading."

    for parent in session.query(Parent).options(joinedload("children")):
        parent.children


@Profiler.profile
def test_subqueryload(n):
    "load everything, subquery eager loading."

    for parent in session.query(Parent).options(subqueryload("children")):
        parent.children

if __name__ == '__main__':
    Profiler.main()

We can run our new script directly:

$ python test_loads.py  --dburl postgresql+psycopg2://scott:tiger@localhost/test
Running setup once...
Tests to run: test_lazyload, test_joinedload, test_subqueryload
test_lazyload : load everything, no eager loading. (1000 iterations); total time 11.971159 sec
test_joinedload : load everything, joined eager loading. (1000 iterations); total time 2.754592 sec
test_subqueryload : load everything, subquery eager loading. (1000 iterations); total time 2.977696 sec

Space Invaders

A Space Invaders game using SQLite as the state machine.

Originally developed in 2012. Adapted to work in Python 3.

Runs in a textual console using ASCII art.

../_images/space_invaders.jpg

To run:

python -m examples.space_invaders.space_invaders

While it runs, watch the SQL output in the log:

tail -f space_invaders.log

enjoy!

Listing of files:

Versioning Objects

Versioning with a History Table

Illustrates an extension which creates version tables for entities and stores records for each change. The given extensions generate an anonymous “history” class which represents historical versions of the target object.

Compare to the Versioning using Temporal Rows examples which write updates as new rows in the same table, without using a separate history table.

Usage is illustrated via a unit test module test_versioning.py, which can be run like any other module, using unittest internally:

python -m examples.versioned_history.test_versioning

A fragment of example usage, using declarative:

from history_meta import Versioned, versioned_session

class Base(DeclarativeBase):
    pass

class SomeClass(Versioned, Base):
    __tablename__ = 'sometable'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id

Session = sessionmaker(bind=engine)
versioned_session(Session)

sess = Session()
sc = SomeClass(name='sc1')
sess.add(sc)
sess.commit()

sc.name = 'sc1modified'
sess.commit()

assert sc.version == 2

SomeClassHistory = SomeClass.__history_mapper__.class_

assert sess.query(SomeClassHistory).\
            filter(SomeClassHistory.version == 1).\
            all() \
            == [SomeClassHistory(version=1, name='sc1')]

The Versioned mixin is designed to work with declarative. To use the extension with classical mappers, the _history_mapper function can be applied:

from history_meta import _history_mapper

m = mapper(SomeClass, sometable)
_history_mapper(m)

SomeHistoryClass = SomeClass.__history_mapper__.class_

The versioning example also integrates with the ORM optimistic concurrency feature documented at Configuring a Version Counter. To enable this feature, set the flag Versioned.use_mapper_versioning to True:

class SomeClass(Versioned, Base):
    __tablename__ = 'sometable'

    use_mapper_versioning = True

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id

Above, if two instance of SomeClass with the same version identifier are updated and sent to the database for UPDATE concurrently, if the database isolation level allows the two UPDATE statements to proceed, one will fail because it no longer is against the last known version identifier.

Listing of files:

Versioning using Temporal Rows

Several examples that illustrate the technique of intercepting changes that would be first interpreted as an UPDATE on a row, and instead turning it into an INSERT of a new row, leaving the previous row intact as a historical version.

Compare to the Versioning with a History Table example which writes a history row to a separate history table.

Listing of files:

  • versioned_rows.py - Illustrates a method to intercept changes on objects, turning an UPDATE statement on a single row into an INSERT statement, so that a new row is inserted with the new data, keeping the old row intact.

  • versioned_rows_w_versionid.py - Illustrates a method to intercept changes on objects, turning an UPDATE statement on a single row into an INSERT statement, so that a new row is inserted with the new data, keeping the old row intact.

  • versioned_map.py - A variant of the versioned_rows example built around the concept of a “vertical table” structure, like those illustrated in Vertical Attribute Mapping examples.

  • versioned_update_old_row.py - Illustrates the same UPDATE into INSERT technique of versioned_rows.py, but also emits an UPDATE on the old row to affect a change in timestamp. Also includes a SessionEvents.do_orm_execute() hook to limit queries to only the most recent version.

Vertical Attribute Mapping

Illustrates “vertical table” mappings.

A “vertical table” refers to a technique where individual attributes of an object are stored as distinct rows in a table. The “vertical table” technique is used to persist objects which can have a varied set of attributes, at the expense of simple query control and brevity. It is commonly found in content/document management systems in order to represent user-created structures flexibly.

Two variants on the approach are given. In the second, each row references a “datatype” which contains information about the type of information stored in the attribute, such as integer, string, or date.

Example:

shrew = Animal(u'shrew')
shrew[u'cuteness'] = 5
shrew[u'weasel-like'] = False
shrew[u'poisonous'] = True

session.add(shrew)
session.flush()

q = (session.query(Animal).
     filter(Animal.facts.any(
       and_(AnimalFact.key == u'weasel-like',
            AnimalFact.value == True))))
print('weasel-like animals', q.all())

Listing of files:

Inheritance Mapping Recipes

Basic Inheritance Mappings

Working examples of single-table, joined-table, and concrete-table inheritance as described in Mapping Class Inheritance Hierarchies.

Listing of files:

  • joined.py - Joined-table (table-per-subclass) inheritance example.

  • concrete.py - Concrete-table (table-per-class) inheritance example.

  • single.py - Single-table (table-per-hierarchy) inheritance example.

Special APIs

Attribute Instrumentation

Examples illustrating modifications to SQLAlchemy’s attribute management system.

Listing of files:

Horizontal Sharding

A basic example of using the SQLAlchemy Sharding API. Sharding refers to horizontally scaling data across multiple databases.

The basic components of a “sharded” mapping are:

  • multiple Engine instances, each assigned a “shard id”. These Engine instances may refer to different databases, or different schemas / accounts within the same database, or they can even be differentiated only by options that will cause them to access different schemas or tables when used.

  • a function which can return a single shard id, given an instance to be saved; this is called “shard_chooser”

  • a function which can return a list of shard ids which apply to a particular instance identifier; this is called “id_chooser”.If it returns all shard ids, all shards will be searched.

  • a function which can return a list of shard ids to try, given a particular Query (“query_chooser”). If it returns all shard ids, all shards will be queried and the results joined together.

In these examples, different kinds of shards are used against the same basic example which accommodates weather data on a per-continent basis. We provide example shard_chooser, id_chooser and query_chooser functions. The query_chooser illustrates inspection of the SQL expression element in order to attempt to determine a single shard being requested.

The construction of generic sharding routines is an ambitious approach to the issue of organizing instances among multiple databases. For a more plain-spoken alternative, the “distinct entity” approach is a simple method of assigning objects to different tables (and potentially database nodes) in an explicit way - described on the wiki at EntityName.

Listing of files:

  • separate_databases.py - Illustrates sharding using distinct SQLite databases.

  • separate_tables.py - Illustrates sharding using a single SQLite database, that will however have multiple tables using a naming convention.

  • separate_schema_translates.py - Illustrates sharding using a single database with multiple schemas, where a different “schema_translates_map” can be used for each shard.

  • asyncio.py - Illustrates sharding API used with asyncio.

Extending the ORM

ORM Query Events

Recipes which illustrate augmentation of ORM SELECT behavior as used by Session.execute() with 2.0 style use of select(), as well as the 1.x style Query object.

Examples include demonstrations of the with_loader_criteria() option as well as the SessionEvents.do_orm_execute() hook.

As of SQLAlchemy 1.4, the Query construct is unified with the Select construct, so that these two objects are mostly the same.

Listing of files:

  • temporal_range.py - Illustrates a custom per-query criteria that will be applied to selected entities.

  • filter_public.py - Illustrates a global criteria applied to entities of a particular type.

Dogpile Caching

Illustrates how to embed dogpile.cache functionality with ORM queries, allowing full cache control as well as the ability to pull “lazy loaded” attributes from long term cache.

In this demo, the following techniques are illustrated:

  • Using the SessionEvents.do_orm_execute() event hook

  • Basic technique of circumventing Session.execute() to pull from a custom cache source instead of the database.

  • Rudimental caching with dogpile.cache, using “regions” which allow global control over a fixed set of configurations.

  • Using custom UserDefinedOption objects to configure options in a statement object.

See also

Re-Executing Statements - includes a general example of the technique presented here.

E.g.:

# query for Person objects, specifying cache
stmt = select(Person).options(FromCache("default"))

# specify that each Person's "addresses" collection comes from
# cache too
stmt = stmt.options(RelationshipCache(Person.addresses, "default"))

# execute and results
result = session.execute(stmt)

print(result.scalars().all())

To run, both SQLAlchemy and dogpile.cache must be installed or on the current PYTHONPATH. The demo will create a local directory for datafiles, insert initial data, and run. Running the demo a second time will utilize the cache files already present, and exactly one SQL statement against two tables will be emitted - the displayed result however will utilize dozens of lazyloads that all pull from cache.

The demo scripts themselves, in order of complexity, are run as Python modules so that relative imports work:

python -m examples.dogpile_caching.helloworld

python -m examples.dogpile_caching.relationship_caching

python -m examples.dogpile_caching.advanced

python -m examples.dogpile_caching.local_session_caching

Listing of files:

  • environment.py - Establish data / cache file paths, and configurations, bootstrap fixture data if necessary.

  • caching_query.py - Represent functions and classes which allow the usage of Dogpile caching with SQLAlchemy. Introduces a query option called FromCache.

  • model.py - The datamodel, which represents Person that has multiple Address objects, each with PostalCode, City, Country.

  • fixture_data.py - Installs some sample data. Here we have a handful of postal codes for a few US/Canadian cities. Then, 100 Person records are installed, each with a randomly selected postal code.

  • helloworld.py - Illustrate how to load some data, and cache the results.

  • relationship_caching.py - Illustrates how to add cache options on relationship endpoints, so that lazyloads load from cache.

  • advanced.py - Illustrate usage of Query combined with the FromCache option, including front-end loading, cache invalidation and collection caching.

  • local_session_caching.py - This example creates a new dogpile.cache backend that will persist data in a dictionary which is local to the current session. remove() the session and the cache is gone.