SQLAlchemy 2.0 Documentation
SQLAlchemy ORM
- ORM Quick Start
- ORM Mapped Class Configuration
- Relationship Configuration
- Basic Relationship Patterns
- Adjacency List Relationships
- Configuring how Relationship Joins¶
- Handling Multiple Join Paths
- Specifying Alternate Join Conditions
- Creating Custom Foreign Conditions
- Using custom operators in join conditions
- Custom operators based on SQL functions
- Overlapping Foreign Keys
- Non-relational Comparisons / Materialized Path
- Self-Referential Many-to-Many Relationship
- Composite “Secondary” Joins
- Relationship to Aliased Class
- Row-Limited Relationships with Window Functions
- Building Query-Enabled Properties
- Notes on using the viewonly relationship parameter
- Working with Large Collections
- Collection Customization and API Details
- Special Relationship Persistence Patterns
- Using the legacy ‘backref’ relationship parameter
- Relationships API
- ORM Querying Guide
- Using the Session
- Events and Internals
- ORM Extensions
- ORM Examples
Project Versions
- Previous: Adjacency List Relationships
- Next: Working with Large Collections
- Up: Home
- On this page:
- Configuring how Relationship Joins
- Handling Multiple Join Paths
- Specifying Alternate Join Conditions
- Creating Custom Foreign Conditions
- Using custom operators in join conditions
- Custom operators based on SQL functions
- Overlapping Foreign Keys
- Non-relational Comparisons / Materialized Path
- Self-Referential Many-to-Many Relationship
- Composite “Secondary” Joins
- Relationship to Aliased Class
- Row-Limited Relationships with Window Functions
- Building Query-Enabled Properties
- Notes on using the viewonly relationship parameter
Configuring how Relationship Joins¶
relationship()
will normally create a join between two tables
by examining the foreign key relationship between the two tables
to determine which columns should be compared. There are a variety
of situations where this behavior needs to be customized.
Handling Multiple Join Paths¶
One of the most common situations to deal with is when there are more than one foreign key path between two tables.
Consider a Customer
class that contains two foreign keys to an Address
class:
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class Customer(Base):
__tablename__ = "customer"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))
billing_address = relationship("Address")
shipping_address = relationship("Address")
class Address(Base):
__tablename__ = "address"
id = mapped_column(Integer, primary_key=True)
street = mapped_column(String)
city = mapped_column(String)
state = mapped_column(String)
zip = mapped_column(String)
The above mapping, when we attempt to use it, will produce the error:
sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join
condition between parent/child tables on relationship
Customer.billing_address - there are multiple foreign key
paths linking the tables. Specify the 'foreign_keys' argument,
providing a list of those columns which should be
counted as containing a foreign key reference to the parent table.
The above message is pretty long. There are many potential messages
that relationship()
can return, which have been carefully tailored
to detect a variety of common configurational issues; most will suggest
the additional configuration that’s needed to resolve the ambiguity
or other missing information.
In this case, the message wants us to qualify each relationship()
by instructing for each one which foreign key column should be considered, and
the appropriate form is as follows:
class Customer(Base):
__tablename__ = "customer"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))
billing_address = relationship("Address", foreign_keys=[billing_address_id])
shipping_address = relationship("Address", foreign_keys=[shipping_address_id])
Above, we specify the foreign_keys
argument, which is a Column
or list
of Column
objects which indicate those columns to be considered “foreign”,
or in other words, the columns that contain a value referring to a parent table.
Loading the Customer.billing_address
relationship from a Customer
object will use the value present in billing_address_id
in order to
identify the row in Address
to be loaded; similarly, shipping_address_id
is used for the shipping_address
relationship. The linkage of the two
columns also plays a role during persistence; the newly generated primary key
of a just-inserted Address
object will be copied into the appropriate
foreign key column of an associated Customer
object during a flush.
When specifying foreign_keys
with Declarative, we can also use string
names to specify, however it is important that if using a list, the list
is part of the string:
billing_address = relationship("Address", foreign_keys="[Customer.billing_address_id]")
In this specific example, the list is not necessary in any case as there’s only
one Column
we need:
billing_address = relationship("Address", foreign_keys="Customer.billing_address_id")
Warning
When passed as a Python-evaluable string, the
relationship.foreign_keys
argument is interpreted using Python’s
eval()
function. DO NOT PASS UNTRUSTED INPUT TO THIS STRING. See
Evaluation of relationship arguments for details on declarative
evaluation of relationship()
arguments.
Specifying Alternate Join Conditions¶
The default behavior of relationship()
when constructing a join
is that it equates the value of primary key columns
on one side to that of foreign-key-referring columns on the other.
We can change this criterion to be anything we’d like using the
relationship.primaryjoin
argument, as well as the relationship.secondaryjoin
argument in the case when a “secondary” table is used.
In the example below, using the User
class
as well as an Address
class which stores a street address, we
create a relationship boston_addresses
which will only
load those Address
objects which specify a city of “Boston”:
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
boston_addresses = relationship(
"Address",
primaryjoin="and_(User.id==Address.user_id, Address.city=='Boston')",
)
class Address(Base):
__tablename__ = "address"
id = mapped_column(Integer, primary_key=True)
user_id = mapped_column(Integer, ForeignKey("user.id"))
street = mapped_column(String)
city = mapped_column(String)
state = mapped_column(String)
zip = mapped_column(String)
Within this string SQL expression, we made use of the and_()
conjunction
construct to establish two distinct predicates for the join condition - joining
both the User.id
and Address.user_id
columns to each other, as well as
limiting rows in Address
to just city='Boston'
. When using
Declarative, rudimentary SQL functions like and_()
are automatically
available in the evaluated namespace of a string relationship()
argument.
Warning
When passed as a Python-evaluable string, the
relationship.primaryjoin
argument is interpreted using
Python’s
eval()
function. DO NOT PASS UNTRUSTED INPUT TO THIS STRING. See
Evaluation of relationship arguments for details on declarative
evaluation of relationship()
arguments.
The custom criteria we use in a relationship.primaryjoin
is generally only significant when SQLAlchemy is rendering SQL in
order to load or represent this relationship. That is, it’s used in
the SQL statement that’s emitted in order to perform a per-attribute
lazy load, or when a join is constructed at query time, such as via
Select.join()
, or via the eager “joined” or “subquery” styles of
loading. When in-memory objects are being manipulated, we can place
any Address
object we’d like into the boston_addresses
collection, regardless of what the value of the .city
attribute
is. The objects will remain present in the collection until the
attribute is expired and re-loaded from the database where the
criterion is applied. When a flush occurs, the objects inside of
boston_addresses
will be flushed unconditionally, assigning value
of the primary key user.id
column onto the foreign-key-holding
address.user_id
column for each row. The city
criteria has no
effect here, as the flush process only cares about synchronizing
primary key values into referencing foreign key values.
Creating Custom Foreign Conditions¶
Another element of the primary join condition is how those columns
considered “foreign” are determined. Usually, some subset
of Column
objects will specify ForeignKey
, or otherwise
be part of a ForeignKeyConstraint
that’s relevant to the join condition.
relationship()
looks to this foreign key status as it decides
how it should load and persist data for this relationship. However, the
relationship.primaryjoin
argument can be used to create a join condition that
doesn’t involve any “schema” level foreign keys. We can combine relationship.primaryjoin
along with relationship.foreign_keys
and relationship.remote_side
explicitly in order to
establish such a join.
Below, a class HostEntry
joins to itself, equating the string content
column to the ip_address
column, which is a PostgreSQL type called INET
.
We need to use cast()
in order to cast one side of the join to the
type of the other:
from sqlalchemy import cast, String, Column, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import INET
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class HostEntry(Base):
__tablename__ = "host_entry"
id = mapped_column(Integer, primary_key=True)
ip_address = mapped_column(INET)
content = mapped_column(String(50))
# relationship() using explicit foreign_keys, remote_side
parent_host = relationship(
"HostEntry",
primaryjoin=ip_address == cast(content, INET),
foreign_keys=content,
remote_side=ip_address,
)
The above relationship will produce a join like:
SELECT host_entry.id, host_entry.ip_address, host_entry.content
FROM host_entry JOIN host_entry AS host_entry_1
ON host_entry_1.ip_address = CAST(host_entry.content AS INET)
An alternative syntax to the above is to use the foreign()
and
remote()
annotations,
inline within the relationship.primaryjoin
expression.
This syntax represents the annotations that relationship()
normally
applies by itself to the join condition given the relationship.foreign_keys
and
relationship.remote_side
arguments. These functions may
be more succinct when an explicit join condition is present, and additionally
serve to mark exactly the column that is “foreign” or “remote” independent
of whether that column is stated multiple times or within complex
SQL expressions:
from sqlalchemy.orm import foreign, remote
class HostEntry(Base):
__tablename__ = "host_entry"
id = mapped_column(Integer, primary_key=True)
ip_address = mapped_column(INET)
content = mapped_column(String(50))
# relationship() using explicit foreign() and remote() annotations
# in lieu of separate arguments
parent_host = relationship(
"HostEntry",
primaryjoin=remote(ip_address) == cast(foreign(content), INET),
)
Using custom operators in join conditions¶
Another use case for relationships is the use of custom operators, such
as PostgreSQL’s “is contained within” <<
operator when joining with
types such as INET
and CIDR
.
For custom boolean operators we use the Operators.bool_op()
function:
inet_column.bool_op("<<")(cidr_column)
A comparison like the above may be used directly with
relationship.primaryjoin
when constructing
a relationship()
:
class IPA(Base):
__tablename__ = "ip_address"
id = mapped_column(Integer, primary_key=True)
v4address = mapped_column(INET)
network = relationship(
"Network",
primaryjoin="IPA.v4address.bool_op('<<')(foreign(Network.v4representation))",
viewonly=True,
)
class Network(Base):
__tablename__ = "network"
id = mapped_column(Integer, primary_key=True)
v4representation = mapped_column(CIDR)
Above, a query such as:
select(IPA).join(IPA.network)
Will render as:
SELECT ip_address.id AS ip_address_id, ip_address.v4address AS ip_address_v4address
FROM ip_address JOIN network ON ip_address.v4address << network.v4representation
Custom operators based on SQL functions¶
A variant to the use case for Operators.op.is_comparison
is
when we aren’t using an operator, but a SQL function. The typical example
of this use case is the PostgreSQL PostGIS functions however any SQL
function on any database that resolves to a binary condition may apply.
To suit this use case, the FunctionElement.as_comparison()
method
can modify any SQL function, such as those invoked from the func
namespace, to indicate to the ORM that the function produces a comparison of
two expressions. The below example illustrates this with the
Geoalchemy2 library:
from geoalchemy2 import Geometry
from sqlalchemy import Column, Integer, func
from sqlalchemy.orm import relationship, foreign
class Polygon(Base):
__tablename__ = "polygon"
id = mapped_column(Integer, primary_key=True)
geom = mapped_column(Geometry("POLYGON", srid=4326))
points = relationship(
"Point",
primaryjoin="func.ST_Contains(foreign(Polygon.geom), Point.geom).as_comparison(1, 2)",
viewonly=True,
)
class Point(Base):
__tablename__ = "point"
id = mapped_column(Integer, primary_key=True)
geom = mapped_column(Geometry("POINT", srid=4326))
Above, the FunctionElement.as_comparison()
indicates that the
func.ST_Contains()
SQL function is comparing the Polygon.geom
and
Point.geom
expressions. The foreign()
annotation additionally notes
which column takes on the “foreign key” role in this particular relationship.
New in version 1.3: Added FunctionElement.as_comparison()
.
Overlapping Foreign Keys¶
A rare scenario can arise when composite foreign keys are used, such that a single column may be the subject of more than one column referred to via foreign key constraint.
Consider an (admittedly complex) mapping such as the Magazine
object,
referred to both by the Writer
object and the Article
object
using a composite primary key scheme that includes magazine_id
for both; then to make Article
refer to Writer
as well,
Article.magazine_id
is involved in two separate relationships;
Article.magazine
and Article.writer
:
class Magazine(Base):
__tablename__ = "magazine"
id = mapped_column(Integer, primary_key=True)
class Article(Base):
__tablename__ = "article"
article_id = mapped_column(Integer)
magazine_id = mapped_column(ForeignKey("magazine.id"))
writer_id = mapped_column()
magazine = relationship("Magazine")
writer = relationship("Writer")
__table_args__ = (
PrimaryKeyConstraint("article_id", "magazine_id"),
ForeignKeyConstraint(
["writer_id", "magazine_id"], ["writer.id", "writer.magazine_id"]
),
)
class Writer(Base):
__tablename__ = "writer"
id = mapped_column(Integer, primary_key=True)
magazine_id = mapped_column(ForeignKey("magazine.id"), primary_key=True)
magazine = relationship("Magazine")
When the above mapping is configured, we will see this warning emitted:
SAWarning: relationship 'Article.writer' will copy column
writer.magazine_id to column article.magazine_id,
which conflicts with relationship(s): 'Article.magazine'
(copies magazine.id to article.magazine_id). Consider applying
viewonly=True to read-only relationships, or provide a primaryjoin
condition marking writable columns with the foreign() annotation.
What this refers to originates from the fact that Article.magazine_id
is
the subject of two different foreign key constraints; it refers to
Magazine.id
directly as a source column, but also refers to
Writer.magazine_id
as a source column in the context of the
composite key to Writer
. If we associate an Article
with a
particular Magazine
, but then associate the Article
with a
Writer
that’s associated with a different Magazine
, the ORM
will overwrite Article.magazine_id
non-deterministically, silently
changing which magazine to which we refer; it may
also attempt to place NULL into this column if we de-associate a
Writer
from an Article
. The warning lets us know this is the case.
To solve this, we need to break out the behavior of Article
to include
all three of the following features:
Article
first and foremost writes toArticle.magazine_id
based on data persisted in theArticle.magazine
relationship only, that is a value copied fromMagazine.id
.Article
can write toArticle.writer_id
on behalf of data persisted in theArticle.writer
relationship, but only theWriter.id
column; theWriter.magazine_id
column should not be written intoArticle.magazine_id
as it ultimately is sourced fromMagazine.id
.Article
takesArticle.magazine_id
into account when loadingArticle.writer
, even though it doesn’t write to it on behalf of this relationship.
To get just #1 and #2, we could specify only Article.writer_id
as the
“foreign keys” for Article.writer
:
class Article(Base):
# ...
writer = relationship("Writer", foreign_keys="Article.writer_id")
However, this has the effect of Article.writer
not taking
Article.magazine_id
into account when querying against Writer
:
SELECT article.article_id AS article_article_id,
article.magazine_id AS article_magazine_id,
article.writer_id AS article_writer_id
FROM article
JOIN writer ON writer.id = article.writer_id
Therefore, to get at all of #1, #2, and #3, we express the join condition
as well as which columns to be written by combining
relationship.primaryjoin
fully, along with either the
relationship.foreign_keys
argument, or more succinctly by
annotating with foreign()
:
class Article(Base):
# ...
writer = relationship(
"Writer",
primaryjoin="and_(Writer.id == foreign(Article.writer_id), "
"Writer.magazine_id == Article.magazine_id)",
)
Non-relational Comparisons / Materialized Path¶
Warning
this section details an experimental feature.
Using custom expressions means we can produce unorthodox join conditions that don’t obey the usual primary/foreign key model. One such example is the materialized path pattern, where we compare strings for overlapping path tokens in order to produce a tree structure.
Through careful use of foreign()
and remote()
, we can build
a relationship that effectively produces a rudimentary materialized path
system. Essentially, when foreign()
and remote()
are
on the same side of the comparison expression, the relationship is considered
to be “one to many”; when they are on different sides, the relationship
is considered to be “many to one”. For the comparison we’ll use here,
we’ll be dealing with collections so we keep things configured as “one to many”:
class Element(Base):
__tablename__ = "element"
path = mapped_column(String, primary_key=True)
descendants = relationship(
"Element",
primaryjoin=remote(foreign(path)).like(path.concat("/%")),
viewonly=True,
order_by=path,
)
Above, if given an Element
object with a path attribute of "/foo/bar2"
,
we seek for a load of Element.descendants
to look like:
SELECT element.path AS element_path
FROM element
WHERE element.path LIKE ('/foo/bar2' || '/%') ORDER BY element.path
Self-Referential Many-to-Many Relationship¶
See also
This section documents a two-table variant of the “adjacency list” pattern, which is documented at Adjacency List Relationships. Be sure to review the self-referential querying patterns in subsections Self-Referential Query Strategies and Configuring Self-Referential Eager Loading which apply equally well to the mapping pattern discussed here.
Many to many relationships can be customized by one or both of relationship.primaryjoin
and relationship.secondaryjoin
- the latter is significant for a relationship that
specifies a many-to-many reference using the relationship.secondary
argument.
A common situation which involves the usage of relationship.primaryjoin
and relationship.secondaryjoin
is when establishing a many-to-many relationship from a class to itself, as shown below:
from typing import List
from sqlalchemy import Integer, ForeignKey, Column, Table
from sqlalchemy.orm import DeclarativeBase, Mapped
from sqlalchemy.orm import mapped_column, relationship
class Base(DeclarativeBase):
pass
node_to_node = Table(
"node_to_node",
Base.metadata,
Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)
class Node(Base):
__tablename__ = "node"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str]
right_nodes: Mapped[List["Node"]] = relationship(
"Node",
secondary=node_to_node,
primaryjoin=id == node_to_node.c.left_node_id,
secondaryjoin=id == node_to_node.c.right_node_id,
back_populates="left_nodes",
)
left_nodes: Mapped[List["Node"]] = relationship(
"Node",
secondary=node_to_node,
primaryjoin=id == node_to_node.c.right_node_id,
secondaryjoin=id == node_to_node.c.left_node_id,
back_populates="right_nodes",
)
Where above, SQLAlchemy can’t know automatically which columns should connect
to which for the right_nodes
and left_nodes
relationships. The relationship.primaryjoin
and relationship.secondaryjoin
arguments establish how we’d like to join to the association table.
In the Declarative form above, as we are declaring these conditions within the Python
block that corresponds to the Node
class, the id
variable is available directly
as the Column
object we wish to join with.
Alternatively, we can define the relationship.primaryjoin
and relationship.secondaryjoin
arguments using strings, which is suitable
in the case that our configuration does not have either the Node.id
column
object available yet or the node_to_node
table perhaps isn’t yet available.
When referring to a plain Table
object in a declarative string, we
use the string name of the table as it is present in the MetaData
:
class Node(Base):
__tablename__ = "node"
id = mapped_column(Integer, primary_key=True)
label = mapped_column(String)
right_nodes = relationship(
"Node",
secondary="node_to_node",
primaryjoin="Node.id==node_to_node.c.left_node_id",
secondaryjoin="Node.id==node_to_node.c.right_node_id",
backref="left_nodes",
)
Warning
When passed as a Python-evaluable string, the
relationship.primaryjoin
and
relationship.secondaryjoin
arguments are interpreted using
Python’s eval()
function. DO NOT PASS UNTRUSTED INPUT TO THESE
STRINGS. See Evaluation of relationship arguments for details on
declarative evaluation of relationship()
arguments.
A classical mapping situation here is similar, where node_to_node
can be joined
to node.c.id
:
from sqlalchemy import Integer, ForeignKey, String, Column, Table, MetaData
from sqlalchemy.orm import relationship, registry
metadata_obj = MetaData()
mapper_registry = registry()
node_to_node = Table(
"node_to_node",
metadata_obj,
Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)
node = Table(
"node",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("label", String),
)
class Node:
pass
mapper_registry.map_imperatively(
Node,
node,
properties={
"right_nodes": relationship(
Node,
secondary=node_to_node,
primaryjoin=node.c.id == node_to_node.c.left_node_id,
secondaryjoin=node.c.id == node_to_node.c.right_node_id,
backref="left_nodes",
)
},
)
Note that in both examples, the relationship.backref
keyword specifies a left_nodes
backref - when
relationship()
creates the second relationship in the reverse
direction, it’s smart enough to reverse the
relationship.primaryjoin
and
relationship.secondaryjoin
arguments.
See also
Adjacency List Relationships - single table version
Self-Referential Query Strategies - tips on querying with self-referential mappings
Configuring Self-Referential Eager Loading - tips on eager loading with self- referential mapping
Composite “Secondary” Joins¶
Note
This section features far edge cases that are somewhat supported by SQLAlchemy, however it is recommended to solve problems like these in simpler ways whenever possible, by using reasonable relational layouts and / or in-Python attributes.
Sometimes, when one seeks to build a relationship()
between two tables
there is a need for more than just two or three tables to be involved in
order to join them. This is an area of relationship()
where one seeks
to push the boundaries of what’s possible, and often the ultimate solution to
many of these exotic use cases needs to be hammered out on the SQLAlchemy mailing
list.
In more recent versions of SQLAlchemy, the relationship.secondary
parameter can be used in some of these cases in order to provide a composite
target consisting of multiple tables. Below is an example of such a
join condition (requires version 0.9.2 at least to function as is):
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
d = relationship(
"D",
secondary="join(B, D, B.d_id == D.id).join(C, C.d_id == D.id)",
primaryjoin="and_(A.b_id == B.id, A.id == C.a_id)",
secondaryjoin="D.id == B.d_id",
uselist=False,
viewonly=True,
)
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
d_id = mapped_column(ForeignKey("d.id"))
class C(Base):
__tablename__ = "c"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
d_id = mapped_column(ForeignKey("d.id"))
class D(Base):
__tablename__ = "d"
id = mapped_column(Integer, primary_key=True)
In the above example, we provide all three of relationship.secondary
,
relationship.primaryjoin
, and relationship.secondaryjoin
,
in the declarative style referring to the named tables a
, b
, c
, d
directly. A query from A
to D
looks like:
sess.scalars(select(A).join(A.d)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (
b AS b_1 JOIN d AS d_1 ON b_1.d_id = d_1.id
JOIN c AS c_1 ON c_1.d_id = d_1.id)
ON a.b_id = b_1.id AND a.id = c_1.a_id JOIN d ON d.id = b_1.d_id
In the above example, we take advantage of being able to stuff multiple
tables into a “secondary” container, so that we can join across many
tables while still keeping things “simple” for relationship()
, in that
there’s just “one” table on both the “left” and the “right” side; the
complexity is kept within the middle.
Warning
A relationship like the above is typically marked as
viewonly=True
, using relationship.viewonly
,
and should be considered as read-only. While there are
sometimes ways to make relationships like the above writable, this is
generally complicated and error prone.
Relationship to Aliased Class¶
In the previous section, we illustrated a technique where we used
relationship.secondary
in order to place additional
tables within a join condition. There is one complex join case where
even this technique is not sufficient; when we seek to join from A
to B
, making use of any number of C
, D
, etc. in between,
however there are also join conditions between A
and B
directly. In this case, the join from A
to B
may be
difficult to express with just a complex
relationship.primaryjoin
condition, as the intermediary
tables may need special handling, and it is also not expressible with
a relationship.secondary
object, since the
A->secondary->B
pattern does not support any references between
A
and B
directly. When this extremely advanced case
arises, we can resort to creating a second mapping as a target for the
relationship. This is where we use AliasedClass
in order to make a
mapping to a class that includes all the additional tables we need for
this join. In order to produce this mapper as an “alternative” mapping
for our class, we use the aliased()
function to produce the new
construct, then use relationship()
against the object as though it
were a plain mapped class.
Below illustrates a relationship()
with a simple join from A
to
B
, however the primaryjoin condition is augmented with two additional
entities C
and D
, which also must have rows that line up with
the rows in both A
and B
simultaneously:
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
class C(Base):
__tablename__ = "c"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
some_c_value = mapped_column(String)
class D(Base):
__tablename__ = "d"
id = mapped_column(Integer, primary_key=True)
c_id = mapped_column(ForeignKey("c.id"))
b_id = mapped_column(ForeignKey("b.id"))
some_d_value = mapped_column(String)
# 1. set up the join() as a variable, so we can refer
# to it in the mapping multiple times.
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
# 2. Create an AliasedClass to B
B_viacd = aliased(B, j, flat=True)
A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)
With the above mapping, a simple join looks like:
sess.scalars(select(A).join(A.b)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) ON a.b_id = b.id
Integrating AliasedClass Mappings with Typing and Avoiding Early Mapper Configuration¶
The creation of the aliased()
construct against a mapped class
forces the configure_mappers()
step to proceed, which will resolve
all current classes and their relationships. This may be problematic if
unrelated mapped classes needed by the current mappings have not yet been
declared, or if the configuration of the relationship itself needs access
to as-yet undeclared classes. Additionally, SQLAlchemy’s Declarative pattern
works with Python typing most effectively when relationships are declared
up front.
To organize the construction of the relationship to work with these issues, a
configure level event hook like MapperEvents.before_mapper_configured()
may be used, which will invoke the configuration code only when all mappings
are ready for configuration:
from sqlalchemy import event
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
# do the above configuration in a configuration hook
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
B_viacd = aliased(B, j, flat=True)
A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)
Above, the function _configure_ab_relationship()
will be invoked only
when a fully configured version of A
is requested, at which point the
classes B
, D
and C
would be available.
For an approach that integrates with inline typing, a similar technique can be used to effectively generate a “singleton” creation pattern for the aliased class where it is late-initialized as a global variable, which can then be used in the relationship inline:
from typing import Any
B_viacd: Any = None
b_viacd_join: Any = None
class A(Base):
__tablename__ = "a"
id: Mapped[int] = mapped_column(primary_key=True)
b_id: Mapped[int] = mapped_column(ForeignKey("b.id"))
# 1. the relationship can be declared using lambdas, allowing it to resolve
# to targets that are late-configured
b: Mapped[B] = relationship(
lambda: B_viacd, primaryjoin=lambda: A.b_id == b_viacd_join.c.b_id
)
# 2. configure the targets of the relationship using a before_mapper_configured
# hook.
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
# 3. set up the join() and AliasedClass as globals from within
# the configuration hook.
global B_viacd, b_viacd_join
b_viacd_join = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
B_viacd = aliased(B, b_viacd_join, flat=True)
Using the AliasedClass target in Queries¶
In the previous example, the A.b
relationship refers to the B_viacd
entity as the target, and not the B
class directly. To add additional
criteria involving the A.b
relationship, it’s typically necessary to
reference the B_viacd
directly rather than using B
, especially in a
case where the target entity of A.b
is to be transformed into an alias or a
subquery. Below illustrates the same relationship using a subquery, rather than
a join:
subq = select(B).join(D, D.b_id == B.id).join(C, C.id == D.c_id).subquery()
B_viacd_subquery = aliased(B, subq)
A.b = relationship(B_viacd_subquery, primaryjoin=A.b_id == subq.c.id)
A query using the above A.b
relationship will render a subquery:
sess.scalars(select(A).join(A.b)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column
FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id
If we want to add additional criteria based on the A.b
join, we must do
so in terms of B_viacd_subquery
rather than B
directly:
sess.scalars(
select(A)
.join(A.b)
.where(B_viacd_subquery.some_b_column == "some b")
.order_by(B_viacd_subquery.id)
).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column
FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id
WHERE anon_1.some_b_column = ? ORDER BY anon_1.id
Row-Limited Relationships with Window Functions¶
Another interesting use case for relationships to AliasedClass
objects are situations where
the relationship needs to join to a specialized SELECT of any form. One
scenario is when the use of a window function is desired, such as to limit
how many rows should be returned for a relationship. The example below
illustrates a non-primary mapper relationship that will load the first
ten items for each collection:
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
partition = select(
B, func.row_number().over(order_by=B.id, partition_by=B.a_id).label("index")
).alias()
partitioned_b = aliased(B, partition)
A.partitioned_bs = relationship(
partitioned_b, primaryjoin=and_(partitioned_b.a_id == A.id, partition.c.index < 10)
)
We can use the above partitioned_bs
relationship with most of the loader
strategies, such as selectinload()
:
for a1 in session.scalars(select(A).options(selectinload(A.partitioned_bs))):
print(a1.partitioned_bs) # <-- will be no more than ten objects
Where above, the “selectinload” query looks like:
SELECT
a_1.id AS a_1_id, anon_1.id AS anon_1_id, anon_1.a_id AS anon_1_a_id,
anon_1.data AS anon_1_data, anon_1.index AS anon_1_index
FROM a AS a_1
JOIN (
SELECT b.id AS id, b.a_id AS a_id, b.data AS data,
row_number() OVER (PARTITION BY b.a_id ORDER BY b.id) AS index
FROM b) AS anon_1
ON anon_1.a_id = a_1.id AND anon_1.index < %(index_1)s
WHERE a_1.id IN ( ... primary key collection ...)
ORDER BY a_1.id
Above, for each matching primary key in “a”, we will get the first ten “bs” as ordered by “b.id”. By partitioning on “a_id” we ensure that each “row number” is local to the parent “a_id”.
Such a mapping would ordinarily also include a “plain” relationship from “A” to “B”, for persistence operations as well as when the full set of “B” objects per “A” is desired.
Building Query-Enabled Properties¶
Very ambitious custom join conditions may fail to be directly persistable, and
in some cases may not even load correctly. To remove the persistence part of
the equation, use the flag relationship.viewonly
on the
relationship()
, which establishes it as a read-only
attribute (data written to the collection will be ignored on flush()).
However, in extreme cases, consider using a regular Python property in
conjunction with Query
as follows:
class User(Base):
__tablename__ = "user"
id = mapped_column(Integer, primary_key=True)
@property
def addresses(self):
return object_session(self).query(Address).with_parent(self).filter(...).all()
In other cases, the descriptor can be built to make use of existing in-Python data. See the section on Using Descriptors and Hybrids for more general discussion of special Python attributes.
See also
Notes on using the viewonly relationship parameter¶
The relationship.viewonly
parameter when applied to a
relationship()
construct indicates that this relationship()
will not take part in any ORM unit of work operations, and additionally
that the attribute does not expect to participate within in-Python mutations
of its represented collection. This means
that while the viewonly relationship may refer to a mutable Python collection
like a list or set, making changes to that list or set as present on a
mapped instance will have no effect on the ORM flush process.
To explore this scenario consider this mapping:
from __future__ import annotations
import datetime
from sqlalchemy import and_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str | None]
all_tasks: Mapped[list[Task]] = relationship()
current_week_tasks: Mapped[list[Task]] = relationship(
primaryjoin=lambda: and_(
User.id == Task.user_account_id,
# this expression works on PostgreSQL but may not be supported
# by other database engines
Task.task_date >= func.now() - datetime.timedelta(days=7),
),
viewonly=True,
)
class Task(Base):
__tablename__ = "task"
id: Mapped[int] = mapped_column(primary_key=True)
user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
description: Mapped[str | None]
task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
user: Mapped[User] = relationship(back_populates="current_week_tasks")
The following sections will note different aspects of this configuration.
In-Python mutations including backrefs are not appropriate with viewonly=True¶
The above mapping targets the User.current_week_tasks
viewonly relationship
as the backref target of the Task.user
attribute. This is not
currently flagged by SQLAlchemy’s ORM configuration process, however is a
configuration error. Changing the .user
attribute on a Task
will not
affect the .current_week_tasks
attribute:
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]
There is another parameter called relationship.sync_backrefs
which can be turned on here to allow .current_week_tasks
to be mutated in this
case, however this is not considered to be a best practice with a viewonly
relationship, which instead should not be relied upon for in-Python mutations.
In this mapping, backrefs can be configured between User.all_tasks
and
Task.user
, as these are both not viewonly and will synchronize normally.
Beyond the issue of backref mutations being disabled for viewonly relationships,
plain changes to the User.all_tasks
collection in Python
are also not reflected in the User.current_week_tasks
collection until
changes have been flushed to the database.
Overall, for a use case where a custom collection should respond immediately to
in-Python mutations, the viewonly relationship is generally not appropriate. A
better approach is to use the Hybrid Attributes feature of SQLAlchemy, or
for instance-only cases to use a Python @property
, where a user-defined
collection that is generated in terms of the current Python instance can be
implemented. To change our example to work this way, we repair the
relationship.back_populates
parameter on Task.user
to
reference User.all_tasks
, and
then illustrate a simple @property
that will deliver results in terms of
the immediate User.all_tasks
collection:
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str | None]
all_tasks: Mapped[list[Task]] = relationship(back_populates="user")
@property
def current_week_tasks(self) -> list[Task]:
past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
return [t for t in self.all_tasks if t.task_date >= past_seven_days]
class Task(Base):
__tablename__ = "task"
id: Mapped[int] = mapped_column(primary_key=True)
user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
description: Mapped[str | None]
task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
user: Mapped[User] = relationship(back_populates="all_tasks")
Using an in-Python collection calculated on the fly each time, we are guaranteed to have the correct answer at all times, without the need to use a database at all:
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]
viewonly=True collections / attributes do not get re-queried until expired¶
Continuing with the original viewonly attribute, if we do in fact make changes
to the User.all_tasks
collection on a persistent object, the
viewonly collection can only show the net result of this change after two
things occur. The first is that the change to User.all_tasks
is
flushed, so that the new data is available in the database, at least
within the scope of the local transaction. The second is that the User.current_week_tasks
attribute is expired and reloaded via a new SQL query to the database.
To support this requirement, the simplest flow to use is one where the
viewonly relationship is consumed only in operations that are primarily read
only to start with. Such as below, if we retrieve a User
fresh from
the database, the collection will be current:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]
When we make modifications to u1.all_tasks
, if we want to see these changes
reflected in the u1.current_week_tasks
viewonly relationship, these changes need to be flushed
and the u1.current_week_tasks
attribute needs to be expired, so that
it will lazy load on next access. The simplest approach to this is
to use Session.commit()
, keeping the Session.expire_on_commit
parameter set at its default of True
:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.commit()
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]
Above, the call to Session.commit()
flushed the changes to u1.all_tasks
to the database, then expired all objects, so that when we accessed u1.current_week_tasks
,
a :term:` lazy load` occurred which fetched the contents for this attribute
freshly from the database.
To intercept operations without actually committing the transaction,
the attribute needs to be explicitly expired
first. A simplistic way to do this is to just call it directly. In
the example below, Session.flush()
sends pending changes to the
database, then Session.expire()
is used to expire the u1.current_week_tasks
collection so that it re-fetches on next access:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.flush()
... sess.expire(u1, ["current_week_tasks"])
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]
We can in fact skip the call to Session.flush()
, assuming a
Session
that keeps Session.autoflush
at its
default value of True
, as the expired current_week_tasks
attribute will
trigger autoflush when accessed after expiration:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.expire(u1, ["current_week_tasks"])
... print(u1.current_week_tasks) # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]
Continuing with the above approach to something more elaborate, we can apply
the expiration programmatically when the related User.all_tasks
collection
changes, using event hooks. This an advanced
technique, where simpler architectures like @property
or sticking to
read-only use cases should be examined first. In our simple example, this
would be configured as:
from sqlalchemy import event, inspect
@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
inspect(target).session.expire(target, ["current_week_tasks"])
With the above hooks, mutation operations are intercepted and result in
the User.current_week_tasks
collection to be expired automatically:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]
The AttributeEvents
event hooks used above are also triggered
by backref mutations, so with the above hooks a change to Task.user
is
also intercepted:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... t1 = Task(task_date=datetime.datetime.now())
... t1.user = u1
... sess.add(t1)
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]
flambé! the dragon and The Alchemist image designs created and generously donated by Rotem Yaari.
Created using Sphinx 7.2.6. Documentation last generated: Thu 12 Dec 2024 05:39:06 PM EST