Source code for examples.asyncio.async_orm_writeonly
"""Illustrates using **write only relationships** for simpler handlingof ORM collections under asyncio."""from__future__importannotationsimportasyncioimportdatetimefromtypingimportOptionalfromsqlalchemyimportForeignKeyfromsqlalchemyimportfuncfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportAsyncAttrsfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.futureimportselectfromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnfromsqlalchemy.ormimportrelationshipfromsqlalchemy.ormimportWriteOnlyMappedclassBase(AsyncAttrs,DeclarativeBase):passclassA(Base):__tablename__="a"id:Mapped[int]=mapped_column(primary_key=True)data:Mapped[Optional[str]]create_date:Mapped[datetime.datetime]=mapped_column(server_default=func.now())# collection relationships are declared with WriteOnlyMapped. There# is no separate collection typebs:WriteOnlyMapped[B]=relationship()classB(Base):__tablename__="b"id:Mapped[int]=mapped_column(primary_key=True)a_id:Mapped[int]=mapped_column(ForeignKey("a.id"))data:Mapped[Optional[str]]asyncdefasync_main():"""Main program function."""engine=create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test",echo=True,)asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.drop_all)asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.create_all)async_session=async_sessionmaker(engine,expire_on_commit=False)asyncwithasync_session()assession:asyncwithsession.begin():# WriteOnlyMapped may be populated using any iterable,# e.g. lists, sets, etc.session.add_all([A(bs=[B(),B()],data="a1"),A(bs=[B()],data="a2"),A(bs=[B(),B()],data="a3"),])stmt=select(A)result=awaitsession.scalars(stmt)fora1inresult:print(a1)print(f"created at: {a1.create_date}")# to iterate a collection, emit a SELECT statementforb1inawaitsession.scalars(a1.bs.select()):print(b1)result=awaitsession.stream(stmt)asyncfora1inresult.scalars():print(a1)# similar using "streaming" (server side cursors)asyncforb1in(awaitsession.stream(a1.bs.select())).scalars():print(b1)awaitsession.commit()result=awaitsession.scalars(select(A).order_by(A.id))a1=result.first()a1.data="new data"asyncio.run(async_main())