Each ``User`` can have any number of ``Keyword`` objects, and vice-versa
(the many-to-many pattern is described at :ref:`relationships_many_to_many`)::
- from sqlalchemy import Column, Integer, String, ForeignKey, Table
+ from sqlalchemy import Column, ForeignKey, Integer, String, Table
from sqlalchemy.orm import declarative_base, relationship
Base = declarative_base()
class User(Base):
- __tablename__ = 'user'
+ __tablename__ = "user"
id = Column(Integer, primary_key=True)
name = Column(String(64))
kw = relationship("Keyword", secondary=lambda: userkeywords_table)
def __init__(self, name):
self.name = name
+
class Keyword(Base):
- __tablename__ = 'keyword'
+ __tablename__ = "keyword"
id = Column(Integer, primary_key=True)
- keyword = Column('keyword', String(64))
+ keyword = Column("keyword", String(64))
def __init__(self, keyword):
self.keyword = keyword
- userkeywords_table = Table('userkeywords', Base.metadata,
- Column('user_id', Integer, ForeignKey("user.id"),
- primary_key=True),
- Column('keyword_id', Integer, ForeignKey("keyword.id"),
- primary_key=True)
+
+ userkeywords_table = Table(
+ "userkeywords",
+ Base.metadata,
+ Column("user_id", Integer, ForeignKey("user.id"), primary_key=True),
+ Column("keyword_id", Integer, ForeignKey("keyword.id"), primary_key=True),
)
Reading and manipulating the collection of "keyword" strings associated
with ``User`` requires traversal from each collection element to the ``.keyword``
attribute, which can be awkward::
- >>> user = User('jek')
- >>> user.kw.append(Keyword('cheese-inspector'))
+ >>> user = User("jek")
+ >>> user.kw.append(Keyword("cheese-inspector"))
>>> print(user.kw)
[<__main__.Keyword object at 0x12bf830>]
>>> print(user.kw[0].keyword)
from sqlalchemy.ext.associationproxy import association_proxy
+
class User(Base):
- __tablename__ = 'user'
+ __tablename__ = "user"
id = Column(Integer, primary_key=True)
name = Column(String(64))
kw = relationship("Keyword", secondary=lambda: userkeywords_table)
self.name = name
# proxy the 'keyword' attribute from the 'kw' relationship
- keywords = association_proxy('kw', 'keyword')
+ keywords = association_proxy("kw", "keyword")
We can now reference the ``.keywords`` collection as a listing of strings,
which is both readable and writable. New ``Keyword`` objects are created
for us transparently::
- >>> user = User('jek')
- >>> user.keywords.append('cheese-inspector')
+ >>> user = User("jek")
+ >>> user.keywords.append("cheese-inspector")
>>> user.keywords
['cheese-inspector']
- >>> user.keywords.append('snack ninja')
+ >>> user.keywords.append("snack ninja")
>>> user.kw
[<__main__.Keyword object at 0x12cdd30>, <__main__.Keyword object at 0x12cde30>]
new instance of the "intermediary" object using its constructor, passing as a
single argument the given value. In our example above, an operation like::
- user.keywords.append('cheese-inspector')
+ user.keywords.append("cheese-inspector")
Is translated by the association proxy into the operation::
- user.kw.append(Keyword('cheese-inspector'))
+ user.kw.append(Keyword("cheese-inspector"))
The example works here because we have designed the constructor for ``Keyword``
to accept a single positional argument, ``keyword``. For those cases where a
# ...
# use Keyword(keyword=kw) on append() events
- keywords = association_proxy('kw', 'keyword',
- creator=lambda kw: Keyword(keyword=kw))
+ keywords = association_proxy(
+ "kw", "keyword", creator=lambda kw: Keyword(keyword=kw)
+ )
The ``creator`` function accepts a single argument in the case of a list-
or set- based collection, or a scalar attribute. In the case of a dictionary-based
collection of ``User`` to the ``.keyword`` attribute present on each
``UserKeyword``::
- from sqlalchemy import Column, Integer, String, ForeignKey
+ from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import backref, declarative_base, relationship
Base = declarative_base()
+
class User(Base):
- __tablename__ = 'user'
+ __tablename__ = "user"
id = Column(Integer, primary_key=True)
name = Column(String(64))
# association proxy of "user_keywords" collection
# to "keyword" attribute
- keywords = association_proxy('user_keywords', 'keyword')
+ keywords = association_proxy("user_keywords", "keyword")
def __init__(self, name):
self.name = name
+
class UserKeyword(Base):
- __tablename__ = 'user_keyword'
- user_id = Column(Integer, ForeignKey('user.id'), primary_key=True)
- keyword_id = Column(Integer, ForeignKey('keyword.id'), primary_key=True)
+ __tablename__ = "user_keyword"
+ user_id = Column(Integer, ForeignKey("user.id"), primary_key=True)
+ keyword_id = Column(Integer, ForeignKey("keyword.id"), primary_key=True)
special_key = Column(String(50))
# bidirectional attribute/collection of "user"/"user_keywords"
- user = relationship(User,
- backref=backref("user_keywords",
- cascade="all, delete-orphan")
- )
+ user = relationship(
+ User, backref=backref("user_keywords", cascade="all, delete-orphan")
+ )
# reference to the "Keyword" object
keyword = relationship("Keyword")
self.keyword = keyword
self.special_key = special_key
+
class Keyword(Base):
- __tablename__ = 'keyword'
+ __tablename__ = "keyword"
id = Column(Integer, primary_key=True)
- keyword = Column('keyword', String(64))
+ keyword = Column("keyword", String(64))
def __init__(self, keyword):
self.keyword = keyword
def __repr__(self):
- return 'Keyword(%s)' % repr(self.keyword)
+ return "Keyword(%s)" % repr(self.keyword)
With the above configuration, we can operate upon the ``.keywords`` collection
of each ``User`` object, each of which exposes a collection of ``Keyword``
objects that are obtained from the underyling ``UserKeyword`` elements::
- >>> user = User('log')
- >>> for kw in (Keyword('new_from_blammo'), Keyword('its_big')):
+ >>> user = User("log")
+ >>> for kw in (Keyword("new_from_blammo"), Keyword("its_big")):
... user.keywords.append(kw)
...
>>> print(user.keywords)
a collection of strings, rather than a collection of composed objects.
In this case, each ``.keywords.append()`` operation is equivalent to::
- >>> user.user_keywords.append(UserKeyword(Keyword('its_heavy')))
+ >>> user.user_keywords.append(UserKeyword(Keyword("its_heavy")))
The ``UserKeyword`` association object has two attributes that are both
populated within the scope of the ``append()`` operation of the association
construction, has the effect of appending the new ``UserKeyword`` to
the ``User.user_keywords`` collection (via the relationship)::
- >>> UserKeyword(Keyword('its_wood'), user, special_key='my special key')
+ >>> UserKeyword(Keyword("its_wood"), user, special_key="my special key")
The association proxy returns to us a collection of ``Keyword`` objects represented
by all these operations::
argument to the ``User.keywords`` proxy so that these values are assigned appropriately
when new elements are added to the dictionary::
- from sqlalchemy import Column, Integer, String, ForeignKey
+ from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import backref, declarative_base, relationship
from sqlalchemy.orm.collections import attribute_mapped_collection
Base = declarative_base()
+
class User(Base):
- __tablename__ = 'user'
+ __tablename__ = "user"
id = Column(Integer, primary_key=True)
name = Column(String(64))
# proxy to 'user_keywords', instantiating UserKeyword
# assigning the new key to 'special_key', values to
# 'keyword'.
- keywords = association_proxy('user_keywords', 'keyword',
- creator=lambda k, v:
- UserKeyword(special_key=k, keyword=v)
- )
+ keywords = association_proxy(
+ "user_keywords",
+ "keyword",
+ creator=lambda k, v: UserKeyword(special_key=k, keyword=v),
+ )
def __init__(self, name):
self.name = name
+
class UserKeyword(Base):
- __tablename__ = 'user_keyword'
- user_id = Column(Integer, ForeignKey('user.id'), primary_key=True)
- keyword_id = Column(Integer, ForeignKey('keyword.id'), primary_key=True)
+ __tablename__ = "user_keyword"
+ user_id = Column(Integer, ForeignKey("user.id"), primary_key=True)
+ keyword_id = Column(Integer, ForeignKey("keyword.id"), primary_key=True)
special_key = Column(String)
# bidirectional user/user_keywords relationships, mapping
# user_keywords with a dictionary against "special_key" as key.
- user = relationship(User, backref=backref(
- "user_keywords",
- collection_class=attribute_mapped_collection("special_key"),
- cascade="all, delete-orphan"
- )
- )
+ user = relationship(
+ User,
+ backref=backref(
+ "user_keywords",
+ collection_class=attribute_mapped_collection("special_key"),
+ cascade="all, delete-orphan",
+ ),
+ )
keyword = relationship("Keyword")
+
class Keyword(Base):
- __tablename__ = 'keyword'
+ __tablename__ = "keyword"
id = Column(Integer, primary_key=True)
- keyword = Column('keyword', String(64))
+ keyword = Column("keyword", String(64))
def __init__(self, keyword):
self.keyword = keyword
def __repr__(self):
- return 'Keyword(%s)' % repr(self.keyword)
+ return "Keyword(%s)" % repr(self.keyword)
We illustrate the ``.keywords`` collection as a dictionary, mapping the
``UserKeyword.special_key`` value to ``Keyword`` objects::
- >>> user = User('log')
+ >>> user = User("log")
- >>> user.keywords['sk1'] = Keyword('kw1')
- >>> user.keywords['sk2'] = Keyword('kw2')
+ >>> user.keywords["sk1"] = Keyword("kw1")
+ >>> user.keywords["sk2"] = Keyword("kw2")
>>> print(user.keywords)
{'sk1': Keyword('kw1'), 'sk2': Keyword('kw2')}
an association proxy on ``User`` that refers to an association proxy
present on ``UserKeyword``::
- from sqlalchemy import Column, Integer, String, ForeignKey
+ from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import backref, declarative_base, relationship
from sqlalchemy.orm.collections import attribute_mapped_collection
Base = declarative_base()
+
class User(Base):
- __tablename__ = 'user'
+ __tablename__ = "user"
id = Column(Integer, primary_key=True)
name = Column(String(64))
# the same 'user_keywords'->'keyword' proxy as in
# the basic dictionary example.
keywords = association_proxy(
- 'user_keywords',
- 'keyword',
- creator=lambda k, v: UserKeyword(special_key=k, keyword=v)
+ "user_keywords",
+ "keyword",
+ creator=lambda k, v: UserKeyword(special_key=k, keyword=v),
)
# another proxy that is directly column-targeted
def __init__(self, name):
self.name = name
+
class UserKeyword(Base):
- __tablename__ = 'user_keyword'
- user_id = Column(ForeignKey('user.id'), primary_key=True)
- keyword_id = Column(ForeignKey('keyword.id'), primary_key=True)
+ __tablename__ = "user_keyword"
+ user_id = Column(ForeignKey("user.id"), primary_key=True)
+ keyword_id = Column(ForeignKey("keyword.id"), primary_key=True)
special_key = Column(String)
user = relationship(
User,
backref=backref(
"user_keywords",
collection_class=attribute_mapped_collection("special_key"),
- cascade="all, delete-orphan"
- )
+ cascade="all, delete-orphan",
+ ),
)
# the relationship to Keyword is now called
# 'keyword' is changed to be a proxy to the
# 'keyword' attribute of 'Keyword'
- keyword = association_proxy('kw', 'keyword')
+ keyword = association_proxy("kw", "keyword")
+
class Keyword(Base):
- __tablename__ = 'keyword'
+ __tablename__ = "keyword"
id = Column(Integer, primary_key=True)
- keyword = Column('keyword', String(64))
+ keyword = Column("keyword", String(64))
def __init__(self, keyword):
self.keyword = keyword
-
``User.keywords`` is now a dictionary of string to string, where
``UserKeyword`` and ``Keyword`` objects are created and removed for us
transparently using the association proxy. In the example below, we illustrate
Given a mapping as::
class A(Base):
- __tablename__ = 'test_a'
+ __tablename__ = "test_a"
id = Column(Integer, primary_key=True)
- ab = relationship(
- 'AB', backref='a', uselist=False)
+ ab = relationship("AB", backref="a", uselist=False)
b = association_proxy(
- 'ab', 'b', creator=lambda b: AB(b=b),
- cascade_scalar_deletes=True)
+ "ab", "b", creator=lambda b: AB(b=b), cascade_scalar_deletes=True
+ )
class B(Base):
- __tablename__ = 'test_b'
+ __tablename__ = "test_b"
id = Column(Integer, primary_key=True)
- ab = relationship('AB', backref='b', cascade='all, delete-orphan')
+ ab = relationship("AB", backref="b", cascade="all, delete-orphan")
class AB(Base):
- __tablename__ = 'test_ab'
+ __tablename__ = "test_ab"
a_id = Column(Integer, ForeignKey(A.id), primary_key=True)
b_id = Column(Integer, ForeignKey(B.id), primary_key=True)
from sqlalchemy.ext.asyncio import create_async_engine
+
async def async_main():
engine = create_async_engine(
- "postgresql+asyncpg://scott:tiger@localhost/test", echo=True,
+ "postgresql+asyncpg://scott:tiger@localhost/test",
+ echo=True,
)
async with engine.begin() as conn:
)
async with engine.connect() as conn:
-
# select a Result, which will be delivered with buffered
# results
result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
# clean-up pooled connections
await engine.dispose()
+
asyncio.run(async_main())
Above, the :meth:`_asyncio.AsyncConnection.run_sync` method may be used to
async_result = await conn.stream(select(t1))
async for row in async_result:
- print("row: %s" % (row, ))
+ print("row: %s" % (row,))
.. _asyncio_orm:
import asyncio
- from sqlalchemy import Column
- from sqlalchemy import DateTime
- from sqlalchemy import ForeignKey
- from sqlalchemy import func
- from sqlalchemy import Integer
- from sqlalchemy import String
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.ext.asyncio import async_sessionmaker
+ from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, func
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
- from sqlalchemy.orm import declarative_base
- from sqlalchemy.orm import relationship
- from sqlalchemy.orm import selectinload
+ from sqlalchemy.orm import declarative_base, relationship, selectinload
Base = declarative_base()
:meth:`_asyncio.AsyncSession.commit`, as in the line at the end where we
access an attribute::
- # create AsyncSession with expire_on_commit=False
- async_session = AsyncSession(engine, expire_on_commit=False)
-
- # sessionmaker version
- async_session = async_sessionmaker(
- engine, expire_on_commit=False
- )
+ # create AsyncSession with expire_on_commit=False
+ async_session = AsyncSession(engine, expire_on_commit=False)
- async with async_session() as session:
+ # sessionmaker version
+ async_session = async_sessionmaker(engine, expire_on_commit=False)
- result = await session.execute(select(A).order_by(A.id))
+ async with async_session() as session:
+ result = await session.execute(select(A).order_by(A.id))
- a1 = result.scalars().first()
+ a1 = result.scalars().first()
- # commit would normally expire all attributes
- await session.commit()
+ # commit would normally expire all attributes
+ await session.commit()
- # access attribute subsequent to commit; this is what
- # expire_on_commit=False allows
- print(a1.data)
+ # access attribute subsequent to commit; this is what
+ # expire_on_commit=False allows
+ print(a1.data)
* The :paramref:`_schema.Column.server_default` value on the ``created_at``
column will not be refreshed by default after an INSERT; instead, it is
:ref:`session_run_sync`, or by using its ``.statement`` attribute
to obtain a normal select::
- user = await session.get(User, 42)
- addresses = (await session.scalars(user.addresses.statement)).all()
- stmt = user.addresses.statement.where(
- Address.email_address.startswith("patrick")
- )
- addresses_filter = (await session.scalars(stmt)).all()
+ user = await session.get(User, 42)
+ addresses = (await session.scalars(user.addresses.statement)).all()
+ stmt = user.addresses.statement.where(
+ Address.email_address.startswith("patrick")
+ )
+ addresses_filter = (await session.scalars(stmt)).all()
.. seealso::
import asyncio
- from sqlalchemy.ext.asyncio import create_async_engine
- from sqlalchemy.ext.asyncio import AsyncSession
+ from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
+
def fetch_and_update_objects(session):
"""run traditional sync-style ORM code in a function that will be
async def async_main():
engine = create_async_engine(
- "postgresql+asyncpg://scott:tiger@localhost/test", echo=True,
+ "postgresql+asyncpg://scott:tiger@localhost/test",
+ echo=True,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
# clean-up pooled connections
await engine.dispose()
+
asyncio.run(async_main())
The above approach of running certain functions within a "sync" runner
import asyncio
- from sqlalchemy import text
+ from sqlalchemy import event, text
from sqlalchemy.engine import Engine
- from sqlalchemy import event
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.ext.asyncio import create_async_engine
+ from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import Session
## Core events ##
- engine = create_async_engine(
- "postgresql+asyncpg://scott:tiger@localhost:5432/test"
- )
+ engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
+
# connect event on instance of Engine
@event.listens_for(engine.sync_engine, "connect")
cursor.execute("select 'execute from event'")
print(cursor.fetchone()[0])
+
# before_execute event on all Engine instances
@event.listens_for(Engine, "before_execute")
def my_before_execute(
- conn, clauseelement, multiparams, params, execution_options
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ execution_options,
):
print("before execute!")
session = AsyncSession(engine)
+
# before_commit event on instance of Session
@event.listens_for(session.sync_session, "before_commit")
def my_before_commit(session):
result = connection.execute(text("select 'execute from event'"))
print(result.first())
+
# after_commit event on all Session instances
@event.listens_for(Session, "after_commit")
def my_after_commit(session):
print("after commit!")
+
async def go():
await session.execute(text("select 1"))
await session.commit()
await session.close()
await engine.dispose()
+
asyncio.run(go())
+
The above example prints something along the lines of::
New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection ...>>
it's perfectly fine for it to be a Python ``lambda:``, as the return awaitable
value will be invoked after being returned::
- from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import event
+ from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(...)
+
@event.listens_for(engine.sync_engine, "connect")
def register_custom_types(dbapi_connection, ...):
dbapi_connection.run_async(
- lambda connection: connection.set_type_codec('MyCustomType', encoder, decoder, ...)
+ lambda connection: connection.set_type_codec(
+ "MyCustomType", encoder, decoder, ...
+ )
)
Above, the object passed to the ``register_custom_types`` event handler
to disable pooling using :class:`~sqlalchemy.pool.NullPool`, preventing the Engine
from using any connection more than once::
+ from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
+
engine = create_async_engine(
- "postgresql+asyncpg://user:pass@host/dbname", poolclass=NullPool
+ "postgresql+asyncpg://user:pass@host/dbname",
+ poolclass=NullPool,
)
-
.. _asyncio_scoped_session:
Using asyncio scoped session
from asyncio import current_task
- from sqlalchemy.ext.asyncio import async_sessionmaker
- from sqlalchemy.ext.asyncio import async_scoped_session
- from sqlalchemy.ext.asyncio import AsyncSession
-
- async_session_factory = async_sessionmaker(some_async_engine, expire_on_commit=False)
- AsyncScopedSession = async_scoped_session(async_session_factory, scopefunc=current_task)
+ from sqlalchemy.ext.asyncio import AsyncSession, async_scoped_session
+ from sqlalchemy.orm import sessionmaker
+ async_session_factory = sessionmaker(
+ some_async_engine,
+ expire_on_commit=False,
+ class_=AsyncSession,
+ )
+ AsyncScopedSession = async_scoped_session(
+ async_session_factory,
+ copefunc=current_task,
+ )
some_async_session = AsyncScopedSession()
:class:`_asyncio.async_scoped_session` also includes **proxy
import asyncio
- from sqlalchemy.ext.asyncio import create_async_engine
- from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import inspect
+ from sqlalchemy.ext.asyncio import create_async_engine
+
+ engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test")
- engine = create_async_engine(
- "postgresql+asyncpg://scott:tiger@localhost/test"
- )
def use_inspector(conn):
inspector = inspect(conn)
# return any value to the caller
return inspector.get_table_names()
+
async def async_main():
async with engine.connect() as conn:
tables = await conn.run_sync(use_inspector)
+
asyncio.run(async_main())
.. seealso::