=======
CHANGES
=======
+
0.5.4
=====
+
- orm
- - Fixed the "set collection" function on "dynamic" relations
- to initiate events correctly. Previously a collection
- could only be assigned to a pending parent instance,
- otherwise modified events would not be fired correctly.
- Set collection is now compatible with merge(),
- fixes [ticket:1352].
-
- - Lazy loader will not use get() if the "lazy load"
- SQL clause matches the clause used by get(), but
- contains some parameters hardcoded. Previously
- the lazy strategy would fail with the get(). Ideally
- get() would be used with the hardcoded parameters
- but this would require further development.
+ - Modified query_cls on DynamicAttributeImpl to accept a full
+ mixin version of the AppenderQuery, which allows subclassing
+ the AppenderMixin.
+
+ - Fixed the evaluator not being able to evaluate IS NULL clauses.
+
+ - Fixed the "set collection" function on "dynamic" relations to
+ initiate events correctly. Previously a collection could only
+ be assigned to a pending parent instance, otherwise modified
+ events would not be fired correctly. Set collection is now
+ compatible with merge(), fixes [ticket:1352].
+
+ - Allowed pickling of PropertyOption objects constructed with
+ instrumented descriptors; previously, pickle errors would occur
+ when pickling an object which was loaded with a descriptor-based
+ option, such as query.options(eagerload(MyClass.foo)).
+
+ - Lazy loader will not use get() if the "lazy load" SQL clause
+ matches the clause used by get(), but contains some parameters
+ hardcoded. Previously the lazy strategy would fail with the
+ get(). Ideally get() would be used with the hardcoded
+ parameters but this would require further development.
[ticket:1357]
+ - Fixed another location where autoflush was interfering
+ with session.merge(). autoflush is disabled completely
+ for the duration of merge() now. [ticket:1360]
+
+ - Fixed bug in relation(), introduced in 0.5.3,
+ whereby a self referential relation
+ from a base class to a joined-table subclass would
+ not configure correctly.
+
+ - Fixed documentation for session weak_identity_map -
+ the default value is True, indicating a weak
+ referencing map in use.
+
+ - Fixed a unit of work issue whereby the foreign
+ key attribute on an item contained within a collection
+ owned by an object being deleted would not be set to
+ None if the relation() was self-referential. [ticket:1376]
+
+ - Fixed Query.update() and Query.delete() failures with eagerloaded
+ relations. [ticket:1378]
+
+- schema
+ - Added a quote_schema() method to the IdentifierPreparer class
+ so that dialects can override how schemas get handled. This
+ enables the MSSQL dialect to treat schemas as multipart
+ identifiers, such as 'database.owner'. [ticket: 594, 1341]
+
- sql
- - Fixed __repr__() and other _get_colspec() methods on
+ - ``sqlalchemy.extract()`` is now dialect sensitive and can
+ extract components of timestamps idiomatically across the
+ supported databases, including SQLite.
+
+ - Fixed __repr__() and other _get_colspec() methods on
ForeignKey constructed from __clause_element__() style
construct (i.e. declarative columns). [ticket:1353]
-
+
- mssql
+ - Modified how savepoint logic works to prevent it from
+ stepping on non-savepoint oriented routines. Savepoint
+ support is still very experimental.
+
+ - Added in reserved words for MSSQL that covers version 2008
+ and all prior versions. [ticket:1310]
+
- Corrected problem with information schema not working with a
- binary collation based database. Cleaned up information
- schema since it is only used by mssql now. [ticket:1343]
+ binary collation based database. Cleaned up information schema
+ since it is only used by mssql now. [ticket:1343]
+
+- sqlite
+ - Corrected the float type so that it correctly maps to a
+ SLFloat type when being reflected. [ticket:1273]
+
+- extensions
+
+ - Fixed adding of deferred or other column properties to a
+ declarative class. [ticket:1379]
0.5.3
=====
# You can set these variables from the command line.
SPHINXOPTS =
-SPHINXBUILD = ./bin/sphinx-build
+SPHINXBUILD = sphinx-build
PAPER =
# Internal variables.
# mysql
mysql_db = create_engine('mysql://localhost/foo')
+ # oracle
+ oracle_db = create_engine('oracle://scott:tiger@host:port/dbname?key1=value1&key2=value2')
+
# oracle via TNS name
- oracle_db = create_engine('oracle://scott:tiger@dsn')
+ oracle_db = create_engine('oracle://scott:tiger@tnsname')
+ oracle_db = create_engine('oracle://scott:tiger@tnsname/?key1=value1&key2=value2')
# oracle will feed host/port/SID into cx_oracle.makedsn
oracle_db = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname')
mssql_db = create_engine('mssql://username:password@localhost/database')
# mssql via a DSN connection
- mssql_db = create_engine('mssql://username:password@/?dsn=mydsn')
+ mssql_db = create_engine('mssql://mydsn')
+ mssql_db = create_engine('mssql://username:password@mydsn')
The :class:`~sqlalchemy.engine.base.Engine` will ask the connection pool for a connection when the ``connect()`` or ``execute()`` methods are called. The default connection pool, :class:`~sqlalchemy.pool.QueuePool`, as well as the default connection pool used with SQLite, :class:`~sqlalchemy.pool.SingletonThreadPool`, will open connections to the database on an as-needed basis. As concurrent statements are executed, :class:`~sqlalchemy.pool.QueuePool` will grow its pool of connections to a default size of five, and will allow a default "overflow" of ten. Since the ``Engine`` is essentially "home base" for the connection pool, it follows that you should keep a single :class:`~sqlalchemy.engine.base.Engine` per database established within an application, rather than creating a new one for each connection.
.. sourcecode:: python+sql
- {sql}>>> try: #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> from sqlalchemy.orm.exc import MultipleResultsFound
+ >>> try: #doctest: +NORMALIZE_WHITESPACE
... user = query.one()
- ... except Exception, e:
+ ... except MultipleResultsFound, e:
... print e
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
.. sourcecode:: python+sql
- {sql}>>> try: #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> from sqlalchemy.orm.exc import NoResultFound
+ >>> try: #doctest: +NORMALIZE_WHITESPACE
... user = query.filter(User.id == 99).one()
- ... except Exception, e:
+ ... except NoResultFound, e:
... print e
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
LIMIT 2 OFFSET 0
['jack']
- >>> jack
+ {stop}>>> jack
<User('jack','Jack Bean', 'gjffdd')>
Let's look at the ``addresses`` collection. Watch the SQL:
anon_1.users_fullname AS anon_1_users_fullname, anon_1.users_password AS anon_1_users_password,
addresses_1.id AS addresses_1_id, addresses_1.email_address AS addresses_1_email_address,
addresses_1.user_id AS addresses_1_user_id
- FROM (SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname,
- users.password AS users_password
- FROM users WHERE users.name = ?
- LIMIT 2 OFFSET 0) AS anon_1 LEFT OUTER JOIN addresses AS addresses_1
- ON anon_1.users_id = addresses_1.user_id ORDER BY addresses_1.id
- ['jack']
-
- >>> jack
+ FROM (SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname,
+ users.password AS users_password
+ FROM users WHERE users.name = ?
+ LIMIT 2 OFFSET 0) AS anon_1 LEFT OUTER JOIN addresses AS addresses_1
+ ON anon_1.users_id = addresses_1.user_id ORDER BY addresses_1.id
+ ['jack']
+
+ {stop}>>> jack
<User('jack','Jack Bean', 'gjffdd')>
>>> jack.addresses
WHERE users.name = ?
LIMIT 2 OFFSET 0
['wendy']
-
+ {stop}
>>> post = BlogPost("Wendy's Blog Post", "This is a test", wendy)
>>> session.add(post)
:author: Mike Bayer
-Serializer/Deserializer objects for usage with SQLAlchemy structures.
-
-Any SQLAlchemy structure, including Tables, Columns, expressions, mappers,
-Query objects etc. can be serialized in a minimally-sized format,
-and deserialized when given a Metadata and optional ScopedSession object
-to use as context on the way out.
-
-Usage is nearly the same as that of the standard Python pickle module:
-
-.. sourcecode:: python+sql
-
- from sqlalchemy.ext.serializer import loads, dumps
- metadata = MetaData(bind=some_engine)
- Session = scoped_session(sessionmaker())
-
- # ... define mappers
-
- query = Session.query(MyClass).filter(MyClass.somedata=='foo').order_by(MyClass.sortkey)
-
- # pickle the query
- serialized = dumps(query)
-
- # unpickle. Pass in metadata + scoped_session
- query2 = loads(serialized, metadata, Session)
-
- print query2.all()
-
-Similar restrictions as when using raw pickle apply; mapped classes must be
-themselves be pickleable, meaning they are importable from a module-level
-namespace.
-
-Note that instances of user-defined classes do not require this extension
-in order to be pickled; these contain no references to engines, sessions
-or expression constructs in the typical case and can be serialized directly.
-This module is specifically for ORM and expression constructs.
-
.. automodule:: sqlalchemy.ext.serializer
:members:
:undoc-members:
print "----------------------------"
session = create_session()
-session.save(node)
+session.add(node)
session.flush()
print "\n\n\n----------------------------"
print "tree new where node_id=%d:" % nodeid
print "----------------------------"
-session.clear()
+session.expunge_all()
t = session.query(TreeNode).filter(TreeNode.id==nodeid)[0]
print "\n\n\n----------------------------"
of the price on each Item (since those can change).
"""
-import logging
from datetime import datetime
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import (create_engine, MetaData, Table, Column, Integer,
+ String, DateTime, Numeric, ForeignKey, and_)
+from sqlalchemy.orm import mapper, relation, create_session
# Uncomment these to watch database activity.
+#import logging
#logging.basicConfig(format='%(message)s')
#logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
session = create_session()
# create our catalog
-session.save(Item('SA T-Shirt', 10.99))
-session.save(Item('SA Mug', 6.50))
-session.save(Item('SA Hat', 8.99))
-session.save(Item('MySQL Crowbar', 16.99))
+session.add(Item('SA T-Shirt', 10.99))
+session.add(Item('SA Mug', 6.50))
+session.add(Item('SA Hat', 8.99))
+session.add(Item('MySQL Crowbar', 16.99))
session.flush()
# function to return items from the DB
order.order_items.append(OrderItem(item('SA Mug')))
order.order_items.append(OrderItem(item('MySQL Crowbar'), 10.99))
order.order_items.append(OrderItem(item('SA Hat')))
-session.save(order)
+session.add(order)
session.flush()
-session.clear()
+session.expunge_all()
# query the order, print items
order = session.query(Order).filter_by(customer_name='john smith').one()
the usage of the associationproxy extension."""
from datetime import datetime
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import (create_engine, MetaData, Table, Column, Integer,
+ String, DateTime, Float, ForeignKey, and_)
+from sqlalchemy.orm import mapper, relation, create_session
from sqlalchemy.ext.associationproxy import AssociationProxy
engine = create_engine('sqlite://')
session.add(order)
session.flush()
-session.clear()
+session.expunge_all()
# query the order, print items
order = session.query(Order).filter_by(customer_name='john smith').one()
presents a more refined version of some of these patterns.
"""
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
+from sqlalchemy.orm import (mapper, relation, create_session, MapperExtension,
+ object_session)
+
meta = MetaData('sqlite://')
meta.bind.echo = True
org.members.append(Member('member two'))
org.members.append(Member('member three'))
-sess.save(org)
+sess.add(org)
print "-------------------------\nflush one - save org + 3 members"
sess.flush()
-sess.clear()
+sess.expunge_all()
# reload. load the org and some child members
print "-------------------------\nload subset of members"
members = org.member_query.filter(member_table.c.name.like('%member t%')).all()
print members
-sess.clear()
+sess.expunge_all()
# reload. create some more members and flush, without loading any of the original members
print "-------------------------\nflush two - save 3 more members"
sess.flush()
-sess.clear()
+sess.expunge_all()
org = sess.query(Organization).get(org.org_id)
# now delete. note that this will explictily delete members four, five and six because they are in the session,
with a custom attribute system as well.
"""
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, Text,
+ ForeignKey)
+from sqlalchemy.orm import (mapper, relation, create_session,
+ InstrumentationManager)
from sqlalchemy.orm.attributes import set_attribute, get_attribute, del_attribute, is_instrumented
from sqlalchemy.orm.collections import collection_adapter
assert isinstance(a1.bs, MyCollection)
sess = create_session()
- sess.save(a1)
+ sess.add(a1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
a1 = sess.query(A).get(a1.id)
a1.bs.remove(a1.bs[0])
sess.flush()
- sess.clear()
+ sess.expunge_all()
a1 = sess.query(A).get(a1.id)
assert len(a1.bs) == 1
if __name__ == '__main__':
- from sqlalchemy import *
- from sqlalchemy.orm import *
+ from sqlalchemy import Column, Integer, String, ForeignKey
+ from sqlalchemy.orm import relation
from sqlalchemy.ext.declarative import declarative_base
class Base(object):
m1.related.mapped.append(MyMappedClass(data='m2'))
del m1.data
-
\ No newline at end of file
+
### Example code
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import MetaData, Table, Column, Integer
+from sqlalchemy.orm import mapper, create_session
metadata = MetaData('sqlite://')
metadata.bind.echo = True
intervals = [Interval1(1,4), Interval1(3,15), Interval1(11,16)]
for interval in intervals:
- session.save(interval)
- session.save(Interval2(interval.start, interval.length))
+ session.add(interval)
+ session.add(Interval2(interval.start, interval.length))
session.flush()
print "Clear the cache and do some queries"
-session.clear()
+session.expunge_all()
for Interval in (Interval1, Interval2):
print "Querying using interval class %s" % Interval.__name__
self.collection.append(value)\r
\r
from sqlalchemy.ext.declarative import declarative_base\r
-from sqlalchemy import *\r
-from sqlalchemy.orm import *\r
+from sqlalchemy import create_engine, Column, Integer, String, ForeignKey\r
+from sqlalchemy.orm import sessionmaker, dynamic_loader\r
\r
Base = declarative_base(engine=create_engine('sqlite://'))\r
\r
"""
################################# PART I - Imports/Coniguration ###########################################
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import (MetaData, Table, Column, Integer, String, ForeignKey,
+ Unicode, and_)
+from sqlalchemy.orm import mapper, relation, create_session, lazyload
import sys, os, StringIO, re
#logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
-from elementtree import ElementTree
-from elementtree.ElementTree import Element, SubElement
+from xml.etree import ElementTree
meta = MetaData()
meta.bind = 'sqlite://'
# get ElementTree documents
for file in ('test.xml', 'test2.xml', 'test3.xml'):
- filename = os.path.join(os.path.dirname(sys.argv[0]), file)
+ filename = os.path.join(os.path.dirname(__file__), file)
doc = ElementTree.parse(filename)
- session.save(Document(file, doc))
+ session.add(Document(file, doc))
print "\nSaving three documents...", line
session.flush()
print "Done."
# clear session (to illustrate a full load), restore
-session.clear()
+session.expunge_all()
print "\nFull text of document 'text.xml':", line
document = session.query(Document).filter_by(filename="test.xml").first()
"""
################################# PART I - Imports/Configuration ###########################################
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import (MetaData, Table, Column, Integer, String, ForeignKey,
+ Unicode, and_)
+from sqlalchemy.orm import mapper, relation, create_session, lazyload
import sys, os, StringIO, re
#logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
-from elementtree import ElementTree
-from elementtree.ElementTree import Element, SubElement
+from xml.etree import ElementTree
meta = MetaData()
meta.bind = 'sqlite://'
# get ElementTree documents
for file in ('test.xml', 'test2.xml', 'test3.xml'):
- filename = os.path.join(os.path.dirname(sys.argv[0]), file)
+ filename = os.path.join(os.path.dirname(__file__), file)
doc = ElementTree.parse(filename)
- session.save(Document(file, doc))
+ session.add(Document(file, doc))
print "\nSaving three documents...", line
session.flush()
print "Done."
# clear session (to illustrate a full load), restore
-session.clear()
+session.expunge_all()
print "\nFull text of document 'text.xml':", line
document = session.query(Document).filter_by(filename="test.xml").first()
styles of persistence are identical, as is the structure of the main Document class.
"""
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, String,
+ PickleType)
+from sqlalchemy.orm import mapper, create_session
import sys, os
# uncomment to show SQL statements and result sets
#logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
-from elementtree import ElementTree
+from xml.etree import ElementTree
engine = create_engine('sqlite://')
meta = MetaData(engine)
###### time to test ! #########
# get ElementTree document
-filename = os.path.join(os.path.dirname(sys.argv[0]), "test.xml")
+filename = os.path.join(os.path.dirname(__file__), "test.xml")
doc = ElementTree.parse(filename)
# save to DB
session = create_session()
-session.save(Document("test.xml", doc))
+session.add(Document("test.xml", doc))
session.flush()
# clear session (to illustrate a full load), restore
-session.clear()
+session.expunge_all()
document = session.query(Document).filter_by(filename="test.xml").first()
# print
"""a directed graph example."""
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import MetaData, Table, Column, Integer, ForeignKey
+from sqlalchemy.orm import mapper, relation, create_session
import logging
logging.basicConfig()
n1.add_neighbor(n3)
n2.add_neighbor(n1)
-[session.save(x) for x in [n1, n2, n3, n4, n5, n6, n7]]
+[session.add(x) for x in [n1, n2, n3, n4, n5, n6, n7]]
session.flush()
-session.clear()
+session.expunge_all()
n2 = session.query(Node).get(2)
n3 = session.query(Node).get(3)
"""
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from sqlalchemy.orm import attributes
+from sqlalchemy import (create_engine, Column, Integer, String, select, case,
+ func)
+from sqlalchemy.orm import sessionmaker, MapperExtension, aliased
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine('sqlite://', echo=True)
"""illustrates one way to use a custom pickler that is session-aware."""
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import MetaData, Table, Column, Integer, String, PickleType
+from sqlalchemy.orm import (mapper, create_session, MapperExtension,
+ class_mapper, EXT_CONTINUE)
from sqlalchemy.orm.session import object_session
from cStringIO import StringIO
from pickle import Pickler, Unpickler
if getattr(obj, "id", None) is None:
sess = MyPickler.sessions.current
newsess = create_session(bind=sess.connection(class_mapper(Bar)))
- newsess.save(obj)
+ newsess.add(obj)
newsess.flush()
key = "%s:%s" % (type(obj).__name__, obj.id)
return key
sess = create_session()
f = Foo()
f.bar = Bar('some bar')
-sess.save(f)
+sess.add(f)
sess.flush()
-sess.clear()
+sess.expunge_all()
del MyPickler.sessions.current
"""
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import MetaData, Table, Column, Integer, String, and_
+from sqlalchemy.orm import (mapper, relation, create_session, class_mapper,
+ backref)
metadata = MetaData('sqlite://')
a3.street = '444 park ave.'
sess = create_session()
-sess.save(u1)
-sess.save(o1)
+sess.add(u1)
+sess.add(o1)
sess.flush()
-sess.clear()
+sess.expunge_all()
# query objects, get their addresses
poly_assoc_generic.py.
"""
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
+from sqlalchemy.orm import mapper, relation, create_session, class_mapper
metadata = MetaData('sqlite://')
o1.address.street = '444 park ave.'
sess = create_session()
-sess.save(u1)
-sess.save(o1)
+sess.add(u1)
+sess.add(o1)
sess.flush()
-sess.clear()
+sess.expunge_all()
# query objects, get their addresses
"interface".
"""
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
+from sqlalchemy.orm import mapper, relation, create_session, class_mapper
metadata = MetaData('sqlite://')
o1.address.street = '444 park ave.'
sess = create_session()
-sess.save(u1)
-sess.save(o1)
+sess.add(u1)
+sess.add(o1)
sess.flush()
-sess.clear()
+sess.expunge_all()
# query objects, get their addresses
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
+from sqlalchemy.orm import mapper, create_session, polymorphic_union
metadata = MetaData()
e1 = Engineer("wally", "engineer1")
e2 = Engineer("dilbert", "engineer2")
-session.save(m1)
-session.save(e1)
-session.save(e2)
+session.add(m1)
+session.add(e1)
+session.add(e2)
session.flush()
-employees = session.query(Employee).select()
+employees = session.query(Employee)
print [e for e in employees]
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
+from sqlalchemy.orm import mapper, relation, create_session
import sets
# this example illustrates a polymorphic load of two classes
person_join = people.outerjoin(engineers).outerjoin(managers)
-person_mapper = mapper(Person, people, select_table=person_join,polymorphic_on=people.c.type, polymorphic_identity='person')
+person_mapper = mapper(Person, people, polymorphic_on=people.c.type, polymorphic_identity='person')
mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
mapper(Company, companies, properties={
- 'employees': relation(Person, lazy=False, private=True, backref='company')
+ 'employees': relation(Person, lazy=False, backref='company', cascade="all, delete-orphan")
})
session = create_session(echo_uow=False)
c.employees.append(Person(name='joesmith', status='HHH'))
c.employees.append(Engineer(name='wally', status='CGG', engineer_name='engineer2', primary_language='python'))
c.employees.append(Manager(name='jsmith', status='ABA', manager_name='manager2'))
-session.save(c)
+session.add(c)
print session.new
session.flush()
-session.clear()
+session.expunge_all()
c = session.query(Company).get(1)
for e in c.employees:
- print e, e._instance_key, e.company
+ print e, e._sa_instance_state.key, e.company
assert sets.Set([e.name for e in c.employees]) == sets.Set(['pointy haired boss', 'dilbert', 'joesmith', 'wally', 'jsmith'])
print "\n"
-dilbert = session.query(Person).get_by(name='dilbert')
-dilbert2 = session.query(Engineer).get_by(name='dilbert')
+dilbert = session.query(Person).filter_by(name='dilbert').one()
+dilbert2 = session.query(Engineer).filter_by(name='dilbert').one()
assert dilbert is dilbert2
dilbert.engineer_name = 'hes dibert!'
session.flush()
-session.clear()
+session.expunge_all()
c = session.query(Company).get(1)
for e in c.employees:
- print e, e._instance_key
+ print e, e._sa_instance_state.key
session.delete(c)
session.flush()
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
+from sqlalchemy.orm import mapper, relation, create_session
metadata = MetaData('sqlite://')
metadata.bind.echo = 'debug'
mapper(Company, companies, properties={
- 'employees': relation(Person, lazy=True, private=True, backref='company')
+ 'employees': relation(Person, lazy=True, backref='company')
})
session = create_session()
c.employees.append(Person(name='joesmith', status='HHH'))
c.employees.append(Engineer(name='wally', status='CGG', engineer_name='engineer2', primary_language='python'))
c.employees.append(Manager(name='jsmith', status='ABA', manager_name='manager2'))
-session.save(c)
+session.add(c)
session.flush()
-session.clear()
+session.expunge_all()
c = session.query(Company).get(1)
for e in c.employees:
- print e, e._instance_key, e.company
+ print e, e._sa_instance_state.key, e.company
print "\n"
-dilbert = session.query(Person).get_by(name='dilbert')
-dilbert2 = session.query(Engineer).get_by(name='dilbert')
+dilbert = session.query(Person).filter_by(name='dilbert').one()
+dilbert2 = session.query(Engineer).filter_by(name='dilbert').one()
assert dilbert is dilbert2
dilbert.engineer_name = 'hes dibert!'
session.flush()
-session.clear()
+session.expunge_all()
c = session.query(Company).get(1)
for e in c.employees:
- print e, e._instance_key
+ print e, e._sa_instance_state.key
session.delete(c)
session.flush()
* a standalone operator example.
The implementation is limited to only public, well known
-and simple to use extension points, with the exception
-of one temporary monkeypatch in the DDL extension.
-Future SQLAlchemy expansion points may allow more seamless
-integration of some features.
-
+and simple to use extension points. Future SQLAlchemy
+expansion points may allow more seamless integration of
+some features.
+
"""
from sqlalchemy.orm.interfaces import AttributeExtension
# illustrate usage
if __name__ == '__main__':
- from sqlalchemy import *
- from sqlalchemy.orm import *
+ from sqlalchemy import (create_engine, MetaData, Column, Integer, String,
+ func, literal, select)
+ from sqlalchemy.orm import sessionmaker, column_property
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine('postgres://scott:tiger@localhost/gistest', echo=True)
"""
# step 1. imports
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import (create_engine, MetaData, Table, Column, Integer,
+ String, ForeignKey, Float, DateTime)
+from sqlalchemy.orm import sessionmaker, mapper, relation
from sqlalchemy.orm.shard import ShardedSession
from sqlalchemy.sql import operators
from sqlalchemy import sql
# step 2. databases
echo = True
-db1 = create_engine('sqlite:///shard1.db', echo=echo)
-db2 = create_engine('sqlite:///shard2.db', echo=echo)
-db3 = create_engine('sqlite:///shard3.db', echo=echo)
-db4 = create_engine('sqlite:///shard4.db', echo=echo)
+db1 = create_engine('sqlite://', echo=echo)
+db2 = create_engine('sqlite://', echo=echo)
+db3 = create_engine('sqlite://', echo=echo)
+db4 = create_engine('sqlite://', echo=echo)
# step 3. create session function. this binds the shard ids
sess = create_session()
for c in [tokyo, newyork, toronto, london, dublin, brasilia, quito]:
- sess.save(c)
+ sess.add(c)
sess.flush()
-sess.clear()
+sess.expunge_all()
t = sess.query(WeatherLocation).get(tokyo.id)
assert t.city == tokyo.city
north_american_cities = sess.query(WeatherLocation).filter(WeatherLocation.continent == 'North America')
assert [c.city for c in north_american_cities] == ['New York', 'Toronto']
-asia_and_europe = sess.query(WeatherLocation).filter(WeatherLocation.continent.in_('Europe', 'Asia'))
+asia_and_europe = sess.query(WeatherLocation).filter(WeatherLocation.continent.in_(['Europe', 'Asia']))
assert set([c.city for c in asia_and_europe]) == set(['Tokyo', 'London', 'Dublin'])
future version of SQLAlchemy.
"""
-from sqlalchemy.orm.interfaces import PropComparator, MapperProperty
-from sqlalchemy.orm import session as sessionlib, comparable_property
+from sqlalchemy.orm.interfaces import PropComparator
+from sqlalchemy.orm import comparable_property
# Using the VerticalPropertyDictMixin from the base example
from dictlike import VerticalPropertyDictMixin
if __name__ == '__main__':
- from sqlalchemy import *
+ from sqlalchemy import (MetaData, Table, Column, Integer, Unicode,
+ ForeignKey, UnicodeText, and_, not_, or_, String, Boolean, cast, text,
+ null, case)
from sqlalchemy.orm import mapper, relation, create_session
from sqlalchemy.orm.collections import attribute_mapped_collection
stoat[u'cuteness'] = 7
stoat[u'weasel-like'] = True
- session.save(stoat)
+ session.add(stoat)
session.flush()
- session.clear()
+ session.expunge_all()
critter = session.query(Animal).filter(Animal.name == u'stoat').one()
print critter[u'color']
marten[u'cuteness'] = 5
marten[u'weasel-like'] = True
marten[u'poisonous'] = False
- session.save(marten)
+ session.add(marten)
shrew = Animal(u'shrew')
shrew[u'cuteness'] = 5
shrew[u'weasel-like'] = False
shrew[u'poisonous'] = True
- session.save(shrew)
+ session.add(shrew)
session.flush()
q = (session.query(Animal).
if __name__ == '__main__':
- from sqlalchemy import *
+ from sqlalchemy import (MetaData, Table, Column, Integer, Unicode,
+ ForeignKey, UnicodeText, and_, not_)
from sqlalchemy.orm import mapper, relation, create_session
from sqlalchemy.orm.collections import attribute_mapped_collection
# stoat.facts collection:
print stoat.facts[u'color']
- session.save(stoat)
+ session.add(stoat)
session.flush()
- session.clear()
+ session.expunge_all()
critter = session.query(Animal).filter(Animal.name == u'stoat').one()
print critter[u'color']
marten = Animal(u'marten')
marten[u'color'] = u'brown'
marten[u'cuteness'] = u'somewhat'
- session.save(marten)
+ session.add(marten)
shrew = Animal(u'shrew')
shrew[u'cuteness'] = u'somewhat'
shrew[u'poisonous-part'] = u'saliva'
- session.save(shrew)
+ session.add(shrew)
loris = Animal(u'slow loris')
loris[u'cuteness'] = u'fairly'
loris[u'poisonous-part'] = u'elbows'
- session.save(loris)
+ session.add(loris)
session.flush()
q = (session.query(Animal).
represented in distinct database rows. This allows objects to be created with dynamically changing
fields that are all persisted in a normalized fashion."""
-from sqlalchemy import *
-from sqlalchemy.orm import *
+from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, String,
+ ForeignKey, PickleType, DateTime, and_)
+from sqlalchemy.orm import mapper, relation, sessionmaker, scoped_session
from sqlalchemy.orm.collections import mapped_collection
import datetime
class AccessCompiler(compiler.SQLCompiler):
+ extract_map = compiler.SQLCompiler.extract_map.copy()
+ extract_map.update ({
+ 'month': 'm',
+ 'day': 'd',
+ 'year': 'yyyy',
+ 'second': 's',
+ 'hour': 'h',
+ 'doy': 'y',
+ 'minute': 'n',
+ 'quarter': 'q',
+ 'dow': 'w',
+ 'week': 'ww'
+ })
+
def visit_select_precolumns(self, select):
"""Access puts TOP, it's version of LIMIT here """
s = select.distinct and "DISTINCT " or ""
return (self.process(join.left, asfrom=True) + (join.isouter and " LEFT OUTER JOIN " or " INNER JOIN ") + \
self.process(join.right, asfrom=True) + " ON " + self.process(join.onclause))
+ def visit_extract(self, extract):
+ field = self.extract_map.get(extract.field, extract.field)
+ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
class AccessSchemaGenerator(compiler.SchemaGenerator):
def get_column_specification(self, column, **kwargs):
MS_2005_VERSION = (9,)
MS_2000_VERSION = (8,)
-MSSQL_RESERVED_WORDS = set(['function'])
+RESERVED_WORDS = set(
+ ['add', 'all', 'alter', 'and', 'any', 'as', 'asc', 'authorization',
+ 'backup', 'begin', 'between', 'break', 'browse', 'bulk', 'by', 'cascade',
+ 'case', 'check', 'checkpoint', 'close', 'clustered', 'coalesce',
+ 'collate', 'column', 'commit', 'compute', 'constraint', 'contains',
+ 'containstable', 'continue', 'convert', 'create', 'cross', 'current',
+ 'current_date', 'current_time', 'current_timestamp', 'current_user',
+ 'cursor', 'database', 'dbcc', 'deallocate', 'declare', 'default',
+ 'delete', 'deny', 'desc', 'disk', 'distinct', 'distributed', 'double',
+ 'drop', 'dump', 'else', 'end', 'errlvl', 'escape', 'except', 'exec',
+ 'execute', 'exists', 'exit', 'external', 'fetch', 'file', 'fillfactor',
+ 'for', 'foreign', 'freetext', 'freetexttable', 'from', 'full',
+ 'function', 'goto', 'grant', 'group', 'having', 'holdlock', 'identity',
+ 'identity_insert', 'identitycol', 'if', 'in', 'index', 'inner', 'insert',
+ 'intersect', 'into', 'is', 'join', 'key', 'kill', 'left', 'like',
+ 'lineno', 'load', 'merge', 'national', 'nocheck', 'nonclustered', 'not',
+ 'null', 'nullif', 'of', 'off', 'offsets', 'on', 'open', 'opendatasource',
+ 'openquery', 'openrowset', 'openxml', 'option', 'or', 'order', 'outer',
+ 'over', 'percent', 'pivot', 'plan', 'precision', 'primary', 'print',
+ 'proc', 'procedure', 'public', 'raiserror', 'read', 'readtext',
+ 'reconfigure', 'references', 'replication', 'restore', 'restrict',
+ 'return', 'revert', 'revoke', 'right', 'rollback', 'rowcount',
+ 'rowguidcol', 'rule', 'save', 'schema', 'securityaudit', 'select',
+ 'session_user', 'set', 'setuser', 'shutdown', 'some', 'statistics',
+ 'system_user', 'table', 'tablesample', 'textsize', 'then', 'to', 'top',
+ 'tran', 'transaction', 'trigger', 'truncate', 'tsequal', 'union',
+ 'unique', 'unpivot', 'update', 'updatetext', 'use', 'user', 'values',
+ 'varying', 'view', 'waitfor', 'when', 'where', 'while', 'with',
+ 'writetext',
+ ])
class MSNumeric(sqltypes.Numeric):
}
)
+ extract_map = compiler.SQLCompiler.extract_map.copy()
+ extract_map.update ({
+ 'doy': 'dayofyear',
+ 'dow': 'weekday',
+ 'milliseconds': 'millisecond',
+ 'microseconds': 'microsecond'
+ })
+
def __init__(self, *args, **kwargs):
super(MSSQLCompiler, self).__init__(*args, **kwargs)
self.tablealiases = {}
kwargs['mssql_aliased'] = True
return super(MSSQLCompiler, self).visit_alias(alias, **kwargs)
- def visit_savepoint(self, savepoint_stmt):
- util.warn("Savepoint support in mssql is experimental and may lead to data loss.")
- return "SAVE TRANSACTION %s" % self.preparer.format_savepoint(savepoint_stmt)
+ def visit_extract(self, extract):
+ field = self.extract_map.get(extract.field, extract.field)
+ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
def visit_rollback_to_savepoint(self, savepoint_stmt):
return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint(savepoint_stmt)
class MSIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = compiler.IdentifierPreparer.reserved_words.union(MSSQL_RESERVED_WORDS)
+ reserved_words = RESERVED_WORDS
def __init__(self, dialect):
super(MSIdentifierPreparer, self).__init__(dialect, initial_quote='[', final_quote=']')
#TODO: determine MSSQL's escaping rules
return value
+ def quote_schema(self, schema, force=True):
+ """Prepare a quoted table and schema name."""
+ result = '.'.join([self.quote(x, force) for x in schema.split('.')])
+ return result
class MSDialect(default.DefaultDialect):
name = 'mssql'
self.max_identifier_length
super(MSDialect, self).__init__(**opts)
- def do_begin(self, connection):
- cursor = connection.cursor()
- cursor.execute("SET IMPLICIT_TRANSACTIONS OFF")
- cursor.execute("BEGIN TRANSACTION")
+ def do_savepoint(self, connection, name):
+ util.warn("Savepoint support in mssql is experimental and may lead to data loss.")
+ connection.execute("IF @@TRANCOUNT = 0 BEGIN TRANSACTION")
+ connection.execute("SAVE TRANSACTION %s" % name)
def do_release_savepoint(self, connection, name):
pass
"utc_timestamp":"UTC_TIMESTAMP"
})
+ extract_map = compiler.SQLCompiler.extract_map.copy()
+ extract_map.update ({
+ 'milliseconds': 'millisecond',
+ })
+
def visit_typeclause(self, typeclause):
type_ = typeclause.type.dialect_impl(self.dialect)
if isinstance(type_, MSInteger):
else:
return text
+ def visit_extract(self, extract, **kwargs):
+ field = self.extract_map.get(extract.field, extract.field)
+ return "EXTRACT(%s FROM %s::timestamp)" % (
+ field, self.process(extract.expr))
+
class PGDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
colspec = self.preparer.format_column(column)
text += " WHERE " + inlined_clause
return text
+
class PGDefaultRunner(base.DefaultRunner):
def __init__(self, context):
base.DefaultRunner.__init__(self, context)
}
)
+ extract_map = compiler.SQLCompiler.extract_map.copy()
+ extract_map.update({
+ 'month': '%m',
+ 'day': '%d',
+ 'year': '%Y',
+ 'second': '%S',
+ 'hour': '%H',
+ 'doy': '%j',
+ 'minute': '%M',
+ 'epoch': '%s',
+ 'dow': '%w',
+ 'week': '%W'
+ })
+
def visit_cast(self, cast, **kwargs):
if self.dialect.supports_cast:
return super(SQLiteCompiler, self).visit_cast(cast)
else:
return self.process(cast.clause)
+ def visit_extract(self, extract):
+ try:
+ return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
+ self.extract_map[extract.field], self.process(extract.expr))
+ except KeyError:
+ raise exc.ArgumentError(
+ "%s is not a valid extract argument." % extract.field)
+
def limit_clause(self, select):
text = ""
if select._limit is not None:
'drop', 'each', 'else', 'end', 'escape', 'except', 'exclusive',
'explain', 'false', 'fail', 'for', 'foreign', 'from', 'full', 'glob',
'group', 'having', 'if', 'ignore', 'immediate', 'in', 'index',
- 'initially', 'inner', 'insert', 'instead', 'intersect', 'into', 'is',
+ 'indexed', 'initially', 'inner', 'insert', 'instead', 'intersect', 'into', 'is',
'isnull', 'join', 'key', 'left', 'like', 'limit', 'match', 'natural',
'not', 'notnull', 'null', 'of', 'offset', 'on', 'or', 'order', 'outer',
'plan', 'pragma', 'primary', 'query', 'raise', 'references',
sql_operators.mod: lambda x, y: "MOD(%s, %s)" % (x, y),
})
+ extract_map = compiler.SQLCompiler.extract_map.copy()
+ extract_map.update ({
+ 'doy': 'dayofyear',
+ 'dow': 'weekday',
+ 'milliseconds': 'millisecond'
+ })
+
def bindparam_string(self, name):
res = super(SybaseSQLCompiler, self).bindparam_string(name)
if name.lower().startswith('literal'):
res = "CAST(%s AS %s)" % (res, self.process(cast.typeclause))
return res
+ def visit_extract(self, extract):
+ field = self.extract_map.get(extract.field, extract.field)
+ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
+
def for_update_clause(self, select):
# "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use
return ''
_undefer_column_name(key, value)
cls.__table__.append_column(value)
cls.__mapper__.add_property(key, value)
+ elif isinstance(value, ColumnProperty):
+ for col in value.columns:
+ if isinstance(col, Column) and col.table is None:
+ _undefer_column_name(key, col)
+ cls.__table__.append_column(col)
+ cls.__mapper__.add_property(key, value)
elif isinstance(value, MapperProperty):
cls.__mapper__.add_property(key, _deferred_relation(cls, value))
else:
alternative, see the method `populate_existing()` on
:class:`~sqlalchemy.orm.query.Query`.
- allow_column_override
- If True, allows the usage of a ``relation()`` which has the
- same name as a column in the mapped table. The table column
- will no longer be mapped.
-
allow_null_pks
Indicates that composite primary keys where one or more (but not all)
columns contain NULL is a valid primary key. Primary keys which
strategies._register_attribute(self,
mapper,
useobject=True,
- impl_class=DynamicAttributeImpl,
- target_mapper=self.parent_property.mapper,
- order_by=self.parent_property.order_by,
+ impl_class=DynamicAttributeImpl,
+ target_mapper=self.parent_property.mapper,
+ order_by=self.parent_property.order_by,
query_class=self.parent_property.query_class
)
uses_objects = True
accepts_scalar_loader = False
- def __init__(self, class_, key, typecallable,
+ def __init__(self, class_, key, typecallable,
target_mapper, order_by, query_class=None, **kwargs):
super(DynamicAttributeImpl, self).__init__(class_, key, typecallable, **kwargs)
self.target_mapper = target_mapper
self.order_by = order_by
if not query_class:
self.query_class = AppenderQuery
+ elif AppenderMixin in query_class.mro():
+ self.query_class = query_class
else:
self.query_class = mixin_user_query(query_class)
ext.remove(state, value, initiator or self)
def _modified_event(self, state):
-
+
if self.key not in state.committed_state:
state.committed_state[self.key] = CollectionHistory(self, state)
collection_history = self._modified_event(state)
new_values = list(iterable)
-
+
if _state_has_identity(state):
old_collection = list(self.get(state))
else:
c = state.committed_state[self.key]
else:
c = CollectionHistory(self, state)
-
+
if not passive:
return CollectionHistory(self, state, apply_to=c)
else:
return c
-
+
def append(self, state, value, initiator, passive=False):
if initiator is not self:
self.fire_append_event(state, value, initiator)
-
+
def remove(self, state, value, initiator, passive=False):
if initiator is not self:
self.fire_remove_event(state, value, initiator)
class DynCollectionAdapter(object):
"""the dynamic analogue to orm.collections.CollectionAdapter"""
-
+
def __init__(self, attr, owner_state, data):
self.attr = attr
self.state = owner_state
self.data = data
-
+
def __iter__(self):
return iter(self.data)
-
+
def append_with_event(self, item, initiator=None):
self.attr.append(self.state, item, initiator)
def append_without_event(self, item):
pass
-
+
def remove_without_event(self, item):
pass
-
+
class AppenderMixin(object):
query_class = None
Query.__init__(self, attr.target_mapper, None)
self.instance = state.obj()
self.attr = attr
-
+
def __session(self):
sess = object_session(self.instance)
if sess is not None and self.autoflush and sess.autoflush and self.instance in sess:
return None
else:
return sess
-
+
def session(self):
return self.__session()
session = property(session, lambda s, x:None)
-
+
def __iter__(self):
sess = self.__session()
if sess is None:
passive=True).added_items.__getitem__(index)
else:
return self._clone(sess).__getitem__(index)
-
+
def count(self):
sess = self.__session()
if sess is None:
name = 'Appender' + cls.__name__
return type(name, (AppenderMixin, cls), {'query_class': cls})
-class CollectionHistory(object):
+class CollectionHistory(object):
"""Overrides AttributeHistory to receive append/remove events directly."""
def __init__(self, attr, state, apply_to=None):
self.deleted_items = []
self.added_items = []
self.unchanged_items = []
-
+
if operator is operators.is_:
def evaluate(obj):
return eval_left(obj) == eval_right(obj)
- if operator is operators.isnot:
+ elif operator is operators.isnot:
def evaluate(obj):
return eval_left(obj) != eval_right(obj)
elif operator in _straight_ops:
else:
return None
+ def __getstate__(self):
+ d = self.__dict__.copy()
+ d['key'] = ret = []
+ for token in util.to_list(self.key):
+ if isinstance(token, PropComparator):
+ ret.append((token.mapper.class_, token.key))
+ else:
+ ret.append(token)
+ return d
+
+ def __setstate__(self, state):
+ ret = []
+ for key in state['key']:
+ if isinstance(key, tuple):
+ cls, propkey = key
+ ret.append(getattr(cls, propkey))
+ else:
+ ret.append(key)
+ state['key'] = tuple(ret)
+ self.__dict__ = state
+
def __get_paths(self, query, raiseerr):
path = None
entity = None
dest_list = []
for current in instances:
_recursive[(current, self)] = True
- obj = session.merge(current, dont_load=dont_load, _recursive=_recursive)
+ obj = session._merge(current, dont_load=dont_load, _recursive=_recursive)
if obj is not None:
dest_list.append(obj)
if dont_load:
current = instances[0]
if current is not None:
_recursive[(current, self)] = True
- obj = session.merge(current, dont_load=dont_load, _recursive=_recursive)
+ obj = session._merge(current, dont_load=dont_load, _recursive=_recursive)
if obj is not None:
if dont_load:
dest_state.dict[self.key] = obj
def _refers_to_parent_table(self):
- return self.parent.mapped_table is self.target
+ for c, f in self.synchronize_pairs:
+ if c.table is f.table:
+ return True
+ else:
+ return False
def _is_self_referential(self):
return self.mapper.common_parent(self.parent)
if synchronize_session not in [False, 'evaluate', 'fetch']:
raise sa_exc.ArgumentError("Valid strategies for session synchronization are False, 'evaluate' and 'fetch'")
+ self = self.enable_eagerloads(False)
+
context = self._compile_context()
if len(context.statement.froms) != 1 or not isinstance(context.statement.froms[0], schema.Table):
raise sa_exc.ArgumentError("Only deletion via a single table query is currently supported")
if synchronize_session not in [False, 'evaluate', 'fetch']:
raise sa_exc.ArgumentError("Valid strategies for session synchronization are False, 'evaluate' and 'fetch'")
+ self = self.enable_eagerloads(False)
+
context = self._compile_context()
if len(context.statement.froms) != 1 or not isinstance(context.statement.froms[0], schema.Table):
raise sa_exc.ArgumentError("Only update via a single table query is currently supported")
before each transaction is committed.
weak_identity_map
- When set to the default value of ``False``, a weak-referencing map is
+ When set to the default value of ``True``, a weak-referencing map is
used; instances which are not externally referenced will be garbage
collected immediately. For dereferenced instances which have pending
changes present, the attribute management system will create a temporary
strong-reference to the object which lasts until the changes are flushed
to the database, at which point it's again dereferenced. Alternatively,
- when using the value ``True``, the identity map uses a regular Python
+ when using the value ``False``, the identity map uses a regular Python
dictionary to store instances. The session will maintain all instances
present until they are removed using expunge(), clear(), or purge().
for state, m, o in cascade_states:
self._delete_impl(state)
- def merge(self, instance, dont_load=False,
- _recursive=None):
+ def merge(self, instance, dont_load=False):
"""Copy the state an instance onto the persistent instance with the same identifier.
If there is no persistent instance currently associated with the
mapped with ``cascade="merge"``.
"""
- if _recursive is None:
- # TODO: this should be an IdentityDict for instances, but will
- # need a separate dict for PropertyLoader tuples
- _recursive = {}
- # Autoflush only on the topmost call
- self._autoflush()
-
+ # TODO: this should be an IdentityDict for instances, but will
+ # need a separate dict for PropertyLoader tuples
+ _recursive = {}
+ self._autoflush()
+ autoflush = self.autoflush
+ try:
+ self.autoflush = False
+ return self._merge(instance, dont_load=dont_load, _recursive=_recursive)
+ finally:
+ self.autoflush = autoflush
+
+ def _merge(self, instance, dont_load=False, _recursive=None):
mapper = _object_mapper(instance)
if instance in _recursive:
return _recursive[instance]
new_instance = False
state = attributes.instance_state(instance)
key = state.key
+
if key is None:
if dont_load:
raise sa_exc.InvalidRequestError(
self._update_impl(merged_state)
new_instance = True
else:
- merged = self.query(mapper.class_).autoflush(False).get(key[1])
+ merged = self.query(mapper.class_).get(key[1])
if merged is None:
merged = mapper.class_manager.new_instance()
def execute_save_steps(self, trans, task):
self.save_objects(trans, task)
+ for dep in task.polymorphic_cyclical_dependencies:
+ self.execute_dependency(trans, dep, False)
+ for dep in task.polymorphic_cyclical_dependencies:
+ self.execute_dependency(trans, dep, True)
self.execute_cyclical_dependencies(trans, task, False)
self.execute_dependencies(trans, task)
self.execute_dependency(trans, dep, True)
def execute_cyclical_dependencies(self, trans, task, isdelete):
- for dep in task.polymorphic_cyclical_dependencies:
- self.execute_dependency(trans, dep, isdelete)
for t in task.dependent_tasks:
self.execute(trans, [t], isdelete)
raise exc.ArgumentError(
"Parent column '%s' does not descend from a "
"table-attached Column" % str(self.parent))
- m = re.match(r"^(.+?)(?:\.(.+?))?(?:\.(.+?))?$", self._colspec,
- re.UNICODE)
+
+ m = self._colspec.split('.')
+
if m is None:
raise exc.ArgumentError(
"Invalid foreign key column specification: %s" %
self._colspec)
- if m.group(3) is None:
- (tname, colname) = m.group(1, 2)
- schema = None
+
+ # A FK between column 'bar' and table 'foo' can be
+ # specified as 'foo', 'foo.bar', 'dbo.foo.bar',
+ # 'otherdb.dbo.foo.bar'. Once we have the column name and
+ # the table name, treat everything else as the schema
+ # name. Some databases (e.g. Sybase) support
+ # inter-database foreign keys. See tickets#1341 and --
+ # indirectly related -- Ticket #594. This assumes that '.'
+ # will never appear *within* any component of the FK.
+
+ (schema, tname, colname) = (None, None, None)
+ if (len(m) == 1):
+ tname = m.pop()
else:
- (schema, tname, colname) = m.group(1, 2, 3)
+ colname = m.pop()
+ tname = m.pop()
+
+ if (len(m) > 0):
+ schema = '.'.join(m)
+
if _get_table_key(tname, schema) not in parenttable.metadata:
raise exc.NoReferencedTableError(
"Could not find table '%s' with which to generate a "
functions.user: 'USER'
}
+EXTRACT_MAP = {
+ 'month': 'month',
+ 'day': 'day',
+ 'year': 'year',
+ 'second': 'second',
+ 'hour': 'hour',
+ 'doy': 'doy',
+ 'minute': 'minute',
+ 'quarter': 'quarter',
+ 'dow': 'dow',
+ 'week': 'week',
+ 'epoch': 'epoch',
+ 'milliseconds': 'milliseconds',
+ 'microseconds': 'microseconds',
+ 'timezone_hour': 'timezone_hour',
+ 'timezone_minute': 'timezone_minute'
+}
class _CompileLabel(visitors.Visitable):
"""lightweight label object which acts as an expression._Label."""
operators = OPERATORS
functions = FUNCTIONS
+ extract_map = EXTRACT_MAP
# class-level defaults which can be set at the instance
# level to define if this Compiled instance represents
return name
else:
if column.table.schema:
- schema_prefix = self.preparer.quote(column.table.schema, column.table.quote_schema) + '.'
+ schema_prefix = self.preparer.quote_schema(column.table.schema, column.table.quote_schema) + '.'
else:
schema_prefix = ''
tablename = column.table.name
def visit_cast(self, cast, **kwargs):
return "CAST(%s AS %s)" % (self.process(cast.clause), self.process(cast.typeclause))
+ def visit_extract(self, extract, **kwargs):
+ field = self.extract_map.get(extract.field, extract.field)
+ return "EXTRACT(%s FROM %s)" % (field, self.process(extract.expr))
+
def visit_function(self, func, result_map=None, **kwargs):
if result_map is not None:
result_map[func.name.lower()] = (func.name, None, func.type)
def visit_table(self, table, asfrom=False, **kwargs):
if asfrom:
if getattr(table, "schema", None):
- return self.preparer.quote(table.schema, table.quote_schema) + "." + self.preparer.quote(table.name, table.quote)
+ return self.preparer.quote_schema(table.schema, table.quote_schema) + "." + self.preparer.quote(table.name, table.quote)
else:
return self.preparer.quote(table.name, table.quote)
else:
or self.illegal_initial_characters.match(value[0])
or not self.legal_characters.match(unicode(value))
or (lc_value != value))
-
+
+ def quote_schema(self, schema, force):
+ """Quote a schema.
+
+ Subclasses should override this to provide database-dependent
+ quoting behavior.
+ """
+ return self.quote(schema, force)
+
def quote(self, ident, force):
if force is None:
if ident in self._strings:
def format_sequence(self, sequence, use_schema=True):
name = self.quote(sequence.name, sequence.quote)
if not self.omit_schema and use_schema and sequence.schema is not None:
- name = self.quote(sequence.schema, sequence.quote) + "." + name
+ name = self.quote_schema(sequence.schema, sequence.quote) + "." + name
return name
def format_label(self, label, name=None):
name = table.name
result = self.quote(name, table.quote)
if not self.omit_schema and use_schema and getattr(table, "schema", None):
- result = self.quote(table.schema, table.quote_schema) + "." + result
+ result = self.quote_schema(table.schema, table.quote_schema) + "." + result
return result
def format_column(self, column, use_table=False, name=None, table_name=None):
# a longer sequence.
if not self.omit_schema and use_schema and getattr(table, 'schema', None):
- return (self.quote(table.schema, table.quote_schema),
+ return (self.quote_schema(table.schema, table.quote_schema),
self.format_table(table, use_schema=False))
else:
return (self.format_table(table, use_schema=False), )
def extract(field, expr):
"""Return the clause ``extract(field FROM expr)``."""
- expr = _BinaryExpression(text(field), expr, operators.from_)
- return func.extract(expr)
+ return _Extract(field, expr)
def collate(expression, collation):
"""Return the clause ``expression COLLATE collation``."""
whenlist = [(_literal_as_binds(c).self_group(), _literal_as_binds(r)) for (c, r) in whens]
else:
whenlist = [(_no_literals(c).self_group(), _literal_as_binds(r)) for (c, r) in whens]
-
+
if whenlist:
type_ = list(whenlist[-1])[-1].type
else:
type_ = None
-
- self.value = value
+
+ if value is None:
+ self.value = None
+ else:
+ self.value = _literal_as_binds(value)
+
self.type = type_
self.whens = whenlist
if else_ is not None:
@property
def _from_objects(self):
- return itertools.chain(*[x._from_objects for x in self.get_children()])
+ return list(itertools.chain(*[x._from_objects for x in self.get_children()]))
class Function(ColumnElement, FromClause):
"""Describe a SQL function."""
return self.clause._from_objects
+class _Extract(ColumnElement):
+
+ __visit_name__ = 'extract'
+
+ def __init__(self, field, expr, **kwargs):
+ self.type = sqltypes.Integer()
+ self.field = field
+ self.expr = _literal_as_binds(expr, None)
+
+ def _copy_internals(self, clone=_clone):
+ self.field = clone(self.field)
+ self.expr = clone(self.expr)
+
+ def get_children(self, **kwargs):
+ return self.field, self.expr
+
+ @property
+ def _from_objects(self):
+ return self.expr._from_objects
+
+
class _UnaryExpression(ColumnElement):
__visit_name__ = 'unary'
return i, f
else:
return None, None
+
def find_tables(clause, check_columns=False, include_aliases=False, include_joins=False, include_selects=False):
"""locate Table objects within the given expression."""
self._list = []
dict.clear(self)
+ def copy(self):
+ return self.__copy__()
+
+ def __copy__(self):
+ return OrderedDict(self)
+
def sort(self, *arg, **kw):
self._list.sort(*arg, **kw)
import testenv; testenv.configure_for_tests()
-import threading, unittest
+import copy, threading, unittest
from sqlalchemy import util, sql, exc
from testlib import TestBase
from testlib.compat import gc_collect
o = util.OrderedDict([('name', 'jbe'), ('fullname', 'jonathan'), ('password', '')])
eq_(o.keys(), ['name', 'fullname', 'password'])
+ def test_odict_copy(self):
+ o = util.OrderedDict()
+ o["zzz"] = 1
+ o["aaa"] = 2
+ eq_(o.keys(), ['zzz', 'aaa'])
+
+ o2 = o.copy()
+ eq_(o2.keys(), o.keys())
+
+ o3 = copy.copy(o)
+ eq_(o3.keys(), o.keys())
+
class OrderedSetTest(TestBase):
def test_mutators_against_iter(self):
# testing a set modified against an iterator
import testenv; testenv.configure_for_tests()
from sqlalchemy import *
+from sqlalchemy import sql
from sqlalchemy.databases import access
from testlib import *
-class BasicTest(TestBase, AssertsExecutionResults):
- # A simple import of the database/ module should work on all systems.
- def test_import(self):
- # we got this far, right?
- return True
+class CompileTest(TestBase, AssertsCompiledSQL):
+ __dialect__ = access.dialect()
+
+ def test_extract(self):
+ t = sql.table('t', sql.column('col1'))
+
+ mapping = {
+ 'month': 'm',
+ 'day': 'd',
+ 'year': 'yyyy',
+ 'second': 's',
+ 'hour': 'h',
+ 'doy': 'y',
+ 'minute': 'n',
+ 'quarter': 'q',
+ 'dow': 'w',
+ 'week': 'ww'
+ }
+
+ for field, subst in mapping.items():
+ self.assert_compile(
+ select([extract(field, t.c.col1)]),
+ 'SELECT DATEPART("%s", t.col1) AS anon_1 FROM t' % subst)
if __name__ == "__main__":
s = select([tbl.c.id]).where(tbl.c.id==1)
self.assert_compile(tbl.delete().where(tbl.c.id==(s)), "DELETE FROM paj.test WHERE paj.test.id IN (SELECT test_1.id FROM paj.test AS test_1 WHERE test_1.id = :id_1)")
+ def test_delete_schema_multipart(self):
+ metadata = MetaData()
+ tbl = Table('test', metadata, Column('id', Integer, primary_key=True), schema='banana.paj')
+ self.assert_compile(tbl.delete(tbl.c.id == 1), "DELETE FROM banana.paj.test WHERE banana.paj.test.id = :id_1")
+
+ s = select([tbl.c.id]).where(tbl.c.id==1)
+ self.assert_compile(tbl.delete().where(tbl.c.id==(s)), "DELETE FROM banana.paj.test WHERE banana.paj.test.id IN (SELECT test_1.id FROM banana.paj.test AS test_1 WHERE test_1.id = :id_1)")
+
+ def test_delete_schema_multipart_needs_quoting(self):
+ metadata = MetaData()
+ tbl = Table('test', metadata, Column('id', Integer, primary_key=True), schema='banana split.paj')
+ self.assert_compile(tbl.delete(tbl.c.id == 1), "DELETE FROM [banana split].paj.test WHERE [banana split].paj.test.id = :id_1")
+
+ s = select([tbl.c.id]).where(tbl.c.id==1)
+ self.assert_compile(tbl.delete().where(tbl.c.id==(s)), "DELETE FROM [banana split].paj.test WHERE [banana split].paj.test.id IN (SELECT test_1.id FROM [banana split].paj.test AS test_1 WHERE test_1.id = :id_1)")
+
+ def test_delete_schema_multipart_both_need_quoting(self):
+ metadata = MetaData()
+ tbl = Table('test', metadata, Column('id', Integer, primary_key=True), schema='banana split.paj with a space')
+ self.assert_compile(tbl.delete(tbl.c.id == 1), "DELETE FROM [banana split].[paj with a space].test WHERE [banana split].[paj with a space].test.id = :id_1")
+
+ s = select([tbl.c.id]).where(tbl.c.id==1)
+ self.assert_compile(tbl.delete().where(tbl.c.id==(s)), "DELETE FROM [banana split].[paj with a space].test WHERE [banana split].[paj with a space].test.id IN (SELECT test_1.id FROM [banana split].[paj with a space].test AS test_1 WHERE test_1.id = :id_1)")
+
def test_union(self):
t1 = table('t1',
column('col1'),
self.assert_compile(func.current_date(), "GETDATE()")
self.assert_compile(func.length(3), "LEN(:length_1)")
+ def test_extract(self):
+ t = table('t', column('col1'))
+
+ for field in 'day', 'month', 'year':
+ self.assert_compile(
+ select([extract(field, t.c.col1)]),
+ 'SELECT DATEPART("%s", t.col1) AS anon_1 FROM t' % field)
+
class IdentityInsertTest(TestBase, AssertsCompiledSQL):
__only_on__ = 'mssql'
for type_, expected in specs:
self.assert_compile(cast(t.c.col, type_), expected)
+ def test_extract(self):
+ t = sql.table('t', sql.column('col1'))
+
+ for field in 'year', 'month', 'day':
+ self.assert_compile(
+ select([extract(field, t.c.col1)]),
+ "SELECT EXTRACT(%s FROM t.col1) AS anon_1 FROM t" % field)
+
+ # millsecondS to millisecond
+ self.assert_compile(
+ select([extract('milliseconds', t.c.col1)]),
+ "SELECT EXTRACT(millisecond FROM t.col1) AS anon_1 FROM t")
+
class RawReflectionTest(TestBase):
def setUp(self):
assert dialect.identifier_preparer.format_sequence(seq) == '"Some_Schema"."My_Seq"'
class CompileTest(TestBase, AssertsCompiledSQL):
+ __dialect__ = postgres.dialect()
+
def test_update_returning(self):
dialect = postgres.dialect()
table1 = table('mytable',
self.assert_compile(schema.CreateIndex(idx),
"CREATE INDEX test_idx1 ON testtbl (data) WHERE testtbl.data > 5 AND testtbl.data < 10", dialect=postgres.dialect())
+ def test_extract(self):
+ t = table('t', column('col1'))
+
+ for field in 'year', 'month', 'day':
+ self.assert_compile(
+ select([extract(field, t.c.col1)]),
+ "SELECT EXTRACT(%s FROM t.col1::timestamp) AS anon_1 "
+ "FROM t" % field)
class ReturningTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgres'
import testenv; testenv.configure_for_tests()
import datetime
from sqlalchemy import *
-from sqlalchemy import exc
+from sqlalchemy import exc, sql
from sqlalchemy.dialects.sqlite import base as sqlite, pysqlite as pysqlite_dialect
from testlib import *
isolation_level="FOO")
+class SQLTest(TestBase, AssertsCompiledSQL):
+ """Tests SQLite-dialect specific compilation."""
+
+ __dialect__ = sqlite.dialect()
+
+
+ def test_extract(self):
+ t = sql.table('t', sql.column('col1'))
+
+ mapping = {
+ 'month': '%m',
+ 'day': '%d',
+ 'year': '%Y',
+ 'second': '%S',
+ 'hour': '%H',
+ 'doy': '%j',
+ 'minute': '%M',
+ 'epoch': '%s',
+ 'dow': '%w',
+ 'week': '%W',
+ }
+
+ for field, subst in mapping.items():
+ self.assert_compile(
+ select([extract(field, t.c.col1)]),
+ "SELECT CAST(STRFTIME('%s', t.col1) AS INTEGER) AS anon_1 "
+ "FROM t" % subst)
+
+
class InsertTest(TestBase, AssertsExecutionResults):
"""Tests inserts and autoincrement."""
import testenv; testenv.configure_for_tests()
from sqlalchemy import *
+from sqlalchemy import sql
from sqlalchemy.databases import sybase
from testlib import *
-class BasicTest(TestBase, AssertsExecutionResults):
- # A simple import of the database/ module should work on all systems.
- def test_import(self):
- # we got this far, right?
- return True
+class CompileTest(TestBase, AssertsCompiledSQL):
+ __dialect__ = sybase.dialect()
+
+ def test_extract(self):
+ t = sql.table('t', sql.column('col1'))
+
+ mapping = {
+ 'day': 'day',
+ 'doy': 'dayofyear',
+ 'dow': 'weekday',
+ 'milliseconds': 'millisecond',
+ 'millisecond': 'millisecond',
+ 'year': 'year',
+ }
+
+ for field, subst in mapping.items():
+ self.assert_compile(
+ select([extract(field, t.c.col1)]),
+ 'SELECT DATEPART("%s", t.col1) AS anon_1 FROM t' % subst)
+
if __name__ == "__main__":
from sqlalchemy import exc
from testlib import sa, testing
from testlib.sa import MetaData, Table, Column, Integer, String, ForeignKey, ForeignKeyConstraint, asc, Index
-from testlib.sa.orm import relation, create_session, class_mapper, eagerload, compile_mappers, backref, clear_mappers, polymorphic_union
+from testlib.sa.orm import relation, create_session, class_mapper, eagerload, compile_mappers, backref, clear_mappers, polymorphic_union, deferred
from testlib.testing import eq_
from orm._base import ComparableEntity, MappedTest
Engineer(name="vlad", primary_language="cobol")
)
+ def test_add_deferred(self):
+ class Person(Base, ComparableEntity):
+ __tablename__ = 'people'
+ id = Column('id', Integer, primary_key=True)
+
+ Person.name = deferred(Column(String(10)))
+
+ Base.metadata.create_all()
+ sess = create_session()
+ p = Person(name='ratbert')
+
+ sess.add(p)
+ sess.flush()
+ sess.expunge_all()
+ eq_(
+ sess.query(Person).all(),
+ [
+ Person(name='ratbert')
+ ]
+ )
+ person = sess.query(Person).filter(Person.name == 'ratbert').one()
+ assert 'name' not in person.__dict__
+
def test_single_fksonsub(self):
"""test single inheritance with a foreign key-holding column on a subclass.
'orm.assorted_eager',
'orm.naturalpks',
+ 'orm.defaults',
'orm.unitofwork',
'orm.session',
'orm.transaction',
'orm.onetoone',
'orm.dynamic',
+ 'orm.evaluator',
+
'orm.deprecations',
)
alltests = unittest.TestSuite()
sess.delete(a)
sess.flush()
-
+ @testing.resolve_artifact_names
+ def test_setnull_ondelete(self):
+ mapper(C1, t1, properties={
+ 'children':relation(C1)
+ })
+
+ sess = create_session()
+ c1 = C1()
+ c2 = C1()
+ c1.children.append(c2)
+ sess.add(c1)
+ sess.flush()
+ assert c2.parent_c1 == c1.c1
+
+ sess.delete(c1)
+ sess.flush()
+ assert c2.parent_c1 is None
+
+ sess.expire_all()
+ assert c2.parent_c1 is None
+
class SelfReferentialNoPKTest(_base.MappedTest):
"""A self-referential relationship that joins on a column other than the primary key column"""
from testlib import testing
from testlib.sa import Table, Column, Integer, String, ForeignKey, desc, select, func
from testlib.sa.orm import mapper, relation, create_session, Query, attributes
+from sqlalchemy.orm.dynamic import AppenderMixin
from testlib.testing import eq_
from testlib.compat import _function_named
from orm import _base, _fixtures
assert not hasattr(q, 'append')
assert type(q).__name__ == 'MyQuery'
+ @testing.resolve_artifact_names
+ def test_custom_query_with_custom_mixin(self):
+ class MyAppenderMixin(AppenderMixin):
+ def add(self, items):
+ if isinstance(items, list):
+ for item in items:
+ self.append(item)
+ else:
+ self.append(items)
+
+ class MyQuery(Query):
+ pass
+
+ class MyAppenderQuery(MyAppenderMixin, MyQuery):
+ query_class = MyQuery
+
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses),
+ query_class=MyAppenderQuery)
+ })
+ sess = create_session()
+ u = User()
+ sess.add(u)
+
+ col = u.addresses
+ assert isinstance(col, Query)
+ assert isinstance(col, MyQuery)
+ assert hasattr(col, 'append')
+ assert hasattr(col, 'add')
+ assert type(col).__name__ == 'MyAppenderQuery'
+
+ q = col.limit(1)
+ assert isinstance(q, Query)
+ assert isinstance(q, MyQuery)
+ assert not hasattr(q, 'append')
+ assert not hasattr(q, 'add')
+ assert type(q).__name__ == 'MyQuery'
+
class SessionTest(_fixtures.FixtureTest):
run_inserts = None
a1 = Address(email_address='foo')
sess.add_all([u1, a1])
sess.flush()
-
+
assert testing.db.scalar(select([func.count(1)]).where(addresses.c.user_id!=None)) == 0
u1 = sess.query(User).get(u1.id)
u1.addresses.append(a1)
assert testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall() == [
(a1.id, u1.id, 'foo')
]
-
+
u1.addresses.remove(a1)
sess.flush()
assert testing.db.scalar(select([func.count(1)]).where(addresses.c.user_id!=None)) == 0
-
+
u1.addresses.append(a1)
sess.flush()
assert testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall() == [
assert testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall() == [
(a2.id, u1.id, 'bar')
]
-
+
@testing.resolve_artifact_names
def test_merge(self):
a1 = Address(email_address='a1')
a2 = Address(email_address='a2')
a3 = Address(email_address='a3')
-
+
u1.addresses.append(a2)
u1.addresses.append(a3)
-
+
sess.add_all([u1, a1])
sess.flush()
-
+
u1 = User(id=u1.id, name='jack')
u1.addresses.append(a1)
u1.addresses.append(a3)
u1 = sess.merge(u1)
assert attributes.get_history(u1, 'addresses') == (
- [a1],
- [a3],
+ [a1],
+ [a3],
[a2]
)
sess.flush()
-
+
eq_(
list(u1.addresses),
[a1, a3]
)
-
+
@testing.resolve_artifact_names
def test_flush(self):
mapper(User, users, properties={
u1.addresses.append(Address(email_address='lala@hoho.com'))
sess.add_all((u1, u2))
sess.flush()
-
+
from sqlalchemy.orm import attributes
self.assertEquals(attributes.get_history(attributes.instance_state(u1), 'addresses'), ([], [Address(email_address='lala@hoho.com')], []))
-
+
sess.expunge_all()
# test the test fixture a little bit
User(name='jack', addresses=[Address(email_address='lala@hoho.com')]),
User(name='ed', addresses=[Address(email_address='foo@bar.com')])
] == sess.query(User).all()
-
+
@testing.resolve_artifact_names
def test_hasattr(self):
mapper(User, users, properties={
'addresses':dynamic_loader(mapper(Address, addresses))
})
u1 = User(name='jack')
-
+
assert 'addresses' not in u1.__dict__.keys()
u1.addresses = [Address(email_address='test')]
assert 'addresses' in dir(u1)
-
+
@testing.resolve_artifact_names
def test_collection_set(self):
mapper(User, users, properties={
a2 = Address(email_address='a2')
a3 = Address(email_address='a3')
a4 = Address(email_address='a4')
-
+
sess.add(u1)
u1.addresses = [a1, a3]
assert list(u1.addresses) == [a1, a3]
assert list(u1.addresses) == [a2, a3]
u1.addresses = []
assert list(u1.addresses) == []
-
-
-
+
+
+
@testing.resolve_artifact_names
def test_rollback(self):
mapper(User, users, properties={
n1.children[1].append(Node(data='n123'))
sess.add(n1)
sess.flush()
+ sess.expunge_all()
+ def go():
+ d = sess.query(Node).filter_by(data='n1').all()[0]
+ assert Node(data='n1', children=[
+ Node(data='n11'),
+ Node(data='n12', children=[
+ Node(data='n121'),
+ Node(data='n122'),
+ Node(data='n123')
+ ]),
+ Node(data='n13')
+ ]) == d
+ self.assert_sql_count(testing.db, go, 1)
+
sess.expunge_all()
def go():
d = sess.query(Node).filter_by(data='n1').first()
from sqlalchemy.orm import *
from testlib import *
from testlib import fixtures
+from orm import _base
+from testlib.testing import eq_
class AttrSettable(object):
def __init__(self, **kwargs):
for p in r:
assert p.car_id == p.car.car_id
+class RelationTest8(ORMTest):
+ def define_tables(self, metadata):
+ global taggable, users
+ taggable = Table('taggable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('type', String(30)),
+ Column('owner_id', Integer, ForeignKey('taggable.id')),
+ )
+ users = Table ('users', metadata,
+ Column('id', Integer, ForeignKey('taggable.id'), primary_key=True),
+ Column('data', String(50)),
+ )
+
+ def test_selfref_onjoined(self):
+ class Taggable(_base.ComparableEntity):
+ pass
+
+ class User(Taggable):
+ pass
+
+ mapper( Taggable, taggable, polymorphic_on=taggable.c.type, polymorphic_identity='taggable', properties = {
+ 'owner' : relation (User,
+ primaryjoin=taggable.c.owner_id ==taggable.c.id,
+ remote_side=taggable.c.id
+ ),
+ })
+
+
+ mapper(User, users, inherits=Taggable, polymorphic_identity='user',
+ inherit_condition=users.c.id == taggable.c.id,
+ )
+
+
+ u1 = User(data='u1')
+ t1 = Taggable(owner=u1)
+ sess = create_session()
+ sess.add(t1)
+ sess.flush()
+ sess.expunge_all()
+ eq_(
+ sess.query(Taggable).order_by(Taggable.id).all(),
+ [User(data='u1'), Taggable(owner=User(data='u1'))]
+ )
class GenerativeTest(TestBase, AssertsExecutionResults):
def setUpAll(self):
sess.flush()
assert merged_user not in sess.new
+ @testing.resolve_artifact_names
+ def test_cascades_dont_autoflush_2(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address,
+ backref='user',
+ cascade="all, delete-orphan")
+ })
+ mapper(Address, addresses)
+
+ u = User(id=7, name='fred', addresses=[
+ Address(id=1, email_address='fred1'),
+ ])
+ sess = create_session(autoflush=True, autocommit=False)
+ sess.add(u)
+ sess.commit()
+
+ sess.expunge_all()
+
+ u = User(id=7, name='fred', addresses=[
+ Address(id=1, email_address='fred1'),
+ Address(id=2, email_address='fred2'),
+ ])
+ sess.merge(u)
+ assert sess.autoflush
+ sess.commit()
+
+
+
+
if __name__ == "__main__":
testenv.main()
self.assertEquals(ad.email_address, 'ed@bar.com')
self.assertEquals(u2, User(name='ed', addresses=[Address(email_address='ed@bar.com')]))
+ @testing.resolve_artifact_names
+ def test_options_with_descriptors(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref="user")
+ })
+ mapper(Address, addresses)
+ sess = create_session()
+ u1 = User(name='ed')
+ u1.addresses.append(Address(email_address='ed@bar.com'))
+ sess.add(u1)
+ sess.flush()
+ sess.expunge_all()
+ for opt in [
+ sa.orm.eagerload(User.addresses),
+ sa.orm.eagerload("addresses"),
+ sa.orm.defer("name"),
+ sa.orm.defer(User.name),
+ sa.orm.defer([User.name]),
+ sa.orm.eagerload("addresses", User.addresses),
+ sa.orm.eagerload(["addresses", User.addresses]),
+ ]:
+ opt2 = pickle.loads(pickle.dumps(opt))
+ self.assertEquals(opt.key, opt2.key)
+
+ u1 = sess.query(User).options(opt).first()
+
+ u2 = pickle.loads(pickle.dumps(u1))
+
+
class PolymorphicDeferredTest(_base.MappedTest):
def define_tables(self, metadata):
Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(32)),
Column('age', Integer))
+
+ Table('documents', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('user_id', None, ForeignKey('users.id')),
+ Column('title', String(32)))
def setup_classes(self):
class User(_base.ComparableEntity):
pass
+
+ class Document(_base.ComparableEntity):
+ pass
@testing.resolve_artifact_names
def insert_data(self):
dict(id=3, name='jill', age=29),
dict(id=4, name='jane', age=37),
])
+
+ @testing.resolve_artifact_names
+ def insert_documents(self):
+ documents.insert().execute([
+ dict(id=1, user_id=1, title='foo'),
+ dict(id=2, user_id=1, title='bar'),
+ dict(id=3, user_id=2, title='baz'),
+ ])
@testing.resolve_artifact_names
def setup_mappers(self):
mapper(User, users)
+ mapper(Document, documents, properties={
+ 'user': relation(User, lazy=False, backref=backref('documents', lazy=True))
+ })
@testing.resolve_artifact_names
def test_delete(self):
rowcount = sess.query(User).filter(User.age > 26).delete(synchronize_session=False)
self.assertEquals(rowcount, 3)
+ @testing.resolve_artifact_names
+ def test_update_with_eager_relations(self):
+ self.insert_documents()
+
+ sess = create_session(bind=testing.db, autocommit=False)
+
+ foo,bar,baz = sess.query(Document).order_by(Document.id).all()
+ sess.query(Document).filter(Document.user_id == 1).update({'title': Document.title+Document.title}, synchronize_session='fetch')
+
+ eq_([foo.title, bar.title, baz.title], ['foofoo','barbar', 'baz'])
+ eq_(sess.query(Document.title).order_by(Document.id).all(), zip(['foofoo','barbar', 'baz']))
+
+ @testing.resolve_artifact_names
+ def test_update_with_explicit_eagerload(self):
+ sess = create_session(bind=testing.db, autocommit=False)
+
+ john,jack,jill,jane = sess.query(User).order_by(User.id).all()
+ sess.query(User).options(eagerload(User.documents)).filter(User.age > 29).update({'age': User.age - 10}, synchronize_session='fetch')
+
+ eq_([john.age, jack.age, jill.age, jane.age], [25,37,29,27])
+ eq_(sess.query(User.age).order_by(User.id).all(), zip([25,37,29,27]))
+
+ @testing.resolve_artifact_names
+ def test_delete_with_eager_relations(self):
+ self.insert_documents()
+
+ sess = create_session(bind=testing.db, autocommit=False)
+
+ sess.query(Document).filter(Document.user_id == 1).delete(synchronize_session=False)
+
+ eq_(sess.query(Document.title).all(), zip(['baz']))
+
if __name__ == '__main__':
testenv.main()
assert x == y == z == w == q == r
+ def test_extract_bind(self):
+ """Basic common denominator execution tests for extract()"""
+
+ date = datetime.date(2010, 5, 1)
+
+ def execute(field):
+ return testing.db.execute(select([extract(field, date)])).scalar()
+
+ assert execute('year') == 2010
+ assert execute('month') == 5
+ assert execute('day') == 1
+
+ date = datetime.datetime(2010, 5, 1, 12, 11, 10)
+
+ assert execute('year') == 2010
+ assert execute('month') == 5
+ assert execute('day') == 1
+
+ def test_extract_expression(self):
+ meta = MetaData(testing.db)
+ table = Table('test', meta,
+ Column('dt', DateTime),
+ Column('d', Date))
+ meta.create_all()
+ try:
+ table.insert().execute(
+ {'dt': datetime.datetime(2010, 5, 1, 12, 11, 10),
+ 'd': datetime.date(2010, 5, 1) })
+ rs = select([extract('year', table.c.dt),
+ extract('month', table.c.d)]).execute()
+ row = rs.fetchone()
+ assert row[0] == 2010
+ assert row[1] == 5
+ rs.close()
+ finally:
+ meta.drop_all()
+
+
def exec_sorted(statement, *args, **kw):
"""Executes a statement and returns a sorted list plain tuple rows."""
"SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :param_1"
)
- def test_extract(self):
- """test the EXTRACT function"""
- self.assert_compile(select([extract("month", table3.c.otherstuff)]), "SELECT extract(month FROM thirdtable.otherstuff) AS extract_1 FROM thirdtable")
-
- self.assert_compile(select([extract("day", func.to_date("03/20/2005", "MM/DD/YYYY"))]), "SELECT extract(day FROM to_date(:to_date_1, :to_date_2)) AS extract_1")
-
def test_collate(self):
for expr in (select([table1.c.name.collate('latin1_german2_ci')]),
select([collate(table1.c.name, 'latin1_german2_ci')])):
ncasdec=Decimal("12.4"), fcasdec=Decimal("15.75"))
l = numeric_table.select().execute().fetchall()
- print l
rounded = [
(l[0][0], l[0][1], round(l[0][2], 5), l[0][3], l[0][4]),
(l[1][0], l[1][1], round(l[1][2], 5), l[1][3], l[1][4]),
bool_table.insert().execute(id=5, value=True)
res = bool_table.select(bool_table.c.value==True).execute().fetchall()
- print res
assert(res==[(1, True),(3, True),(4, True),(5, True)])
res2 = bool_table.select(bool_table.c.value==False).execute().fetchall()
- print res2
assert(res2==[(2, False)])
class PickleTest(TestBase):
return _chain_decorators_on(
fn,
# no access to same table
+ no_support('mysql', 'requires SUPER priv'),
exclude('mysql', '<', (5, 0, 10), 'not supported by database'),
no_support('postgres', 'not supported by database: no statements'),
)