outer joins are created for all joined-table inheriting
mappers requested. Note that the auto-create of joins
is not compatible with concrete table inheritance.
-
+
The existing select_table flag on mapper() is now
deprecated and is synonymous with
with_polymorphic('*', select_table). Note that the
underlying "guts" of select_table have been
completely removed and replaced with the newer,
more flexible approach.
-
+
The new approach also automatically allows eager loads
to work for subclasses, if they are present, for
example::
- sess.query(Company).options(
- eagerload_all(
- ))
+ sess.query(Company).options(eagerload_all())
to load Company objects, their employees, and the
'machines' collection of employees who happen to be
del_ = delete(SomeMappedClass).where(SomeMappedClass.id == 5)
- upd = update(SomeMappedClass).where(SomeMappedClass.id == 5).values(name='ed')
+ upd = update(SomeMappedClass).where(SomeMappedClass.id == 5).values(name="ed")
.. change::
:tags: bug, orm
to the original, older use case for :meth:`_query.Query.select_from`, which is that
of restating the mapped entity in terms of a different selectable::
- session.query(User.name).\
- select_from(user_table.select().where(user_table.c.id > 5))
+ session.query(User.name).select_from(user_table.select().where(user_table.c.id > 5))
Which produces::
original. Allows symmetry when using :class:`_engine.Engine` and
:class:`_engine.Connection` objects as context managers::
- with conn.connect() as c: # leaves the Connection open
- c.execute("...")
+ with conn.connect() as c: # leaves the Connection open
+ c.execute("...")
with engine.connect() as c: # closes the Connection
- c.execute("...")
+ c.execute("...")
.. change::
:tags: engine
ad-hoc keyword arguments within the :attr:`.Index.kwargs` collection,
after construction::
- idx = Index('a', 'b')
- idx.kwargs['mysql_someargument'] = True
+ idx = Index("a", "b")
+ idx.kwargs["mysql_someargument"] = True
To suit the use case of allowing custom arguments at construction time,
the :meth:`.DialectKWArgs.argument_for` method now allows this registration::
- Index.argument_for('mysql', 'someargument', False)
+ Index.argument_for("mysql", "someargument", False)
- idx = Index('a', 'b', mysql_someargument=True)
+ idx = Index("a", "b", mysql_someargument=True)
.. seealso::
::
- mapper(Foo, foo_table, properties={
- 'bars':dynamic_loader(Bar, backref='foo', <other relation() opts>)
- })
+ mapper(
+ Foo,
+ foo_table,
+ properties={
+ "bars": dynamic_loader(
+ Bar,
+ backref="foo",
+ # <other relation() opts>
+ )
+ },
+ )
session = create_session(autoflush=True)
foo = session.query(Foo).first()
- foo.bars.append(Bar(name='lala'))
+ foo.bars.append(Bar(name="lala"))
- for bar in foo.bars.filter(Bar.name=='lala'):
+ for bar in foo.bars.filter(Bar.name == "lala"):
print(bar)
session.commit()
::
- mapper(Class, table, properties={
- 'foo' : deferred(table.c.foo, group='group1'),
- 'bar' : deferred(table.c.bar, group='group1'),
- 'bat' : deferred(table.c.bat, group='group1'),
+ mapper(
+ Class,
+ table,
+ properties={
+ "foo": deferred(table.c.foo, group="group1"),
+ "bar": deferred(table.c.bar, group="group1"),
+ "bat": deferred(table.c.bat, group="group1"),
+ },
)
- session.query(Class).options(undefer_group('group1')).filter(...).all()
+ session.query(Class).options(undefer_group("group1")).filter(...).all()
and ``eagerload_all()`` sets a chain of attributes to be
eager in one pass:
::
- my_table.select(my_table.c.id.in_(1,2,3)
- my_table.select(my_table.c.id.in_(*listOfIds)
+ my_table.select(my_table.c.id.in_(1, 2, 3))
+ my_table.select(my_table.c.id.in_(*listOfIds))
should be changed to
::
- my_table.select(my_table.c.id.in_([1,2,3])
- my_table.select(my_table.c.id.in_(listOfIds)
+ my_table.select(my_table.c.id.in_([1, 2, 3]))
+ my_table.select(my_table.c.id.in_(listOfIds))
Schema and Reflection
=====================
::
- subq = session.query(Keyword.id.label('keyword_id')).filter(Keyword.name.in_(['beans', 'carrots'])).subquery()
- recipes = session.query(Recipe).filter(exists().
- where(Recipe.id==recipe_keywords.c.recipe_id).
- where(recipe_keywords.c.keyword_id==subq.c.keyword_id)
+ subq = (
+ session.query(Keyword.id.label("keyword_id"))
+ .filter(Keyword.name.in_(["beans", "carrots"]))
+ .subquery()
+ )
+ recipes = session.query(Recipe).filter(
+ exists()
+ .where(Recipe.id == recipe_keywords.c.recipe_id)
+ .where(recipe_keywords.c.keyword_id == subq.c.keyword_id)
)
* **Explicit ORM aliases are recommended for aliased joins**
::
class MyType(AdaptOldConvertMethods, TypeEngine):
- # ...
+ ..
* The ``quote`` flag on ``Column`` and ``Table`` as well as
the ``quote_schema`` flag on ``Table`` now control quoting
dt = datetime.datetime(2008, 6, 27, 12, 0, 0, 125) # 125 usec
# old way
- '2008-06-27 12:00:00.125'
+ "2008-06-27 12:00:00.125"
# new way
- '2008-06-27 12:00:00.000125'
+ "2008-06-27 12:00:00.000125"
So if an existing SQLite file-based database intends to be
used across 0.4 and 0.5, you either have to upgrade the
::
from sqlalchemy.databases.sqlite import DateTimeMixin
+
DateTimeMixin.__legacy_microseconds__ = True
Connection Pool no longer threadlocal by default
::
- query.join('orders', 'items')
+ query.join("orders", "items")
query.join(User.orders, Order.items)
* the ``in_()`` method on columns and similar only accepts a
class MyQuery(Query):
def get(self, ident):
- # ...
+ ...
+
session = sessionmaker(query_cls=MyQuery)()
::
from sqlalchemy.orm import aliased
+
address_alias = aliased(Address)
print(session.query(User, address_alias).join((address_alias, User.addresses)).all())
::
class Parent(Base):
- __tablename__ = 'parent'
+ __tablename__ = "parent"
id = Column(Integer, primary_key=True)
+
class Child(Parent):
- __tablename__ = 'child'
- id = Column(Integer, ForeignKey('parent.id'), primary_key=True)
+ __tablename__ = "child"
+ id = Column(Integer, ForeignKey("parent.id"), primary_key=True)
Above, the attribute ``Child.id`` refers to both the
``child.id`` column as well as ``parent.id`` - this due to
::
class Child(Parent):
- __tablename__ = 'child'
- id = Column(Integer, ForeignKey('parent.id'), primary_key=True)
- some_related = relationship("SomeRelated",
- primaryjoin="Child.id==SomeRelated.child_id")
+ __tablename__ = "child"
+ id = Column(Integer, ForeignKey("parent.id"), primary_key=True)
+ some_related = relationship(
+ "SomeRelated", primaryjoin="Child.id==SomeRelated.child_id"
+ )
+
class SomeRelated(Base):
- __tablename__ = 'some_related'
+ __tablename__ = "some_related"
id = Column(Integer, primary_key=True)
- child_id = Column(Integer, ForeignKey('child.id'))
+ child_id = Column(Integer, ForeignKey("child.id"))
Prior to 0.7 the ``Child.id`` expression would reference
``Parent.id``, and it would be necessary to map ``child.id``
class Parent(Base):
- __tablename__ = 'parent'
+ __tablename__ = "parent"
id = Column(Integer, primary_key=True)
- child_id_one = Column(Integer, ForeignKey('child.id'))
- child_id_two = Column(Integer, ForeignKey('child.id'))
+ child_id_one = Column(Integer, ForeignKey("child.id"))
+ child_id_two = Column(Integer, ForeignKey("child.id"))
child_one = relationship("Child", foreign_keys=child_id_one)
child_two = relationship("Child", foreign_keys=child_id_two)
+
class Child(Base):
- __tablename__ = 'child'
+ __tablename__ = "child"
id = Column(Integer, primary_key=True)
* relationships against self-referential, composite foreign
need to be set as a class-bound attribute, string-based names can be resumed
afterwards::
- session.query(Company).\
- options(
- subqueryload(Company.employees.of_type(Engineer)).
- subqueryload("machines")
- )
- )
+ session.query(Company).options(
+ subqueryload(Company.employees.of_type(Engineer)).subqueryload("machines")
+ )
**Old Way**
to be set flexibly::
# setup values
- stmt = text("SELECT id, name FROM user "
- "WHERE name=:name AND timestamp=:timestamp").\
- bindparams(name="ed", timestamp=datetime(2012, 11, 10, 15, 12, 35))
+ stmt = text(
+ "SELECT id, name FROM user WHERE name=:name AND timestamp=:timestamp"
+ ).bindparams(name="ed", timestamp=datetime(2012, 11, 10, 15, 12, 35))
# setup types and/or values
- stmt = text("SELECT id, name FROM user "
- "WHERE name=:name AND timestamp=:timestamp").\
- bindparams(
- bindparam("name", value="ed"),
- bindparam("timestamp", type_=DateTime()
- ).bindparam(timestamp=datetime(2012, 11, 10, 15, 12, 35))
+ stmt = (
+ text("SELECT id, name FROM user WHERE name=:name AND timestamp=:timestamp")
+ .bindparams(bindparam("name", value="ed"), bindparam("timestamp", type_=DateTime()))
+ .bindparam(timestamp=datetime(2012, 11, 10, 15, 12, 35))
+ )
* :meth:`_expression.TextClause.columns` supersedes the ``typemap`` option
of :func:`_expression.text`, returning a new construct :class:`.TextAsFrom`::
stmt = stmt.alias()
stmt = select([addresses]).select_from(
- addresses.join(stmt), addresses.c.user_id == stmt.c.id)
+ addresses.join(stmt), addresses.c.user_id == stmt.c.id
+ )
# or into a cte():
stmt = stmt.cte("x")
stmt = select([addresses]).select_from(
- addresses.join(stmt), addresses.c.user_id == stmt.c.id)
+ addresses.join(stmt), addresses.c.user_id == stmt.c.id
+ )
:ticket:`2877`
* Binding to a Mixin or Abstract Class::
class MyClass(SomeMixin, Base):
- __tablename__ = 'my_table'
+ __tablename__ = "my_table"
# ...
- session = Session(binds={SomeMixin: some_engine})
+ session = Session(binds={SomeMixin: some_engine})
* Binding to inherited concrete subclasses individually based on table::
class BaseClass(Base):
- __tablename__ = 'base'
+ __tablename__ = "base"
# ...
+
class ConcreteSubClass(BaseClass):
- __tablename__ = 'concrete'
+ __tablename__ = "concrete"
# ...
- __mapper_args__ = {'concrete': True}
-
+ __mapper_args__ = {"concrete": True}
- session = Session(binds={
- base_table: some_engine,
- concrete_table: some_other_engine
- })
+ session = Session(binds={base_table: some_engine, concrete_table: some_other_engine})
:ticket:`3035`
statement as well as for the SELECT used by the "fetch" strategy::
session.query(User).filter(User.id == 15).update(
- {"name": "foob"}, synchronize_session='fetch')
+ {"name": "foob"}, synchronize_session="fetch"
+ )
- session.query(User).filter(User.id == 15).delete(
- synchronize_session='fetch')
+ session.query(User).filter(User.id == 15).delete(synchronize_session="fetch")
* Queries against individual columns::
:obj:`.column_property`::
class User(Base):
- # ...
+ ...
+
+ score = column_property(func.coalesce(self.tables.users.c.name, None))
- score = column_property(func.coalesce(self.tables.users.c.name, None)))
session.query(func.max(User.score)).scalar()
``Person/Manager/Engineer->Company`` setup from the mapping documentation,
using with_polymorphic::
- sess.query(Person.name)
- .filter(
- sess.query(Company.name).
- filter(Company.company_id == Person.company_id).
- correlate(Person).as_scalar() == "Elbonia, Inc.")
+ sess.query(Person.name).filter(
+ sess.query(Company.name)
+ .filter(Company.company_id == Person.company_id)
+ .correlate(Person)
+ .as_scalar()
+ == "Elbonia, Inc."
+ )
The above query now produces::
# aliasing.
paliased = aliased(Person)
- sess.query(paliased.name)
- .filter(
- sess.query(Company.name).
- filter(Company.company_id == paliased.company_id).
- correlate(paliased).as_scalar() == "Elbonia, Inc.")
+ sess.query(paliased.name).filter(
+ sess.query(Company.name)
+ .filter(Company.company_id == paliased.company_id)
+ .correlate(paliased)
+ .as_scalar()
+ == "Elbonia, Inc."
+ )
The :func:`.aliased` construct guarantees that the "polymorphic selectable"
is wrapped in a subquery. By referring to it explicitly in the correlated
engine = create_engine("postgresql+psycopg2://")
+
@event.listens_for(engine, "handle_error")
def cancel_disconnect(ctx):
if isinstance(ctx.original_exception, KeyboardInterrupt):
from sqlalchemy.dialects.postgresql import insert
- insert_stmt = insert(my_table). \\
- values(id='some_id', data='some data to insert')
+ insert_stmt = insert(my_table).values(id="some_id", data="some data to insert")
do_update_stmt = insert_stmt.on_conflict_do_update(
- index_elements=[my_table.c.id],
- set_=dict(data='some data to update')
+ index_elements=[my_table.c.id], set_=dict(data="some data to update")
)
conn.execute(do_update_stmt)
def _set_name(self, value):
self.first_name = value
+
class FirstNameOnly(Base):
@hybrid_property
def name(self):
"polymorphic in" loading to make use of the baked query extension
to reduce call overhead::
- stmt = select([table]).where(
- table.c.col.in_(bindparam('foo', expanding=True))
+ stmt = select([table]).where(table.c.col.in_(bindparam("foo", expanding=True)))
conn.execute(stmt, {"foo": [1, 2, 3]})
The feature should be regarded as **experimental** within the 1.2 series.
@util.dependency_for("sqlalchemy.sql.dml")
def insert(self, dml, *args, **kw):
+ ...
Where the above function would be rewritten to no longer have the ``dml`` parameter
on the outside. This would confuse code-linting tools into seeing a missing parameter
with engine.connect() as conn:
result = conn.execute(
table.select().order_by(table.c.id),
- execution_options={"stream_results": True}
+ execution_options={"stream_results": True},
)
for chunk in result.partitions(500):
# process up to 500 records
+ ...
:meth:`_engine.Result.columns` - allows slicing and reorganizing of rows:
result = session.execute(select(User).order_by(User.id))
for user_obj in result.scalars():
- # ...
+ ...
:meth:`_engine.Result.mappings` - instead of named-tuple rows, returns
dictionaries:
addresses = relationship(Address, backref=backref("user", viewonly=True))
+
class Address(Base):
- # ...
+ ...
u1 = session.query(User).filter_by(name="x").first()
from sqlalchemy import exc
# for warnings not included in regex-based filter below, just log
- warnings.filterwarnings(
- "always", category=exc.RemovedIn20Warning
- )
+ warnings.filterwarnings("always", category=exc.RemovedIn20Warning)
# for warnings related to execute() / scalar(), raise
for msg in [
r"The (?:Executable|Engine)\.(?:execute|scalar)\(\) function",
- r"The current statement is being autocommitted using implicit "
- "autocommit,",
+ r"The current statement is being autocommitted using implicit autocommit,",
r"The connection.execute\(\) method in SQLAlchemy 2.0 will accept "
"parameters as a single dictionary or a single sequence of "
"dictionaries only.",
r"The Connection.connect\(\) function/method is considered legacy",
r".*DefaultGenerator.execute\(\)",
]:
- warnings.filterwarnings(
- "error", message=msg, category=exc.RemovedIn20Warning,
- )
+ warnings.filterwarnings(
+ "error",
+ message=msg,
+ category=exc.RemovedIn20Warning,
+ )
3. As each sub-category of warnings are resolved in the application, new
warnings that are caught by the "always" filter can be added to the list
- ::
- session.execute(
- select(User)
- ).scalars().all()
+ session.execute(select(User)).scalars().all()
# or
session.scalars(select(User)).all()
* - ::
- session.query(User).\
- filter_by(name='some user').one()
+ session.query(User).filter_by(name="some user").one()
- ::
- session.execute(
- select(User).
- filter_by(name="some user")
- ).scalar_one()
+ session.execute(select(User).filter_by(name="some user")).scalar_one()
- :ref:`migration_20_unify_select`
* - ::
- session.query(User).\
- filter_by(name='some user').first()
-
+ session.query(User).filter_by(name="some user").first()
- ::
- session.scalars(
- select(User).
- filter_by(name="some user").
- limit(1)
- ).first()
+ session.scalars(select(User).filter_by(name="some user").limit(1)).first()
- :ref:`migration_20_unify_select`
* - ::
- session.query(User).options(
- joinedload(User.addresses)
- ).all()
+ session.query(User).options(joinedload(User.addresses)).all()
- ::
- session.scalars(
- select(User).
- options(
- joinedload(User.addresses)
- )
- ).unique().all()
+ session.scalars(select(User).options(joinedload(User.addresses))).unique().all()
- :ref:`joinedload_not_uniqued`
* - ::
- session.query(User).\
- join(Address).\
- filter(Address.email == 'e@sa.us').\
- all()
+ session.query(User).join(Address).filter(Address.email == "e@sa.us").all()
- ::
session.execute(
- select(User).
- join(Address).
- where(Address.email == 'e@sa.us')
+ select(User).join(Address).where(Address.email == "e@sa.us")
).scalars().all()
- :ref:`migration_20_unify_select`
* - ::
- session.query(User).from_statement(
- text("select * from users")
- ).all()
+ session.query(User).from_statement(text("select * from users")).all()
- ::
- session.scalars(
- select(User).
- from_statement(
- text("select * from users")
- )
- ).all()
+ session.scalars(select(User).from_statement(text("select * from users"))).all()
- :ref:`orm_queryguide_selecting_text`
* - ::
- session.query(User).\
- join(User.addresses).\
- options(
- contains_eager(User.addresses)
- ).\
- populate_existing().all()
+ session.query(User).join(User.addresses).options(
+ contains_eager(User.addresses)
+ ).populate_existing().all()
- ::
session.execute(
- select(User).
- join(User.addresses).
- options(contains_eager(User.addresses)).
- execution_options(populate_existing=True)
+ select(User)
+ .join(User.addresses)
+ .options(contains_eager(User.addresses))
+ .execution_options(populate_existing=True)
).scalars().all()
-
*
- ::
- session.query(User).\
- filter(User.name == 'foo').\
- update(
- {"fullname": "Foo Bar"},
- synchronize_session="evaluate"
- )
-
+ session.query(User).filter(User.name == "foo").update(
+ {"fullname": "Foo Bar"}, synchronize_session="evaluate"
+ )
- ::
session.execute(
- update(User).
- where(User.name == 'foo').
- values(fullname="Foo Bar").
- execution_options(synchronize_session="evaluate")
+ update(User)
+ .where(User.name == "foo")
+ .values(fullname="Foo Bar")
+ .execution_options(synchronize_session="evaluate")
)
- :ref:`orm_expression_update_delete`
session = Session(engine)
# becomes legacy use case
- user = session.query(User).filter_by(name='some user').one()
+ user = session.query(User).filter_by(name="some user").one()
# becomes legacy use case
- user = session.query(User).filter_by(name='some user').first()
+ user = session.query(User).filter_by(name="some user").first()
# becomes legacy use case
user = session.query(User).get(5)
# becomes legacy use case
- for user in session.query(User).join(User.addresses).filter(Address.email == 'some@email.com'):
- # ...
+ for user in (
+ session.query(User).join(User.addresses).filter(Address.email == "some@email.com")
+ ):
+ ...
# becomes legacy use case
users = session.query(User).options(joinedload(User.addresses)).order_by(User.id).all()
# becomes legacy use case
- users = session.query(User).from_statement(
- text("select * from users")
- ).all()
+ users = session.query(User).from_statement(text("select * from users")).all()
# etc
session = Session(engine)
- user = session.execute(
- select(User).filter_by(name="some user")
- ).scalar_one()
+ user = session.execute(select(User).filter_by(name="some user")).scalar_one()
# for first(), no LIMIT is applied automatically; add limit(1) if LIMIT
# is desired on the query
- user = session.execute(
- select(User).filter_by(name="some user").limit(1)
- ).scalars().first()
+ user = (
+ session.execute(select(User).filter_by(name="some user").limit(1)).scalars().first()
+ )
# get() moves to the Session directly
user = session.get(User, 5)
for user in session.execute(
select(User).join(User.addresses).filter(Address.email == "some@email.case")
).scalars():
- # ...
+ ...
# when using joinedload() against collections, use unique() on the result
- users = session.execute(
- select(User).options(joinedload(User.addresses)).order_by(User.id)
- ).unique().all()
+ users = (
+ session.execute(select(User).options(joinedload(User.addresses)).order_by(User.id))
+ .unique()
+ .all()
+ )
# select() has ORM-ish methods like from_statement() that only work
# if the statement is against ORM entities
- users = session.execute(
- select(User).from_statement(text("select * from users"))
- ).scalars().all()
+ users = (
+ session.execute(select(User).from_statement(text("select * from users")))
+ .scalars()
+ .all()
+ )
**Discussion**
class User(Base):
- __tablename__ = 'user_account'
+ __tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
addresses: Mapped[List["Address"]] = relationship()
+
class Address(Base):
__tablename__ = "address"
email_address: Mapped[str]
user_id = mapped_column(ForeignKey("user_account.id"))
-
With the above mapping, the attributes are typed and express themselves
all the way from statement to result set::
# declarative base with a type-level override, using a type that is
# expected to be used in multiple places
class Base(DeclarativeBase):
- registry = registry(type_annotation_map={
- str50: String(50),
- })
-
+ registry = registry(
+ type_annotation_map={
+ str50: String(50),
+ }
+ )
Second, Declarative will extract full
:func:`_orm.mapped_column` definitions from the left hand type if
emitted, a new transaction begins implicitly::
with engine.connect() as connection:
- connection.execute(<some statement>)
+ connection.execute("<some statement>")
connection.commit() # commits "some statement"
# new transaction starts
- connection.execute(<some other statement>)
+ connection.execute("<some other statement>")
connection.rollback() # rolls back "some other statement"
# new transaction starts
- connection.execute(<a third statement>)
+ connection.execute("<a third statement>")
connection.commit() # commits "a third statement"
.. versionadded:: 2.0 "commit as you go" style is a new feature of
to the :class:`_orm.Session`. The :class:`_orm.Session` uses a new
:class:`_engine.Connection` for each transaction::
- schema_engine = engine.execution_options(schema_translate_map = { ... } )
+ schema_engine = engine.execution_options(schema_translate_map={...})
session = Session(schema_engine)
# **Don't** do this:
+
def my_stmt(parameter, thing=False):
stmt = lambda_stmt(lambda: select(table))
stmt += (
- lambda s: s.where(table.c.x > parameter) if thing
+ lambda s: s.where(table.c.x > parameter)
+ if thing
else s.where(table.c.y == parameter)
+ )
return stmt
+
# **Do** do this:
+
def my_stmt(parameter, thing=False):
stmt = lambda_stmt(lambda: select(table))
if thing:
>>> def my_stmt(x, y):
... def get_x():
... return x
- ...
+ ...
... def get_y():
... return y
- ...
+ ...
... stmt = lambda_stmt(lambda: select(func.max(get_x(), get_y())))
... return stmt
>>> with engine.connect() as conn:
>>> def my_stmt(x, y):
... def get_x():
... return x
- ...
+ ...
... def get_y():
... return y
- ...
+ ...
... x_param, y_param = get_x(), get_y()
... stmt = lambda_stmt(lambda: select(func.max(x_param, y_param)))
... return stmt
which is typically a subclass of :class:`sqlalchemy.engine.default.DefaultDialect`.
In this example let's say it's called ``FooDialect`` and its module is accessed
via ``foodialect.dialect``.
-3. The entry point can be established in setup.py as follows::
+3. The entry point can be established in ``setup.cfg`` as follows:
- entry_points = """
- [sqlalchemy.dialects]
- foodialect = foodialect.dialect:FooDialect
- """
+ .. sourcecode:: ini
+
+ [options.entry_points]
+ sqlalchemy.dialects =
+ foodialect = foodialect.dialect:FooDialect
If the dialect is providing support for a particular DBAPI on top of
an existing SQLAlchemy-supported database, the name can be given
including a database-qualification. For example, if ``FooDialect``
-were in fact a MySQL dialect, the entry point could be established like this::
+were in fact a MySQL dialect, the entry point could be established like this:
+
+.. sourcecode:: ini
- entry_points = """
- [sqlalchemy.dialects]
- mysql.foodialect = foodialect.dialect:FooDialect
- """
+ [options.entry_points]
+ sqlalchemy.dialects
+ mysql.foodialect = foodialect.dialect:FooDialect
The above entrypoint would then be accessed as ``create_engine("mysql+foodialect://")``.
>>> from sqlalchemy.orm import relationship
>>> class User(Base):
... __tablename__ = "user_account"
- ...
+ ...
... id = Column(Integer, primary_key=True)
... name = Column(String(30))
... fullname = Column(String)
- ...
+ ...
... addresses = relationship("Address", back_populates="user")
- ...
+ ...
... def __repr__(self):
... return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
>>> class Address(Base):
... __tablename__ = "address"
- ...
+ ...
... id = Column(Integer, primary_key=True)
... email_address = Column(String, nullable=False)
... user_id = Column(Integer, ForeignKey("user_account.id"))
- ...
+ ...
... user = relationship("User", back_populates="addresses")
- ...
+ ...
... def __repr__(self):
... return f"Address(id={self.id!r}, email_address={self.email_address!r})"
>>> conn = engine.connect()
engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
+
def run_in_process(some_data_record):
with engine.connect() as conn:
conn.execute(text("..."))
+
def initializer():
"""ensure the parent proc's database connections are not touched
- in the new connection pool"""
+ in the new connection pool"""
engine.dispose(close=False)
+
with Pool(10, initializer=initializer) as p:
p.map(run_in_process, data)
-
.. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
parameter to allow the replacement of a connection pool in a child
process without interfering with the connections used by the parent
engine = create_engine("mysql://user:pass@host/dbname")
+
def run_in_process():
with engine.connect() as conn:
conn.execute(text("..."))
+
# before process starts, ensure engine.dispose() is called
engine.dispose()
p = Process(target=run_in_process)
E.g.::
Table(
- 'mydata', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', ArrayOfEnum(ENUM('a', 'b, 'c', name='myenum')))
-
+ "mydata",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", ArrayOfEnum(ENUM("a", "b", "c", name="myenum"))),
)
This type is not included as a built-in type as it would be incompatible
The relationships ``Child.parent`` and ``Parent.children`` appear to be in conflict.
The solution is to apply :paramref:`_orm.relationship.back_populates`::
- class Parent(Base):
+ class Parent(Base):
__tablename__ = "parent"
id = Column(Integer, primary_key=True)
children = relationship("Child", back_populates="parent")
metadata_obj = MetaData(bind=engine)
Base = declarative_base(metadata=metadata_obj)
+
class MyClass(Base):
- # ...
+ ...
session = Session()
Session = sessionmaker(engine)
Base = declarative_base()
+
class MyClass(Base):
- # ...
+ ...
session = Session()
the :meth:`_engine.Connection.execute` method of a :class:`_engine.Connection`::
with engine.connect() as conn:
- result = conn.execute(stmt)
+ result = conn.execute(stmt)
When using the ORM, a similar facility is available via the :class:`.Session`::
metadata_obj = MetaData()
# ... add Table objects to metadata
- ti = metadata_obj.sorted_tables:
+ ti = metadata_obj.sorted_tables
for t in ti:
print(t)
* Use :class:`.Bundle` objects to organize column-based results::
- u_b = Bundle('user', User.id, User.name)
- a_b = Bundle('address', Address.id, Address.email)
+ u_b = Bundle("user", User.id, User.name)
+ a_b = Bundle("address", Address.id, Address.email)
for user, address in session.execute(select(u_b, a_b).join(User.addresses)):
- # ...
+ ...
* Use result caching - see :ref:`examples_caching` for an in-depth example
of this.
The usage of the :class:`.Session` should fit within a structure similar to this::
try:
- <use session>
+ # <use session>
session.commit()
except:
- session.rollback()
- raise
+ session.rollback()
+ raise
finally:
- session.close() # optional, depends on use case
+ session.close() # optional, depends on use case
Many things can cause a failure within the try/except besides flushes.
Applications should ensure some system of "framing" is applied to ORM-oriented
on mapped classes. When a class is mapped as such::
class MyClass(Base):
- __tablename__ = 'foo'
+ __tablename__ = "foo"
id = Column(Integer, primary_key=True)
data = Column(String)
as an ORDER BY clause by calling upon the :meth:`_expression.Select.where`
and :meth:`_expression.Select.order_by` methods::
- stmt = select(user.c.name).\
- where(user.c.id > 5).\
- where(user.c.name.like('e%').\
- order_by(user.c.name)
+ stmt = (
+ select(user.c.name)
+ .where(user.c.id > 5)
+ .where(user.c.name.like("e%"))
+ .order_by(user.c.name)
+ )
Each method call above returns a copy of the original
:class:`_expression.Select` object with additional qualifiers
single department. A SQLAlchemy mapping might look like::
class Department(Base):
- __tablename__ = 'department'
+ __tablename__ = "department"
id = Column(Integer, primary_key=True)
name = Column(String(30))
employees = relationship("Employee")
+
class Employee(Base):
- __tablename__ = 'employee'
+ __tablename__ = "employee"
id = Column(Integer, primary_key=True)
name = Column(String(30))
- dep_id = Column(Integer, ForeignKey('department.id'))
+ dep_id = Column(Integer, ForeignKey("department.id"))
.. seealso::
single department. A SQLAlchemy mapping might look like::
class Department(Base):
- __tablename__ = 'department'
+ __tablename__ = "department"
id = Column(Integer, primary_key=True)
name = Column(String(30))
+
class Employee(Base):
- __tablename__ = 'employee'
+ __tablename__ = "employee"
id = Column(Integer, primary_key=True)
name = Column(String(30))
- dep_id = Column(Integer, ForeignKey('department.id'))
+ dep_id = Column(Integer, ForeignKey("department.id"))
department = relationship("Department")
.. seealso::
used in :term:`one to many` as follows::
class Department(Base):
- __tablename__ = 'department'
+ __tablename__ = "department"
id = Column(Integer, primary_key=True)
name = Column(String(30))
employees = relationship("Employee", backref="department")
+
class Employee(Base):
- __tablename__ = 'employee'
+ __tablename__ = "employee"
id = Column(Integer, primary_key=True)
name = Column(String(30))
- dep_id = Column(Integer, ForeignKey('department.id'))
+ dep_id = Column(Integer, ForeignKey("department.id"))
A backref can be applied to any relationship, including one to many,
many to one, and :term:`many to many`.
specified using plain table metadata::
class Employee(Base):
- __tablename__ = 'employee'
+ __tablename__ = "employee"
id = Column(Integer, primary_key=True)
name = Column(String(30))
projects = relationship(
"Project",
- secondary=Table('employee_project', Base.metadata,
- Column("employee_id", Integer, ForeignKey('employee.id'),
- primary_key=True),
- Column("project_id", Integer, ForeignKey('project.id'),
- primary_key=True)
- ),
- backref="employees"
- )
+ secondary=Table(
+ "employee_project",
+ Base.metadata,
+ Column("employee_id", Integer, ForeignKey("employee.id"), primary_key=True),
+ Column("project_id", Integer, ForeignKey("project.id"), primary_key=True),
+ ),
+ backref="employees",
+ )
+
class Project(Base):
- __tablename__ = 'project'
+ __tablename__ = "project"
id = Column(Integer, primary_key=True)
name = Column(String(30))
A SQLAlchemy declarative mapping for the above might look like::
class Employee(Base):
- __tablename__ = 'employee'
+ __tablename__ = "employee"
id = Column(Integer, primary_key=True)
name = Column(String(30))
class Project(Base):
- __tablename__ = 'project'
+ __tablename__ = "project"
id = Column(Integer, primary_key=True)
name = Column(String(30))
class EmployeeProject(Base):
- __tablename__ = 'employee_project'
+ __tablename__ = "employee_project"
- employee_id = Column(Integer, ForeignKey('employee.id'), primary_key=True)
- project_id = Column(Integer, ForeignKey('project.id'), primary_key=True)
+ employee_id = Column(Integer, ForeignKey("employee.id"), primary_key=True)
+ project_id = Column(Integer, ForeignKey("project.id"), primary_key=True)
role_name = Column(String(30))
project = relationship("Project", backref="project_employees")
employee = relationship("Employee", backref="employee_projects")
-
Employees can be added to a project given a role name::
proj = Project(name="Client A")
emp1 = Employee(name="emp1")
emp2 = Employee(name="emp2")
- proj.project_employees.extend([
- EmployeeProject(employee=emp1, role_name="tech lead"),
- EmployeeProject(employee=emp2, role_name="account executive")
- ])
+ proj.project_employees.extend(
+ [
+ EmployeeProject(employee=emp1, role_name="tech lead"),
+ EmployeeProject(employee=emp2, role_name="account executive"),
+ ]
+ )
.. seealso::
# use a List, Python 3.8 and earlier
children: Mapped[List["Child"]] = relationship()
-
When using mappings without the :class:`_orm.Mapped` annotation, such as when
using :ref:`imperative mappings <orm_imperative_mapping>` or untyped
Python code, as well as in a few special cases, the collection class for a
from sqlalchemy.orm.collections import collection
+
class MyList(list):
@collection.remover
def zark(self, item):
# do something special...
+ ...
@collection.iterator
def hey_use_this_instead_for_iteration(self):
- # ...
+ ...
There is no requirement to be list-, or set-like at all. Collection classes
can be any shape, so long as they have the append, remove and iterate
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
+
class Base(DeclarativeBase):
pass
+
class SomeClass(Base):
- __tablename__ = 'some_table'
+ __tablename__ = "some_table"
# primary_key=True, therefore will be NOT NULL
id: Mapped[int] = mapped_column(primary_key=True)
user = await session.get(User, 42)
addresses = (await session.scalars(user.addresses.statement)).all()
- stmt = user.addresses.statement.where(
- Address.email_address.startswith("patrick")
- )
+ stmt = user.addresses.statement.where(Address.email_address.startswith("patrick"))
addresses_filter = (await session.scalars(stmt)).all()
.. seealso::
@event.listens_for(engine.sync_engine, "connect")
- def register_custom_types(dbapi_connection, ...):
+ def register_custom_types(dbapi_connection, *args):
dbapi_connection.run_async(
lambda connection: connection.set_type_codec(
- "MyCustomType", encoder, decoder, ...
+ "MyCustomType",
+ encoder,
+ decoder, # ...
)
)
@declared_attr.cascading
def id(cls):
if has_inherited_table(cls):
- return Column(
- ForeignKey('myclass.id'), primary_key=True
- )
+ return Column(ForeignKey("myclass.id"), primary_key=True)
else:
return Column(Integer, primary_key=True)
+
class MyClass(HasIdMixin, Base):
- __tablename__ = 'myclass'
+ __tablename__ = "myclass"
# ...
+
class MySubClass(MyClass):
- ""
+ """"""
+
# ...
The behavior of the above configuration is that ``MySubClass``
from sqlalchemy import event
- @event.listens_for(PtoQ, 'before_update')
+
+ @event.listens_for(PtoQ, "before_update")
def receive_before_update(mapper, connection, target):
- if target.some_required_attr_on_q is None:
+ if target.some_required_attr_on_q is None:
connection.execute(q_table.insert(), {"id": target.id})
where above, a row is INSERTed into the ``q_table`` table by creating an
class Foo(Base):
- __tablename__ = 'foo'
+ __tablename__ = "foo"
pk = mapped_column(Integer, primary_key=True)
bar = mapped_column(Integer)
+
e = create_engine("postgresql+psycopg2://scott:tiger@localhost/test", echo=True)
Base.metadata.create_all(e)
session = Session(e)
- foo = Foo(pk=sql.select(sql.func.coalesce(sql.func.max(Foo.pk) + 1, 1))
+ foo = Foo(pk=sql.select(sql.func.coalesce(sql.func.max(Foo.pk) + 1, 1)))
session.add(foo)
session.commit()
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Session
+
class BaseA(DeclarativeBase):
pass
+
class BaseB(DeclarativeBase):
pass
+
class User(BaseA):
- # ...
+ ...
+
class Address(BaseA):
- # ...
+ ...
class GameInfo(BaseB):
- # ...
+ ...
+
class GameStats(BaseB):
- # ...
+ ...
Session = sessionmaker()
# all User/Address operations will be on engine 1, all
# Game operations will be on engine 2
- Session.configure(binds={BaseA:engine1, BaseB:engine2})
+ Session.configure(binds={BaseA: engine1, BaseB: engine2})
Above, classes which descend from ``BaseA`` and ``BaseB`` will have their
SQL operations routed to one of two engines based on which superclass
... name: Mapped[str]
... fullname: Mapped[Optional[str]]
... books: Mapped[List["Book"]] = relationship(back_populates="owner")
- ...
+ ...
... def __repr__(self) -> str:
... return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
>>> class Book(Base):
... summary: Mapped[str] = mapped_column(Text)
... cover_photo: Mapped[bytes] = mapped_column(LargeBinary)
... owner: Mapped["User"] = relationship(back_populates="books")
- ...
+ ...
... def __repr__(self) -> str:
... return f"Book(id={self.id!r}, title={self.title!r})"
>>> engine = create_engine("sqlite+pysqlite:///:memory:", echo=True)
... fullname: Mapped[Optional[str]]
... species: Mapped[Optional[str]]
... addresses: Mapped[List["Address"]] = relationship(back_populates="user")
- ...
+ ...
... def __repr__(self) -> str:
... return f"User(name={self.name!r}, fullname={self.fullname!r})"
>>> class Address(Base):
... user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
... email_address: Mapped[str]
... user: Mapped[User] = relationship(back_populates="addresses")
- ...
+ ...
... def __repr__(self) -> str:
... return f"Address(email_address={self.email_address!r})"
>>> class LogRecord(Base):
... message: Mapped[str]
... code: Mapped[str]
... timestamp: Mapped[datetime.datetime]
- ...
+ ...
... def __repr__(self):
... return f"LogRecord({self.message!r}, {self.code!r}, {self.timestamp!r})"
... id: Mapped[int] = mapped_column(primary_key=True)
... name: Mapped[str]
... type: Mapped[str]
- ...
+ ...
... def __repr__(self):
... return f"{self.__class__.__name__}({self.name!r})"
- ...
+ ...
... __mapper_args__ = {
... "polymorphic_identity": "employee",
... "polymorphic_on": "type",
... __tablename__ = "manager"
... id: Mapped[int] = mapped_column(ForeignKey("employee.id"), primary_key=True)
... manager_name: Mapped[str]
- ...
+ ...
... def __repr__(self):
... return f"{self.__class__.__name__}({self.name!r}, manager_name={self.manager_name!r})"
- ...
+ ...
... __mapper_args__ = {
... "polymorphic_identity": "manager",
... }
... __tablename__ = "engineer"
... id: Mapped[int] = mapped_column(ForeignKey("employee.id"), primary_key=True)
... engineer_info: Mapped[str]
- ...
+ ...
... def __repr__(self):
... return f"{self.__class__.__name__}({self.name!r}, engineer_info={self.engineer_info!r})"
- ...
+ ...
... __mapper_args__ = {
... "polymorphic_identity": "engineer",
... }
... type: Mapped[str]
... company_id: Mapped[int] = mapped_column(ForeignKey("company.id"))
... company: Mapped[Company] = relationship(back_populates="employees")
- ...
+ ...
... def __repr__(self):
... return f"{self.__class__.__name__}({self.name!r})"
- ...
+ ...
... __mapper_args__ = {
... "polymorphic_identity": "employee",
... "polymorphic_on": "type",
... id: Mapped[int] = mapped_column(primary_key=True)
... manager_id: Mapped[int] = mapped_column(ForeignKey("manager.id"))
... document_name: Mapped[str]
- ...
+ ...
... def __repr__(self):
... return f"Paperwork({self.document_name!r})"
>>>
... fullname: Mapped[Optional[str]]
... addresses: Mapped[List["Address"]] = relationship(back_populates="user")
... orders: Mapped[List["Order"]] = relationship()
- ...
+ ...
... def __repr__(self) -> str:
... return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
>>> class Address(Base):
... user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
... email_address: Mapped[str]
... user: Mapped[User] = relationship(back_populates="addresses")
- ...
+ ...
... def __repr__(self) -> str:
... return f"Address(id={self.id!r}, email_address={self.email_address!r})"
>>> order_items_table = Table(
... id: Mapped[int] = mapped_column(primary_key=True)
... name: Mapped[str]
... type: Mapped[str]
- ...
+ ...
... def __repr__(self):
... return f"{self.__class__.__name__}({self.name!r})"
- ...
+ ...
... __mapper_args__ = {
... "polymorphic_identity": "employee",
... "polymorphic_on": "type",
.. sourcecode:: python
stmt = (
- select(User).
- where(User.name.in_(names)).
- execution_options(populate_existing=True).
- options(selectinload(User.addresses)
+ select(User)
+ .where(User.name.in_(names))
+ .execution_options(populate_existing=True)
+ .options(selectinload(User.addresses))
)
# will refresh all matching User objects as well as the related
# Address objects
... title: Mapped[str]
... summary: Mapped[str] = mapped_column(Text, deferred=True)
... cover_photo: Mapped[bytes] = mapped_column(LargeBinary, deferred=True)
- ...
+ ...
... def __repr__(self) -> str:
... return f"Book(id={self.id!r}, title={self.title!r})"
... cover_photo: Mapped[bytes] = mapped_column(
... LargeBinary, deferred=True, deferred_group="book_attrs"
... )
- ...
+ ...
... def __repr__(self) -> str:
... return f"Book(id={self.id!r}, title={self.title!r})"
... cover_photo: Mapped[bytes] = mapped_column(
... LargeBinary, deferred=True, deferred_raiseload=True
... )
- ...
+ ...
... def __repr__(self) -> str:
... return f"Book(id={self.id!r}, title={self.title!r})"
... name: Mapped[str]
... fullname: Mapped[Optional[str]]
... books: Mapped[List["Book"]] = relationship(back_populates="owner")
- ...
+ ...
... def __repr__(self) -> str:
... return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
>>> class Book(Base):
... summary: Mapped[str] = mapped_column(Text)
... cover_photo: Mapped[bytes] = mapped_column(LargeBinary)
... owner: Mapped["User"] = relationship(back_populates="books")
- ...
+ ...
... def __repr__(self) -> str:
... return f"Book(id={self.id!r}, title={self.title!r})"
... title: Mapped[str]
... summary: Mapped[str] = mapped_column(Text)
... cover_photo: Mapped[bytes] = mapped_column(LargeBinary)
- ...
+ ...
... def __repr__(self) -> str:
... return f"Book(id={self.id!r}, title={self.title!r})"
... name: Mapped[str]
... fullname: Mapped[Optional[str]]
... book_count: Mapped[int] = query_expression()
- ...
+ ...
... def __repr__(self) -> str:
... return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
... id: Mapped[int] = mapped_column(primary_key=True)
... name: Mapped[str]
... type: Mapped[str]
- ...
+ ...
... def __repr__(self):
... return f"{self.__class__.__name__}({self.name!r})"
- ...
+ ...
... __mapper_args__ = {
... "polymorphic_identity": "employee",
... "polymorphic_on": "type",
:ref:`orm_queryguide_populate_existing` execution option::
# change the options on Parent objects that were already loaded
- stmt = select(Parent).execution_options(populate_existing=True).options(
- lazyload(Parent.children).
- lazyload(Child.subelements)).all()
+ stmt = (
+ select(Parent)
+ .execution_options(populate_existing=True)
+ .options(lazyload(Parent.children).lazyload(Child.subelements))
+ .all()
+ )
If the objects loaded above are fully cleared from the :class:`.Session`,
such as due to garbage collection or that :meth:`.Session.expunge_all`
stmt = (
select(User).
outerjoin(User.addresses.of_type(adalias)).
- options(contains_eager(User.addresses.of_type(adalias))
+ options(contains_eager(User.addresses.of_type(adalias)))
)
# get results normally
>>> class User(Base):
... __tablename__ = "user_account"
- ...
+ ...
... id: Mapped[int] = mapped_column(primary_key=True)
... name: Mapped[str] = mapped_column(String(30))
... fullname: Mapped[Optional[str]]
- ...
+ ...
... addresses: Mapped[list["Address"]] = relationship(
... back_populates="user", cascade="all, delete-orphan"
... )
- ...
+ ...
... def __repr__(self) -> str:
... return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
>>> class Address(Base):
... __tablename__ = "address"
- ...
+ ...
... id: Mapped[int] = mapped_column(primary_key=True)
... email_address: Mapped[str]
... user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
- ...
+ ...
... user: Mapped["User"] = relationship(back_populates="addresses")
- ...
+ ...
... def __repr__(self) -> str:
... return f"Address(id={self.id!r}, email_address={self.email_address!r})"
>>> from sqlalchemy.orm import Session
>>> with Session(engine) as session:
- ...
+ ...
... spongebob = User(
... name="spongebob",
... fullname="Spongebob Squarepants",
... ],
... )
... patrick = User(name="patrick", fullname="Patrick Star")
- ...
+ ...
... session.add_all([spongebob, sandy, patrick])
- ...
+ ...
... session.commit()
{opensql}BEGIN (implicit)
INSERT INTO user_account (name, fullname) VALUES (?, ?), (?, ?), (?, ?) RETURNING id
will be refreshed with data from the database::
stmt = (
- select(User).
- execution_options(populate_existing=True).
- where((User.name.in_(['a', 'b', 'c']))
+ select(User)
+ .execution_options(populate_existing=True)
+ .where((User.name.in_(["a", "b", "c"])))
)
for user in session.execute(stmt).scalars():
print(user) # will be refreshed for those columns that came back from the query
session.commit() # commits
# will automatically begin again
- result = session.execute(< some select statement >)
+ result = session.execute("< some select statement >")
session.add_all([more_objects, ...])
session.commit() # commits
session.add(still_another_object)
session.flush() # flush still_another_object
- session.rollback() # rolls back still_another_object
+ session.rollback() # rolls back still_another_object
The :class:`_orm.Session` itself features a :meth:`_orm.Session.close`
method. If the :class:`_orm.Session` is begun within a transaction that
method to allow both operations to take place at once::
with Session.begin() as session:
- session.add(some_object):
-
-
+ session.add(some_object)
.. _session_begin_nested:
with autocommit_session() as session:
- some_objects = session.execute(<statement>)
- some_other_objects = session.execute(<statement>)
+ some_objects = session.execute("<statement>")
+ some_other_objects = session.execute("<statement>")
# closes connection
-
Setting Isolation for Individual Sessions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
result = conn.execute(text("select x, y from some_table"))
for x, y in result:
- # ...
+ ...
* **Integer Index** - Tuples are Python sequences, so regular integer access is available too:
result = conn.execute(text("select x, y from some_table"))
- for row in result:
- x = row[0]
+ for row in result:
+ x = row[0]
* **Attribute Name** - As these are Python named tuples, the tuples have dynamic attribute names
matching the names of each column. These names are normally the names that the
result = conn.execute(text("select x, y from some_table"))
for dict_row in result.mappings():
- x = dict_row['x']
- y = dict_row['y']
+ x = dict_row["x"]
+ y = dict_row["y"]
..
>>> class User(Base):
... __tablename__ = "user_account"
- ...
+ ...
... id: Mapped[int] = mapped_column(primary_key=True)
... name: Mapped[str] = mapped_column(String(30))
... fullname: Mapped[Optional[str]]
- ...
+ ...
... addresses: Mapped[List["Address"]] = relationship(back_populates="user")
- ...
+ ...
... def __repr__(self) -> str:
... return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
>>> class Address(Base):
... __tablename__ = "address"
- ...
+ ...
... id: Mapped[int] = mapped_column(primary_key=True)
... email_address: Mapped[str]
... user_id = mapped_column(ForeignKey("user_account.id"))
- ...
+ ...
... user: Mapped[User] = relationship(back_populates="addresses")
- ...
+ ...
... def __repr__(self) -> str:
... return f"Address(id={self.id!r}, email_address={self.email_address!r})"
optional. Our mapping above can be written without annotations as::
class User(Base):
- __tablename__ = 'user_account'
+ __tablename__ = "user_account"
- id = mapped_column(Integer, primary_key=True)
- name = mapped_column(String(30), nullable=False)
- fullname = mapped_column(String)
+ id = mapped_column(Integer, primary_key=True)
+ name = mapped_column(String(30), nullable=False)
+ fullname = mapped_column(String)
- addresses = relationship("Address", back_populates="user")
+ addresses = relationship("Address", back_populates="user")
- # ... definition continues
+ # ... definition continues
The above class has an advantage over one that uses :class:`.Column`
directly, in that the ``User`` class as well as instances of ``User``
from pathlib import Path
import re
-from black import DEFAULT_LINE_LENGTH
from black import format_str
-from black import Mode
-from black import parse_pyproject_toml
-from black import TargetVersion
+from black.const import DEFAULT_LINE_LENGTH
+from black.files import parse_pyproject_toml
+from black.mode import Mode
+from black.mode import TargetVersion
home = Path(__file__).parent.parent
def _format_block(
- input_block: _Block, exit_on_error: bool, is_doctest: bool
+ input_block: _Block,
+ exit_on_error: bool,
+ errors: list[tuple[int, str, Exception]],
+ is_doctest: bool,
) -> list[str]:
- code = "\n".join(c for *_, c in input_block)
+ if not is_doctest:
+ # The first line may have additional padding. Remove then restore later
+ add_padding = start_space.match(input_block[0][3]).groups()[0]
+ skip = len(add_padding)
+ code = "\n".join(
+ c[skip:] if c.startswith(add_padding) else c
+ for *_, c in input_block
+ )
+ else:
+ add_padding = None
+ code = "\n".join(c for *_, c in input_block)
try:
formatted = format_str(code, mode=BLACK_MODE)
except Exception as e:
+ start_line = input_block[0][1]
+ errors.append((start_line, code, e))
if is_doctest:
- start_line = input_block[0][1]
print(
"Could not format code block starting at "
f"line {start_line}:\n{code}\nError: {e}"
else:
print("Ignoring error")
elif VERBOSE:
- start_line = input_block[0][1]
print(
"Could not format code block starting at "
f"line {start_line}:\n---\n{code}\n---Error: {e}"
if is_doctest:
formatted_lines = [
f"{padding}>>> {formatted_code_lines[0]}",
- *(f"{padding}... {fcl}" for fcl in formatted_code_lines[1:]),
+ *(
+ f"{padding}...{' ' if fcl else ''}{fcl}"
+ for fcl in formatted_code_lines[1:]
+ ),
]
else:
- # The first line may have additional padding.
- # If it does restore it
- additionalPadding = re.match(
- r"^(\s*)[^ ]?", input_block[0][3]
- ).groups()[0]
formatted_lines = [
- f"{padding}{additionalPadding}{fcl}" if fcl else fcl
+ f"{padding}{add_padding}{fcl}" if fcl else fcl
for fcl in formatted_code_lines
]
if not input_block[-1][0] and formatted_lines[-1]:
return formatted_lines
+format_directive = re.compile(r"^\.\.\s*format\s*:\s*(on|off)\s*$")
+
doctest_code_start = re.compile(r"^(\s+)>>>\s?(.+)")
doctest_code_continue = re.compile(r"^\s+\.\.\.\s?(\s*.*)")
-plain_indent = re.compile(r"^(\s{4})(\s*[^: ].*)")
-format_directive = re.compile(r"^\.\.\s*format\s*:\s*(on|off)\s*$")
-dont_format_under_directive = re.compile(r"^\.\. (?:toctree)::\s*$")
+
+start_code_section = re.compile(
+ r"^(((?!\.\.).+::)|(\.\.\s*sourcecode::(.*py.*)?)|(::))$"
+)
+start_space = re.compile(r"^(\s*)[^ ]?")
def format_file(
file: Path, exit_on_error: bool, check: bool, no_plain: bool
-) -> bool | None:
+) -> tuple[bool, int]:
buffer = []
if not check:
print(f"Running file {file} ..", end="")
original = file.read_text("utf-8")
doctest_block: _Block | None = None
plain_block: _Block | None = None
- last_line = None
+
+ plain_code_section = False
+ plain_padding = None
+ plain_padding_len = None
+
+ errors = []
+
disable_format = False
- non_code_directive = False
for line_no, line in enumerate(original.splitlines(), 1):
- if match := format_directive.match(line):
+ # start_code_section requires no spaces at the start
+ if start_code_section.match(line.strip()):
+ if plain_block:
+ buffer.extend(
+ _format_block(
+ plain_block, exit_on_error, errors, is_doctest=False
+ )
+ )
+ plain_block = None
+ plain_code_section = True
+ plain_padding = start_space.match(line).groups()[0]
+ plain_padding_len = len(plain_padding)
+ buffer.append(line)
+ continue
+ elif (
+ plain_code_section
+ and line.strip()
+ and not line.startswith(" " * (plain_padding_len + 1))
+ ):
+ plain_code_section = False
+ elif match := format_directive.match(line):
disable_format = match.groups()[0] == "off"
- elif match := dont_format_under_directive.match(line):
- non_code_directive = True
if doctest_block:
assert not plain_block
else:
buffer.extend(
_format_block(
- doctest_block, exit_on_error, is_doctest=True
+ doctest_block, exit_on_error, errors, is_doctest=True
)
)
doctest_block = None
-
- if plain_block:
- assert not doctest_block
- if not line:
- plain_block.append((line, line_no, None, line))
- continue
- elif match := plain_indent.match(line):
- plain_block.append((line, line_no, None, match.groups()[1]))
+ elif plain_block:
+ if plain_code_section and not doctest_code_start.match(line):
+ plain_block.append(
+ (line, line_no, None, line[plain_padding_len:])
+ )
continue
else:
- if non_code_directive:
- buffer.extend(line for line, _, _, _ in plain_block)
- else:
- buffer.extend(
- _format_block(
- plain_block, exit_on_error, is_doctest=False
- )
+ buffer.extend(
+ _format_block(
+ plain_block, exit_on_error, errors, is_doctest=False
)
+ )
plain_block = None
- non_code_directive = False
- if match := doctest_code_start.match(line):
+ if line and (match := doctest_code_start.match(line)):
+ plain_code_section = False
if plain_block:
buffer.extend(
- _format_block(plain_block, exit_on_error, is_doctest=False)
+ _format_block(
+ plain_block, exit_on_error, errors, is_doctest=False
+ )
)
plain_block = None
padding, code = match.groups()
doctest_block = [(line, line_no, padding, code)]
elif (
- not no_plain
- and not disable_format
- and not last_line
- and (match := plain_indent.match(line))
+ line and not no_plain and not disable_format and plain_code_section
):
- # print('start plain', line)
assert not doctest_block
# start of a plain block
- padding, code = match.groups()
- plain_block = [(line, line_no, padding, code)]
+ plain_block = [
+ (line, line_no, plain_padding, line[plain_padding_len:])
+ ]
else:
buffer.append(line)
- last_line = line
if doctest_block:
buffer.extend(
- _format_block(doctest_block, exit_on_error, is_doctest=True)
+ _format_block(
+ doctest_block, exit_on_error, errors, is_doctest=True
+ )
)
if plain_block:
- if non_code_directive:
- buffer.extend(line for line, _, _, _ in plain_block)
- else:
- buffer.extend(
- _format_block(plain_block, exit_on_error, is_doctest=False)
- )
+ buffer.extend(
+ _format_block(plain_block, exit_on_error, errors, is_doctest=False)
+ )
if buffer:
# if there is nothing in the buffer something strange happened so
# don't do anything
updated = "\n".join(buffer)
equal = original == updated
if not check:
- print("..done. ", "No changes" if equal else "Changes detected")
+ print(
+ f"..done. {len(errors)} error(s).",
+ "No changes" if equal else "Changes detected",
+ )
if not equal:
# write only if there are changes to write
file.write_text(updated, "utf-8", newline="\n")
if check:
if not equal:
print(f"File {file} would be formatted")
- return equal
- else:
- return None
+ return equal, len(errors)
def iter_files(directory) -> Iterator[Path]:
]
if check:
- if all(result):
+ formatting_error_counts = [e for _, e in result if e]
+ to_reformat = len([b for b, _ in result if not b])
+
+ if not to_reformat and not formatting_error_counts:
print("All files are correctly formatted")
exit(0)
else:
- print("Some file would be reformated")
+ print(
+ f"{to_reformat} file(s) would be reformatted;",
+ (
+ f"{sum(formatting_error_counts)} formatting errors "
+ f"reported in {len(formatting_error_counts)} files"
+ )
+ if formatting_error_counts
+ else "no formatting errors reported",
+ )
+
+ # interim, until we fix all formatting errors
+ if not to_reformat:
+ exit(0)
exit(1)