=======
CHANGES
=======
+0.5.4
+=====
+- orm
+ - Fixed the "set collection" function on "dynamic" relations
+ to initiate events correctly. Previously a collection
+ could only be assigned to a pending parent instance,
+ otherwise modified events would not be fired correctly.
+ Set collection is now compatible with merge(),
+ fixes [ticket:1352].
+
+ - Lazy loader will not use get() if the "lazy load"
+ SQL clause matches the clause used by get(), but
+ contains some parameters hardcoded. Previously
+ the lazy strategy would fail with the get(). Ideally
+ get() would be used with the hardcoded parameters
+ but this would require further development.
+ [ticket:1357]
+
+- sql
+ - Fixed __repr__() and other _get_colspec() methods on
+ ForeignKey constructed from __clause_element__() style
+ construct (i.e. declarative columns). [ticket:1353]
+
+- mssql
+ - Corrected problem with information schema not working with a
+ binary collation based database. Cleaned up information
+ schema since it is only used by mssql now. [ticket:1343]
+
0.5.3
=====
- orm
might say SELECT A.*, B.* FROM A JOIN X, B JOIN Y.
Eager loading can also tack its joins onto those
multiple FROM clauses. [ticket:1337]
+
+ - Fixed bug in dynamic_loader() where append/remove events
+ after construction time were not being propagated to the
+ UOW to pick up on flush(). [ticket:1347]
- Fixed bug where column_prefix wasn't being checked before
not mapping an attribute that already had class-level
with_polymorphic(), or using from_self().
- sql
+ - An alias() of a select() will convert to a "scalar subquery"
+ when used in an unambiguously scalar context, i.e. it's used
+ in a comparison operation. This applies to
+ the ORM when using query.subquery() as well.
+
- Fixed missing _label attribute on Function object, others
when used in a select() with use_labels (such as when used
in an ORM column_property()). [ticket:1302]
or dialects. There is a small performance penalty
which will be resolved in 0.6. [ticket:1299]
+- sqlite
+ - Fixed SQLite reflection methods so that non-present
+ cursor.description, which triggers an auto-cursor
+ close, will be detected so that no results doesn't
+ fail on recent versions of pysqlite which raise
+ an error when fetchone() called with no rows present.
+
- postgres
- Index reflection won't fail when an index with
multiple expressions is encountered.
This will create a new annotated file ./lib/sqlalchemy/sql.py,cover. Pretty
cool!
+BIG COVERAGE TIP !!! There is an issue where existing .pyc files may
+store the incorrect filepaths, which will break the coverage system. If
+coverage numbers are coming out as low/zero, try deleting all .pyc files.
TESTING NEW DIALECTS
--------------------
# You can set these variables from the command line.
SPHINXOPTS =
-SPHINXBUILD = sphinx-build
+SPHINXBUILD = ./bin/sphinx-build
PAPER =
# Internal variables.
addresses_table.c.city=='New York')),
})
+.. _alternate_collection_implementations:
+
Alternate Collection Implementations
-------------------------------------
{sql}>>> metadata.create_all(engine) # doctest:+ELLIPSIS,+NORMALIZE_WHITESPACE
PRAGMA table_info("users")
- {}
+ ()
CREATE TABLE users (
id INTEGER NOT NULL,
name VARCHAR,
password VARCHAR,
PRIMARY KEY (id)
)
- {}
+ ()
COMMIT
Users familiar with the syntax of CREATE TABLE may notice that the VARCHAR columns were generated without a length; on SQLite, this is a valid datatype, but on most databases it's not allowed. So if running this tutorial on a database such as Postgres or MySQL, and you wish to use SQLAlchemy to generate the tables, a "length" may be provided to the ``String`` type as below::
.. sourcecode:: python+sql
- {sql}>>> session.query(User).filter(User.name.in_(['Edwardo', 'fakeuser'])).all()
+ {sql}>>> session.query(User).filter(User.name.in_(['Edwardo', 'fakeuser'])).all() #doctest: +NORMALIZE_WHITESPACE
UPDATE users SET name=? WHERE users.id = ?
['Edwardo', 1]
INSERT INTO users (name, fullname, password) VALUES (?, ?, ?)
ROLLBACK
{stop}
- {sql}>>> ed_user.name
+ {sql}>>> ed_user.name #doctest: +NORMALIZE_WHITESPACE
BEGIN
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
.. sourcecode:: python+sql
- {sql}>>> session.query(User).filter(User.name.in_(['ed', 'fakeuser'])).all()
+ {sql}>>> session.query(User).filter(User.name.in_(['ed', 'fakeuser'])).all() #doctest: +NORMALIZE_WHITESPACE
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
WHERE users.name IN (?, ?)
.. sourcecode:: python+sql
- {sql}>>> for row in session.query(User, User.name).all():
+ {sql}>>> for row in session.query(User, User.name).all(): #doctest: +NORMALIZE_WHITESPACE
... print row.User, row.name
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
>>> from sqlalchemy.orm import aliased
>>> user_alias = aliased(User, name='user_alias')
- {sql}>>> for row in session.query(user_alias, user_alias.name.label('name_label')).all():
+ {sql}>>> for row in session.query(user_alias, user_alias.name.label('name_label')).all(): #doctest: +NORMALIZE_WHITESPACE
... print row.user_alias, row.name_label
SELECT users_1.id AS users_1_id, users_1.name AS users_1_name, users_1.fullname AS users_1_fullname, users_1.password AS users_1_password, users_1.name AS name_label
FROM users AS users_1
.. sourcecode:: python+sql
>>> query = session.query(User).filter(User.name.like('%ed')).order_by(User.id)
- {sql}>>> query.all()
+ {sql}>>> query.all() #doctest: +NORMALIZE_WHITESPACE
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
WHERE users.name LIKE ? ORDER BY users.id
.. sourcecode:: python+sql
- {sql}>>> query.first()
+ {sql}>>> query.first() #doctest: +NORMALIZE_WHITESPACE
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
WHERE users.name LIKE ? ORDER BY users.id
.. sourcecode:: python+sql
- {sql}>>> try:
+ {sql}>>> try: #doctest: +NORMALIZE_WHITESPACE
... user = query.one()
... except Exception, e:
... print e
.. sourcecode:: python+sql
- {sql}>>> try:
+ {sql}>>> try: #doctest: +NORMALIZE_WHITESPACE
... user = query.filter(User.id == 99).one()
... except Exception, e:
... print e
.. sourcecode:: python+sql
- {sql}>>> for user in session.query(User).filter("id<224").order_by("id").all():
+ {sql}>>> for user in session.query(User).filter("id<224").order_by("id").all(): #doctest: +NORMALIZE_WHITESPACE
... print user.name
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
{sql}>>> metadata.create_all(engine) # doctest: +NORMALIZE_WHITESPACE
PRAGMA table_info("users")
- {}
+ ()
PRAGMA table_info("addresses")
- {}
+ ()
CREATE TABLE addresses (
id INTEGER NOT NULL,
email_address VARCHAR NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY(user_id) REFERENCES users (id)
)
- {}
+ ()
COMMIT
Working with Related Objects
.. sourcecode:: python+sql
- {sql}>>> jack = session.query(User).filter_by(name='jack').one()
+ {sql}>>> jack = session.query(User).filter_by(name='jack').one() #doctest: +NORMALIZE_WHITESPACE
BEGIN
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
.. sourcecode:: python+sql
- {sql}>>> jack.addresses
+ {sql}>>> jack.addresses #doctest: +NORMALIZE_WHITESPACE
SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address, addresses.user_id AS addresses_user_id
FROM addresses
WHERE ? = addresses.user_id ORDER BY addresses.id
>>> from sqlalchemy.orm import join
{sql}>>> session.query(User).select_from(join(User, Address)).\
- ... filter(Address.email_address=='jack@google.com').all()
+ ... filter(Address.email_address=='jack@google.com').all() #doctest: +NORMALIZE_WHITESPACE
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users JOIN addresses ON users.id = addresses.user_id
WHERE addresses.email_address = ?
.. sourcecode:: python+sql
{sql}>>> session.query(User).join(User.addresses).\
- ... filter(Address.email_address=='jack@google.com').all()
+ ... filter(Address.email_address=='jack@google.com').all() #doctest: +NORMALIZE_WHITESPACE
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users JOIN addresses ON users.id = addresses.user_id
WHERE addresses.email_address = ?
<User('fred','Fred Flinstone', 'blah')> None
<User('jack','Jack Bean', 'gjffdd')> 2
+Selecting Entities from Subqueries
+----------------------------------
+
+Above, we just selected a result that included a column from a subquery. What if we wanted our subquery to map to an entity ? For this we use ``aliased()`` to associate an "alias" of a mapped class to a subquery:
+
+.. sourcecode:: python+sql
+
+ {sql}>>> stmt = session.query(Address).filter(Address.email_address != 'j25@yahoo.com').subquery()
+ >>> adalias = aliased(Address, stmt)
+ >>> for user, address in session.query(User, adalias).join((adalias, User.addresses)): # doctest: +NORMALIZE_WHITESPACE
+ ... print user, address
+ SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname,
+ users.password AS users_password, anon_1.id AS anon_1_id,
+ anon_1.email_address AS anon_1_email_address, anon_1.user_id AS anon_1_user_id
+ FROM users JOIN (SELECT addresses.id AS id, addresses.email_address AS email_address, addresses.user_id AS user_id
+ FROM addresses
+ WHERE addresses.email_address != ?) AS anon_1 ON users.id = anon_1.user_id
+ ['j25@yahoo.com']
+ {stop}<User('jack','Jack Bean', 'gjffdd')> <Address('jack@google.com')>
+
Using EXISTS
------------
{stop}
# remove one Address (lazy load fires off)
- {sql}>>> del jack.addresses[1]
+ {sql}>>> del jack.addresses[1] #doctest: +NORMALIZE_WHITESPACE
SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address, addresses.user_id AS addresses_user_id
FROM addresses
WHERE ? = addresses.user_id
{sql}>>> metadata.create_all(engine) # doctest: +NORMALIZE_WHITESPACE
PRAGMA table_info("users")
- {}
+ ()
PRAGMA table_info("addresses")
- {}
+ ()
PRAGMA table_info("posts")
- {}
+ ()
PRAGMA table_info("keywords")
- {}
+ ()
PRAGMA table_info("post_keywords")
- {}
+ ()
CREATE TABLE posts (
id INTEGER NOT NULL,
user_id INTEGER,
PRIMARY KEY (id),
FOREIGN KEY(user_id) REFERENCES users (id)
)
- {}
+ ()
COMMIT
CREATE TABLE keywords (
id INTEGER NOT NULL,
PRIMARY KEY (id),
UNIQUE (keyword)
)
- {}
+ ()
COMMIT
CREATE TABLE post_keywords (
post_id INTEGER,
FOREIGN KEY(post_id) REFERENCES posts (id),
FOREIGN KEY(keyword_id) REFERENCES keywords (id)
)
- {}
+ ()
COMMIT
Usage is not too different from what we've been doing. Let's give Wendy some blog posts:
.. sourcecode:: python+sql
- {sql}>>> wendy = session.query(User).filter_by(name='wendy').one()
+ {sql}>>> wendy = session.query(User).filter_by(name='wendy').one() #doctest: +NORMALIZE_WHITESPACE
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
WHERE users.name = ?
.. sourcecode:: python+sql
- {sql}>>> session.query(BlogPost).filter(BlogPost.keywords.any(keyword='firstpost')).all()
- INSERT INTO posts (user_id, headline, body) VALUES (?, ?, ?)
- [2, "Wendy's Blog Post", 'This is a test']
+ {sql}>>> session.query(BlogPost).filter(BlogPost.keywords.any(keyword='firstpost')).all() #doctest: +NORMALIZE_WHITESPACE
INSERT INTO keywords (keyword) VALUES (?)
['wendy']
INSERT INTO keywords (keyword) VALUES (?)
['firstpost']
+ INSERT INTO posts (user_id, headline, body) VALUES (?, ?, ?)
+ [2, "Wendy's Blog Post", 'This is a test']
INSERT INTO post_keywords (post_id, keyword_id) VALUES (?, ?)
- [[1, 1], [1, 2]]
- SELECT posts.id AS posts_id, posts.user_id AS posts_user_id, posts.headline AS posts_headline, posts.body AS posts_body
- FROM posts
- WHERE EXISTS (SELECT 1
- FROM post_keywords, keywords
+ [[1, 2], [1, 1]]
+ SELECT posts.id AS posts_id, posts.user_id AS posts_user_id, posts.headline AS posts_headline, posts.body AS posts_body
+ FROM posts
+ WHERE EXISTS (SELECT 1
+ FROM post_keywords, keywords
WHERE posts.id = post_keywords.post_id AND keywords.id = post_keywords.keyword_id AND keywords.keyword = ?)
['firstpost']
{stop}[BlogPost("Wendy's Blog Post", 'This is a test', <User('wendy','Wendy Williams', 'foobar')>)]
.. sourcecode:: python+sql
{sql}>>> session.query(BlogPost).filter(BlogPost.author==wendy).\
- ... filter(BlogPost.keywords.any(keyword='firstpost')).all()
+ ... filter(BlogPost.keywords.any(keyword='firstpost')).all() #doctest: +NORMALIZE_WHITESPACE
SELECT posts.id AS posts_id, posts.user_id AS posts_user_id, posts.headline AS posts_headline, posts.body AS posts_body
FROM posts
WHERE ? = posts.user_id AND (EXISTS (SELECT 1
.. sourcecode:: python+sql
- {sql}>>> wendy.posts.filter(BlogPost.keywords.any(keyword='firstpost')).all()
+ {sql}>>> wendy.posts.filter(BlogPost.keywords.any(keyword='firstpost')).all() #doctest: +NORMALIZE_WHITESPACE
SELECT posts.id AS posts_id, posts.user_id AS posts_user_id, posts.headline AS posts_headline, posts.body AS posts_body
FROM posts
WHERE ? = posts.user_id AND (EXISTS (SELECT 1
--- /dev/null
+Collection Mapping
+==================
+
+This is an in-depth discussion of collection mechanics. For simple examples, see :ref:`alternate_collection_implementations`.
+
+.. automodule:: sqlalchemy.orm.collections
+
+.. autofunction:: attribute_mapped_collection
+
+.. autoclass:: collection
+
+.. autofunction:: collection_adapter
+
+.. autofunction:: column_mapped_collection
+
+.. autofunction:: mapped_collection
+
:glob:
mapping
+ collections
query
sessions
interfaces
from sqlalchemy import types as sqltypes
from decimal import Decimal as _python_Decimal
+import information_schema as ischema
+
MS_2008_VERSION = (10,)
#MS_2005_VERSION = ??
#MS_2000_VERSION = ??
return self.schema_name
def table_names(self, connection, schema):
- from sqlalchemy.dialects import information_schema as ischema
return ischema.table_names(connection, schema)
- def uppercase_table(self, t):
- # convert all names to uppercase -- fixes refs to INFORMATION_SCHEMA for case-senstive DBs, and won't matter for case-insensitive
- t.name = t.name.upper()
- if t.schema:
- t.schema = t.schema.upper()
- for c in t.columns:
- c.name = c.name.upper()
- return t
-
-
def has_table(self, connection, tablename, schema=None):
- import sqlalchemy.dialects.information_schema as ischema
-
current_schema = schema or self.get_default_schema_name(connection)
columns = self.uppercase_table(ischema.columns)
s = sql.select([columns],
@reflection.cache
def get_schema_names(self, connection, info_cache=None):
- import sqlalchemy.dialects.information_schema as ischema
s = sql.select([self.uppercase_table(ischema.schemata).c.schema_name],
order_by=[ischema.schemata.c.schema_name]
)
@reflection.cache
def get_table_names(self, connection, schemaname, info_cache=None):
- import sqlalchemy.dialects.information_schema as ischema
current_schema = schemaname or self.get_default_schema_name(connection)
tables = self.uppercase_table(ischema.tables)
s = sql.select([tables.c.table_name],
@reflection.cache
def get_view_names(self, connection, schemaname=None, info_cache=None):
- import sqlalchemy.dialects.information_schema as ischema
current_schema = schemaname or self.get_default_schema_name(connection)
tables = self.uppercase_table(ischema.tables)
s = sql.select([tables.c.table_name],
@reflection.cache
def get_view_definition(self, connection, viewname, schemaname=None,
info_cache=None):
- import sqlalchemy.dialects.information_schema as ischema
current_schema = schemaname or self.get_default_schema_name(connection)
views = self.uppercase_table(ischema.views)
s = sql.select([views.c.view_definition],
info_cache=None):
# Get base columns
current_schema = schemaname or self.get_default_schema_name(connection)
- import sqlalchemy.dialects.information_schema as ischema
columns = self.uppercase_table(ischema.columns)
s = sql.select([columns],
current_schema
@reflection.cache
def get_primary_keys(self, connection, tablename, schemaname=None,
info_cache=None):
- import sqlalchemy.dialects.information_schema as ischema
current_schema = schemaname or self.get_default_schema_name(connection)
pkeys = []
# Add constraints
@reflection.cache
def get_foreign_keys(self, connection, tablename, schemaname=None,
info_cache=None):
- import sqlalchemy.dialects.information_schema as ischema
current_schema = schemaname or self.get_default_schema_name(connection)
# Add constraints
RR = self.uppercase_table(ischema.ref_constraints) #information_schema.referential_constraints
return fkeys
def reflecttable(self, connection, table, include_columns):
- import sqlalchemy.dialects.information_schema as ischema
# Get base columns
if table.schema is not None:
current_schema = table.schema
--- /dev/null
+from sqlalchemy import Table, MetaData, Column, ForeignKey, String, Integer
+
+ischema = MetaData()
+
+schemata = Table("SCHEMATA", ischema,
+ Column("CATALOG_NAME", String, key="catalog_name"),
+ Column("SCHEMA_NAME", String, key="schema_name"),
+ Column("SCHEMA_OWNER", String, key="schema_owner"),
+ schema="INFORMATION_SCHEMA")
+
+tables = Table("TABLES", ischema,
+ Column("TABLE_CATALOG", String, key="table_catalog"),
+ Column("TABLE_SCHEMA", String, key="table_schema"),
+ Column("TABLE_NAME", String, key="table_name"),
+ Column("TABLE_TYPE", String, key="table_type"),
+ schema="INFORMATION_SCHEMA")
+
+columns = Table("COLUMNS", ischema,
+ Column("TABLE_SCHEMA", String, key="table_schema"),
+ Column("TABLE_NAME", String, key="table_name"),
+ Column("COLUMN_NAME", String, key="column_name"),
+ Column("IS_NULLABLE", Integer, key="is_nullable"),
+ Column("DATA_TYPE", String, key="data_type"),
+ Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
+ Column("CHARACTER_MAXIMUM_LENGTH", Integer, key="character_maximum_length"),
+ Column("NUMERIC_PRECISION", Integer, key="numeric_precision"),
+ Column("NUMERIC_SCALE", Integer, key="numeric_scale"),
+ Column("COLUMN_DEFAULT", Integer, key="column_default"),
+ Column("COLLATION_NAME", String, key="collation_name"),
+ schema="INFORMATION_SCHEMA")
+
+constraints = Table("TABLE_CONSTRAINTS", ischema,
+ Column("TABLE_SCHEMA", String, key="table_schema"),
+ Column("TABLE_NAME", String, key="table_name"),
+ Column("CONSTRAINT_NAME", String, key="constraint_name"),
+ Column("CONSTRAINT_TYPE", String, key="constraint_type"),
+ schema="INFORMATION_SCHEMA")
+
+column_constraints = Table("CONSTRAINT_COLUMN_USAGE", ischema,
+ Column("TABLE_SCHEMA", String, key="table_schema"),
+ Column("TABLE_NAME", String, key="table_name"),
+ Column("COLUMN_NAME", String, key="column_name"),
+ Column("CONSTRAINT_NAME", String, key="constraint_name"),
+ schema="INFORMATION_SCHEMA")
+
+key_constraints = Table("KEY_COLUMN_USAGE", ischema,
+ Column("TABLE_SCHEMA", String, key="table_schema"),
+ Column("TABLE_NAME", String, key="table_name"),
+ Column("COLUMN_NAME", String, key="column_name"),
+ Column("CONSTRAINT_NAME", String, key="constraint_name"),
+ Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
+ schema="INFORMATION_SCHEMA")
+
+ref_constraints = Table("REFERENTIAL_CONSTRAINTS", ischema,
+ Column("CONSTRAINT_CATALOG", String, key="constraint_catalog"),
+ Column("CONSTRAINT_SCHEMA", String, key="constraint_schema"),
+ Column("CONSTRAINT_NAME", String, key="constraint_name"),
+ Column("UNIQUE_CONSTRAINT_CATLOG", String, key="unique_constraint_catalog"),
+ Column("UNIQUE_CONSTRAINT_SCHEMA", String, key="unique_constraint_schema"),
+ Column("UNIQUE_CONSTRAINT_NAME", String, key="unique_constraint_name"),
+ Column("MATCH_OPTION", String, key="match_option"),
+ Column("UPDATE_RULE", String, key="update_rule"),
+ Column("DELETE_RULE", String, key="delete_rule"),
+ schema="INFORMATION_SCHEMA")
+
else:
pragma = "PRAGMA "
qtable = quote(table_name)
- cursor = connection.execute("%stable_info(%s)" % (pragma, qtable))
+ cursor = _pragma_cursor(connection.execute("%stable_info(%s)" % (pragma, qtable)))
row = cursor.fetchone()
# consume remaining rows, to work around
else:
pragma = "PRAGMA "
qtable = quote(table_name)
- c = connection.execute("%stable_info(%s)" % (pragma, qtable))
+ c = _pragma_cursor(connection.execute("%stable_info(%s)" % (pragma, qtable)))
found_table = False
columns = []
while True:
else:
pragma = "PRAGMA "
qtable = quote(table_name)
- c = connection.execute("%sforeign_key_list(%s)" % (pragma, qtable))
+ c = _pragma_cursor(connection.execute("%sforeign_key_list(%s)" % (pragma, qtable)))
fkeys = []
fks = {}
while True:
else:
pragma = "PRAGMA "
qtable = quote(table_name)
- c = connection.execute("%sindex_list(%s)" % (pragma, qtable))
+ c = _pragma_cursor(connection.execute("%sindex_list(%s)" % (pragma, qtable)))
indexes = []
while True:
row = c.fetchone()
else:
pragma = "PRAGMA "
qtable = quote(table_name)
- c = connection.execute("%sindex_list(%s)" % (pragma, qtable))
+ c = _pragma_cursor(connection.execute("%sindex_list(%s)" % (pragma, qtable)))
unique_indexes = []
while True:
row = c.fetchone()
unique_indexes.append(row[1])
# loop thru unique indexes for one that includes the primary key
for idx in unique_indexes:
- c = connection.execute("%sindex_info(%s)" % (pragma, idx))
+ c = _pragma_cursor(connection.execute("%sindex_info(%s)" % (pragma, idx)))
cols = []
while True:
row = c.fetchone()
# this doesn't do anything ???
unique_indexes = self.get_unique_indexes(connection, table_name,
schema, info_cache=info_cache)
+
+
+def _pragma_cursor(cursor):
+ """work around SQLite issue whereby cursor.description is blank when PRAGMA returns no rows."""
+
+ if cursor.closed:
+ cursor._fetchone_impl = lambda: None
+ return cursor
)
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.util import _state_has_identity, has_identity
-
+from sqlalchemy.orm import attributes, collections
class DynaLoader(strategies.AbstractRelationLoader):
def init_class_attribute(self, mapper):
collection_history = self._modified_event(state)
collection_history.added_items.append(value)
- if self.trackparent and value is not None:
- self.sethasparent(attributes.instance_state(value), True)
for ext in self.extensions:
ext.append(state, value, initiator or self)
+ if self.trackparent and value is not None:
+ self.sethasparent(attributes.instance_state(value), True)
+
def fire_remove_event(self, state, value, initiator):
collection_history = self._modified_event(state)
collection_history.deleted_items.append(value)
ext.remove(state, value, initiator or self)
def _modified_event(self, state):
- state.modified = True
+
if self.key not in state.committed_state:
state.committed_state[self.key] = CollectionHistory(self, state)
+ state.modified_event(self, False, attributes.NEVER_SET, passive=attributes.PASSIVE_NO_INITIALIZE)
+
# this is a hack to allow the _base.ComparableEntity fixture
# to work
state.dict[self.key] = True
if initiator is self:
return
+ self._set_iterable(state, value)
+
+ def _set_iterable(self, state, iterable, adapter=None):
+
collection_history = self._modified_event(state)
+ new_values = list(iterable)
+
if _state_has_identity(state):
old_collection = list(self.get(state))
else:
old_collection = []
- collection_history.replace(old_collection, value)
+
+ collections.bulk_replace(new_values, DynCollectionAdapter(self, state, old_collection), DynCollectionAdapter(self, state, new_values))
def delete(self, *args, **kwargs):
raise NotImplementedError()
if initiator is not self:
self.fire_remove_event(state, value, initiator)
+class DynCollectionAdapter(object):
+ """the dynamic analogue to orm.collections.CollectionAdapter"""
+
+ def __init__(self, attr, owner_state, data):
+ self.attr = attr
+ self.state = owner_state
+ self.data = data
+
+ def __iter__(self):
+ return iter(self.data)
+
+ def append_with_event(self, item, initiator=None):
+ self.attr.append(self.state, item, initiator)
+
+ def remove_with_event(self, item, initiator=None):
+ self.attr.remove(self.state, item, initiator)
+
+ def append_without_event(self, item):
+ pass
+
+ def remove_without_event(self, item):
+ pass
class AppenderMixin(object):
query_class = None
self.deleted_items = []
self.added_items = []
self.unchanged_items = []
-
- def replace(self, olditems, newitems):
- self.added_items = newitems
- self.deleted_items = olditems
not self._deleted and not self._new):
return
+
dirty = self._dirty_states
if not dirty and not self._deleted and not self._new:
self.identity_map.modified = False
raise exc.UnmappedInstanceError(instance)
def class_mapper(class_, compile=True):
- """Given a class (or an object), return the primary Mapper associated with the key.
+ """Given a class, return the primary Mapper associated with the key.
Raises UnmappedClassError if no mapping is configured.
manager = attributes.manager_of_class(cls)
return manager and _INSTRUMENTOR in manager.info
return False
-
+
def instance_str(instance):
"""Return a string describing an instance."""
return schema + "." + self.column.table.name + "." + self.column.key
elif isinstance(self._colspec, basestring):
return self._colspec
+ elif hasattr(self._colspec, '__clause_element__'):
+ _column = self._colspec.__clause_element__()
else:
- return "%s.%s" % (self._colspec.table.fullname, self._colspec.key)
+ _column = self._colspec
+
+ return "%s.%s" % (_column.table.fullname, _column.key)
+
target_fullname = property(_get_colspec)
def references(self, table):
return other.__clause_element__()
elif not isinstance(other, ClauseElement):
return self._bind_param(other)
- elif isinstance(other, _SelectBaseMixin):
+ elif isinstance(other, (_SelectBaseMixin, Alias)):
return other.as_scalar()
else:
return other
return Join(self, right, onclause, True)
def alias(self, name=None):
- """return an alias of this ``FromClause`` against another ``FromClause``."""
+ """return an alias of this ``FromClause``.
+
+ For table objects, this has the effect of the table being rendered
+ as ``tablename AS aliasname`` in a SELECT statement.
+ For select objects, the effect is that of creating a named
+ subquery, i.e. ``(select ...) AS aliasname``.
+ The ``alias()`` method is the general way to create
+ a "subquery" out of an existing SELECT.
+
+ The ``name`` parameter is optional, and if left blank an
+ "anonymous" name will be generated at compile time, guaranteed
+ to be unique against other anonymous constructs used in the
+ same statement.
+
+ """
return Alias(self, name)
the same type.
"""
- return isinstance(other, _BindParamClause) and other.type.__class__ == self.type.__class__
+ return isinstance(other, _BindParamClause) and other.type.__class__ == self.type.__class__ and self.value == other.value
def __getstate__(self):
"""execute a deferred value for serialization purposes."""
return self.name.encode('ascii', 'backslashreplace')
# end Py2K
+ def as_scalar(self):
+ try:
+ return self.element.as_scalar()
+ except AttributeError:
+ raise AttributeError("Element %s does not support 'as_scalar()'" % self.element)
+
def is_derived_from(self, fromclause):
if fromclause in self._cloned_set:
return True
# longer the case
sa.orm.compile_mappers()
+ eq_(str(Address.user_id.property.columns[0].foreign_keys[0]), "ForeignKey('users.id')")
+
Base.metadata.create_all()
u1 = User(name='u1', addresses=[
Address(email='one'),
import operator
from sqlalchemy.orm import dynamic_loader, backref
from testlib import testing
-from testlib.sa import Table, Column, Integer, String, ForeignKey, desc
-from testlib.sa.orm import mapper, relation, create_session, Query
+from testlib.sa import Table, Column, Integer, String, ForeignKey, desc, select, func
+from testlib.sa.orm import mapper, relation, create_session, Query, attributes
from testlib.testing import eq_
from testlib.compat import _function_named
from orm import _base, _fixtures
assert type(q).__name__ == 'MyQuery'
-class FlushTest(_fixtures.FixtureTest):
+class SessionTest(_fixtures.FixtureTest):
run_inserts = None
@testing.resolve_artifact_names
- def test_basic(self):
+ def test_events(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses))
+ })
+ sess = create_session()
+ u1 = User(name='jack')
+ a1 = Address(email_address='foo')
+ sess.add_all([u1, a1])
+ sess.flush()
+
+ assert testing.db.scalar(select([func.count(1)]).where(addresses.c.user_id!=None)) == 0
+ u1 = sess.query(User).get(u1.id)
+ u1.addresses.append(a1)
+ sess.flush()
+
+ assert testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall() == [
+ (a1.id, u1.id, 'foo')
+ ]
+
+ u1.addresses.remove(a1)
+ sess.flush()
+ assert testing.db.scalar(select([func.count(1)]).where(addresses.c.user_id!=None)) == 0
+
+ u1.addresses.append(a1)
+ sess.flush()
+ assert testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall() == [
+ (a1.id, u1.id, 'foo')
+ ]
+
+ a2= Address(email_address='bar')
+ u1.addresses.remove(a1)
+ u1.addresses.append(a2)
+ sess.flush()
+ assert testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall() == [
+ (a2.id, u1.id, 'bar')
+ ]
+
+
+ @testing.resolve_artifact_names
+ def test_merge(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses), order_by=addresses.c.email_address)
+ })
+ sess = create_session()
+ u1 = User(name='jack')
+ a1 = Address(email_address='a1')
+ a2 = Address(email_address='a2')
+ a3 = Address(email_address='a3')
+
+ u1.addresses.append(a2)
+ u1.addresses.append(a3)
+
+ sess.add_all([u1, a1])
+ sess.flush()
+
+ u1 = User(id=u1.id, name='jack')
+ u1.addresses.append(a1)
+ u1.addresses.append(a3)
+ u1 = sess.merge(u1)
+ assert attributes.get_history(u1, 'addresses') == (
+ [a1],
+ [a3],
+ [a2]
+ )
+
+ sess.flush()
+
+ eq_(
+ list(u1.addresses),
+ [a1, a3]
+ )
+
+ @testing.resolve_artifact_names
+ def test_flush(self):
mapper(User, users, properties={
'addresses':dynamic_loader(mapper(Address, addresses))
})
assert 'addresses' not in u1.__dict__.keys()
u1.addresses = [Address(email_address='test')]
assert 'addresses' in dir(u1)
+
+ @testing.resolve_artifact_names
+ def test_collection_set(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses), order_by=addresses.c.email_address)
+ })
+ sess = create_session(autoflush=True, autocommit=False)
+ u1 = User(name='jack')
+ a1 = Address(email_address='a1')
+ a2 = Address(email_address='a2')
+ a3 = Address(email_address='a3')
+ a4 = Address(email_address='a4')
+
+ sess.add(u1)
+ u1.addresses = [a1, a3]
+ assert list(u1.addresses) == [a1, a3]
+ u1.addresses = [a1, a2, a4]
+ assert list(u1.addresses) == [a1, a2, a4]
+ u1.addresses = [a2, a3]
+ assert list(u1.addresses) == [a2, a3]
+ u1.addresses = []
+ assert list(u1.addresses) == []
+
+
+
@testing.resolve_artifact_names
def test_rollback(self):
test_backref = _function_named(
test_backref, "test%s%s" % ((autoflush and "_autoflush" or ""),
(saveuser and "_saveuser" or "_savead")))
- setattr(FlushTest, test_backref.__name__, test_backref)
+ setattr(SessionTest, test_backref.__name__, test_backref)
for autoflush in (False, True):
for saveuser in (False, True):
l = q.filter(users.c.id == 7).all()
assert [User(id=7, address=Address(id=1))] == l
+ @testing.resolve_artifact_names
+ def test_many_to_one_binds(self):
+ mapper(Address, addresses, primary_key=[addresses.c.user_id, addresses.c.email_address])
+
+ mapper(User, users, properties = dict(
+ address = relation(Address, uselist=False,
+ primaryjoin=sa.and_(users.c.id==addresses.c.user_id, addresses.c.email_address=='ed@bettyboop.com')
+ )
+ ))
+ q = create_session().query(User)
+ eq_(
+ [
+ User(id=7, address=None),
+ User(id=8, address=Address(id=3)),
+ User(id=9, address=None),
+ User(id=10, address=None),
+ ],
+ list(q)
+ )
+
+
@testing.resolve_artifact_names
def test_double(self):
"""tests lazy loading with two relations simulatneously, from the same table, using aliases. """
l = list(session.query(User).instances(s.execute(emailad = 'jack@bean.com')))
eq_([User(id=7)], l)
-
+ def test_scalar_subquery(self):
+ session = create_session()
+
+ q = session.query(User.id).filter(User.id==7).subquery()
+
+ q = session.query(User).filter(User.id==q)
+
+ eq_(User(id=7), q.one())
+
+
def test_in(self):
session = create_session()
s = session.query(User.id).join(User.addresses).group_by(User.id).having(func.count(Address.id) > 2)
q2 = q.group_by([User.name.like('%j%')]).order_by(desc(User.name.like('%j%'))).values(User.name.like('%j%'), func.count(User.name.like('%j%')))
self.assertEquals(list(q2), [(True, 1), (False, 3)])
- def test_scalar_subquery(self):
+ def test_correlated_subquery(self):
"""test that a subquery constructed from ORM attributes doesn't leak out
those entities to the outermost query.
'profiling.zoomark_orm',
)
alltests = unittest.TestSuite()
+ if testenv.testlib.config.coverage_enabled:
+ return alltests
+
for name in modules_to_test:
mod = __import__(name)
for token in name.split('.')[1:]:
s = select([table1.c.myid]).correlate(None).as_scalar()
self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable")
+ # test that aliases use as_scalar() when used in an explicitly scalar context
+ s = select([table1.c.myid]).alias()
+ self.assert_compile(select([table1.c.myid]).where(table1.c.myid==s), "SELECT mytable.myid FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable)")
+ self.assert_compile(select([table1.c.myid]).where(s > table1.c.myid), "SELECT mytable.myid FROM mytable WHERE mytable.myid < (SELECT mytable.myid FROM mytable)")
+
+
s = select([table1.c.myid]).as_scalar()
self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable")
"myothertable.othername = :othername_2 OR myothertable.otherid = :otherid_1) AND sysdate() = today()",
checkparams = {'othername_1': 'asdf', 'othername_2':'foo', 'otherid_1': 9, 'myid_1': 12}
)
+
def test_distinct(self):
self.assert_compile(
options = None
file_config = None
+coverage_enabled = False
base_config = """
[db]
elif opt_str.endswith('-debug'):
logging.getLogger(value).setLevel(logging.DEBUG)
-def _start_coverage(option, opt_str, value, parser):
+def _start_cumulative_coverage(option, opt_str, value, parser):
+ _start_coverage(option, opt_str, value, parser, erase=False)
+
+def _start_coverage(option, opt_str, value, parser, erase=True):
import sys, atexit, coverage
true_out = sys.stdout
-
- def _iter_covered_files():
- import sqlalchemy
- for rec in os.walk(os.path.dirname(sqlalchemy.__file__)):
+
+ global coverage_enabled
+ coverage_enabled = True
+
+ def _iter_covered_files(mod, recursive=True):
+
+ if recursive:
+ ff = os.walk
+ else:
+ ff = os.listdir
+
+ for rec in ff(os.path.dirname(mod.__file__)):
for x in rec[2]:
if x.endswith('.py'):
yield os.path.join(rec[0], x)
+
def _stop():
coverage.stop()
true_out.write("\nPreparing coverage report...\n")
- coverage.report(list(_iter_covered_files()),
- show_missing=False, ignore_errors=False,
- file=true_out)
+
+ from sqlalchemy import sql, orm, engine, \
+ ext, databases, log
+
+ import sqlalchemy
+
+ for modset in [
+ _iter_covered_files(sqlalchemy, recursive=False),
+ _iter_covered_files(databases),
+ _iter_covered_files(engine),
+ _iter_covered_files(ext),
+ _iter_covered_files(orm),
+ ]:
+ coverage.report(list(modset),
+ show_missing=False, ignore_errors=False,
+ file=true_out)
atexit.register(_stop)
- coverage.erase()
+ if erase:
+ coverage.erase()
coverage.start()
def _list_dbs(*args):
help="Add a dialect-specific table option, key=value")
opt("--coverage", action="callback", callback=_start_coverage,
help="Dump a full coverage report after running tests")
+opt("--cumulative-coverage", action="callback", callback=_start_cumulative_coverage,
+ help="Like --coverage, but accumlate coverage into the current DB")
opt("--profile", action="append", dest="profile_targets", default=[],
help="Enable a named profile target (multiple OK.)")
opt("--profile-sort", action="store", dest="profile_sort", default=None,