"""Illustrates use of the ``sqlalchemy.ext.asyncio.AsyncSession`` objectfor asynchronous ORM use."""from__future__importannotationsimportasyncioimportdatetimefromtypingimportListfromtypingimportOptionalfromsqlalchemyimportForeignKeyfromsqlalchemyimportfuncfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportAsyncAttrsfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.futureimportselectfromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnfromsqlalchemy.ormimportrelationshipfromsqlalchemy.ormimportselectinloadclassBase(AsyncAttrs,DeclarativeBase):passclassA(Base):__tablename__="a"id:Mapped[int]=mapped_column(primary_key=True)data:Mapped[Optional[str]]create_date:Mapped[datetime.datetime]=mapped_column(server_default=func.now())bs:Mapped[List[B]]=relationship()classB(Base):__tablename__="b"id:Mapped[int]=mapped_column(primary_key=True)a_id:Mapped[int]=mapped_column(ForeignKey("a.id"))data:Mapped[Optional[str]]asyncdefasync_main():"""Main program function."""engine=create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test",echo=True,)asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.drop_all)asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.create_all)# expire_on_commit=False will prevent attributes from being expired# after commit.async_session=async_sessionmaker(engine,expire_on_commit=False)asyncwithasync_session()assession:asyncwithsession.begin():session.add_all([A(bs=[B(),B()],data="a1"),A(bs=[B()],data="a2"),A(bs=[B(),B()],data="a3"),])# for relationship loading, eager loading should be applied.stmt=select(A).options(selectinload(A.bs))# AsyncSession.execute() is used for 2.0 style ORM execution# (same as the synchronous API).result=awaitsession.scalars(stmt)# result is a buffered Result object.fora1inresult:print(a1)print(f"created at: {a1.create_date}")forb1ina1.bs:print(b1)# for streaming ORM results, AsyncSession.stream() may be used.result=awaitsession.stream(stmt)# result is a streaming AsyncResult object.asyncfora1inresult.scalars():print(a1)forb1ina1.bs:print(b1)result=awaitsession.scalars(select(A).order_by(A.id))a1=result.first()a1.data="new data"awaitsession.commit()# use the AsyncAttrs interface to accommodate for a lazy loadforb1inawaita1.awaitable_attrs.bs:print(b1)asyncio.run(async_main())