.. changelog::
:version: 1.0.10
+ .. change::
+ :tags: change, tests
+ :versions: 1.1.0b1
+
+ The ORM and Core tutorials, which have always been in doctest format,
+ are now exercised within the normal unit test suite in both Python
+ 2 and Python 3.
+
.. change::
:tags: bug, sql
:tickets: 3603
.. sourcecode:: pycon+sql
>>> import sqlalchemy
- >>> sqlalchemy.__version__ # doctest:+SKIP
+ >>> sqlalchemy.__version__ # doctest: +SKIP
1.1.0
Connecting
.. sourcecode:: pycon+sql
- {sql}>>> metadata.create_all(engine) #doctest: +NORMALIZE_WHITESPACE
- PRAGMA table_info("users")
- ()
- PRAGMA table_info("addresses")
- ()
+ {sql}>>> metadata.create_all(engine)
+ SE...
CREATE TABLE users (
id INTEGER NOT NULL,
name VARCHAR,
parameters for them. We can peek at this data for now by looking at the
compiled form of the statement::
- >>> ins.compile().params #doctest: +NORMALIZE_WHITESPACE
+ >>> ins.compile().params # doctest: +SKIP
{'fullname': 'Jack Jones', 'name': 'jack'}
Executing
we use the ``connect()`` method::
>>> conn = engine.connect()
- >>> conn #doctest: +ELLIPSIS
+ >>> conn
<sqlalchemy.engine.base.Connection object at 0x...>
The :class:`~sqlalchemy.engine.Connection` object represents an actively
.. sourcecode:: pycon+sql
>>> ins = users.insert()
- >>> conn.execute(ins, id=2, name='wendy', fullname='Wendy Williams') # doctest: +ELLIPSIS
+ >>> conn.execute(ins, id=2, name='wendy', fullname='Wendy Williams')
{opensql}INSERT INTO users (id, name, fullname) VALUES (?, ?, ?)
(2, 'wendy', 'Wendy Williams')
COMMIT
.. sourcecode:: pycon+sql
- >>> conn.execute(addresses.insert(), [ # doctest: +ELLIPSIS
+ >>> conn.execute(addresses.insert(), [
... {'user_id': 1, 'email_address' : 'jack@yahoo.com'},
... {'user_id': 1, 'email_address' : 'jack@msn.com'},
... {'user_id': 2, 'email_address' : 'www@www.org'},
>>> from sqlalchemy.sql import select
>>> s = select([users])
- >>> result = conn.execute(s) # doctest: +NORMALIZE_WHITESPACE
+ >>> result = conn.execute(s)
{opensql}SELECT users.id, users.name, users.fullname
FROM users
()
.. sourcecode:: pycon+sql
>>> for row in result:
- ... print row
+ ... print(row)
(1, u'jack', u'Jack Jones')
(2, u'wendy', u'Wendy Williams')
.. sourcecode:: pycon+sql
- {sql}>>> result = conn.execute(s) # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> result = conn.execute(s)
SELECT users.id, users.name, users.fullname
FROM users
()
{stop}>>> row = result.fetchone()
- >>> print "name:", row['name'], "; fullname:", row['fullname']
+ >>> print("name:", row['name'], "; fullname:", row['fullname'])
name: jack ; fullname: Jack Jones
Integer indexes work as well:
.. sourcecode:: pycon+sql
>>> row = result.fetchone()
- >>> print "name:", row[1], "; fullname:", row[2]
+ >>> print("name:", row[1], "; fullname:", row[2])
name: wendy ; fullname: Wendy Williams
But another way, whose usefulness will become apparent later on, is to use the
.. sourcecode:: pycon+sql
- {sql}>>> for row in conn.execute(s): # doctest: +NORMALIZE_WHITESPACE
- ... print "name:", row[users.c.name], "; fullname:", row[users.c.fullname]
+ {sql}>>> for row in conn.execute(s):
+ ... print("name:", row[users.c.name], "; fullname:", row[users.c.fullname])
SELECT users.id, users.name, users.fullname
FROM users
()
.. sourcecode:: pycon+sql
>>> s = select([users.c.name, users.c.fullname])
- {sql}>>> result = conn.execute(s) # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> result = conn.execute(s)
SELECT users.name, users.fullname
FROM users
()
- {stop}>>> for row in result: #doctest: +NORMALIZE_WHITESPACE
- ... print row
+ {stop}>>> for row in result:
+ ... print(row)
(u'jack', u'Jack Jones')
(u'wendy', u'Wendy Williams')
.. sourcecode:: pycon+sql
{sql}>>> for row in conn.execute(select([users, addresses])):
- ... print row # doctest: +NORMALIZE_WHITESPACE
+ ... print(row)
SELECT users.id, users.name, users.fullname, addresses.id, addresses.user_id, addresses.email_address
FROM users, addresses
()
>>> s = select([users, addresses]).where(users.c.id == addresses.c.user_id)
{sql}>>> for row in conn.execute(s):
- ... print row # doctest: +NORMALIZE_WHITESPACE
+ ... print(row)
SELECT users.id, users.name, users.fullname, addresses.id,
addresses.user_id, addresses.email_address
FROM users, addresses
.. sourcecode:: pycon+sql
- >>> users.c.id == addresses.c.user_id #doctest: +ELLIPSIS
- <sqlalchemy.sql.expression.BinaryExpression object at 0x...>
+ >>> users.c.id == addresses.c.user_id
+ <sqlalchemy.sql.elements.BinaryExpression object at 0x...>
Wow, surprise ! This is neither a ``True`` nor a ``False``. Well what is it ?
.. sourcecode:: pycon+sql
- >>> print users.c.id == addresses.c.user_id
+ >>> print(users.c.id == addresses.c.user_id)
users.id = addresses.user_id
If we use a literal value (a literal meaning, not a SQLAlchemy clause object),
.. sourcecode:: pycon+sql
- >>> print users.c.id == 7
+ >>> print(users.c.id == 7)
users.id = :id_1
The ``7`` literal is embedded the resulting
.. sourcecode:: pycon+sql
- >>> print users.c.id != 7
+ >>> print(users.c.id != 7)
users.id != :id_1
>>> # None converts to IS NULL
- >>> print users.c.name == None
+ >>> print(users.c.name == None)
users.name IS NULL
>>> # reverse works too
- >>> print 'fred' > users.c.name
+ >>> print('fred' > users.c.name)
users.name < :name_1
If we add two integer columns together, we get an addition expression:
.. sourcecode:: pycon+sql
- >>> print users.c.id + addresses.c.id
+ >>> print(users.c.id + addresses.c.id)
users.id + addresses.id
Interestingly, the type of the :class:`~sqlalchemy.schema.Column` is important!
.. sourcecode:: pycon+sql
- >>> print users.c.name + users.c.fullname
+ >>> print(users.c.name + users.c.fullname)
users.name || users.fullname
Where ``||`` is the string concatenation operator used on most databases. But
.. sourcecode:: pycon+sql
- >>> print (users.c.name + users.c.fullname).\
- ... compile(bind=create_engine('mysql://'))
+ >>> print((users.c.name + users.c.fullname).
+ ... compile(bind=create_engine('mysql://'))) # doctest: +SKIP
concat(users.name, users.fullname)
The above illustrates the SQL that's generated for an
.. sourcecode:: pycon+sql
- >>> print users.c.name.op('tiddlywinks')('foo')
+ >>> print(users.c.name.op('tiddlywinks')('foo'))
users.name tiddlywinks :name_1
This function can also be used to make bitwise operators explicit. For example::
.. sourcecode:: pycon+sql
>>> from sqlalchemy.sql import and_, or_, not_
- >>> print and_(
+ >>> print(and_(
... users.c.name.like('j%'),
- ... users.c.id == addresses.c.user_id, #doctest: +NORMALIZE_WHITESPACE
+ ... users.c.id == addresses.c.user_id,
... or_(
... addresses.c.email_address == 'wendy@aol.com',
... addresses.c.email_address == 'jack@yahoo.com'
... ),
... not_(users.c.id > 5)
... )
+ ... )
users.name LIKE :name_1 AND users.id = addresses.user_id AND
(addresses.email_address = :email_address_1
OR addresses.email_address = :email_address_2)
.. sourcecode:: pycon+sql
- >>> print users.c.name.like('j%') & (users.c.id == addresses.c.user_id) & \
+ >>> print(users.c.name.like('j%') & (users.c.id == addresses.c.user_id) &
... (
... (addresses.c.email_address == 'wendy@aol.com') | \
... (addresses.c.email_address == 'jack@yahoo.com')
... ) \
- ... & ~(users.c.id>5) # doctest: +NORMALIZE_WHITESPACE
+ ... & ~(users.c.id>5)
+ ... )
users.name LIKE :name_1 AND users.id = addresses.user_id AND
(addresses.email_address = :email_address_1
OR addresses.email_address = :email_address_2)
... )
... )
... )
- >>> conn.execute(s).fetchall() #doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(s).fetchall()
SELECT users.fullname || ? || addresses.email_address AS title
FROM users, addresses
WHERE users.id = addresses.user_id AND users.name BETWEEN ? AND ? AND
... addresses.c.email_address.like('%@msn.com')
... )
... )
- >>> conn.execute(s).fetchall() #doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(s).fetchall()
SELECT users.fullname || ? || addresses.email_address AS title
FROM users, addresses
WHERE users.id = addresses.user_id AND users.name BETWEEN ? AND ? AND
... "AND users.name BETWEEN :x AND :y "
... "AND (addresses.email_address LIKE :e1 "
... "OR addresses.email_address LIKE :e2)")
- {sql}>>> conn.execute(s, x='m', y='z', e1='%@aol.com', e2='%@msn.com').fetchall() # doctest:+NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(s, x='m', y='z', e1='%@aol.com', e2='%@msn.com').fetchall()
SELECT users.fullname || ', ' || addresses.email_address AS title
FROM users, addresses
WHERE users.id = addresses.user_id AND users.name BETWEEN ? AND ? AND
... "OR addresses.email_address LIKE :e2)")
>>> s = s.columns(title=String)
>>> s = s.bindparams(x='m', y='z', e1='%@aol.com', e2='%@msn.com')
- >>> conn.execute(s).fetchall() # doctest:+NORMALIZE_WHITESPACE
+ >>> conn.execute(s).fetchall()
SELECT users.fullname || ', ' || addresses.email_address AS title
FROM users, addresses
WHERE users.id = addresses.user_id AND users.name BETWEEN ? AND ? AND
... "OR addresses.email_address LIKE :y)")
... )
... ).select_from(text('users, addresses'))
- {sql}>>> conn.execute(s, x='%@aol.com', y='%@msn.com').fetchall() #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(s, x='%@aol.com', y='%@msn.com').fetchall()
SELECT users.fullname || ', ' || addresses.email_address AS title
FROM users, addresses
WHERE users.id = addresses.user_id AND users.name BETWEEN 'm' AND 'z'
:ref:`orm_tutorial_literal_sql` - integrating ORM-level queries with
:func:`.text`
-.. versionchanged:: 1.0.0
+.. fchanged:: 1.0.0
The :func:`.select` construct emits warnings when string SQL
fragments are coerced to :func:`.text`, and :func:`.text` should
be used explicitly. See :ref:`migration_2992` for background.
>>> from sqlalchemy.sql import table, literal_column
>>> s = select([
... literal_column("users.fullname", String) +
- ... ' , ' +
+ ... ', ' +
... literal_column("addresses.email_address").label("title")
... ]).\
... where(
... )
... ).select_from(table('users')).select_from(table('addresses'))
- {sql}>>> conn.execute(s, x='%@aol.com', y='%@msn.com').fetchall() #doctest: +NORMALIZE_WHITESPACE
- SELECT "users.fullname" || ? || "addresses.email_address" AS anon_1
+ {sql}>>> conn.execute(s, x='%@aol.com', y='%@msn.com').fetchall()
+ SELECT users.fullname || ? || addresses.email_address AS anon_1
FROM users, addresses
- WHERE "users.id" = "addresses.user_id"
+ WHERE users.id = addresses.user_id
AND users.name BETWEEN 'm' AND 'z'
AND (addresses.email_address LIKE ? OR addresses.email_address LIKE ?)
- (' , ', '%@aol.com', '%@msn.com')
+ (', ', '%@aol.com', '%@msn.com')
{stop}[(u'Wendy Williams, wendy@aol.com',)]
Ordering or Grouping by a Label
... func.count(addresses.c.id).label('num_addresses')]).\
... order_by("num_addresses")
- {sql}>>> conn.execute(stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(stmt).fetchall()
SELECT addresses.user_id, count(addresses.id) AS num_addresses
FROM addresses ORDER BY num_addresses
()
... func.count(addresses.c.id).label('num_addresses')]).\
... order_by(desc("num_addresses"))
- {sql}>>> conn.execute(stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(stmt).fetchall()
SELECT addresses.user_id, count(addresses.id) AS num_addresses
FROM addresses ORDER BY num_addresses DESC
()
... where(u1a.c.name > u1b.c.name).\
... order_by(u1a.c.name) # using "name" here would be ambiguous
- {sql}>>> conn.execute(stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(stmt).fetchall()
SELECT users_1.id, users_1.name, users_1.fullname, users_2.id,
users_2.name, users_2.fullname
FROM users AS users_1, users AS users_2
... a1.c.email_address == 'jack@msn.com',
... a2.c.email_address == 'jack@yahoo.com'
... ))
- {sql}>>> conn.execute(s).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(s).fetchall()
SELECT users.id, users.name, users.fullname
FROM users, addresses AS addresses_1, addresses AS addresses_2
WHERE users.id = addresses_1.user_id
>>> a1 = s.correlate(None).alias()
>>> s = select([users.c.name]).where(users.c.id == a1.c.id)
- {sql}>>> conn.execute(s).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(s).fetchall()
SELECT users.name
FROM users,
(SELECT users.id AS id, users.name AS name, users.fullname AS fullname
.. sourcecode:: pycon+sql
- >>> print users.join(addresses)
+ >>> print(users.join(addresses))
users JOIN addresses ON users.id = addresses.user_id
The alert reader will see more surprises; SQLAlchemy figured out how to JOIN
.. sourcecode:: pycon+sql
- >>> print users.join(addresses,
+ >>> print(users.join(addresses,
... addresses.c.email_address.like(users.c.name + '%')
... )
+ ... )
users JOIN addresses ON addresses.email_address LIKE (users.name || :name_1)
When we create a :func:`.select` construct, SQLAlchemy looks around at the
... users.join(addresses,
... addresses.c.email_address.like(users.c.name + '%'))
... )
- {sql}>>> conn.execute(s).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(s).fetchall()
SELECT users.fullname
FROM users JOIN addresses ON addresses.email_address LIKE (users.name || ?)
('%',)
.. sourcecode:: pycon+sql
>>> s = select([users.c.fullname]).select_from(users.outerjoin(addresses))
- >>> print s # doctest: +NORMALIZE_WHITESPACE
+ >>> print(s)
SELECT users.fullname
FROM users
LEFT OUTER JOIN addresses ON users.id = addresses.user_id
.. sourcecode:: pycon+sql
>>> from sqlalchemy.dialects.oracle import dialect as OracleDialect
- >>> print s.compile(dialect=OracleDialect(use_ansi=False)) # doctest: +NORMALIZE_WHITESPACE
+ >>> print(s.compile(dialect=OracleDialect(use_ansi=False)))
SELECT users.fullname
FROM users, addresses
WHERE users.id = addresses.user_id(+)
>>> from sqlalchemy.sql import bindparam
>>> s = users.select(users.c.name == bindparam('username'))
- {sql}>>> conn.execute(s, username='wendy').fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(s, username='wendy').fetchall()
SELECT users.id, users.name, users.fullname
FROM users
WHERE users.name = ?
.. sourcecode:: pycon+sql
>>> s = users.select(users.c.name.like(bindparam('username', type_=String) + text("'%'")))
- {sql}>>> conn.execute(s, username='wendy').fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(s, username='wendy').fetchall()
SELECT users.id, users.name, users.fullname
FROM users
WHERE users.name LIKE (? || '%')
... ).\
... select_from(users.outerjoin(addresses)).\
... order_by(addresses.c.id)
- {sql}>>> conn.execute(s, name='jack').fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(s, name='jack').fetchall()
SELECT users.id, users.name, users.fullname, addresses.id,
addresses.user_id, addresses.email_address
FROM users LEFT OUTER JOIN addresses ON users.id = addresses.user_id
.. sourcecode:: pycon+sql
>>> from sqlalchemy.sql import func
- >>> print func.now()
+ >>> print(func.now())
now()
- >>> print func.concat('x', 'y')
- concat(:param_1, :param_2)
+ >>> print(func.concat('x', 'y'))
+ concat(:concat_1, :concat_2)
By "generates", we mean that **any** SQL function is created based on the word
you choose::
- >>> print func.xyz_my_goofy_function() # doctest: +NORMALIZE_WHITESPACE
+ >>> print(func.xyz_my_goofy_function())
xyz_my_goofy_function()
Certain function names are known by SQLAlchemy, allowing special behavioral
.. sourcecode:: pycon+sql
- >>> print func.current_timestamp()
+ >>> print(func.current_timestamp())
CURRENT_TIMESTAMP
Functions are most typically used in the columns clause of a select statement,
... func.max(addresses.c.email_address, type_=String).
... label('maxemail')
... ])
- ... ).scalar() # doctest: +NORMALIZE_WHITESPACE
+ ... ).scalar()
{opensql}SELECT max(addresses.email_address) AS maxemail
FROM addresses
()
... )
... )
>>> calc = calculate.alias()
- >>> print select([users]).where(users.c.id > calc.c.z) # doctest: +NORMALIZE_WHITESPACE
+ >>> print(select([users]).where(users.c.id > calc.c.z))
SELECT users.id, users.name, users.fullname
FROM users, (SELECT q, z, r
FROM calculate(:x, :y)) AS anon_1
>>> calc2 = calculate.alias('c2').unique_params(x=5, y=12)
>>> s = select([users]).\
... where(users.c.id.between(calc1.c.z, calc2.c.z))
- >>> print s # doctest: +NORMALIZE_WHITESPACE
+ >>> print(s)
SELECT users.id, users.name, users.fullname
FROM users,
(SELECT q, z, r FROM calculate(:x_1, :y_1)) AS c1,
(SELECT q, z, r FROM calculate(:x_2, :y_2)) AS c2
WHERE users.id BETWEEN c1.z AND c2.z
- >>> s.compile().params
+ >>> s.compile().params # doctest: +SKIP
{u'x_2': 5, u'y_2': 12, u'y_1': 45, u'x_1': 17}
.. seealso::
... users.c.id,
... func.row_number().over(order_by=users.c.name)
... ])
- >>> print s # doctest: +NORMALIZE_WHITESPACE
+ >>> print(s)
SELECT users.id, row_number() OVER (ORDER BY users.name) AS anon_1
FROM users
... where(addresses.c.email_address.like('%@yahoo.com')),
... ).order_by(addresses.c.email_address)
- {sql}>>> conn.execute(u).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(u).fetchall()
SELECT addresses.id, addresses.user_id, addresses.email_address
FROM addresses
WHERE addresses.email_address = ?
... where(addresses.c.email_address.like('%@msn.com'))
... )
- {sql}>>> conn.execute(u).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(u).fetchall()
SELECT addresses.id, addresses.user_id, addresses.email_address
FROM addresses
WHERE addresses.email_address LIKE ?
... ).alias().select(), # apply subquery here
... addresses.select(addresses.c.email_address.like('%@msn.com'))
... )
- {sql}>>> conn.execute(u).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> conn.execute(u).fetchall()
SELECT anon_1.id, anon_1.user_id, anon_1.email_address
FROM (SELECT addresses.id AS id, addresses.user_id AS user_id,
addresses.email_address AS email_address
.. sourcecode:: pycon+sql
- >>> conn.execute(select([users.c.name, stmt])).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(select([users.c.name, stmt])).fetchall()
{opensql}SELECT users.name, (SELECT count(addresses.id) AS count_1
FROM addresses
WHERE users.id = addresses.user_id) AS anon_1
>>> stmt = select([func.count(addresses.c.id)]).\
... where(users.c.id == addresses.c.user_id).\
... label("address_count")
- >>> conn.execute(select([users.c.name, stmt])).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(select([users.c.name, stmt])).fetchall()
{opensql}SELECT users.name, (SELECT count(addresses.id) AS count_1
FROM addresses
WHERE users.id = addresses.user_id) AS address_count
... where(addresses.c.user_id == users.c.id).\
... where(addresses.c.email_address == 'jack@yahoo.com')
>>> enclosing_stmt = select([users.c.name]).where(users.c.id == stmt)
- >>> conn.execute(enclosing_stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(enclosing_stmt).fetchall()
{opensql}SELECT users.name
FROM users
WHERE users.id = (SELECT addresses.user_id
... [users.c.name, addresses.c.email_address]).\
... select_from(users.join(addresses)).\
... where(users.c.id == stmt)
- >>> conn.execute(enclosing_stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(enclosing_stmt).fetchall()
{opensql}SELECT users.name, addresses.email_address
FROM users JOIN addresses ON users.id = addresses.user_id
WHERE users.id = (SELECT users.id
... correlate(None)
>>> enclosing_stmt = select([users.c.name]).\
... where(users.c.id == stmt)
- >>> conn.execute(enclosing_stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(enclosing_stmt).fetchall()
{opensql}SELECT users.name
FROM users
WHERE users.id = (SELECT users.id
... [users.c.name, addresses.c.email_address]).\
... select_from(users.join(addresses)).\
... where(users.c.id == stmt)
- >>> conn.execute(enclosing_stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(enclosing_stmt).fetchall()
{opensql}SELECT users.name, addresses.email_address
FROM users JOIN addresses ON users.id = addresses.user_id
WHERE users.id = (SELECT users.id
.. sourcecode:: pycon+sql
>>> stmt = select([users.c.name]).order_by(users.c.name)
- >>> conn.execute(stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(stmt).fetchall()
{opensql}SELECT users.name
FROM users ORDER BY users.name
()
.. sourcecode:: pycon+sql
>>> stmt = select([users.c.name]).order_by(users.c.name.desc())
- >>> conn.execute(stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(stmt).fetchall()
{opensql}SELECT users.name
FROM users ORDER BY users.name DESC
()
>>> stmt = select([users.c.name, func.count(addresses.c.id)]).\
... select_from(users.join(addresses)).\
... group_by(users.c.name)
- >>> conn.execute(stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(stmt).fetchall()
{opensql}SELECT users.name, count(addresses.id) AS count_1
FROM users JOIN addresses
ON users.id = addresses.user_id
... select_from(users.join(addresses)).\
... group_by(users.c.name).\
... having(func.length(users.c.name) > 4)
- >>> conn.execute(stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(stmt).fetchall()
{opensql}SELECT users.name, count(addresses.id) AS count_1
FROM users JOIN addresses
ON users.id = addresses.user_id
... where(addresses.c.email_address.
... contains(users.c.name)).\
... distinct()
- >>> conn.execute(stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(stmt).fetchall()
{opensql}SELECT DISTINCT users.name
FROM users, addresses
- WHERE addresses.email_address LIKE '%%' || users.name || '%%'
+ WHERE (addresses.email_address LIKE '%%' || users.name || '%%')
()
{stop}[(u'jack',), (u'wendy',)]
>>> stmt = select([users.c.name, addresses.c.email_address]).\
... select_from(users.join(addresses)).\
... limit(1).offset(1)
- >>> conn.execute(stmt).fetchall() # doctest: +NORMALIZE_WHITESPACE
+ >>> conn.execute(stmt).fetchall()
{opensql}SELECT users.name, addresses.email_address
FROM users JOIN addresses ON users.id = addresses.user_id
LIMIT ? OFFSET ?
>>> stmt = users.update().\
... values(fullname="Fullname: " + users.c.name)
- >>> conn.execute(stmt) #doctest: +ELLIPSIS
+ >>> conn.execute(stmt)
{opensql}UPDATE users SET fullname=(? || users.name)
('Fullname: ',)
COMMIT
>>> stmt = users.insert().\
... values(name=bindparam('_name') + " .. name")
- >>> conn.execute(stmt, [ # doctest: +ELLIPSIS
+ >>> conn.execute(stmt, [
... {'id':4, '_name':'name1'},
... {'id':5, '_name':'name2'},
... {'id':6, '_name':'name3'},
... where(users.c.name == 'jack').\
... values(name='ed')
- >>> conn.execute(stmt) #doctest: +ELLIPSIS
+ >>> conn.execute(stmt)
{opensql}UPDATE users SET name=? WHERE users.name = ?
('ed', 'jack')
COMMIT
... {'oldname':'jack', 'newname':'ed'},
... {'oldname':'wendy', 'newname':'mary'},
... {'oldname':'jim', 'newname':'jake'},
- ... ]) #doctest: +ELLIPSIS
+ ... ])
{opensql}UPDATE users SET name=? WHERE users.name = ?
(('ed', 'jack'), ('mary', 'wendy'), ('jake', 'jim'))
COMMIT
>>> stmt = select([addresses.c.email_address]).\
... where(addresses.c.user_id == users.c.id).\
... limit(1)
- >>> conn.execute(users.update().values(fullname=stmt)) #doctest: +ELLIPSIS,+NORMALIZE_WHITESPACE
+ >>> conn.execute(users.update().values(fullname=stmt))
{opensql}UPDATE users SET fullname=(SELECT addresses.email_address
FROM addresses
WHERE addresses.user_id = users.id
.. sourcecode:: pycon+sql
- >>> conn.execute(addresses.delete()) #doctest: +ELLIPSIS
+ >>> conn.execute(addresses.delete())
{opensql}DELETE FROM addresses
()
COMMIT
{stop}<sqlalchemy.engine.result.ResultProxy object at 0x...>
- >>> conn.execute(users.delete().where(users.c.name > 'm')) #doctest: +ELLIPSIS
+ >>> conn.execute(users.delete().where(users.c.name > 'm'))
{opensql}DELETE FROM users WHERE users.name > ?
('m',)
COMMIT
.. sourcecode:: pycon+sql
- >>> result = conn.execute(users.delete()) #doctest: +ELLIPSIS
+ >>> result = conn.execute(users.delete())
{opensql}DELETE FROM users
()
COMMIT
.. sourcecode:: python+sql
- >>> Base.metadata.create_all(engine) # doctest:+ELLIPSIS,+NORMALIZE_WHITESPACE
- {opensql}PRAGMA table_info("users")
+ >>> Base.metadata.create_all(engine)
+ SELECT ...
+ PRAGMA table_info("users")
()
CREATE TABLE users (
- id INTEGER NOT NULL,
- name VARCHAR,
+ id INTEGER NOT NULL, name VARCHAR,
fullname VARCHAR,
password VARCHAR,
PRIMARY KEY (id)
.. sourcecode:: python+sql
- {sql}>>> our_user = session.query(User).filter_by(name='ed').first() # doctest:+ELLIPSIS,+NORMALIZE_WHITESPACE
+ {sql}>>> our_user = session.query(User).filter_by(name='ed').first() # doctest:+NORMALIZE_WHITESPACE
BEGIN (implicit)
INSERT INTO users (name, fullname, password) VALUES (?, ?, ?)
('ed', 'Ed Jones', 'edspassword')
.. sourcecode:: python+sql
- {sql}>>> session.query(User).filter(User.name.in_(['Edwardo', 'fakeuser'])).all() #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> session.query(User).filter(User.name.in_(['Edwardo', 'fakeuser'])).all()
UPDATE users SET name=? WHERE users.id = ?
('Edwardo', 1)
INSERT INTO users (name, fullname, password) VALUES (?, ?, ?)
FROM users
WHERE users.name IN (?, ?)
('Edwardo', 'fakeuser')
- {stop}[<User(name='Edwardo', fullname='Ed Jones', password='f8s7ccs')>, <User(user='fakeuser', fullname='Invalid', password='12345')>]
+ {stop}[<User(name='Edwardo', fullname='Ed Jones', password='f8s7ccs')>, <User(name='fakeuser', fullname='Invalid', password='12345')>]
Rolling back, we can see that ``ed_user``'s name is back to ``ed``, and
``fake_user`` has been kicked out of the session:
ROLLBACK
{stop}
- {sql}>>> ed_user.name #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> ed_user.name
BEGIN (implicit)
SELECT users.id AS users_id,
users.name AS users_name,
.. sourcecode:: python+sql
- {sql}>>> session.query(User).filter(User.name.in_(['ed', 'fakeuser'])).all() #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> session.query(User).filter(User.name.in_(['ed', 'fakeuser'])).all()
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
.. sourcecode:: python+sql
- {sql}>>> for instance in session.query(User).order_by(User.id): # doctest: +NORMALIZE_WHITESPACE
- ... print instance.name, instance.fullname
+ {sql}>>> for instance in session.query(User).order_by(User.id):
+ ... print(instance.name, instance.fullname)
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
.. sourcecode:: python+sql
- {sql}>>> for name, fullname in session.query(User.name, User.fullname): # doctest: +NORMALIZE_WHITESPACE
- ... print name, fullname
+ {sql}>>> for name, fullname in session.query(User.name, User.fullname):
+ ... print(name, fullname)
SELECT users.name AS users_name,
users.fullname AS users_fullname
FROM users
.. sourcecode:: python+sql
- {sql}>>> for row in session.query(User, User.name).all(): #doctest: +NORMALIZE_WHITESPACE
- ... print row.User, row.name
+ {sql}>>> for row in session.query(User, User.name).all():
+ ... print(row.User, row.name)
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
.. sourcecode:: python+sql
- {sql}>>> for row in session.query(User.name.label('name_label')).all(): #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> for row in session.query(User.name.label('name_label')).all():
... print(row.name_label)
SELECT users.name AS name_label
FROM users
>>> from sqlalchemy.orm import aliased
>>> user_alias = aliased(User, name='user_alias')
- {sql}>>> for row in session.query(user_alias, user_alias.name).all(): #doctest: +NORMALIZE_WHITESPACE
- ... print row.user_alias
+ {sql}>>> for row in session.query(user_alias, user_alias.name).all():
+ ... print(row.user_alias)
SELECT user_alias.id AS user_alias_id,
user_alias.name AS user_alias_name,
user_alias.fullname AS user_alias_fullname,
.. sourcecode:: python+sql
- {sql}>>> for u in session.query(User).order_by(User.id)[1:3]: #doctest: +NORMALIZE_WHITESPACE
- ... print u
+ {sql}>>> for u in session.query(User).order_by(User.id)[1:3]:
+ ... print(u)
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
.. sourcecode:: python+sql
{sql}>>> for name, in session.query(User.name).\
- ... filter_by(fullname='Ed Jones'): # doctest: +NORMALIZE_WHITESPACE
- ... print name
+ ... filter_by(fullname='Ed Jones'):
+ ... print(name)
SELECT users.name AS users_name FROM users
WHERE users.fullname = ?
('Ed Jones',)
.. sourcecode:: python+sql
{sql}>>> for name, in session.query(User.name).\
- ... filter(User.fullname=='Ed Jones'): # doctest: +NORMALIZE_WHITESPACE
- ... print name
+ ... filter(User.fullname=='Ed Jones'):
+ ... print(name)
SELECT users.name AS users_name FROM users
WHERE users.fullname = ?
('Ed Jones',)
{sql}>>> for user in session.query(User).\
... filter(User.name=='ed').\
- ... filter(User.fullname=='Ed Jones'): # doctest: +NORMALIZE_WHITESPACE
- ... print user
+ ... filter(User.fullname=='Ed Jones'):
+ ... print(user)
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
.. sourcecode:: python+sql
>>> query = session.query(User).filter(User.name.like('%ed')).order_by(User.id)
- {sql}>>> query.all() #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> query.all()
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
.. sourcecode:: python+sql
- {sql}>>> query.first() #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> query.first()
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
.. sourcecode:: python+sql
{sql}>>> from sqlalchemy.orm.exc import MultipleResultsFound
- >>> try: #doctest: +NORMALIZE_WHITESPACE
+ >>> try:
... user = query.one()
- ... except MultipleResultsFound, e:
- ... print e
+ ... except MultipleResultsFound as e:
+ ... print(e)
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
.. sourcecode:: python+sql
{sql}>>> from sqlalchemy.orm.exc import NoResultFound
- >>> try: #doctest: +NORMALIZE_WHITESPACE
+ >>> try:
... user = query.filter(User.id == 99).one()
- ... except NoResultFound, e:
- ... print e
+ ... except NoResultFound as e:
+ ... print(e)
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
>>> query = session.query(User.id).filter(User.name == 'ed').\
... order_by(User.id)
- {sql}>>> query.scalar() #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> query.scalar()
SELECT users.id AS users_id
FROM users
WHERE users.name = ? ORDER BY users.id
>>> from sqlalchemy import text
{sql}>>> for user in session.query(User).\
... filter(text("id<224")).\
- ... order_by(text("id")).all(): #doctest: +NORMALIZE_WHITESPACE
- ... print user.name
+ ... order_by(text("id")).all():
+ ... print(user.name)
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
.. sourcecode:: python+sql
{sql}>>> session.query(User).filter(text("id<:value and name=:name")).\
- ... params(value=224, name='fred').order_by(User.id).one() # doctest: +NORMALIZE_WHITESPACE
+ ... params(value=224, name='fred').order_by(User.id).one()
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
.. sourcecode:: python+sql
- {sql}>>> session.query(User).filter(User.name.like('%ed')).count() #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> session.query(User).filter(User.name.like('%ed')).count()
SELECT count(*) AS count_1
FROM (SELECT users.id AS users_id,
users.name AS users_name,
.. sourcecode:: python+sql
>>> from sqlalchemy import func
- {sql}>>> session.query(func.count(User.name), User.name).group_by(User.name).all() #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> session.query(func.count(User.name), User.name).group_by(User.name).all()
SELECT count(users.name) AS count_1, users.name AS users_name
FROM users GROUP BY users.name
()
.. sourcecode:: python+sql
- {sql}>>> session.query(func.count('*')).select_from(User).scalar() #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> session.query(func.count('*')).select_from(User).scalar()
SELECT count(?) AS count_1
FROM users
('*',)
.. sourcecode:: python+sql
- {sql}>>> session.query(func.count(User.id)).scalar() #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> session.query(func.count(User.id)).scalar()
SELECT count(users.id) AS count_1
FROM users
()
.. sourcecode:: python+sql
- {sql}>>> Base.metadata.create_all(engine) # doctest: +NORMALIZE_WHITESPACE
- PRAGMA table_info("users")
- ()
- PRAGMA table_info("addresses")
- ()
+ {sql}>>> Base.metadata.create_all(engine)
+ PRAGMA...
CREATE TABLE addresses (
id INTEGER NOT NULL,
email_address VARCHAR NOT NULL,
.. sourcecode:: python+sql
{sql}>>> jack = session.query(User).\
- ... filter_by(name='jack').one() #doctest: +NORMALIZE_WHITESPACE
+ ... filter_by(name='jack').one()
BEGIN (implicit)
SELECT users.id AS users_id,
users.name AS users_name,
.. sourcecode:: python+sql
- {sql}>>> jack.addresses #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> jack.addresses
SELECT addresses.id AS addresses_id,
addresses.email_address AS
addresses_email_address,
{sql}>>> for u, a in session.query(User, Address).\
... filter(User.id==Address.user_id).\
... filter(Address.email_address=='jack@google.com').\
- ... all(): # doctest: +NORMALIZE_WHITESPACE
- ... print u
- ... print a
+ ... all():
+ ... print(u)
+ ... print(a)
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
{sql}>>> session.query(User).join(Address).\
... filter(Address.email_address=='jack@google.com').\
- ... all() #doctest: +NORMALIZE_WHITESPACE
+ ... all()
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
... join(adalias2, User.addresses).\
... filter(adalias1.email_address=='jack@google.com').\
... filter(adalias2.email_address=='j25@yahoo.com'):
- ... print username, email1, email2 # doctest: +NORMALIZE_WHITESPACE
+ ... print(username, email1, email2)
SELECT users.name AS users_name,
addresses_1.email_address AS addresses_1_email_address,
addresses_2.email_address AS addresses_2_email_address
.. sourcecode:: python+sql
{sql}>>> for u, count in session.query(User, stmt.c.address_count).\
- ... outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id): # doctest: +NORMALIZE_WHITESPACE
- ... print u, count
+ ... outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id):
+ ... print(u, count)
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
... subquery()
>>> adalias = aliased(Address, stmt)
>>> for user, address in session.query(User, adalias).\
- ... join(adalias, User.addresses): # doctest: +NORMALIZE_WHITESPACE
- ... print user
- ... print address
+ ... join(adalias, User.addresses):
+ ... print(user)
+ ... print(address)
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
>>> from sqlalchemy.sql import exists
>>> stmt = exists().where(Address.user_id==User.id)
- {sql}>>> for name, in session.query(User.name).filter(stmt): # doctest: +NORMALIZE_WHITESPACE
- ... print name
+ {sql}>>> for name, in session.query(User.name).filter(stmt):
+ ... print(name)
SELECT users.name AS users_name
FROM users
WHERE EXISTS (SELECT *
.. sourcecode:: python+sql
{sql}>>> for name, in session.query(User.name).\
- ... filter(User.addresses.any()): # doctest: +NORMALIZE_WHITESPACE
- ... print name
+ ... filter(User.addresses.any()):
+ ... print(name)
SELECT users.name AS users_name
FROM users
WHERE EXISTS (SELECT 1
.. sourcecode:: python+sql
{sql}>>> for name, in session.query(User.name).\
- ... filter(User.addresses.any(Address.email_address.like('%google%'))): # doctest: +NORMALIZE_WHITESPACE
- ... print name
+ ... filter(User.addresses.any(Address.email_address.like('%google%'))):
+ ... print(name)
SELECT users.name AS users_name
FROM users
WHERE EXISTS (SELECT 1
.. sourcecode:: python+sql
{sql}>>> session.query(Address).\
- ... filter(~Address.user.has(User.name=='jack')).all() # doctest: +NORMALIZE_WHITESPACE
+ ... filter(~Address.user.has(User.name=='jack')).all()
SELECT addresses.id AS addresses_id,
addresses.email_address AS addresses_email_address,
addresses.user_id AS addresses_user_id
>>> from sqlalchemy.orm import subqueryload
{sql}>>> jack = session.query(User).\
... options(subqueryload(User.addresses)).\
- ... filter_by(name='jack').one() #doctest: +NORMALIZE_WHITESPACE
+ ... filter_by(name='jack').one()
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
{sql}>>> jack = session.query(User).\
... options(joinedload(User.addresses)).\
- ... filter_by(name='jack').one() #doctest: +NORMALIZE_WHITESPACE
+ ... filter_by(name='jack').one()
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
... join(Address.user).\
... filter(User.name=='jack').\
... options(contains_eager(Address.user)).\
- ... all() #doctest: +NORMALIZE_WHITESPACE
+ ... all()
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
.. sourcecode:: python+sql
>>> session.delete(jack)
- {sql}>>> session.query(User).filter_by(name='jack').count() # doctest: +NORMALIZE_WHITESPACE
- UPDATE addresses SET user_id=? WHERE addresses.id = ?
- (None, 1)
+ {sql}>>> session.query(User).filter_by(name='jack').count()
UPDATE addresses SET user_id=? WHERE addresses.id = ?
- (None, 2)
+ ((None, 1), (None, 2))
DELETE FROM users WHERE users.id = ?
(5,)
SELECT count(*) AS count_1
{sql}>>> session.query(Address).filter(
... Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])
- ... ).count() # doctest: +NORMALIZE_WHITESPACE
+ ... ).count()
SELECT count(*) AS count_1
FROM (SELECT addresses.id AS addresses_id,
addresses.email_address AS addresses_email_address,
completely and start again - we'll close the :class:`.Session`::
>>> session.close()
+ ROLLBACK
+
and use a new :func:`.declarative_base`::
... cascade="all, delete, delete-orphan")
...
... def __repr__(self):
- ... return "<User(name='%s', fullname='%s', password'%s')>" % (
+ ... return "<User(name='%s', fullname='%s', password='%s')>" % (
... self.name, self.fullname, self.password)
Then we recreate ``Address``, noting that in this case we've created
.. sourcecode:: python+sql
# load Jack by primary key
- {sql}>>> jack = session.query(User).get(5) #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> jack = session.query(User).get(5)
BEGIN (implicit)
SELECT users.id AS users_id,
users.name AS users_name,
{stop}
# remove one Address (lazy load fires off)
- {sql}>>> del jack.addresses[1] #doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> del jack.addresses[1]
SELECT addresses.id AS addresses_id,
addresses.email_address AS addresses_email_address,
addresses.user_id AS addresses_user_id
# only one address remains
{sql}>>> session.query(Address).filter(
... Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])
- ... ).count() # doctest: +NORMALIZE_WHITESPACE
+ ... ).count()
DELETE FROM addresses WHERE addresses.id = ?
(2,)
SELECT count(*) AS count_1
>>> session.delete(jack)
- {sql}>>> session.query(User).filter_by(name='jack').count() # doctest: +NORMALIZE_WHITESPACE
+ {sql}>>> session.query(User).filter_by(name='jack').count()
DELETE FROM addresses WHERE addresses.id = ?
(1,)
DELETE FROM users WHERE users.id = ?
{sql}>>> session.query(Address).filter(
... Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])
- ... ).count() # doctest: +NORMALIZE_WHITESPACE
+ ... ).count()
SELECT count(*) AS count_1
FROM (SELECT addresses.id AS addresses_id,
addresses.email_address AS addresses_email_address,
>>> from sqlalchemy import Table, Text
>>> # association table
>>> post_keywords = Table('post_keywords', Base.metadata,
- ... Column('post_id', Integer, ForeignKey('posts.id')),
- ... Column('keyword_id', Integer, ForeignKey('keywords.id'))
+ ... Column('post_id', ForeignKey('posts.id'), primary_key=True),
+ ... Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
... )
Above, we can see declaring a :class:`.Table` directly is a little different
.. sourcecode:: python+sql
- {sql}>>> Base.metadata.create_all(engine) # doctest: +NORMALIZE_WHITESPACE
- PRAGMA table_info("users")
- ()
- PRAGMA table_info("addresses")
- ()
- PRAGMA table_info("posts")
- ()
- PRAGMA table_info("keywords")
- ()
- PRAGMA table_info("post_keywords")
- ()
- CREATE TABLE posts (
+ {sql}>>> Base.metadata.create_all(engine)
+ PRAGMA...
+ CREATE TABLE keywords (
id INTEGER NOT NULL,
- user_id INTEGER,
- headline VARCHAR(255) NOT NULL,
- body TEXT,
+ keyword VARCHAR(50) NOT NULL,
PRIMARY KEY (id),
- FOREIGN KEY(user_id) REFERENCES users (id)
+ UNIQUE (keyword)
)
()
COMMIT
- CREATE TABLE keywords (
+ CREATE TABLE posts (
id INTEGER NOT NULL,
- keyword VARCHAR(50) NOT NULL,
+ user_id INTEGER,
+ headline VARCHAR(255) NOT NULL,
+ body TEXT,
PRIMARY KEY (id),
- UNIQUE (keyword)
+ FOREIGN KEY(user_id) REFERENCES users (id)
)
()
COMMIT
CREATE TABLE post_keywords (
- post_id INTEGER,
- keyword_id INTEGER,
- FOREIGN KEY(post_id) REFERENCES posts (id),
- FOREIGN KEY(keyword_id) REFERENCES keywords (id)
+ post_id INTEGER NOT NULL,
+ keyword_id INTEGER NOT NULL,
+ PRIMARY KEY (post_id, keyword_id),
+ FOREIGN KEY(post_id) REFERENCES posts (id),
+ FOREIGN KEY(keyword_id) REFERENCES keywords (id)
)
()
COMMIT
{sql}>>> wendy = session.query(User).\
... filter_by(name='wendy').\
- ... one() #doctest: +NORMALIZE_WHITESPACE
+ ... one()
SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
{sql}>>> session.query(BlogPost).\
... filter(BlogPost.keywords.any(keyword='firstpost')).\
- ... all() #doctest: +NORMALIZE_WHITESPACE
+ ... all()
INSERT INTO keywords (keyword) VALUES (?)
('wendy',)
INSERT INTO keywords (keyword) VALUES (?)
INSERT INTO posts (user_id, headline, body) VALUES (?, ?, ?)
(2, "Wendy's Blog Post", 'This is a test')
INSERT INTO post_keywords (post_id, keyword_id) VALUES (?, ?)
- ((1, 1), (1, 2))
+ (...)
SELECT posts.id AS posts_id,
posts.user_id AS posts_user_id,
posts.headline AS posts_headline,
{sql}>>> session.query(BlogPost).\
... filter(BlogPost.author==wendy).\
... filter(BlogPost.keywords.any(keyword='firstpost')).\
- ... all() #doctest: +NORMALIZE_WHITESPACE
+ ... all()
SELECT posts.id AS posts_id,
posts.user_id AS posts_user_id,
posts.headline AS posts_headline,
{sql}>>> wendy.posts.\
... filter(BlogPost.keywords.any(keyword='firstpost')).\
- ... all() #doctest: +NORMALIZE_WHITESPACE
+ ... all()
SELECT posts.id AS posts_id,
posts.user_id AS posts_user_id,
posts.headline AS posts_headline,
+++ /dev/null
-import sys
-sys.path = ['../../lib', './lib/'] + sys.path
-
-import os
-import re
-import doctest
-import sqlalchemy.util as util
-import sqlalchemy.log as salog
-import logging
-
-rootlogger = logging.getLogger('sqlalchemy.engine.base.Engine')
-class MyStream(object):
- def write(self, string):
- sys.stdout.write(string)
- sys.stdout.flush()
- def flush(self):
- pass
-handler = logging.StreamHandler(MyStream())
-handler.setFormatter(logging.Formatter('%(message)s'))
-rootlogger.addHandler(handler)
-
-
-def teststring(s, name, globs=None, verbose=None, report=True,
- optionflags=0, extraglobs=None, raise_on_error=False,
- parser=doctest.DocTestParser()):
-
- from doctest import DebugRunner, DocTestRunner, master
-
- # Assemble the globals.
- if globs is None:
- globs = {}
- else:
- globs = globs.copy()
- if extraglobs is not None:
- globs.update(extraglobs)
-
- if raise_on_error:
- runner = DebugRunner(verbose=verbose, optionflags=optionflags)
- else:
- runner = DocTestRunner(verbose=verbose, optionflags=optionflags)
-
- test = parser.get_doctest(s, globs, name, name, 0)
- runner.run(test)
-
- if report:
- runner.summarize()
-
- if master is None:
- master = runner
- else:
- master.merge(runner)
-
- return runner.failures, runner.tries
-
-def replace_file(s, newfile):
- engine = r"'(sqlite|postgresql|mysql):///.*'"
- engine = re.compile(engine, re.MULTILINE)
- s, n = re.subn(engine, "'sqlite:///" + newfile + "'", s)
- if not n:
- raise ValueError("Couldn't find suitable create_engine call to replace '%s' in it" % oldfile)
- return s
-
-for filename in 'orm/tutorial','core/tutorial',:
- filename = '%s.rst' % filename
- s = open(filename).read()
- #s = replace_file(s, ':memory:')
- s = re.sub(r'{(?:stop|sql|opensql)}', '', s)
- teststring(s, filename)
-
--- /dev/null
+from __future__ import print_function
+from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import config
+import doctest
+import logging
+import sys
+import re
+import os
+
+
+class DocTest(fixtures.TestBase):
+ def _setup_logger(self):
+ rootlogger = logging.getLogger('sqlalchemy.engine.base.Engine')
+
+ class MyStream(object):
+ def write(self, string):
+ sys.stdout.write(string)
+ sys.stdout.flush()
+
+ def flush(self):
+ pass
+
+ self._handler = handler = logging.StreamHandler(MyStream())
+ handler.setFormatter(logging.Formatter('%(message)s'))
+ rootlogger.addHandler(handler)
+
+ def _teardown_logger(self):
+ rootlogger = logging.getLogger('sqlalchemy.engine.base.Engine')
+ rootlogger.removeHandler(self._handler)
+
+ def _setup_create_table_patcher(self):
+ from sqlalchemy.sql import ddl
+ self.orig_sort = ddl.sort_tables_and_constraints
+
+ def our_sort(tables, **kw):
+ return self.orig_sort(
+ sorted(tables, key=lambda t: t.key), **kw
+ )
+ ddl.sort_tables_and_constraints = our_sort
+
+ def _teardown_create_table_patcher(self):
+ from sqlalchemy.sql import ddl
+ ddl.sort_tables_and_constraints = self.orig_sort
+
+ def setup(self):
+ self._setup_logger()
+ self._setup_create_table_patcher()
+
+ def teardown(self):
+ self._teardown_create_table_patcher()
+ self._teardown_logger()
+
+
+ def _run_doctest_for_content(self, name, content):
+ optionflags = (
+ doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE |
+ _get_allow_unicode_flag()
+ )
+ runner = doctest.DocTestRunner(
+ verbose=None, optionflags=optionflags,
+ checker=_get_unicode_checker())
+ globs = {
+ 'print_function': print_function}
+ parser = doctest.DocTestParser()
+ test = parser.get_doctest(content, globs, name, name, 0)
+ runner.run(test)
+ runner.summarize()
+ assert not runner.failures
+
+ def _run_doctest(self, fname):
+ here = os.path.dirname(__file__)
+ sqla_base = os.path.normpath(os.path.join(here, "..", ".."))
+ path = os.path.join(sqla_base, "doc/build", fname)
+ if not os.path.exists(path):
+ config.skip_test("Can't find documentation file %r" % path)
+ with open(path) as file_:
+ content = file_.read()
+ content = re.sub(r'{(?:stop|sql|opensql)}', '', content)
+ self._run_doctest_for_content(fname, content)
+
+ def test_orm(self):
+ self._run_doctest("orm/tutorial.rst")
+
+ def test_core(self):
+ self._run_doctest("core/tutorial.rst")
+
+
+# unicode checker courtesy py.test
+
+
+def _get_unicode_checker():
+ """
+ Returns a doctest.OutputChecker subclass that takes in account the
+ ALLOW_UNICODE option to ignore u'' prefixes in strings. Useful
+ when the same doctest should run in Python 2 and Python 3.
+
+ An inner class is used to avoid importing "doctest" at the module
+ level.
+ """
+ if hasattr(_get_unicode_checker, 'UnicodeOutputChecker'):
+ return _get_unicode_checker.UnicodeOutputChecker()
+
+ import doctest
+ import re
+
+ class UnicodeOutputChecker(doctest.OutputChecker):
+ """
+ Copied from doctest_nose_plugin.py from the nltk project:
+ https://github.com/nltk/nltk
+ """
+
+ _literal_re = re.compile(r"(\W|^)[uU]([rR]?[\'\"])", re.UNICODE)
+
+ def check_output(self, want, got, optionflags):
+ res = doctest.OutputChecker.check_output(self, want, got,
+ optionflags)
+ if res:
+ return True
+
+ if not (optionflags & _get_allow_unicode_flag()):
+ return False
+
+ else: # pragma: no cover
+ # the code below will end up executed only in Python 2 in
+ # our tests, and our coverage check runs in Python 3 only
+ def remove_u_prefixes(txt):
+ return re.sub(self._literal_re, r'\1\2', txt)
+
+ want = remove_u_prefixes(want)
+ got = remove_u_prefixes(got)
+ res = doctest.OutputChecker.check_output(self, want, got,
+ optionflags)
+ return res
+
+ _get_unicode_checker.UnicodeOutputChecker = UnicodeOutputChecker
+ return _get_unicode_checker.UnicodeOutputChecker()
+
+
+def _get_allow_unicode_flag():
+ """
+ Registers and returns the ALLOW_UNICODE flag.
+ """
+ import doctest
+ return doctest.register_optionflag('ALLOW_UNICODE')