0.5.3
=====
- orm
+ - The "objects" argument to session.flush() is deprecated.
+ State which represents the linkage between a parent and
+ child object does not support "flushed" status on
+ one side of the link and not the other, so supporting
+ this operation leads to misleading results.
+ [ticket:1315]
+
- Query now implements __clause_element__() which produces
its selectable, which means a Query instance can be accepted
in many SQL expressions, including col.in_(query),
union(query1, query2), select([foo]).select_from(query),
etc.
+ - Query.join() can now construct multiple FROM clauses, if
+ needed. Such as, query(A, B).join(A.x).join(B.y)
+ might say SELECT A.*, B.* FROM A JOIN X, B JOIN Y.
+ Eager loading can also tack its joins onto those
+ multiple FROM clauses. [ticket:1337]
+
+ - Fixed bug where column_prefix wasn't being checked before
+ not mapping an attribute that already had class-level
+ name present.
+
- a session.expire() on a particular collection attribute
will clear any pending backref additions as well, so that
the next access correctly returns only what was present
in the database. Presents some degree of a workaround for
[ticket:1315], although we are considering removing the
flush([objects]) feature altogether.
+
+ - Session.scalar() now converts raw SQL strings to text()
+ the same way Session.execute() does and accepts same
+ alternative **kw args.
- improvements to the "determine direction" logic of
relation() such that the direction of tricky situations
- Added "post_configure_attribute" method to InstrumentationManager,
so that the "listen_for_events.py" example works again.
[ticket:1314]
-
+
+ - a forward and complementing backwards reference which are both
+ of the same direction, i.e. ONETOMANY or MANYTOONE,
+ is now detected, and an error message is raised.
+ Saves crazy CircularDependencyErrors later on.
+
- Fixed bugs in Query regarding simultaneous selection of
multiple joined-table inheritance entities with common base
classes:
- Other filterings, like
query(A).join(A.bs).filter(B.foo=='bar'), were erroneously
adapting "B.foo" as though it were an "A".
-
+
+ - Fixed adaptation of EXISTS clauses via any(), has(), etc.
+ in conjunction with an aliased object on the left and
+ of_type() on the right. [ticket:1325]
+
+ - Added an attribute helper method ``set_committed_value`` in
+ sqlalchemy.orm.attributes. Given an object, attribute name,
+ and value, will set the value on the object as part of its
+ "committed" state, i.e. state that is understood to have
+ been loaded from the database. Helps with the creation of
+ homegrown collection loaders and such.
+
+ - Query won't fail with weakref error when a non-mapper/class
+ instrumented descriptor is passed, raises
+ "Invalid column expession".
+
+ - Query.group_by() properly takes into account aliasing applied
+ to the FROM clause, such as with select_from(), using
+ with_polymorphic(), or using from_self().
+
- sql
- Fixed missing _label attribute on Function object, others
when used in a select() with use_labels (such as when used
- postgres
- Index reflection won't fail when an index with
multiple expressions is encountered.
+
+ - Added PGUuid and PGBit types to
+ sqlalchemy.databases.postgres. [ticket:1327]
+
+ - Refection of unknown PG types won't crash when those
+ types are specified within a domain. [ticket:1327]
- mssql
- Preliminary support for pymssql 1.0.1
-
+
+ - Corrected issue on mssql where max_identifier_length was
+ not being respected.
+
+- extensions
+
+ - Fixed a recursive pickling issue in serializer, triggered
+ by an EXISTS or other embedded FROM construct.
+
+ - Declarative locates the "inherits" class using a search
+ through __bases__, to skip over mixins that are local
+ to subclasses.
+
+ - Declarative figures out joined-table inheritance primary join
+ condition even if "inherits" mapper argument is given
+ explicitly.
+
+ - Declarative will properly interpret the "foreign_keys" argument
+ on a backref() if it's a string.
+
+ - Declarative will accept a table-bound column as a property
+ when used in conjunction with __table__, if the column is already
+ present in __table__. The column will be remapped to the given
+ key the same way as when added to the mapper() properties dict.
+
0.5.2
======
Setting Noload
~~~~~~~~~~~~~~~
-
The opposite of the dynamic relation is simply "noload", specified using ``lazy=None``:
.. sourcecode:: python+sql
>>> import sqlalchemy
>>> sqlalchemy.__version__ # doctest:+SKIP
0.5.0
-
+
Connecting
==========
>>> from sqlalchemy import create_engine
>>> engine = create_engine('sqlite:///:memory:', echo=True)
-
+
The ``echo`` flag is a shortcut to setting up SQLAlchemy logging, which is accomplished via Python's standard ``logging`` module. With it enabled, we'll see all the generated SQL produced. If you are working through this tutorial and want less output generated, set it to ``False``. This tutorial will format the SQL behind a popup window so it doesn't get in our way; just click the "SQL" links to see what's being generated.
-
-Define and Create a Table
+
+Define and Create a Table
==========================
Next we want to tell SQLAlchemy about our tables. We will start with just a single table called ``users``, which will store records for the end-users using our application (lets assume it's a website). We define our tables within a catalog called ``MetaData``, using the ``Table`` construct, which is used in a manner similar to SQL's CREATE TABLE syntax::
- >>> from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
+ >>> from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
>>> metadata = MetaData()
>>> users_table = Table('users', metadata,
... Column('id', Integer, primary_key=True),
PRAGMA table_info("users")
{}
CREATE TABLE users (
- id INTEGER NOT NULL,
- name VARCHAR,
- fullname VARCHAR,
- password VARCHAR,
+ id INTEGER NOT NULL,
+ name VARCHAR,
+ fullname VARCHAR,
+ password VARCHAR,
PRIMARY KEY (id)
)
{}
Users familiar with the syntax of CREATE TABLE may notice that the VARCHAR columns were generated without a length; on SQLite, this is a valid datatype, but on most databases it's not allowed. So if running this tutorial on a database such as Postgres or MySQL, and you wish to use SQLAlchemy to generate the tables, a "length" may be provided to the ``String`` type as below::
Column('name', String(50))
-
+
The length field on ``String``, as well as similar precision/scale fields available on ``Integer``, ``Numeric``, etc. are not referenced by SQLAlchemy other than when creating tables.
-Define a Python Class to be Mapped
+Define a Python Class to be Mapped
===================================
While the ``Table`` object defines information about our database, it does not say anything about the definition or behavior of the business objects used by our application; SQLAlchemy views this as a separate concern. To correspond to our ``users`` table, let's create a rudimentary ``User`` class. It only need subclass Python's built-in ``object`` class (i.e. it's a new style class)::
>>> from sqlalchemy.orm import mapper
>>> mapper(User, users_table) # doctest:+ELLIPSIS,+NORMALIZE_WHITESPACE
<Mapper at 0x...; User>
-
+
The ``mapper()`` function creates a new ``Mapper`` object and stores it away for future reference, associated with our class. Let's now create and inspect a ``User`` object::
>>> ed_user = User('ed', 'Ed Jones', 'edspassword')
'edspassword'
>>> str(ed_user.id)
'None'
-
+
The ``id`` attribute, which while not defined by our ``__init__()`` method, exists due to the ``id`` column present within the ``users_table`` object. By default, the ``mapper`` creates class attributes for all columns present within the ``Table``. These class attributes exist as Python descriptors, and define **instrumentation** for the mapped class. The functionality of this instrumentation is very rich and includes the ability to track modifications and automatically load new data from the database when needed.
Since we have not yet told SQLAlchemy to persist ``Ed Jones`` within the database, its id is ``None``. When we persist the object later, this attribute will be populated with a newly generated value.
-Creating Table, Class and Mapper All at Once Declaratively
+Creating Table, Class and Mapper All at Once Declaratively
===========================================================
The preceding approach to configuration involving a ``Table``, user-defined class, and ``mapper()`` call illustrate classical SQLAlchemy usage, which values the highest separation of concerns possible. A large number of applications don't require this degree of separation, and for those SQLAlchemy offers an alternate "shorthand" configurational style called **declarative**. For many applications, this is the only style of configuration needed. Our above example using this style is as follows::
>>> from sqlalchemy.ext.declarative import declarative_base
-
+
>>> Base = declarative_base()
>>> class User(Base):
... __tablename__ = 'users'
The underlying ``Table`` object created by our ``declarative_base()`` version of ``User`` is accessible via the ``__table__`` attribute::
>>> users_table = User.__table__
-
+
and the owning ``MetaData`` object is available as well::
>>> metadata = Base.metadata
.. sourcecode:: python+sql
>>> Session.configure(bind=engine) # once engine is available
-
+
This custom-made ``Session`` class will create new ``Session`` objects which are bound to our database. Other transactional characteristics may be defined when calling ``sessionmaker()`` as well; these are described in a later chapter. Then, whenever you need to have a conversation with the database, you instantiate a ``Session``::
>>> session = Session()
-
+
The above ``Session`` is associated with our SQLite ``engine``, but it hasn't opened any connections yet. When it's first used, it retrieves a connection from a pool of connections maintained by the ``engine``, and holds onto it until we commit all changes and/or close the session object.
Adding new Objects
>>> ed_user = User('ed', 'Ed Jones', 'edspassword')
>>> session.add(ed_user)
-
+
At this point, the instance is **pending**; no SQL has yet been issued. The ``Session`` will issue the SQL to persist ``Ed Jones`` as soon as is needed, using a process known as a **flush**. If we query the database for ``Ed Jones``, all pending information will first be flushed, and the query is issued afterwards.
For example, below we create a new ``Query`` object which loads instances of ``User``. We "filter by" the ``name`` attribute of ``ed``, and indicate that we'd like only the first result in the full list of rows. A ``User`` instance is returned which is equivalent to that which we've added:
BEGIN
INSERT INTO users (name, fullname, password) VALUES (?, ?, ?)
['ed', 'Ed Jones', 'edspassword']
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
WHERE users.name = ?
LIMIT 1 OFFSET 0
['ed']
>>> session.dirty
IdentitySet([<User('ed','Ed Jones', 'f8s7ccs')>])
-
+
and that three new ``User`` objects are pending:
.. sourcecode:: python+sql
>>> session.new # doctest: +NORMALIZE_WHITESPACE
- IdentitySet([<User('wendy','Wendy Williams', 'foobar')>,
- <User('mary','Mary Contrary', 'xxg527')>,
+ IdentitySet([<User('wendy','Wendy Williams', 'foobar')>,
+ <User('mary','Mary Contrary', 'xxg527')>,
<User('fred','Fred Flinstone', 'blah')>])
-
+
We tell the ``Session`` that we'd like to issue all remaining changes to the database and commit the transaction, which has been in progress throughout. We do this via ``commit()``:
.. sourcecode:: python+sql
{sql}>>> ed_user.id # doctest: +NORMALIZE_WHITESPACE
BEGIN
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
WHERE users.id = ?
[1]
{stop}1
>>> fake_user = User('fakeuser', 'Invalid', '12345')
>>> session.add(fake_user)
-
+
Querying the session, we can see that they're flushed into the current transaction:
.. sourcecode:: python+sql
['Edwardo', 1]
INSERT INTO users (name, fullname, password) VALUES (?, ?, ?)
['fakeuser', 'Invalid', '12345']
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
WHERE users.name IN (?, ?)
['Edwardo', 'fakeuser']
{stop}[<User('Edwardo','Ed Jones', 'f8s7ccs')>, <User('fakeuser','Invalid', '12345')>]
-
+
Rolling back, we can see that ``ed_user``'s name is back to ``ed``, and ``fake_user`` has been kicked out of the session:
.. sourcecode:: python+sql
{sql}>>> ed_user.name
BEGIN
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
WHERE users.id = ?
[1]
{stop}u'ed'
>>> fake_user in session
False
-
+
issuing a SELECT illustrates the changes made to the database:
.. sourcecode:: python+sql
{sql}>>> session.query(User).filter(User.name.in_(['ed', 'fakeuser'])).all()
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
WHERE users.name IN (?, ?)
['ed', 'fakeuser']
{stop}[<User('ed','Ed Jones', 'f8s7ccs')>]
.. sourcecode:: python+sql
{sql}>>> for instance in session.query(User).order_by(User.id): # doctest: +NORMALIZE_WHITESPACE
- ... print instance.name, instance.fullname
- SELECT users.id AS users_id, users.name AS users_name,
- users.fullname AS users_fullname, users.password AS users_password
+ ... print instance.name, instance.fullname
+ SELECT users.id AS users_id, users.name AS users_name,
+ users.fullname AS users_fullname, users.password AS users_password
FROM users ORDER BY users.id
[]
{stop}ed Ed Jones
{sql}>>> for row in session.query(User, User.name).all():
... print row.User, row.name
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
[]
{stop}<User('ed','Ed Jones', 'f8s7ccs')> ed
<User('wendy','Wendy Williams', 'foobar')> wendy
<User('mary','Mary Contrary', 'xxg527')> mary
<User('fred','Fred Flinstone', 'blah')> fred
-
+
You can control the names using the ``label()`` construct for scalar attributes and ``aliased()`` for class constructs:
.. sourcecode:: python+sql
>>> user_alias = aliased(User, name='user_alias')
{sql}>>> for row in session.query(user_alias, user_alias.name.label('name_label')).all():
... print row.user_alias, row.name_label
- SELECT users_1.id AS users_1_id, users_1.name AS users_1_name, users_1.fullname AS users_1_fullname, users_1.password AS users_1_password, users_1.name AS name_label
+ SELECT users_1.id AS users_1_id, users_1.name AS users_1_name, users_1.fullname AS users_1_fullname, users_1.password AS users_1_password, users_1.name AS name_label
FROM users AS users_1
[]
<User('ed','Ed Jones', 'f8s7ccs')> ed
{sql}>>> for u in session.query(User).order_by(User.id)[1:3]: #doctest: +NORMALIZE_WHITESPACE
... print u
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users ORDER BY users.id
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users ORDER BY users.id
LIMIT 2 OFFSET 1
[]
{stop}<User('wendy','Wendy Williams', 'foobar')>
{sql}>>> for name, in session.query(User.name).filter_by(fullname='Ed Jones'): # doctest: +NORMALIZE_WHITESPACE
... print name
- SELECT users.name AS users_name FROM users
+ SELECT users.name AS users_name FROM users
WHERE users.fullname = ?
['Ed Jones']
{stop}ed
{sql}>>> for name, in session.query(User.name).filter(User.fullname=='Ed Jones'): # doctest: +NORMALIZE_WHITESPACE
... print name
- SELECT users.name AS users_name FROM users
+ SELECT users.name AS users_name FROM users
WHERE users.fullname = ?
['Ed Jones']
{stop}ed
{sql}>>> for user in session.query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones'): # doctest: +NORMALIZE_WHITESPACE
... print user
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
WHERE users.name = ? AND users.fullname = ?
['ed', 'Ed Jones']
{stop}<User('ed','Ed Jones', 'f8s7ccs')>
* LIKE::
query.filter(User.name.like('%ed%'))
-
+
* IN::
query.filter(User.name.in_(['ed', 'wendy', 'jack']))
-
+
* IS NULL::
filter(User.name == None)
-
+
* AND::
from sqlalchemy import and_
filter(and_(User.name == 'ed', User.fullname == 'Ed Jones'))
-
+
# or call filter()/filter_by() multiple times
filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
query.filter(User.name.match('wendy'))
The contents of the match parameter are database backend specific.
-
-Returning Lists and Scalars
+
+Returning Lists and Scalars
---------------------------
The ``all()``, ``one()``, and ``first()`` methods of ``Query`` immediately issue SQL and return a non-iterator value. ``all()`` returns a list:
>>> query = session.query(User).filter(User.name.like('%ed')).order_by(User.id)
{sql}>>> query.all()
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
WHERE users.name LIKE ? ORDER BY users.id
['%ed']
{stop}[<User('ed','Ed Jones', 'f8s7ccs')>, <User('fred','Fred Flinstone', 'blah')>]
.. sourcecode:: python+sql
{sql}>>> query.first()
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
- WHERE users.name LIKE ? ORDER BY users.id
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
+ WHERE users.name LIKE ? ORDER BY users.id
LIMIT 1 OFFSET 0
['%ed']
{stop}<User('ed','Ed Jones', 'f8s7ccs')>
.. sourcecode:: python+sql
- {sql}>>> try:
- ... user = query.one()
- ... except Exception, e:
+ {sql}>>> try:
+ ... user = query.one()
+ ... except Exception, e:
... print e
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
- WHERE users.name LIKE ? ORDER BY users.id
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
+ WHERE users.name LIKE ? ORDER BY users.id
LIMIT 2 OFFSET 0
['%ed']
{stop}Multiple rows were found for one()
.. sourcecode:: python+sql
{sql}>>> try:
- ... user = query.filter(User.id == 99).one()
- ... except Exception, e:
+ ... user = query.filter(User.id == 99).one()
+ ... except Exception, e:
... print e
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
- WHERE users.name LIKE ? AND users.id = ? ORDER BY users.id
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
+ WHERE users.name LIKE ? AND users.id = ? ORDER BY users.id
LIMIT 2 OFFSET 0
['%ed', 99]
{stop}No row was found for one()
-Using Literal SQL
+Using Literal SQL
-----------------
Literal strings can be used flexibly with ``Query``. Most methods accept strings in addition to SQLAlchemy clause constructs. For example, ``filter()`` and ``order_by()``:
{sql}>>> for user in session.query(User).filter("id<224").order_by("id").all():
... print user.name
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
WHERE id<224 ORDER BY id
[]
{stop}ed
wendy
mary
fred
-
+
Bind parameters can be specified with string-based SQL, using a colon. To specify the values, use the ``params()`` method:
.. sourcecode:: python+sql
{sql}>>> session.query(User).filter("id<:value and name=:name").\
... params(value=224, name='fred').order_by(User.id).one() # doctest: +NORMALIZE_WHITESPACE
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
- WHERE id<? and name=? ORDER BY users.id
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
+ WHERE id<? and name=? ORDER BY users.id
LIMIT 2 OFFSET 0
[224, 'fred']
{stop}<User('fred','Fred Flinstone', 'blah')>
['ed']
{stop}[<User('ed','Ed Jones', 'f8s7ccs')>]
-Building a Relation
+Building a Relation
====================
Now let's consider a second table to be dealt with. Users in our system also can store any number of email addresses associated with their username. This implies a basic one to many association from the ``users_table`` to a new table which stores email addresses, which we will call ``addresses``. Using declarative, we define this table along with its mapped class, ``Address``:
# ....
addresses = relation(Address, order_by=Address.id, backref="user")
-We are also free to not define a backref, and to define the func:`relation()` only on one class and not the other. It is also possible to define two separate :func:`relation` constructs for either direction, which is generally safe for many-to-one and one-to-many relations, but not for many-to-many relations.
+We are also free to not define a backref, and to define the :func:`relation()` only on one class and not the other. It is also possible to define two separate :func:`relation` constructs for either direction, which is generally safe for many-to-one and one-to-many relations, but not for many-to-many relations.
When using the ``declarative`` extension, ``relation()`` gives us the option to use strings for most arguments that concern the target class, in the case that the target class has not yet been defined. This **only** works in conjunction with ``declarative``:
PRAGMA table_info("addresses")
{}
CREATE TABLE addresses (
- id INTEGER NOT NULL,
- email_address VARCHAR NOT NULL,
- user_id INTEGER,
- PRIMARY KEY (id),
+ id INTEGER NOT NULL,
+ email_address VARCHAR NOT NULL,
+ user_id INTEGER,
+ PRIMARY KEY (id),
FOREIGN KEY(user_id) REFERENCES users (id)
)
{}
COMMIT
-Working with Related Objects
+Working with Related Objects
=============================
-Now when we create a ``User``, a blank ``addresses`` collection will be present. By default, the collection is a Python list. Other collection types, such as sets and dictionaries, are available as well:
+Now when we create a ``User``, a blank ``addresses`` collection will be present. Various collection types, such as sets and dictionaries, are possible here (see :ref:`advdatamapping_entitycollections` for details), but by default, the collection is a Python list.
.. sourcecode:: python+sql
>>> jack = User('jack', 'Jack Bean', 'gjffdd')
>>> jack.addresses
[]
-
+
We are free to add ``Address`` objects on our ``User`` object. In this case we just assign a full list directly:
.. sourcecode:: python+sql
>>> jack.addresses[1]
<Address('j25@yahoo.com')>
-
+
>>> jack.addresses[1].user
<User('jack','Jack Bean', 'gjffdd')>
INSERT INTO addresses (email_address, user_id) VALUES (?, ?)
['j25@yahoo.com', 5]
COMMIT
-
+
Querying for Jack, we get just Jack back. No SQL is yet issued for Jack's addresses:
.. sourcecode:: python+sql
{sql}>>> jack = session.query(User).filter_by(name='jack').one()
BEGIN
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
- WHERE users.name = ?
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
+ WHERE users.name = ?
LIMIT 2 OFFSET 0
['jack']
-
+
>>> jack
<User('jack','Jack Bean', 'gjffdd')>
-
+
Let's look at the ``addresses`` collection. Watch the SQL:
.. sourcecode:: python+sql
{sql}>>> jack.addresses
- SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address, addresses.user_id AS addresses_user_id
- FROM addresses
+ SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address, addresses.user_id AS addresses_user_id
+ FROM addresses
WHERE ? = addresses.user_id ORDER BY addresses.id
[5]
{stop}[<Address('jack@google.com')>, <Address('j25@yahoo.com')>]
-
-When we accessed the ``addresses`` collection, SQL was suddenly issued. This is an example of a **lazy loading relation**. The ``addresses`` collection is now loaded and behaves just like an ordinary list.
-
+
+When we accessed the ``addresses`` collection, SQL was suddenly issued. This is an example of a **lazy loading relation**. The ``addresses`` collection is now loaded and behaves just like an ordinary list.
+
If you want to reduce the number of queries (dramatically, in many cases), we can apply an **eager load** to the query operation. With the same query, we may apply an **option** to the query, indicating that we'd like ``addresses`` to load "eagerly". SQLAlchemy then constructs an outer join between the ``users`` and ``addresses`` tables, and loads them at once, populating the ``addresses`` collection on each ``User`` object if it's not already populated:
.. sourcecode:: python+sql
>>> from sqlalchemy.orm import eagerload
-
+
{sql}>>> jack = session.query(User).options(eagerload('addresses')).filter_by(name='jack').one() #doctest: +NORMALIZE_WHITESPACE
- SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name,
- anon_1.users_fullname AS anon_1_users_fullname, anon_1.users_password AS anon_1_users_password,
- addresses_1.id AS addresses_1_id, addresses_1.email_address AS addresses_1_email_address,
- addresses_1.user_id AS addresses_1_user_id
- FROM (SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname,
+ SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name,
+ anon_1.users_fullname AS anon_1_users_fullname, anon_1.users_password AS anon_1_users_password,
+ addresses_1.id AS addresses_1_id, addresses_1.email_address AS addresses_1_email_address,
+ addresses_1.user_id AS addresses_1_user_id
+ FROM (SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname,
users.password AS users_password
- FROM users WHERE users.name = ?
- LIMIT 2 OFFSET 0) AS anon_1 LEFT OUTER JOIN addresses AS addresses_1
+ FROM users WHERE users.name = ?
+ LIMIT 2 OFFSET 0) AS anon_1 LEFT OUTER JOIN addresses AS addresses_1
ON anon_1.users_id = addresses_1.user_id ORDER BY addresses_1.id
['jack']
-
+
>>> jack
<User('jack','Jack Bean', 'gjffdd')>
-
+
>>> jack.addresses
[<Address('jack@google.com')>, <Address('j25@yahoo.com')>]
SQLAlchemy has the ability to control exactly which attributes and how many levels deep should be joined together in a single SQL query. More information on this feature is available in `advdatamapping_relation`.
-Querying with Joins
+Querying with Joins
====================
While the eager load created a JOIN specifically to populate a collection, we can also work explicitly with joins in many ways. For example, to construct a simple inner join between ``User`` and ``Address``, we can just ``filter()`` their related columns together. Below we load the ``User`` and ``Address`` entities at once using this method:
{sql}>>> for u, a in session.query(User, Address).filter(User.id==Address.user_id).\
... filter(Address.email_address=='jack@google.com').all(): # doctest: +NORMALIZE_WHITESPACE
... print u, a
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname,
- users.password AS users_password, addresses.id AS addresses_id,
- addresses.email_address AS addresses_email_address, addresses.user_id AS addresses_user_id
- FROM users, addresses
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname,
+ users.password AS users_password, addresses.id AS addresses_id,
+ addresses.email_address AS addresses_email_address, addresses.user_id AS addresses_user_id
+ FROM users, addresses
WHERE users.id = addresses.user_id AND addresses.email_address = ?
['jack@google.com']
{stop}<User('jack','Jack Bean', 'gjffdd')> <Address('jack@google.com')>
>>> from sqlalchemy.orm import join
{sql}>>> session.query(User).select_from(join(User, Address)).\
... filter(Address.email_address=='jack@google.com').all()
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users JOIN addresses ON users.id = addresses.user_id
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users JOIN addresses ON users.id = addresses.user_id
WHERE addresses.email_address = ?
['jack@google.com']
{stop}[<User('jack','Jack Bean', 'gjffdd')>]
join(User, Address, User.id==Address.user_id) # explicit condition
join(User, Address, User.addresses) # specify relation from left to right
join(User, Address, 'addresses') # same, using a string
-
+
The functionality of ``join()`` is also available generatively from ``Query`` itself using ``Query.join``. This is most easily used with just the "ON" clause portion of the join, such as:
.. sourcecode:: python+sql
{sql}>>> session.query(User).join(User.addresses).\
... filter(Address.email_address=='jack@google.com').all()
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users JOIN addresses ON users.id = addresses.user_id
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users JOIN addresses ON users.id = addresses.user_id
WHERE addresses.email_address = ?
['jack@google.com']
{stop}[<User('jack','Jack Bean', 'gjffdd')>]
.. sourcecode:: python+sql
session.query(User).join((Address, User.addresses))
-
+
Multiple joins can be created by passing a list of arguments:
.. sourcecode:: python+sql
session.query(Foo).join(Foo.bars, Bar.bats, (Bat, 'widgets'))
-
+
The above would produce SQL something like ``foo JOIN bars ON <onclause> JOIN bats ON <onclause> JOIN widgets ON <onclause>``.
-
-Using Aliases
+
+Using Aliases
-------------
When querying across multiple tables, if the same table needs to be referenced more than once, SQL typically requires that the table be *aliased* with another name, so that it can be distinguished against other occurrences of that table. The ``Query`` supports this most explicitly using the ``aliased`` construct. Below we join to the ``Address`` entity twice, to locate a user who has two distinct email addresses at the same time:
... filter(adalias1.email_address=='jack@google.com').\
... filter(adalias2.email_address=='j25@yahoo.com'):
... print username, email1, email2 # doctest: +NORMALIZE_WHITESPACE
- SELECT users.name AS users_name, addresses_1.email_address AS addresses_1_email_address,
- addresses_2.email_address AS addresses_2_email_address
- FROM users JOIN addresses AS addresses_1 ON users.id = addresses_1.user_id
- JOIN addresses AS addresses_2 ON users.id = addresses_2.user_id
+ SELECT users.name AS users_name, addresses_1.email_address AS addresses_1_email_address,
+ addresses_2.email_address AS addresses_2_email_address
+ FROM users JOIN addresses AS addresses_1 ON users.id = addresses_1.user_id
+ JOIN addresses AS addresses_2 ON users.id = addresses_2.user_id
WHERE addresses_1.email_address = ? AND addresses_2.email_address = ?
['jack@google.com', 'j25@yahoo.com']
{stop}jack jack@google.com j25@yahoo.com
-Using Subqueries
+Using Subqueries
----------------
The ``Query`` is suitable for generating statements which can be used as subqueries. Suppose we wanted to load ``User`` objects along with a count of how many ``Address`` records each user has. The best way to generate SQL like this is to get the count of addresses grouped by user ids, and JOIN to the parent. In this case we use a LEFT OUTER JOIN so that we get rows back for those users who don't have any addresses, e.g.::
(SELECT user_id, count(*) AS address_count FROM addresses GROUP BY user_id) AS adr_count
ON users.id=adr_count.user_id
-Using the ``Query``, we build a statement like this from the inside out. The ``statement`` accessor returns a SQL expression representing the statement generated by a particular ``Query`` - this is an instance of a ``select()`` construct, which are described in `sql`::
+Using the ``Query``, we build a statement like this from the inside out. The ``statement`` accessor returns a SQL expression representing the statement generated by a particular ``Query`` - this is an instance of a ``select()`` construct, which are described in :ref:`sql`::
>>> from sqlalchemy.sql import func
>>> stmt = session.query(Address.user_id, func.count('*').label('address_count')).group_by(Address.user_id).subquery()
-
+
The ``func`` keyword generates SQL functions, and the ``subquery()`` method on ``Query`` produces a SQL expression construct representing a SELECT statement embedded within an alias (it's actually shorthand for ``query.statement.alias()``).
Once we have our statement, it behaves like a ``Table`` construct, such as the one we created for ``users`` at the start of this tutorial. The columns on the statement are accessible through an attribute called ``c``:
{sql}>>> for u, count in session.query(User, stmt.c.address_count).\
... outerjoin((stmt, User.id==stmt.c.user_id)).order_by(User.id): # doctest: +NORMALIZE_WHITESPACE
... print u, count
- SELECT users.id AS users_id, users.name AS users_name,
- users.fullname AS users_fullname, users.password AS users_password,
- anon_1.address_count AS anon_1_address_count
- FROM users LEFT OUTER JOIN (SELECT addresses.user_id AS user_id, count(?) AS address_count
- FROM addresses GROUP BY addresses.user_id) AS anon_1 ON users.id = anon_1.user_id
+ SELECT users.id AS users_id, users.name AS users_name,
+ users.fullname AS users_fullname, users.password AS users_password,
+ anon_1.address_count AS anon_1_address_count
+ FROM users LEFT OUTER JOIN (SELECT addresses.user_id AS user_id, count(?) AS address_count
+ FROM addresses GROUP BY addresses.user_id) AS anon_1 ON users.id = anon_1.user_id
ORDER BY users.id
['*']
{stop}<User('ed','Ed Jones', 'f8s7ccs')> None
>>> stmt = exists().where(Address.user_id==User.id)
{sql}>>> for name, in session.query(User.name).filter(stmt): # doctest: +NORMALIZE_WHITESPACE
... print name
- SELECT users.name AS users_name
- FROM users
- WHERE EXISTS (SELECT *
- FROM addresses
+ SELECT users.name AS users_name
+ FROM users
+ WHERE EXISTS (SELECT *
+ FROM addresses
WHERE addresses.user_id = users.id)
[]
{stop}jack
{sql}>>> for name, in session.query(User.name).filter(User.addresses.any()): # doctest: +NORMALIZE_WHITESPACE
... print name
- SELECT users.name AS users_name
- FROM users
- WHERE EXISTS (SELECT 1
- FROM addresses
+ SELECT users.name AS users_name
+ FROM users
+ WHERE EXISTS (SELECT 1
+ FROM addresses
WHERE users.id = addresses.user_id)
[]
{stop}jack
{sql}>>> for name, in session.query(User.name).\
... filter(User.addresses.any(Address.email_address.like('%google%'))): # doctest: +NORMALIZE_WHITESPACE
... print name
- SELECT users.name AS users_name
- FROM users
- WHERE EXISTS (SELECT 1
- FROM addresses
+ SELECT users.name AS users_name
+ FROM users
+ WHERE EXISTS (SELECT 1
+ FROM addresses
WHERE users.id = addresses.user_id AND addresses.email_address LIKE ?)
['%google%']
{stop}jack
.. sourcecode:: python+sql
{sql}>>> session.query(Address).filter(~Address.user.has(User.name=='jack')).all() # doctest: +NORMALIZE_WHITESPACE
- SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address,
- addresses.user_id AS addresses_user_id
- FROM addresses
- WHERE NOT (EXISTS (SELECT 1
- FROM users
+ SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address,
+ addresses.user_id AS addresses_user_id
+ FROM addresses
+ WHERE NOT (EXISTS (SELECT 1
+ FROM users
WHERE users.id = addresses.user_id AND users.name = ?))
['jack']
{stop}[]
-
-Common Relation Operators
+
+Common Relation Operators
-------------------------
Here's all the operators which build on relations:
* any (used for one-to-many and many-to-many collections)::
query.filter(User.addresses.any(Address.email_address == 'bar'))
-
+
# also takes keyword arguments:
query.filter(User.addresses.any(email_address='bar'))
[None, 2]
DELETE FROM users WHERE users.id = ?
[5]
- SELECT count(1) AS count_1
- FROM users
+ SELECT count(1) AS count_1
+ FROM users
WHERE users.name = ?
['jack']
{stop}0
-
+
So far, so good. How about Jack's ``Address`` objects ?
.. sourcecode:: python+sql
... Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])
... ).count() # doctest: +NORMALIZE_WHITESPACE
SELECT count(1) AS count_1
- FROM addresses
+ FROM addresses
WHERE addresses.email_address IN (?, ?)
['jack@google.com', 'j25@yahoo.com']
{stop}2
-
+
Uh oh, they're still there ! Analyzing the flush SQL, we can see that the ``user_id`` column of each address was set to NULL, but the rows weren't deleted. SQLAlchemy doesn't assume that deletes cascade, you have to tell it to do so.
-Configuring delete/delete-orphan Cascade
+Configuring delete/delete-orphan Cascade
----------------------------------------
We will configure **cascade** options on the ``User.addresses`` relation to change the behavior. While SQLAlchemy allows you to add new attributes and relations to mappings at any point in time, in this case the existing relation needs to be removed, so we need to tear down the mappings completely and start again. This is not a typical operation and is here just for illustrative purposes.
>>> session.close() # roll back and close the transaction
>>> from sqlalchemy.orm import clear_mappers
>>> clear_mappers() # clear mappers
-
+
Below, we use ``mapper()`` to reconfigure an ORM mapping for ``User`` and ``Address``, on our existing but currently un-mapped classes. The ``User.addresses`` relation now has ``delete, delete-orphan`` cascade on it, which indicates that DELETE operations will cascade to attached ``Address`` objects as well as ``Address`` objects which are removed from their parent:
.. sourcecode:: python+sql
... 'addresses':relation(Address, backref='user', cascade="all, delete, delete-orphan")
... })
<Mapper at 0x...; User>
-
+
>>> addresses_table = Address.__table__
>>> mapper(Address, addresses_table) # doctest: +ELLIPSIS
<Mapper at 0x...; Address>
# load Jack by primary key
{sql}>>> jack = session.query(User).get(5) #doctest: +NORMALIZE_WHITESPACE
BEGIN
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
WHERE users.id = ?
[5]
{stop}
-
+
# remove one Address (lazy load fires off)
- {sql}>>> del jack.addresses[1]
- SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address, addresses.user_id AS addresses_user_id
- FROM addresses
+ {sql}>>> del jack.addresses[1]
+ SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address, addresses.user_id AS addresses_user_id
+ FROM addresses
WHERE ? = addresses.user_id
[5]
{stop}
DELETE FROM addresses WHERE addresses.id = ?
[2]
SELECT count(1) AS count_1
- FROM addresses
+ FROM addresses
WHERE addresses.email_address IN (?, ?)
['jack@google.com', 'j25@yahoo.com']
{stop}1
-
+
Deleting Jack will delete both Jack and his remaining ``Address``:
.. sourcecode:: python+sql
>>> session.delete(jack)
-
+
{sql}>>> session.query(User).filter_by(name='jack').count() # doctest: +NORMALIZE_WHITESPACE
DELETE FROM addresses WHERE addresses.id = ?
[1]
DELETE FROM users WHERE users.id = ?
[5]
SELECT count(1) AS count_1
- FROM users
+ FROM users
WHERE users.name = ?
['jack']
{stop}0
-
+
{sql}>>> session.query(Address).filter(
... Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])
... ).count() # doctest: +NORMALIZE_WHITESPACE
SELECT count(1) AS count_1
- FROM addresses
+ FROM addresses
WHERE addresses.email_address IN (?, ?)
['jack@google.com', 'j25@yahoo.com']
{stop}0
-Building a Many To Many Relation
+Building a Many To Many Relation
=================================
-We're moving into the bonus round here, but lets show off a many-to-many relationship. We'll sneak in some other features too, just to take a tour. We'll make our application a blog application, where users can write ``BlogPost``s, which have ``Keywords`` associated with them.
+We're moving into the bonus round here, but lets show off a many-to-many relationship. We'll sneak in some other features too, just to take a tour. We'll make our application a blog application, where users can write ``BlogPost`` items, which have ``Keyword`` items associated with them.
The declarative setup is as follows:
... def __init__(self, keyword):
... self.keyword = keyword
-Above, the many-to-many relation above is ``BlogPost.keywords``. The defining feature of a many to many relation is the ``secondary`` keyword argument which references a ``Table`` object representing the association table. This table only contains columns which reference the two sides of the relation; if it has *any* other columns, such as its own primary key, or foreign keys to other tables, SQLAlchemy requires a different usage pattern called the "association object", described at `association_pattern`.
+Above, the many-to-many relation is ``BlogPost.keywords``. The defining feature of a many-to-many relation is the ``secondary`` keyword argument which references a ``Table`` object representing the association table. This table only contains columns which reference the two sides of the relation; if it has *any* other columns, such as its own primary key, or foreign keys to other tables, SQLAlchemy requires a different usage pattern called the "association object", described at :ref:`association_pattern`.
The many-to-many relation is also bi-directional using the ``backref`` keyword. This is the one case where usage of ``backref`` is generally required, since if a separate ``posts`` relation were added to the ``Keyword`` entity, both relations would independently add and remove rows from the ``post_keywords`` table and produce conflicts.
PRAGMA table_info("post_keywords")
{}
CREATE TABLE posts (
- id INTEGER NOT NULL,
- user_id INTEGER,
- headline VARCHAR(255) NOT NULL,
- body TEXT,
- PRIMARY KEY (id),
+ id INTEGER NOT NULL,
+ user_id INTEGER,
+ headline VARCHAR(255) NOT NULL,
+ body TEXT,
+ PRIMARY KEY (id),
FOREIGN KEY(user_id) REFERENCES users (id)
)
{}
COMMIT
CREATE TABLE keywords (
- id INTEGER NOT NULL,
- keyword VARCHAR(50) NOT NULL,
- PRIMARY KEY (id),
+ id INTEGER NOT NULL,
+ keyword VARCHAR(50) NOT NULL,
+ PRIMARY KEY (id),
UNIQUE (keyword)
)
{}
COMMIT
CREATE TABLE post_keywords (
- post_id INTEGER,
- keyword_id INTEGER,
- FOREIGN KEY(post_id) REFERENCES posts (id),
+ post_id INTEGER,
+ keyword_id INTEGER,
+ FOREIGN KEY(post_id) REFERENCES posts (id),
FOREIGN KEY(keyword_id) REFERENCES keywords (id)
)
{}
.. sourcecode:: python+sql
{sql}>>> wendy = session.query(User).filter_by(name='wendy').one()
- SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
- FROM users
- WHERE users.name = ?
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
+ FROM users
+ WHERE users.name = ?
LIMIT 2 OFFSET 0
['wendy']
-
+
>>> post = BlogPost("Wendy's Blog Post", "This is a test", wendy)
>>> session.add(post)
-
+
We're storing keywords uniquely in the database, but we know that we don't have any yet, so we can just create them:
.. sourcecode:: python+sql
>>> post.keywords.append(Keyword('wendy'))
>>> post.keywords.append(Keyword('firstpost'))
-
+
We can now look up all blog posts with the keyword 'firstpost'. We'll use the ``any`` operator to locate "blog posts where any of its keywords has the keyword string 'firstpost'":
.. sourcecode:: python+sql
['firstpost']
INSERT INTO post_keywords (post_id, keyword_id) VALUES (?, ?)
[[1, 1], [1, 2]]
- SELECT posts.id AS posts_id, posts.user_id AS posts_user_id, posts.headline AS posts_headline, posts.body AS posts_body
- FROM posts
- WHERE EXISTS (SELECT 1
- FROM post_keywords, keywords
+ SELECT posts.id AS posts_id, posts.user_id AS posts_user_id, posts.headline AS posts_headline, posts.body AS posts_body
+ FROM posts
+ WHERE EXISTS (SELECT 1
+ FROM post_keywords, keywords
WHERE posts.id = post_keywords.post_id AND keywords.id = post_keywords.keyword_id AND keywords.keyword = ?)
['firstpost']
{stop}[BlogPost("Wendy's Blog Post", 'This is a test', <User('wendy','Wendy Williams', 'foobar')>)]
-
+
If we want to look up just Wendy's posts, we can tell the query to narrow down to her as a parent:
.. sourcecode:: python+sql
{sql}>>> session.query(BlogPost).filter(BlogPost.author==wendy).\
... filter(BlogPost.keywords.any(keyword='firstpost')).all()
- SELECT posts.id AS posts_id, posts.user_id AS posts_user_id, posts.headline AS posts_headline, posts.body AS posts_body
- FROM posts
- WHERE ? = posts.user_id AND (EXISTS (SELECT 1
- FROM post_keywords, keywords
+ SELECT posts.id AS posts_id, posts.user_id AS posts_user_id, posts.headline AS posts_headline, posts.body AS posts_body
+ FROM posts
+ WHERE ? = posts.user_id AND (EXISTS (SELECT 1
+ FROM post_keywords, keywords
WHERE posts.id = post_keywords.post_id AND keywords.id = post_keywords.keyword_id AND keywords.keyword = ?))
[2, 'firstpost']
{stop}[BlogPost("Wendy's Blog Post", 'This is a test', <User('wendy','Wendy Williams', 'foobar')>)]
.. sourcecode:: python+sql
{sql}>>> wendy.posts.filter(BlogPost.keywords.any(keyword='firstpost')).all()
- SELECT posts.id AS posts_id, posts.user_id AS posts_user_id, posts.headline AS posts_headline, posts.body AS posts_body
- FROM posts
- WHERE ? = posts.user_id AND (EXISTS (SELECT 1
- FROM post_keywords, keywords
+ SELECT posts.id AS posts_id, posts.user_id AS posts_user_id, posts.headline AS posts_headline, posts.body AS posts_body
+ FROM posts
+ WHERE ? = posts.user_id AND (EXISTS (SELECT 1
+ FROM post_keywords, keywords
WHERE posts.id = post_keywords.post_id AND keywords.id = post_keywords.keyword_id AND keywords.keyword = ?))
[2, 'firstpost']
{stop}[BlogPost("Wendy's Blog Post", 'This is a test', <User('wendy','Wendy Williams', 'foobar')>)]
-Further Reference
+Further Reference
==================
Query Reference: :ref:`query_api_toplevel`
.. autofunction:: clear_mappers
+Attribute Utilities
+-------------------
+.. autofunction:: sqlalchemy.orm.attributes.del_attribute
+
+.. autofunction:: sqlalchemy.orm.attributes.get_attribute
+
+.. autofunction:: sqlalchemy.orm.attributes.get_history
+
+.. autofunction:: sqlalchemy.orm.attributes.init_collection
+
+.. function:: sqlalchemy.orm.attributes.instance_state
+
+ Return the :class:`InstanceState` for a given object.
+
+.. autofunction:: sqlalchemy.orm.attributes.is_instrumented
+
+.. function:: sqlalchemy.orm.attributes.manager_of_class
+
+ Return the :class:`ClassManager` for a given class.
+
+.. autofunction:: sqlalchemy.orm.attributes.set_attribute
+
+.. autofunction:: sqlalchemy.orm.attributes.set_committed_value
+
Internals
---------
self.schema_name = schema_name
self.use_scope_identity = bool(use_scope_identity)
- self.max_identifier_length = int(max_identifier_length or 0) or 128
+ self.max_identifier_length = int(max_identifier_length or 0) or \
+ self.max_identifier_length
super(MSDialect, self).__init__(**opts)
def initialize(self, connection):
class PGInterval(sqltypes.TypeEngine):
__visit_name__ = 'INTERVAL'
+class PGBit(sqltypes.TypeEngine):
+ __visit_name__ = 'BIT'
+
+class PGUuid(sqltypes.TypeEngine):
+ __visit_name__ = 'UUID'
+
class PGArray(sqltypes.MutableType, sqltypes.Concatenable, sqltypes.TypeEngine):
__visit_name__ = 'ARRAY'
'real' : sqltypes.Float,
'inet': PGInet,
'cidr': PGCidr,
+ 'uuid': PGUuid,
+ 'bit':PGBit,
'macaddr': PGMacAddr,
'double precision' : sqltypes.Float,
'timestamp' : sqltypes.TIMESTAMP,
def visit_INTERVAL(self, type_):
return "INTERVAL"
+ def visit_BIT(self, type_):
+ return "BIT"
+
+ def visit_UUID(self, type_):
+ return "UUID"
+
def visit_binary(self, type_):
return self.visit_BYTEA(type_)
)
return [row[0] for row in result]
- def server_version_info(self, connection):
+ def _get_server_version_info(self, connection):
v = connection.execute("select version()").scalar()
m = re.match('PostgreSQL (\d+)\.(\d+)\.(\d+)', v)
if not m:
raise e
return sqlite
- def server_version_info(self, connection):
+ def _get_server_version_info(self, connection):
return self.dbapi.sqlite_version_info
def create_connect_args(self, url):
# Py3K
#self.supports_unicode_statements = True
#self.supports_unicode_binds = True
+
+ def initialize(self, connection):
+ # TODO: all dialects need to implement this
+ if hasattr(self, '_get_server_version_info'):
+ self.server_version_info = self._get_server_version_info(connection)
@classmethod
def type_descriptor(cls, typeobj):
from sqlalchemy.orm import synonym as _orm_synonym, mapper, comparable_property, class_mapper
from sqlalchemy.orm.interfaces import MapperProperty
from sqlalchemy.orm.properties import PropertyLoader, ColumnProperty
+from sqlalchemy.orm.util import _is_mapped_class
from sqlalchemy import util, exceptions
from sqlalchemy.sql import util as sql_util
else:
table = cls.__table__
if cols:
- raise exceptions.ArgumentError("Can't add additional columns when specifying __table__")
+ for c in cols:
+ if not table.c.contains_column(c):
+ raise exceptions.ArgumentError("Can't add additional column %r when specifying __table__" % key)
mapper_args = getattr(cls, '__mapper_args__', {})
if 'inherits' not in mapper_args:
- inherits = cls.__mro__[1]
- inherits = cls._decl_class_registry.get(inherits.__name__, None)
- if inherits:
- mapper_args['inherits'] = inherits
- if not mapper_args.get('concrete', False) and table and 'inherit_condition' not in mapper_args:
- # figure out the inherit condition with relaxed rules
- # about nonexistent tables, to allow for ForeignKeys to
- # not-yet-defined tables (since we know for sure that our
- # parent table is defined within the same MetaData)
- mapper_args['inherit_condition'] = sql_util.join_condition(
- inherits.__table__, table,
- ignore_nonexistent_tables=True)
+ for c in cls.__bases__:
+ if _is_mapped_class(c):
+ mapper_args['inherits'] = cls._decl_class_registry.get(c.__name__, None)
+ break
if hasattr(cls, '__mapper_cls__'):
mapper_cls = util.unbound_method_to_callable(cls.__mapper_cls__)
elif 'inherits' in mapper_args and not mapper_args.get('concrete', False):
inherited_mapper = class_mapper(mapper_args['inherits'], compile=False)
inherited_table = inherited_mapper.local_table
+ if 'inherit_condition' not in mapper_args and table:
+ # figure out the inherit condition with relaxed rules
+ # about nonexistent tables, to allow for ForeignKeys to
+ # not-yet-defined tables (since we know for sure that our
+ # parent table is defined within the same MetaData)
+ mapper_args['inherit_condition'] = sql_util.join_condition(
+ mapper_args['inherits'].__table__, table,
+ ignore_nonexistent_tables=True)
if not table:
# single table inheritance.
setattr(prop, attr, resolve_arg(v))
if prop.backref:
- for attr in ('primaryjoin', 'secondaryjoin', 'secondary', '_foreign_keys', 'remote_side', 'order_by'):
+ for attr in ('primaryjoin', 'secondaryjoin', 'secondary', 'foreign_keys', 'remote_side', 'order_by'):
if attr in prop.backref.kwargs and isinstance(prop.backref.kwargs[attr], basestring):
prop.backref.kwargs[attr] = resolve_arg(prop.backref.kwargs[attr])
>>> db.entity(tablename) == db.loans
True
+entity() also takes an optional schema argument. If none is specified, the
+default schema is used.
+
Extra tests
===========
j = join(*args, **kwargs)
return self.map(j)
- def entity(self, attr):
+ def entity(self, attr, schema=None):
try:
t = self._cache[attr]
except KeyError:
- table = Table(attr, self._metadata, autoload=True, schema=self.schema)
+ table = Table(attr, self._metadata, autoload=True, schema=schema or self.schema)
if not table.primary_key.columns:
raise PKNotFoundError('table %r does not have a primary key defined [columns: %s]' % (attr, ','.join(table.c.keys())))
if table.columns:
about stability. Caveat emptor.
This attribute is consulted by the default SQLAlchemy instrumentation
-resultion code. If custom finders are installed in the global
+resolution code. If custom finders are installed in the global
instrumentation_finders list, they may or may not choose to honor this
attribute.
if self.extensions:
self.fire_remove_event(state, old, None)
- del state.dict[self.key]
- else:
- del state.dict[self.key]
+ del state.dict[self.key]
def get_history(self, state, passive=PASSIVE_OFF):
return History.from_attribute(
if self.extensions:
value = self.fire_replace_event(state, value, old, initiator)
- state.dict[self.key] = value
- else:
- state.dict[self.key] = value
+ state.dict[self.key] = value
def fire_replace_event(self, state, value, previous, initiator):
for ext in self.extensions:
event_registry_factory = Events
instance_state_factory = InstanceState
-
+ deferred_scalar_loader = None
+
def __init__(self, class_):
self.class_ = class_
self.factory = None # where we came from, for inheritance bookkeeping
self.info = {}
self.mapper = None
+ self.new_init = None
self.mutable_attributes = set()
self.local_attrs = {}
self.originals = {}
cls_state = manager_of_class(base)
if cls_state:
self.update(cls_state)
- self.registered = False
- self._instantiable = False
self.events = self.event_registry_factory()
-
- def instantiable(self, boolean):
- # experiment, probably won't stay in this form
- assert boolean ^ self._instantiable, (boolean, self._instantiable)
- if boolean:
- self.events.original_init = self.class_.__init__
- new_init = _generate_init(self.class_, self)
- self.install_member('__init__', new_init)
- else:
+ self.manage()
+ self._instrument_init()
+
+ def _configure_create_arguments(self,
+ _source=None,
+ instance_state_factory=None,
+ deferred_scalar_loader=None):
+ """Accept extra **kw arguments passed to create_manager_for_cls.
+
+ The current contract of ClassManager and other managers is that they
+ take a single "cls" argument in their constructor (as per
+ test/orm/instrumentation.py InstrumentationCollisionTest). This
+ is to provide consistency with the current API of "class manager"
+ callables and such which may return various ClassManager and
+ ClassManager-like instances. So create_manager_for_cls sends
+ in ClassManager-specific arguments via this method once the
+ non-proxied ClassManager is available.
+
+ """
+ if _source:
+ instance_state_factory = _source.instance_state_factory
+ deferred_scalar_loader = _source.deferred_scalar_loader
+
+ if instance_state_factory:
+ self.instance_state_factory = instance_state_factory
+ if deferred_scalar_loader:
+ self.deferred_scalar_loader = deferred_scalar_loader
+
+ def _subclass_manager(self, cls):
+ """Create a new ClassManager for a subclass of this ClassManager's class.
+
+ This is called automatically when attributes are instrumented so that
+ the attributes can be propagated to subclasses against their own
+ class-local manager, without the need for mappers etc. to have already
+ pre-configured managers for the full class hierarchy. Mappers
+ can post-configure the auto-generated ClassManager when needed.
+
+ """
+ manager = manager_of_class(cls)
+ if manager is None:
+ manager = _create_manager_for_cls(cls, _source=self)
+ return manager
+
+ def _instrument_init(self):
+ # TODO: self.class_.__init__ is often the already-instrumented
+ # __init__ from an instrumented superclass. We still need to make
+ # our own wrapper, but it would
+ # be nice to wrap the original __init__ and not our existing wrapper
+ # of such, since this adds method overhead.
+ self.events.original_init = self.class_.__init__
+ self.new_init = _generate_init(self.class_, self)
+ self.install_member('__init__', self.new_init)
+
+ def _uninstrument_init(self):
+ if self.new_init:
self.uninstall_member('__init__')
- self._instantiable = bool(boolean)
- instantiable = property(lambda s: s._instantiable, instantiable)
+ self.new_init = None
def manage(self):
"""Mark this instance as the manager for its class."""
+
setattr(self.class_, self.MANAGER_ATTR, self)
def dispose(self):
- """Dissasociate this instance from its class."""
+ """Dissasociate this manager from its class."""
+
delattr(self.class_, self.MANAGER_ATTR)
def manager_getter(self):
for cls in self.class_.__subclasses__():
if isinstance(cls, types.ClassType):
continue
- manager = manager_of_class(cls)
- if manager is None:
- manager = create_manager_for_cls(cls)
+ manager = self._subclass_manager(cls)
manager.instrument_attribute(key, inst, True)
def post_configure_attribute(self, key):
for cls in self.class_.__subclasses__():
if isinstance(cls, types.ClassType):
continue
- manager = manager_of_class(cls)
- if manager is None:
- manager = create_manager_for_cls(cls)
+ manager = self._subclass_manager(cls)
manager.uninstrument_attribute(key, True)
def unregister(self):
+ """remove all instrumentation established by this ClassManager."""
+
+ self._uninstrument_init()
+
+ self.mapper = self.events = None
+ self.info.clear()
+
for key in list(self):
if key in self.local_attrs:
self.uninstrument_attribute(key)
- self.registered = False
def install_descriptor(self, key, inst):
if key in (self.STATE_ATTR, self.MANAGER_ATTR):
def attributes(self):
return self.itervalues()
- @classmethod
- def deferred_scalar_loader(cls, state, keys):
- """Apply a scalar loader to the given state.
-
- Unimplemented by default, is patched
- by the mapper.
-
- """
-
## InstanceState management
def new_instance(self, state=None):
class _ClassInstrumentationAdapter(ClassManager):
"""Adapts a user-defined InstrumentationManager to a ClassManager."""
- def __init__(self, class_, override):
- ClassManager.__init__(self, class_)
+ def __init__(self, class_, override, **kw):
self._adapted = override
+ ClassManager.__init__(self, class_, **kw)
def manage(self):
self._adapted.manage(self.class_, self)
self.added_items.remove(value)
self.deleted_items.add(value)
+def _conditional_instance_state(obj):
+ if not isinstance(obj, InstanceState):
+ obj = instance_state(obj)
+ return obj
+
+def get_history(obj, key, **kwargs):
+ """Return a History record for the given object and attribute key.
+
+ obj is an instrumented object instance. An InstanceState
+ is accepted directly for backwards compatibility but
+ this usage is deprecated.
+
+ """
+ return get_state_history(_conditional_instance_state(obj), key, **kwargs)
-def get_history(state, key, **kwargs):
+def get_state_history(state, key, **kwargs):
return state.get_history(key, **kwargs)
def has_parent(cls, obj, key, optimistic=False):
state = instance_state(obj)
return manager.has_parent(state, key, optimistic)
-def register_class(class_):
- """TODO"""
-
- # TODO: what's this function for ? why would I call this and not
- # create_manager_for_cls ?
+def register_class(class_, **kw):
+ """Register class instrumentation.
+
+ Returns the existing or newly created class manager.
+ """
manager = manager_of_class(class_)
if manager is None:
- manager = create_manager_for_cls(class_)
- if not manager.instantiable:
- manager.instantiable = True
-
+ manager = _create_manager_for_cls(class_, **kw)
+ return manager
+
def unregister_class(class_):
- """TODO"""
- manager = manager_of_class(class_)
- assert manager
- assert manager.instantiable
- manager.instantiable = False
- manager.unregister()
+ """Unregister class instrumentation."""
+
+ instrumentation_registry.unregister(class_)
def register_attribute(class_, key, **kw):
if not proxy_property:
register_attribute_impl(class_, key, **kw)
-def register_attribute_impl(class_, key, **kw):
+def register_attribute_impl(class_, key,
+ uselist=False, callable_=None,
+ useobject=False, mutable_scalars=False,
+ impl_class=None, **kw):
manager = manager_of_class(class_)
- uselist = kw.get('uselist', False)
if uselist:
factory = kw.pop('typecallable', None)
typecallable = manager.instrument_collection_class(
key, factory or list)
else:
typecallable = kw.pop('typecallable', None)
-
- manager[key].impl = _create_prop(class_, key, manager, typecallable=typecallable, **kw)
+
+ if impl_class:
+ impl = impl_class(class_, key, typecallable, **kw)
+ elif uselist:
+ impl = CollectionAttributeImpl(class_, key, callable_,
+ typecallable=typecallable, **kw)
+ elif useobject:
+ impl = ScalarObjectAttributeImpl(class_, key, callable_, **kw)
+ elif mutable_scalars:
+ impl = MutableScalarAttributeImpl(class_, key, callable_,
+ class_manager=manager, **kw)
+ else:
+ impl = ScalarAttributeImpl(class_, key, callable_, **kw)
+
+ manager[key].impl = impl
+
manager.post_configure_attribute(key)
def register_descriptor(class_, key, proxy_property=None, comparator=None, parententity=None, property_=None):
def unregister_attribute(class_, key):
manager_of_class(class_).uninstrument_attribute(key)
-def init_collection(state, key):
+def init_collection(obj, key):
+ """Initialize a collection attribute and return the collection adapter.
+
+ This function is used to provide direct access to collection internals
+ for a previously unloaded attribute. e.g.::
+
+ collection_adapter = init_collection(someobject, 'elements')
+ for elem in values:
+ collection_adapter.append_without_event(elem)
+
+ For an easier way to do the above, see :func:`~sqlalchemy.orm.attributes.set_committed_value`.
+
+ obj is an instrumented object instance. An InstanceState
+ is accepted directly for backwards compatibility but
+ this usage is deprecated.
+
+ """
+
+ return init_state_collection(_conditional_instance_state(obj), key)
+
+def init_state_collection(state, key):
"""Initialize a collection attribute and return the collection adapter."""
+
attr = state.get_impl(key)
user_data = attr.initialize(state)
return attr.get_collection(state, user_data)
+def set_committed_value(instance, key, value):
+ """Set the value of an attribute with no history events.
+
+ Cancels any previous history present. The value should be
+ a scalar value for scalar-holding attributes, or
+ an iterable for any collection-holding attribute.
+
+ This is the same underlying method used when a lazy loader
+ fires off and loads additional data from the database.
+ In particular, this method can be used by application code
+ which has loaded additional attributes or collections through
+ separate queries, which can then be attached to an instance
+ as though it were part of its original loaded state.
+
+ """
+ state = instance_state(instance)
+ state.get_impl(key).set_committed_value(instance, key, value)
+
def set_attribute(instance, key, value):
+ """Set the value of an attribute, firing history events.
+
+ This function may be used regardless of instrumentation
+ applied directly to the class, i.e. no descriptors are required.
+ Custom attribute management schemes will need to make usage
+ of this method to establish attribute state as understood
+ by SQLAlchemy.
+
+ """
state = instance_state(instance)
state.get_impl(key).set(state, value, None)
def get_attribute(instance, key):
+ """Get the value of an attribute, firing any callables required.
+
+ This function may be used regardless of instrumentation
+ applied directly to the class, i.e. no descriptors are required.
+ Custom attribute management schemes will need to make usage
+ of this method to make usage of attribute state as understood
+ by SQLAlchemy.
+
+ """
state = instance_state(instance)
return state.get_impl(key).get(state)
def del_attribute(instance, key):
+ """Delete the value of an attribute, firing history events.
+
+ This function may be used regardless of instrumentation
+ applied directly to the class, i.e. no descriptors are required.
+ Custom attribute management schemes will need to make usage
+ of this method to establish attribute state as understood
+ by SQLAlchemy.
+
+ """
state = instance_state(instance)
state.get_impl(key).delete(state)
def is_instrumented(instance, key):
+ """Return True if the given attribute on the given instance is instrumented
+ by the attributes package.
+
+ This function may be used regardless of instrumentation
+ applied directly to the class, i.e. no descriptors are required.
+
+ """
return manager_of_class(instance.__class__).is_instrumented(key, search=True)
class InstrumentationRegistry(object):
"""Private instrumentation registration singleton."""
- manager_finders = weakref.WeakKeyDictionary()
- state_finders = util.WeakIdentityMapping()
- extended = False
+ _manager_finders = weakref.WeakKeyDictionary()
+ _state_finders = util.WeakIdentityMapping()
+ _extended = False
- def create_manager_for_cls(self, class_):
+ def create_manager_for_cls(self, class_, **kw):
assert class_ is not None
assert manager_of_class(class_) is None
else:
factory = ClassManager
- existing_factories = collect_management_factories_for(class_)
- existing_factories.add(factory)
- if len(existing_factories) > 1:
+ existing_factories = self._collect_management_factories_for(class_).\
+ difference([factory])
+ if existing_factories:
raise TypeError(
"multiple instrumentation implementations specified "
"in %s inheritance hierarchy: %r" % (
manager = factory(class_)
if not isinstance(manager, ClassManager):
manager = _ClassInstrumentationAdapter(class_, manager)
- if factory != ClassManager and not self.extended:
- self.extended = True
+
+ if factory != ClassManager and not self._extended:
+ self._extended = True
_install_lookup_strategy(self)
+
+ manager._configure_create_arguments(**kw)
manager.factory = factory
- manager.manage()
- self.manager_finders[class_] = manager.manager_getter()
- self.state_finders[class_] = manager.state_getter()
+ self._manager_finders[class_] = manager.manager_getter()
+ self._state_finders[class_] = manager.state_getter()
return manager
+ def _collect_management_factories_for(self, cls):
+ """Return a collection of factories in play or specified for a hierarchy.
+
+ Traverses the entire inheritance graph of a cls and returns a collection
+ of instrumentation factories for those classes. Factories are extracted
+ from active ClassManagers, if available, otherwise
+ instrumentation_finders is consulted.
+
+ """
+ hierarchy = util.class_hierarchy(cls)
+ factories = set()
+ for member in hierarchy:
+ manager = manager_of_class(member)
+ if manager is not None:
+ factories.add(manager.factory)
+ else:
+ for finder in instrumentation_finders:
+ factory = finder(member)
+ if factory is not None:
+ break
+ else:
+ factory = None
+ factories.add(factory)
+ factories.discard(None)
+ return factories
+
def manager_of_class(self, cls):
if cls is None:
return None
try:
- finder = self.manager_finders[cls]
+ finder = self._manager_finders[cls]
except KeyError:
return None
else:
if instance is None:
raise AttributeError("None has no persistent state.")
try:
- return self.state_finders[instance.__class__](instance)
+ return self._state_finders[instance.__class__](instance)
except KeyError:
raise AttributeError("%r is not instrumented" % instance.__class__)
if instance is None:
return default
try:
- finder = self.state_finders[instance.__class__]
+ finder = self._state_finders[instance.__class__]
except KeyError:
return default
else:
raise
def unregister(self, class_):
- if class_ in self.manager_finders:
+ if class_ in self._manager_finders:
manager = self.manager_of_class(class_)
+ manager.unregister()
manager.dispose()
- del self.manager_finders[class_]
- del self.state_finders[class_]
-
-# Create a registry singleton and prepare placeholders for lookup functions.
+ del self._manager_finders[class_]
+ del self._state_finders[class_]
instrumentation_registry = InstrumentationRegistry()
-create_manager_for_cls = None
-manager_of_class = None
-instance_state = None
-_lookup_strategy = None
def _install_lookup_strategy(implementation):
- """Switch between native and extended instrumentation modes.
-
- Completely private. Use the instrumentation_finders interface to
- inject global instrumentation behavior.
-
+ """Replace global class/object management functions
+ with either faster or more comprehensive implementations,
+ based on whether or not extended class instrumentation
+ has been detected.
+
+ This function is called only by InstrumentationRegistry()
+ and unit tests specific to this behavior.
+
"""
- global manager_of_class, instance_state, create_manager_for_cls
- global _lookup_strategy
-
- # Using a symbol here to make debugging a little friendlier.
- if implementation is not util.symbol('native'):
- manager_of_class = implementation.manager_of_class
- instance_state = implementation.state_of
- create_manager_for_cls = implementation.create_manager_for_cls
- else:
- def manager_of_class(class_):
- return getattr(class_, ClassManager.MANAGER_ATTR, None)
- manager_of_class = instrumentation_registry.manager_of_class
+ global instance_state
+ if implementation is util.symbol('native'):
instance_state = attrgetter(ClassManager.STATE_ATTR)
- create_manager_for_cls = instrumentation_registry.create_manager_for_cls
- # TODO: maybe log an event when setting a strategy.
- _lookup_strategy = implementation
-
+ else:
+ instance_state = instrumentation_registry.state_of
+
+manager_of_class = instrumentation_registry.manager_of_class
+_create_manager_for_cls = instrumentation_registry.create_manager_for_cls
_install_lookup_strategy(util.symbol('native'))
def find_native_user_instrumentation_hook(cls):
return getattr(cls, INSTRUMENTATION_MANAGER, None)
instrumentation_finders.append(find_native_user_instrumentation_hook)
-def collect_management_factories_for(cls):
- """Return a collection of factories in play or specified for a hierarchy.
-
- Traverses the entire inheritance graph of a cls and returns a collection
- of instrumentation factories for those classes. Factories are extracted
- from active ClassManagers, if available, otherwise
- instrumentation_finders is consulted.
-
- """
- hierarchy = util.class_hierarchy(cls)
- factories = set()
- for member in hierarchy:
- manager = manager_of_class(member)
- if manager is not None:
- factories.add(manager.factory)
- else:
- for finder in instrumentation_finders:
- factory = finder(member)
- if factory is not None:
- break
- else:
- factory = None
- factories.add(factory)
- factories.discard(None)
- return factories
-
-def _create_prop(class_, key, class_manager,
- uselist=False, callable_=None, typecallable=None,
- useobject=False, mutable_scalars=False,
- impl_class=None, **kwargs):
- if impl_class:
- return impl_class(class_, key, typecallable, **kwargs)
- elif uselist:
- return CollectionAttributeImpl(class_, key, callable_,
- typecallable=typecallable,
- **kwargs)
- elif useobject:
- return ScalarObjectAttributeImpl(class_, key, callable_,
- **kwargs)
- elif mutable_scalars:
- return MutableScalarAttributeImpl(class_, key, callable_,
- class_manager=class_manager, **kwargs)
- else:
- return ScalarAttributeImpl(class_, key, callable_, **kwargs)
-
def _generate_init(class_, class_manager):
"""Build an __init__ decorator that triggers ClassManager events."""
+ # TODO: we should use the ClassManager's notion of the
+ # original '__init__' method, once ClassManager is fixed
+ # to always reference that.
original__init__ = class_.__init__
assert original__init__
return
if manager is not None:
- if manager.class_ is not self.class_:
- # An inherited manager. Install one for this subclass.
- # TODO: no coverage here
- manager = None
- elif manager.mapper:
+ assert manager.class_ is self.class_
+ if manager.mapper:
raise sa_exc.ArgumentError(
"Class '%s' already has a primary mapper defined. "
"Use non_primary=True to "
"create a non primary Mapper. clear_mappers() will "
"remove *all* current mappers from all classes." %
self.class_)
-
+ #else:
+ # a ClassManager may already exist as
+ # ClassManager.instrument_attribute() creates
+ # new managers for each subclass if they don't yet exist.
+
_mapper_registry[self] = True
+ self.extension.instrument_class(self, self.class_)
+
if manager is None:
- manager = attributes.create_manager_for_cls(self.class_)
+ manager = attributes.register_class(self.class_,
+ instance_state_factory = IdentityManagedState,
+ deferred_scalar_loader = _load_scalar_attributes
+ )
self.class_manager = manager
if manager.info.get(_INSTRUMENTOR, False):
return
- self.extension.instrument_class(self, self.class_)
-
- manager.instantiable = True
- manager.instance_state_factory = IdentityManagedState
- manager.deferred_scalar_loader = _load_scalar_attributes
-
event_registry = manager.events
event_registry.add_listener('on_init', _event_on_init)
event_registry.add_listener('on_init_failure', _event_on_init_failure)
def dispose(self):
# Disable any attribute-based compilation.
self.compiled = True
- manager = self.class_manager
+
if hasattr(self, '_compile_failed'):
del self._compile_failed
- if not self.non_primary and manager.mapper is self:
- manager.mapper = None
- manager.events.remove_listener('on_init', _event_on_init)
- manager.events.remove_listener('on_init_failure',
- _event_on_init_failure)
- manager.uninstall_member('__init__')
- del manager.info[_INSTRUMENTOR]
+
+ if not self.non_primary and self.class_manager.mapper is self:
attributes.unregister_class(self.class_)
def _configure_pks(self):
# pull properties from the inherited mapper if any.
if self.inherits:
for key, prop in self.inherits._props.iteritems():
- if key not in self._props and not self._should_exclude(key, local=False):
+ if key not in self._props and not self._should_exclude(key, key, local=False):
self._adapt_inherited_property(key, prop, False)
# create properties for each column in the mapped table,
if column in self._columntoproperty:
continue
- if self._should_exclude(column.key, local=self.local_table.c.contains_column(column)):
- continue
-
column_key = (self.column_prefix or '') + column.key
+ if self._should_exclude(column.key, column_key, local=self.local_table.c.contains_column(column)):
+ continue
+
# adjust the "key" used for this column to that
# of the inheriting mapper
for mapper in self.iterate_to_root():
col = self.polymorphic_on
else:
dont_instrument = False
- if self._should_exclude(col.key, local=False):
+ if self._should_exclude(col.key, col.key, local=False):
raise sa_exc.InvalidRequestError("Cannot exclude or override the discriminator column %r" % col.key)
self._configure_property(col.key, ColumnProperty(col, _no_instrument=dont_instrument), init=False, setparent=True)
def _is_userland_descriptor(self, obj):
return not isinstance(obj, (MapperProperty, attributes.InstrumentedAttribute)) and hasattr(obj, '__get__')
- def _should_exclude(self, name, local):
+ def _should_exclude(self, name, assigned_name, local):
"""determine whether a particular property should be implicitly present on the class.
This occurs when properties are propagated from an inherited class, or are
# check for descriptors, either local or from
# an inherited class
if local:
- if self.class_.__dict__.get(name, None)\
- and self._is_userland_descriptor(self.class_.__dict__[name]):
+ if self.class_.__dict__.get(assigned_name, None)\
+ and self._is_userland_descriptor(self.class_.__dict__[assigned_name]):
return True
else:
- if getattr(self.class_, name, None)\
- and self._is_userland_descriptor(getattr(self.class_, name)):
+ if getattr(self.class_, assigned_name, None)\
+ and self._is_userland_descriptor(getattr(self.class_, assigned_name)):
return True
if (self.include_properties is not None and
params[col._label] = mapper._get_state_attr_by_column(state, col)
params[col.key] = params[col._label] + 1
for prop in mapper._columntoproperty.itervalues():
- history = attributes.get_history(state, prop.key, passive=True)
+ history = attributes.get_state_history(state, prop.key, passive=True)
if history.added:
hasdata = True
elif mapper.polymorphic_on and mapper.polymorphic_on.shares_lineage(col):
continue
prop = mapper._columntoproperty[col]
- history = attributes.get_history(state, prop.key, passive=True)
+ history = attributes.get_state_history(state, prop.key, passive=True)
if history.added:
if isinstance(history.added[0], sql.ClauseElement):
value_params[col] = history.added[0]
return op(self, *other, **kwargs)
def of_type(self, cls):
- return RelationProperty.Comparator(self.property, self.mapper, cls)
+ return RelationProperty.Comparator(self.property, self.mapper, cls, adapter=self.adapter)
def in_(self, other):
raise NotImplementedError("in_() not yet supported for relations. For a "
source_selectable = self.__clause_element__()
else:
source_selectable = None
-
+
pj, sj, source, dest, secondary, target_adapter = \
self.property._create_joins(dest_polymorphic=True, dest_selectable=to_selectable, source_selectable=source_selectable)
raise sa_exc.ArgumentError("reverse_property %r on relation %s references "
"relation %s, which does not reference mapper %s" % (key, self, other, self.parent))
+ if self.direction in (ONETOMANY, MANYTOONE) and self.direction == other.direction:
+ raise sa_exc.ArgumentError("%s and back-reference %s are both of the same direction %r."
+ " Did you mean to set remote_side on the many-to-one side ?" % (self, other, self.direction))
+
def do_init(self):
self._get_target()
self._process_dependent_arguments()
self._current_path = ()
self._only_load_props = None
self._refresh_state = None
- self._from_obj = None
+ self._from_obj = ()
self._polymorphic_adapters = {}
self._filter_aliases = None
self._from_obj_alias = None
if isinstance(from_obj, expression._SelectBaseMixin):
from_obj = from_obj.alias()
- self._from_obj = from_obj
+ self._from_obj = (from_obj,)
equivs = self.__all_equivs()
if isinstance(from_obj, expression.Alias):
- self._from_obj_alias = sql_util.ColumnAdapter(self._from_obj, equivs)
+ self._from_obj_alias = sql_util.ColumnAdapter(from_obj, equivs)
def _get_polymorphic_adapter(self, entity, selectable):
self.__mapper_loads_polymorphically_with(entity.mapper, sql_util.ColumnAdapter(selectable, entity.mapper._equivalent_columns))
self._entities = [entity] + self._entities[1:]
return entity
- def __mapper_zero_from_obj(self):
- if self._from_obj:
- return self._from_obj
- else:
- return self._entity_zero().selectable
-
def __all_equivs(self):
equivs = {}
for ent in self._mapper_entities:
self._group_by:
raise sa_exc.InvalidRequestError("Query.%s() being called on a Query with existing criterion. " % meth)
- self._statement = self._criterion = self._from_obj = None
+ self._from_obj = ()
+ self._statement = self._criterion = None
self._order_by = self._group_by = self._distinct = False
self.__joined_tables = {}
criterion = list(chain(*[_orm_columns(c) for c in criterion]))
+ criterion = [self._adapt_clause(expression._literal_as_text(o), True, True) for o in criterion]
+
if self._group_by is False:
self._group_by = criterion
else:
if not from_joinpoint:
self.__reset_joinpoint()
- # join from our from_obj. This is
- # None unless select_from()/from_self() has been called.
- clause = self._from_obj
-
+ clause = replace_clause_index = None
+
# after the method completes,
# the query's joinpoint will be set to this.
right_entity = None
elif not left_entity:
left_entity = self._joinpoint_zero()
- # if no initial left-hand clause is set, extract
- # this from the left_entity or as a last
- # resort from the onclause argument, if it's
- # a PropComparator.
+ if not clause and self._from_obj:
+ mp, left_selectable, is_aliased_class = _entity_info(left_entity)
+
+ replace_clause_index, clause = sql_util.find_join_source(self._from_obj, left_selectable)
+ if not clause:
+ clause = left_selectable
+
if not clause:
for ent in self._entities:
if ent.corresponds_to(left_entity):
# construct the onclause.
join_to_left = not is_aliased_class or \
onclause is prop or \
- clause is self._from_obj and self._from_obj_alias
+ self._from_obj_alias and clause is self._from_obj[0]
# create the join
clause = orm_join(clause, right_entity, onclause, isouter=outerjoin, join_to_left=join_to_left)
ORMAdapter(right_entity, equivalents=right_mapper._equivalent_columns)
)
- # loop finished. we're selecting from
- # our final clause now
- self._from_obj = clause
+ if replace_clause_index is not None:
+ l = list(self._from_obj)
+ l[replace_clause_index] = clause
+ self._from_obj = tuple(l)
+ else:
+ self._from_obj = self._from_obj + (clause,)
# future joins with from_joinpoint=True join from our established right_entity.
self._joinpoint = right_entity
`from_obj` is a single table or selectable.
"""
+
if isinstance(from_obj, (tuple, list)):
+ # from_obj is actually a list again as of 0.5.3. so this restriction here
+ # is somewhat artificial, but is still in place since select_from() implies aliasing all further
+ # criterion against what's placed here, and its less complex to only
+ # keep track of a single aliased FROM element being selected against. This could in theory be opened
+ # up again to more complexity.
util.warn_deprecated("select_from() now accepts a single Selectable as its argument, which replaces any existing FROM criterion.")
from_obj = from_obj[-1]
if not isinstance(from_obj, expression.FromClause):
entity.setup_context(self, context)
if context.from_clause:
- from_obj = [context.from_clause]
+ from_obj = list(context.from_clause)
else:
from_obj = context.froms
eager_joins = context.eager_joins.values()
if context.from_clause:
- froms = [context.from_clause] # "load from a single FROM" mode, i.e. when select_from() or join() is used
+ froms = list(context.from_clause) # "load from explicit FROMs" mode, i.e. when select_from() or join() is used
else:
froms = context.froms # "load from discrete FROMs" mode, i.e. when each _MappedEntity has its own FROM
return self.__connection(engine, close_with_result=True).execute(
clause, params or {})
- def scalar(self, clause, params=None, mapper=None):
+ def scalar(self, clause, params=None, mapper=None, **kw):
"""Like execute() but return a scalar result."""
-
- engine = self.get_bind(mapper, clause=clause)
-
- return self.__connection(engine, close_with_result=True).scalar(
- clause, params or {})
+
+ return self.execute(clause, params=params, mapper=mapper, **kw).scalar()
def close(self):
"""Close this Session.
objects
Optional; a list or tuple collection. Restricts the flush operation
to only these objects, rather than all pending changes.
+ Deprecated - this flag prevents the session from properly maintaining
+ accounting among inter-object relations and can cause invalid results.
"""
+
+ if objects:
+ util.warn_deprecated(
+ "The 'objects' argument to session.flush() is deprecated; "
+ "Please do not add objects to the session which should not yet be persisted.")
if self._flushing:
raise sa_exc.InvalidRequestError("Session is already flushing")
if entity in context.eager_joins:
entity_key, default_towrap = entity, entity.selectable
- elif should_nest_selectable or not context.from_clause or not sql_util.search(context.from_clause, entity.selectable):
- # if no from_clause, or a from_clause we can't join to, or a subquery is going to be generated,
+
+ elif should_nest_selectable or not context.from_clause:
+ # if no from_clause, or a subquery is going to be generated,
# store eager joins per _MappedEntity; Query._compile_context will
# add them as separate selectables to the select(), or splice them together
# after the subquery is generated
entity_key, default_towrap = entity, entity.selectable
else:
- # otherwise, create a single eager join from the from clause.
- # Query._compile_context will adapt as needed and append to the
- # FROM clause of the select().
- entity_key, default_towrap = None, context.from_clause
-
+ index, clause = sql_util.find_join_source(context.from_clause, entity.selectable)
+ if clause:
+ # join to an existing FROM clause on the query.
+ # key it to its list index in the eager_joins dict.
+ # Query._compile_context will adapt as needed and append to the
+ # FROM clause of the select().
+ entity_key, default_towrap = index, clause
+ else:
+ # if no from_clause to join to,
+ # store eager joins per _MappedEntity
+ entity_key, default_towrap = entity, entity.selectable
+
towrap = context.eager_joins.setdefault(entity_key, default_towrap)
# create AliasedClauses object to build up the eager query.
# when self-referential eager loading is used; the same instance may be present
# in two distinct sets of result columns
- collection = attributes.init_collection(state, key)
+ collection = attributes.init_state_collection(state, key)
appender = util.UniqueAppender(collection, 'append_without_event')
context.attributes[(state, key)] = appender
# if the cached lookup was "passive" and now we want non-passive, do a non-passive
# lookup and re-cache
if cached_passive and not passive:
- history = attributes.get_history(state, key, passive=False)
+ history = attributes.get_state_history(state, key, passive=False)
self.attributes[hashkey] = (history, passive)
else:
- history = attributes.get_history(state, key, passive=passive)
+ history = attributes.get_state_history(state, key, passive=passive)
self.attributes[hashkey] = (history, passive)
if not history or not state.get_impl(key).uses_objects:
self.merge = "merge" in values or "all" in values
self.expunge = "expunge" in values or "all" in values
self.refresh_expire = "refresh-expire" in values or "all" in values
-
+
if self.delete_orphan and not self.delete:
util.warn("The 'delete-orphan' cascade option requires "
"'delete'. This will raise an error in 0.6.")
-
+
for x in values:
if x not in all_cascades:
raise sa_exc.ArgumentError("Invalid cascade option '%s'" % x)
class Validator(AttributeExtension):
"""Runs a validation method on an attribute value to be set or appended.
-
+
The Validator class is used by the :func:`~sqlalchemy.orm.validates`
decorator, and direct access is usually not needed.
-
+
"""
-
+
def __init__(self, key, validator):
"""Construct a new Validator.
-
+
key - name of the attribute to be validated;
- will be passed as the second argument to
+ will be passed as the second argument to
the validation method (the first is the object instance itself).
-
+
validator - an function or instance method which accepts
three arguments; an instance (usually just 'self' for a method),
the key name of the attribute, and the value. The function should
return the same value given, unless it wishes to modify it.
-
+
"""
self.key = key
self.validator = validator
-
+
def append(self, state, value, initiator):
return self.validator(state.obj(), self.key, value)
def set(self, state, value, oldvalue, initiator):
return self.validator(state.obj(), self.key, value)
-
+
def polymorphic_union(table_map, typecolname, aliasname='p_union'):
"""Create a ``UNION`` statement used by a polymorphic mapper.
% ", ".join(kwargs.keys()))
mapper = object_mapper(instance)
return mapper.identity_key_from_instance(instance)
-
+
class ExtensionCarrier(dict):
"""Fronts an ordered collection of MapperExtension objects.
def _register(self, extension):
"""Register callable fronts for overridden interface methods."""
-
+
for method in self.interface.difference(self):
impl = getattr(extension, method, None)
if impl and impl is not getattr(MapperExtension, method):
def __getattr__(self, key):
"""Delegate MapperExtension methods to bundled fronts."""
-
+
if key not in self.interface:
raise AttributeError(key)
return self.get(key, self._pass)
class ORMAdapter(sql_util.ColumnAdapter):
"""Extends ColumnAdapter to accept ORM entities.
-
+
The selectable is extracted from the given entity,
and the AliasedClass if any is referenced.
-
+
"""
def __init__(self, entity, equivalents=None, chain_to=None):
self.mapper, selectable, is_aliased_class = _entity_info(entity)
class AliasedClass(object):
"""Represents an 'alias'ed form of a mapped class for usage with Query.
-
+
The ORM equivalent of a :class:`~sqlalchemy.sql.expression.Alias`
- object, this object mimics the mapped class using a
+ object, this object mimics the mapped class using a
__getattr__ scheme and maintains a reference to a
- real Alias object. It indicates to Query that the
+ real Alias object. It indicates to Query that the
selectable produced for this class should be aliased,
and also adapts PropComparators produced by the class'
- InstrumentedAttributes so that they adapt the
+ InstrumentedAttributes so that they adapt the
"local" side of SQL expressions against the alias.
-
+
"""
def __init__(self, cls, alias=None, name=None):
self.__mapper = _class_to_mapper(cls)
def __getstate__(self):
return {'mapper':self.__mapper, 'alias':self.__alias, 'name':self._sa_label_name}
-
+
def __setstate__(self, state):
self.__mapper = state['mapper']
self.__target = self.__mapper.class_
name = state['name']
self._sa_label_name = name
self.__name__ = 'AliasedClass_' + str(self.__target)
-
+
def __adapt_element(self, elem):
return self.__adapter.traverse(elem)._annotate({'parententity': self, 'parentmapper':self.__mapper})
-
+
def __adapt_prop(self, prop):
existing = getattr(self.__target, prop.key)
comparator = existing.comparator.adapted(self.__adapt_element)
def _orm_annotate(element, exclude=None):
"""Deep copy the given ClauseElement, annotating each element with the "_orm_adapt" flag.
-
+
Elements within the exclude collection will be cloned but not annotated.
-
+
"""
return sql_util._deep_annotate(element, {'_orm_adapt':True}, exclude)
_orm_deannotate = sql_util._deep_deannotate
-
+
class _ORMJoin(expression.Join):
"""Extend Join to support ORM constructs as input."""
-
+
__visit_name__ = expression.Join.__visit_name__
def __init__(self, left, right, onclause=None, isouter=False, join_to_left=True):
adapt_from = None
-
+
if hasattr(left, '_orm_mappers'):
left_mapper = left._orm_mappers[1]
if join_to_left:
left_mapper, left, left_is_aliased = _entity_info(left)
if join_to_left and (left_is_aliased or not left_mapper):
adapt_from = left
-
+
right_mapper, right, right_is_aliased = _entity_info(right)
if right_is_aliased:
adapt_to = right
if prop:
pj, sj, source, dest, secondary, target_adapter = prop._create_joins(
- source_selectable=adapt_from,
- dest_selectable=adapt_to,
- source_polymorphic=True,
- dest_polymorphic=True,
+ source_selectable=adapt_from,
+ dest_selectable=adapt_to,
+ source_polymorphic=True,
+ dest_polymorphic=True,
of_type=right_mapper)
if sj:
else:
onclause = pj
self._target_adapter = target_adapter
-
+
expression.Join.__init__(self, left, right, onclause, isouter)
def join(self, right, onclause=None, isouter=False, join_to_left=True):
def join(left, right, onclause=None, isouter=False, join_to_left=True):
"""Produce an inner join between left and right clauses.
-
- In addition to the interface provided by
- :func:`~sqlalchemy.sql.expression.join()`, left and right may be mapped
- classes or AliasedClass instances. The onclause may be a
- string name of a relation(), or a class-bound descriptor
+
+ In addition to the interface provided by
+ :func:`~sqlalchemy.sql.expression.join()`, left and right may be mapped
+ classes or AliasedClass instances. The onclause may be a
+ string name of a relation(), or a class-bound descriptor
representing a relation.
-
+
join_to_left indicates to attempt aliasing the ON clause,
in whatever form it is passed, to the selectable
passed as the left side. If False, the onclause
is used as is.
-
+
"""
return _ORMJoin(left, right, onclause, isouter, join_to_left)
def outerjoin(left, right, onclause=None, join_to_left=True):
"""Produce a left outer join between left and right clauses.
-
- In addition to the interface provided by
- :func:`~sqlalchemy.sql.expression.outerjoin()`, left and right may be mapped
- classes or AliasedClass instances. The onclause may be a
- string name of a relation(), or a class-bound descriptor
+
+ In addition to the interface provided by
+ :func:`~sqlalchemy.sql.expression.outerjoin()`, left and right may be mapped
+ classes or AliasedClass instances. The onclause may be a
+ string name of a relation(), or a class-bound descriptor
representing a relation.
-
+
"""
return _ORMJoin(left, right, onclause, True, join_to_left)
instance
a parent instance, which should be persistent or detached.
- property
- a class-attached descriptor, MapperProperty or string property name
- attached to the parent instance.
+ property
+ a class-attached descriptor, MapperProperty or string property name
+ attached to the parent instance.
- \**kwargs
- all extra keyword arguments are propagated to the constructor of
- Query.
+ \**kwargs
+ all extra keyword arguments are propagated to the constructor of
+ Query.
"""
if isinstance(prop, basestring):
def _entity_info(entity, compile=True):
"""Return mapping information given a class, mapper, or AliasedClass.
-
+
Returns 3-tuple of: mapper, mapped selectable, boolean indicating if this
is an aliased() construct.
-
+
If the given entity is not a mapper, mapped class, or aliased construct,
returns None, the entity, False. This is typically used to allow
unmapped selectables through.
-
+
"""
if isinstance(entity, AliasedClass):
return entity._AliasedClass__mapper, entity._AliasedClass__alias, True
def _entity_descriptor(entity, key):
"""Return attribute/property information given an entity and string name.
-
+
Returns a 2-tuple representing InstrumentedAttribute/MapperProperty.
-
+
"""
if isinstance(entity, AliasedClass):
desc = getattr(entity, key)
def object_mapper(instance):
"""Given an object, return the primary Mapper associated with the object instance.
-
+
Raises UnmappedInstanceError if no mapping is configured.
-
+
"""
try:
state = attributes.instance_state(instance)
"""Given a class (or an object), return the primary Mapper associated with the key.
Raises UnmappedClassError if no mapping is configured.
-
+
"""
try:
class_manager = attributes.manager_of_class(class_)
mapper = class_manager.mapper
-
+
# HACK until [ticket:1142] is complete
if mapper is None:
raise AttributeError
-
+
except exc.NO_STATE:
raise exc.UnmappedClassError(class_)
return True
if isinstance(cls, expression.ClauseElement):
return False
- manager = attributes.manager_of_class(cls)
- return manager and _INSTRUMENTOR in manager.info
-
+ if isinstance(cls, type):
+ manager = attributes.manager_of_class(cls)
+ return manager and _INSTRUMENTOR in manager.info
+ return False
+
def instance_str(instance):
"""Return a string describing an instance."""
def state_str(state):
"""Return a string describing an instance via its InstanceState."""
-
+
if state is None:
return "None"
else:
def __getattr__(self, attr):
return getattr(self.element, attr)
+ def __getstate__(self):
+ return {'element':self.element}
+
+ def __setstate__(self, state):
+ self.element = state['element']
+
class _Label(ColumnElement):
"""Represents a column label (AS).
visitors.traverse(table, {'schema_visitor':True}, {'foreign_key':visit_foreign_key})
return topological.sort(tuples, tables)
-def search(clause, target):
- if not clause:
- return False
- for elem in visitors.iterate(clause, {'column_collections':False}):
- if elem is target:
- return True
+def find_join_source(clauses, join_to):
+ """Given a list of FROM clauses and a selectable,
+ return the first index and element from the list of
+ clauses which can be joined against the selectable. returns
+ None, None if no match is found.
+
+ e.g.::
+
+ clause1 = table1.join(table2)
+ clause2 = table4.join(table5)
+
+ join_to = table2.join(table3)
+
+ find_join_source([clause1, clause2], join_to) == clause1
+
+ """
+
+ selectables = list(expression._from_objects(join_to))
+ for i, f in enumerate(clauses):
+ for s in selectables:
+ if f.is_derived_from(s):
+ return i, f
else:
- return False
-
+ return None, None
+
def find_tables(clause, check_columns=False, include_aliases=False, include_joins=False, include_selects=False):
"""locate Table objects within the given expression."""
self.assertEqual(result.fetchall(), [(1,)])
- # Multiple inserts only return the last row
- result2 = table.insert(postgres_returning=[table]).execute(
- [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}])
- self.assertEqual(result2.fetchall(), [(3,3,True)])
-
+ @testing.fails_on('postgres', 'Known limitation of psycopg2')
+ def test_executemany():
+ # return value is documented as failing with psycopg2/executemany
+ result2 = table.insert(postgres_returning=[table]).execute(
+ [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}])
+ self.assertEqual(result2.fetchall(), [(2, 2, False), (3,3,True)])
+
+ test_executemany()
+
result3 = table.insert(postgres_returning=[(table.c.id*2).label('double_id')]).execute({'persons': 4, 'full': False})
self.assertEqual([dict(row) for row in result3], [{'double_id':8}])
self.assertEquals(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
+ def test_unknown_types(self):
+ from sqlalchemy.databases import postgres
+
+ ischema_names = postgres.PGDialect.ischema_names
+ postgres.PGDialect.ischema_names = {}
+ try:
+ m2 = MetaData(testing.db)
+ self.assertRaises(exc.SAWarning, Table, "testtable", m2, autoload=True)
+
+ @testing.emits_warning('Did not recognize type')
+ def warns():
+ m3 = MetaData(testing.db)
+ t3 = Table("testtable", m3, autoload=True)
+ assert t3.c.answer.type.__class__ == sa.types.NullType
+
+ finally:
+ postgres.PGDialect.ischema_names = ischema_names
+
+
class MiscTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
__only_on__ = 'postgres'
finally:
test_table.drop(checkfirst=True)
+class SpecialTypesTest(TestBase, ComparesTables):
+ """test DDL and reflection of PG-specific types """
+
+ __only_on__ = 'postgres'
+ __excluded_on__ = (('postgres', '<', (8, 3, 0)),)
+
+ def setUpAll(self):
+ global metadata, table
+ metadata = MetaData(testing.db)
+
+ table = Table('sometable', metadata,
+ Column('id', postgres.PGUuid, primary_key=True),
+ Column('flag', postgres.PGBit),
+ Column('addr', postgres.PGInet),
+ Column('addr2', postgres.PGMacAddr),
+ Column('addr3', postgres.PGCidr)
+ )
+
+ metadata.create_all()
+
+ def tearDownAll(self):
+ metadata.drop_all()
+
+ def test_reflection(self):
+ m = MetaData(testing.db)
+ t = Table('sometable', m, autoload=True)
+
+ self.assert_tables_equal(table, t)
+
+
class MatchTest(TestBase, AssertsCompiledSQL):
__only_on__ = 'postgres'
__excluded_on__ = (('postgres', '<', (8, 3, 0)),)
self.assertRaisesMessage(sa.exc.InvalidRequestError, "does not have a __table__", go)
def test_cant_add_columns(self):
- t = Table('t', Base.metadata, Column('id', Integer, primary_key=True))
+ t = Table('t', Base.metadata, Column('id', Integer, primary_key=True), Column('data', String))
def go():
class User(Base):
__table__ = t
foo = Column(Integer, primary_key=True)
- self.assertRaisesMessage(sa.exc.ArgumentError, "add additional columns", go)
+ # can't specify new columns not already in the table
+ self.assertRaisesMessage(sa.exc.ArgumentError, "Can't add additional column 'foo' when specifying __table__", go)
+
+ # regular re-mapping works tho
+ class Bar(Base):
+ __table__ = t
+ some_data = t.c.data
+
+ assert class_mapper(Bar).get_property('some_data').columns[0] is t.c.data
def test_undefer_column_name(self):
# TODO: not sure if there was an explicit
id = Column(Integer, primary_key=True)
name = Column(String(50))
addresses = relation("Address", order_by="desc(Address.email)",
- primaryjoin="User.id==Address.user_id", foreign_keys="[Address.user_id]")
+ primaryjoin="User.id==Address.user_id", foreign_keys="[Address.user_id]",
+ backref=backref('user', primaryjoin="User.id==Address.user_id", foreign_keys="[Address.user_id]")
+ )
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
# compile succeeds because inherit_condition is honored
compile_mappers()
-
+
def test_joined(self):
class Company(Base, ComparableEntity):
__tablename__ = 'companies'
def go():
assert sess.query(Person).filter(Manager.name=='dogbert').one().id
self.assert_sql_count(testing.db, go, 1)
+
+ def test_subclass_mixin(self):
+ class Person(Base, ComparableEntity):
+ __tablename__ = 'people'
+ id = Column('id', Integer, primary_key=True)
+ name = Column('name', String(50))
+ discriminator = Column('type', String(50))
+ __mapper_args__ = {'polymorphic_on':discriminator}
+
+ class MyMixin(object):
+ pass
+
+ class Engineer(MyMixin, Person):
+ __tablename__ = 'engineers'
+ __mapper_args__ = {'polymorphic_identity':'engineer'}
+ id = Column('id', Integer, ForeignKey('people.id'), primary_key=True)
+ primary_language = Column('primary_language', String(50))
+
+ assert class_mapper(Engineer).inherits is class_mapper(Person)
def test_with_undefined_foreignkey(self):
class Parent(Base):
eq_(list(q2.values(User.id, User.name)), [(9, u'fred')])
+ @testing.exclude('sqlite', '<=', (3, 5, 9), 'id comparison failing on the buildbot')
def test_aliases(self):
u7, u8, u9, u10 = Session.query(User).order_by(User.id).all()
ualias = aliased(User)
q = Session.query(User, ualias).join((ualias, User.id < ualias.id)).filter(User.id<9).order_by(User.id, ualias.id)
+ eq_(list(q.all()), [(u7, u8), (u7, u9), (u7, u10), (u8, u9), (u8, u10)])
q2 = serializer.loads(serializer.dumps(q), users.metadata, Session)
eq_(list(q2.all()), [(u7, u8), (u7, u9), (u7, u10), (u8, u9), (u8, u10)])
+
+ def test_any(self):
+ r = User.addresses.any(Address.email=='x')
+ ser = serializer.dumps(r)
+ x = serializer.loads(ser, users.metadata)
+ eq_(str(r), str(x))
if __name__ == '__main__':
testing.main()
Column("parent_id", Integer, ForeignKey("parent.id"))
)
-
+ @testing.uses_deprecated()
@testing.resolve_artifact_names
def test_o2m_m2o(self):
class Base(_base.ComparableEntity):
assert c2 in sess and c2 not in sess.new
assert b1 in sess and b1 in sess.new
+ @testing.uses_deprecated()
@testing.resolve_artifact_names
def test_circular_sort(self):
"""test ticket 1306"""
from testlib import sa, testing
from sqlalchemy.orm import eagerload, deferred, undefer
from testlib.sa import Table, Column, Integer, String, Date, ForeignKey, and_, select, func
-from testlib.sa.orm import mapper, relation, create_session, lazyload
+from testlib.sa.orm import mapper, relation, create_session, lazyload, aliased
from testlib.testing import eq_
from testlib.assertsql import CompiledSQL
from orm import _base, _fixtures
closed_mapper = mapper(Order, closedorders, non_primary=True)
mapper(User, users, properties = dict(
- addresses = relation(Address, lazy=False),
+ addresses = relation(Address, lazy=False, order_by=addresses.c.id),
open_orders = relation(
open_mapper,
primaryjoin=sa.and_(openorders.c.isopen == 1,
users.c.id==openorders.c.user_id),
- lazy=False),
+ lazy=False, order_by=openorders.c.id),
closed_orders = relation(
closed_mapper,
primaryjoin=sa.and_(closedorders.c.isopen == 0,
users.c.id==closedorders.c.user_id),
- lazy=False)))
+ lazy=False, order_by=closedorders.c.id)))
- q = create_session().query(User)
+ q = create_session().query(User).order_by(User.id)
def go():
assert [
)
self.assert_sql_count(testing.db, go, 1)
+ @testing.exclude('sqlite', '>', (0, ), "sqlite flat out blows it on the multiple JOINs")
+ @testing.resolve_artifact_names
+ def test_two_entities_with_joins(self):
+ sess = create_session()
+
+ # two FROM clauses where there's a join on each one
+ def go():
+ u1 = aliased(User)
+ o1 = aliased(Order)
+ eq_(
+ [
+ (
+ User(addresses=[Address(email_address=u'fred@fred.com')], name=u'fred'),
+ Order(description=u'order 2', isopen=0, items=[Item(description=u'item 1'), Item(description=u'item 2'), Item(description=u'item 3')]),
+ User(addresses=[Address(email_address=u'jack@bean.com')], name=u'jack'),
+ Order(description=u'order 3', isopen=1, items=[Item(description=u'item 3'), Item(description=u'item 4'), Item(description=u'item 5')])
+ ),
+
+ (
+ User(addresses=[Address(email_address=u'fred@fred.com')], name=u'fred'),
+ Order(description=u'order 2', isopen=0, items=[Item(description=u'item 1'), Item(description=u'item 2'), Item(description=u'item 3')]),
+ User(addresses=[Address(email_address=u'jack@bean.com')], name=u'jack'),
+ Order(address_id=None, description=u'order 5', isopen=0, items=[Item(description=u'item 5')])
+ ),
+
+ (
+ User(addresses=[Address(email_address=u'fred@fred.com')], name=u'fred'),
+ Order(description=u'order 4', isopen=1, items=[Item(description=u'item 1'), Item(description=u'item 5')]),
+ User(addresses=[Address(email_address=u'jack@bean.com')], name=u'jack'),
+ Order(address_id=None, description=u'order 5', isopen=0, items=[Item(description=u'item 5')])
+ ),
+ ],
+ sess.query(User, Order, u1, o1).\
+ join((Order, User.orders)).options(eagerload(User.addresses), eagerload(Order.items)).filter(User.id==9).\
+ join((o1, u1.orders)).options(eagerload(u1.addresses), eagerload(o1.items)).filter(u1.id==7).\
+ filter(Order.id<o1.id).\
+ order_by(User.id, Order.id, u1.id, o1.id).all(),
+ )
+ self.assert_sql_count(testing.db, go, 1)
+
+
+
@testing.resolve_artifact_names
def test_aliased_entity(self):
sess = create_session()
attributes.register_attribute(Foo, 'a', uselist=False, useobject=False)
attributes.register_attribute(Foo, 'b', uselist=False, useobject=False)
- assert Foo in attributes.instrumentation_registry.state_finders
+ assert Foo in attributes.instrumentation_registry._state_finders
f = Foo()
attributes.instance_state(f).expire_attributes(None)
self.assertEquals(f.a, "this is a")
self.assertEquals(sess.query(Person).get(e1.person_id), Engineer(name="dilbert", primary_language="java"))
self.assertEquals(sess.query(Engineer).get(e1.person_id), Engineer(name="dilbert", primary_language="java"))
self.assertEquals(sess.query(Manager).get(b1.person_id), Boss(name="pointy haired boss", golf_swing="fore"))
+
+ def test_multi_join(self):
+ sess = create_session()
+
+ e = aliased(Person)
+ c = aliased(Company)
+
+ q = sess.query(Company, Person, c, e).join((Person, Company.employees)).join((e, c.employees)).\
+ filter(Person.name=='dilbert').filter(e.name=='wally')
+
+ self.assertEquals(q.count(), 1)
+ self.assertEquals(q.all(), [
+ (
+ Company(company_id=1,name=u'MegaCorp, Inc.'),
+ Engineer(status=u'regular engineer',engineer_name=u'dilbert',name=u'dilbert',company_id=1,primary_language=u'java',person_id=1,type=u'engineer'),
+ Company(company_id=1,name=u'MegaCorp, Inc.'),
+ Engineer(status=u'regular engineer',engineer_name=u'wally',name=u'wally',company_id=1,primary_language=u'c++',person_id=2,type=u'engineer')
+ )
+ ])
def test_filter_on_subclass(self):
sess = create_session()
sess.query(Company).filter(Company.employees.of_type(Engineer).any(Engineer.primary_language=='cobol')).one(),
c2
)
+
+ calias = aliased(Company)
+ self.assertEquals(
+ sess.query(calias).filter(calias.employees.of_type(Engineer).any(Engineer.primary_language=='cobol')).one(),
+ c2
+ )
self.assertEquals(
sess.query(Company).filter(Company.employees.of_type(Boss).any(Boss.golf_swing=='fore')).one(),
from testlib import sa
from testlib.sa import MetaData, Table, Column, Integer, ForeignKey
-from testlib.sa.orm import mapper, relation, create_session, attributes
+from testlib.sa.orm import mapper, relation, create_session, attributes, class_mapper
from testlib.testing import eq_, ne_
from testlib.compat import _function_named
from orm import _base
def with_lookup_strategy(strategy):
def decorate(fn):
def wrapped(*args, **kw):
- current = attributes._lookup_strategy
try:
attributes._install_lookup_strategy(strategy)
return fn(*args, **kw)
finally:
- attributes._install_lookup_strategy(current)
+ attributes._install_lookup_strategy(sa.util.symbol('native'))
return _function_named(wrapped, fn.func_name)
return decorate
pass
class C(B):
- def __init__(self):
+ def __init__(self, x):
pass
- mapper(A, self.fixture())
+ m = mapper(A, self.fixture())
a = attributes.instance_state(A())
assert isinstance(a, attributes.InstanceState)
assert isinstance(b, attributes.InstanceState)
assert type(b) is not attributes.InstanceState
- # C is unmanaged
- cobj = C()
- self.assertRaises((AttributeError, TypeError),
- attributes.instance_state, cobj)
+ # B is not mapped in the current implementation
+ self.assertRaises(sa.orm.exc.UnmappedClassError, class_mapper, B)
+
+ # the constructor of C is decorated too.
+ # we don't support unmapped subclasses in any case,
+ # users should not be expecting any particular behavior
+ # from this scenario.
+ c = attributes.instance_state(C(3))
+ assert isinstance(c, attributes.InstanceState)
+ assert type(c) is not attributes.InstanceState
+ # C is not mapped in the current implementation
+ self.assertRaises(sa.orm.exc.UnmappedClassError, class_mapper, C)
class InstrumentationCollisionTest(_base.ORMTest):
def test_none(self):
a = A()
assert not a.bs
-
+
+ def test_uninstrument(self):
+ class A(object):pass
+
+ manager = attributes.register_class(A)
+
+ assert attributes.manager_of_class(A) is manager
+ attributes.unregister_class(A)
+ assert attributes.manager_of_class(A) is None
+
def test_compileonattr_rel_backref_a(self):
m = MetaData()
t1 = Table('t1', m,
users.update().values({User.foobar:User.foobar + 'foo'}).execute()
eq_(sa.select([User.foobar]).where(User.foobar=='name1foo').execute().fetchall(), [('name1foo',)])
+ @testing.resolve_artifact_names
+ def test_utils(self):
+ from sqlalchemy.orm.util import _is_mapped_class, _is_aliased_class
+
+ class Foo(object):
+ x = "something"
+ @property
+ def y(self):
+ return "somethign else"
+ m = mapper(Foo, users)
+ a1 = aliased(Foo)
+
+ f = Foo()
+
+ for fn, arg, ret in [
+ (_is_mapped_class, Foo.x, False),
+ (_is_mapped_class, Foo.y, False),
+ (_is_mapped_class, Foo, True),
+ (_is_mapped_class, f, False),
+ (_is_mapped_class, a1, True),
+ (_is_mapped_class, m, True),
+ (_is_aliased_class, a1, True),
+ (_is_aliased_class, Foo.x, False),
+ (_is_aliased_class, Foo.y, False),
+ (_is_aliased_class, Foo, False),
+ (_is_aliased_class, f, False),
+ (_is_aliased_class, a1, True),
+ (_is_aliased_class, m, False),
+ ]:
+ assert fn(arg) == ret
+
+
@testing.resolve_artifact_names
def test_prop_accessor(self):
class Hoho(object): pass
class Lala(object): pass
+ class HasDef(object):
+ def name(self):
+ pass
+
p_m = mapper(Person, t, polymorphic_on=t.c.type,
include_properties=('id', 'type', 'name'))
e_m = mapper(Employee, inherits=p_m, polymorphic_identity='employee',
properties={
- 'boss': relation(Manager, backref='peon')
+ 'boss': relation(Manager, backref=backref('peon', ), remote_side=t.c.id)
},
exclude_properties=('vendor_id',))
l_m = mapper(Lala, t, exclude_properties=('vendor_id', 'boss_id'),
column_prefix="p_")
+ hd_m = mapper(HasDef, t, column_prefix="h_")
+
p_m.compile()
#sa.orm.compile_mappers()
have = set([p.key for p in class_mapper(cls).iterate_properties])
want = set(want)
eq_(have, want)
-
+
+ assert_props(HasDef, ['h_boss_id', 'h_employee_number', 'h_id', 'name', 'h_name', 'h_vendor_id', 'h_type'])
assert_props(Person, ['id', 'name', 'type'])
assert_instrumented(Person, ['id', 'name', 'type'])
assert_props(Employee, ['boss', 'boss_id', 'employee_number',
] == create_session().query(User).filter(User.id.in_([8,9]))._from_self().\
join('addresses').add_entity(Address).order_by(User.id, Address.id).all()
+ def test_group_by(self):
+ eq_(
+ create_session().query(Address.user_id, func.count(Address.id).label('count')).\
+ group_by(Address.user_id).order_by(Address.user_id).all(),
+ [(7, 1), (8, 3), (9, 1)]
+ )
+
+ eq_(
+ create_session().query(Address.user_id, Address.id).\
+ from_self(Address.user_id, func.count(Address.id)).\
+ group_by(Address.user_id).order_by(Address.user_id).all(),
+ [(7, 1), (8, 3), (9, 1)]
+ )
+
def test_no_eagerload(self):
"""test that eagerloads are pushed outwards and not rendered in subqueries."""
mapper(T2, t2)
self.assertRaises(sa.exc.ArgumentError, sa.orm.compile_mappers)
+class InvalidRemoteSideTest(_base.MappedTest):
+ def define_tables(self, metadata):
+ Table('t1', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)),
+ Column('t_id', Integer, ForeignKey('t1.id'))
+ )
+ @testing.resolve_artifact_names
+ def setup_classes(self):
+ class T1(_base.ComparableEntity):
+ pass
+
+ @testing.resolve_artifact_names
+ def test_o2m_backref(self):
+ mapper(T1, t1, properties={
+ 't1s':relation(T1, backref='parent')
+ })
+
+ self.assertRaisesMessage(sa.exc.ArgumentError, "T1.t1s and back-reference T1.parent are "
+ "both of the same direction <symbol 'ONETOMANY>. Did you "
+ "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)
+
+ @testing.resolve_artifact_names
+ def test_m2o_backref(self):
+ mapper(T1, t1, properties={
+ 't1s':relation(T1, backref=backref('parent', remote_side=t1.c.id), remote_side=t1.c.id)
+ })
+
+ self.assertRaisesMessage(sa.exc.ArgumentError, "T1.t1s and back-reference T1.parent are "
+ "both of the same direction <symbol 'MANYTOONE>. Did you "
+ "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)
+
+ @testing.resolve_artifact_names
+ def test_o2m_explicit(self):
+ mapper(T1, t1, properties={
+ 't1s':relation(T1, back_populates='parent'),
+ 'parent':relation(T1, back_populates='t1s'),
+ })
+
+ # can't be sure of ordering here
+ self.assertRaisesMessage(sa.exc.ArgumentError,
+ "both of the same direction <symbol 'ONETOMANY>. Did you "
+ "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)
+
+ @testing.resolve_artifact_names
+ def test_m2o_explicit(self):
+ mapper(T1, t1, properties={
+ 't1s':relation(T1, back_populates='parent', remote_side=t1.c.id),
+ 'parent':relation(T1, back_populates='t1s', remote_side=t1.c.id)
+ })
+
+ # can't be sure of ordering here
+ self.assertRaisesMessage(sa.exc.ArgumentError,
+ "both of the same direction <symbol 'MANYTOONE>. Did you "
+ "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)
+
+
class InvalidRelationEscalationTest(_base.MappedTest):
def define_tables(self, metadata):
{'id':7}).fetchall(),
[(7, u'jack')])
+
+ # use :bindparam style
+ eq_(sess.scalar("select id from users where id=:id",
+ {'id':7}),
+ 7)
+
@engines.close_open_connections
@testing.resolve_artifact_names
def test_subtransaction_on_external(self):
def test_profile_2_insert(self):
self.test_baseline_2_insert()
- @profiling.function_call_count(3618, {'2.4': 2347})
+ @profiling.function_call_count(3834, {'2.4': 2347})
def test_profile_3_properties(self):
self.test_baseline_3_properties()