-
-from sqlalchemy import exc as exceptions
-from sqlalchemy import testing
-from sqlalchemy.testing import engines
-from sqlalchemy import select, MetaData, Integer, or_
+from sqlalchemy import exc as exceptions, select, MetaData, Integer, or_
from sqlalchemy.engine import default
from sqlalchemy.sql import table, column
-from sqlalchemy.testing import assert_raises, eq_
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL
-from sqlalchemy.testing.engines import testing_engine
+from sqlalchemy.testing import AssertsCompiledSQL, assert_raises, engines,\
+ fixtures
from sqlalchemy.testing.schema import Table, Column
IDENT_LENGTH = 29
class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'DefaultDialect'
table1 = table('some_large_named_table',
column('this_is_the_primarykey_column'),
column('this_is_the_data_column')
)
- __dialect__ = 'DefaultDialect'
-
-
def _length_fixture(self, length=IDENT_LENGTH, positional=False):
dialect = default.DefaultDialect()
dialect.max_identifier_length = length
ta = table2.alias()
on = table1.c.this_is_the_data_column == ta.c.this_is_the_data_column
self.assert_compile(
- select([table1, ta]).select_from(table1.join(ta, on)).\
+ select([table1, ta]).select_from(table1.join(ta, on)).
where(ta.c.this_is_the_data_column == 'data3'),
'SELECT '
'some_large_named_table.this_is_the_primarykey_column, '
t = Table('this_name_is_too_long_for_what_were_doing_in_this_test',
m, Column('foo', Integer))
eng = self._engine_fixture()
- for meth in (
- t.create,
- t.drop,
- m.create_all,
- m.drop_all
- ):
- assert_raises(
- exceptions.IdentifierError,
- meth, eng
- )
+ methods = (t.create, t.drop, m.create_all, m.drop_all)
+ for meth in methods:
+ assert_raises(exceptions.IdentifierError, meth, eng)
def _assert_labeled_table1_select(self, s):
table1 = self.table1
dialect=self._length_fixture(positional=True)
)
+
class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'DefaultDialect'
table1 = table('some_large_named_table',
column('this_is_the_primarykey_column'),
column('this_is_the_data_column')
)
- __dialect__ = 'DefaultDialect'
-
def test_adjustable_1(self):
table1 = self.table1
q = table1.select(
'AS _1',
dialect=compile_dialect)
-
def test_adjustable_result_schema_column_1(self):
table1 = self.table1
+
q = table1.select(
table1.c.this_is_the_primarykey_column == 4).apply_labels().\
alias('foo')
- dialect = default.DefaultDialect(label_length=10)
+ dialect = default.DefaultDialect(label_length=10)
compiled = q.compile(dialect=dialect)
+
assert set(compiled.result_map['some_2'][1]).issuperset([
- table1.c.this_is_the_data_column,
- 'some_large_named_table_this_is_the_data_column',
- 'some_2'
+ table1.c.this_is_the_data_column,
+ 'some_large_named_table_this_is_the_data_column',
+ 'some_2'
+ ])
- ])
assert set(compiled.result_map['some_1'][1]).issuperset([
- table1.c.this_is_the_primarykey_column,
- 'some_large_named_table_this_is_the_primarykey_column',
- 'some_1'
-
- ])
+ table1.c.this_is_the_primarykey_column,
+ 'some_large_named_table_this_is_the_primarykey_column',
+ 'some_1'
+ ])
def test_adjustable_result_schema_column_2(self):
table1 = self.table1
x = select([q])
dialect = default.DefaultDialect(label_length=10)
-
compiled = x.compile(dialect=dialect)
+
assert set(compiled.result_map['this_2'][1]).issuperset([
- q.corresponding_column(table1.c.this_is_the_data_column),
- 'this_is_the_data_column',
- 'this_2'
+ q.corresponding_column(table1.c.this_is_the_data_column),
+ 'this_is_the_data_column',
+ 'this_2'])
- ])
assert set(compiled.result_map['this_1'][1]).issuperset([
- q.corresponding_column(table1.c.this_is_the_primarykey_column),
- 'this_is_the_primarykey_column',
- 'this_1'
-
- ])
+ q.corresponding_column(table1.c.this_is_the_primarykey_column),
+ 'this_is_the_primarykey_column',
+ 'this_1'])
def test_table_plus_column_exceeds_length(self):
"""test that the truncation only occurs when tablename + colname are
'other_thirty_characters_table_.thirty_characters_table_id',
dialect=compile_dialect)
-
def test_colnames_longer_than_labels_lowercase(self):
t1 = table('a', column('abcde'))
self._test_colnames_longer_than_labels(t1)
# 'abcde' is longer than 4, but rendered as itself
# needs to have all characters
s = select([a1])
- self.assert_compile(
- select([a1]),
- "SELECT asdf.abcde FROM a AS asdf",
- dialect=dialect
- )
+ self.assert_compile(select([a1]),
+ 'SELECT asdf.abcde FROM a AS asdf',
+ dialect=dialect)
compiled = s.compile(dialect=dialect)
assert set(compiled.result_map['abcde'][1]).issuperset([
- 'abcde',
- a1.c.abcde,
- 'abcde'
- ])
+ 'abcde', a1.c.abcde, 'abcde'])
# column still there, but short label
s = select([a1]).apply_labels()
- self.assert_compile(
- s,
- "SELECT asdf.abcde AS _1 FROM a AS asdf",
- dialect=dialect
- )
+ self.assert_compile(s,
+ 'SELECT asdf.abcde AS _1 FROM a AS asdf',
+ dialect=dialect)
compiled = s.compile(dialect=dialect)
assert set(compiled.result_map['_1'][1]).issuperset([
- 'asdf_abcde',
- a1.c.abcde,
- '_1'
- ])
-
-
+ 'asdf_abcde', a1.c.abcde, '_1'])
-from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL
-import datetime
from sqlalchemy import *
-from sqlalchemy import exc, sql, util
-from sqlalchemy.engine import default, base
from sqlalchemy import testing
-from sqlalchemy.testing import fixtures
-from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.dialects import mysql
+from sqlalchemy.testing import AssertsCompiledSQL, eq_, fixtures
+from sqlalchemy.testing.schema import Table, Column
+
class _UpdateFromTestBase(object):
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('name', String(30), nullable=False),
- )
+ test_needs_autoincrement=True),
+ Column('name', String(30), nullable=False))
Table('addresses', metadata,
Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
Column('name', String(30), nullable=False),
- Column('email_address', String(50), nullable=False),
- )
+ Column('email_address', String(50), nullable=False))
- Table("dingalings", metadata,
+ Table('dingalings', metadata,
Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ test_needs_autoincrement=True),
Column('address_id', None, ForeignKey('addresses.id')),
- Column('data', String(30)),
- )
+ Column('data', String(30)))
@classmethod
def fixtures(cls):
return dict(
- users = (
+ users=(
('id', 'name'),
(7, 'jack'),
(8, 'ed'),
(9, 'fred'),
(10, 'chuck')
),
-
addresses = (
('id', 'user_id', 'name', 'email_address'),
- (1, 7, 'x', "jack@bean.com"),
- (2, 8, 'x', "ed@wood.com"),
- (3, 8, 'x', "ed@bettyboop.com"),
- (4, 8, 'x', "ed@lala.com"),
- (5, 9, 'x', "fred@fred.com")
+ (1, 7, 'x', 'jack@bean.com'),
+ (2, 8, 'x', 'ed@wood.com'),
+ (3, 8, 'x', 'ed@bettyboop.com'),
+ (4, 8, 'x', 'ed@lala.com'),
+ (5, 9, 'x', 'fred@fred.com')
),
dingalings = (
('id', 'address_id', 'data'),
)
-class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
+class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest,
+ AssertsCompiledSQL):
__dialect__ = 'default'
run_create_tables = run_inserts = run_deletes = None
def test_render_table(self):
users, addresses = self.tables.users, self.tables.addresses
+
self.assert_compile(
- users.update().\
- values(name='newname').\
- where(users.c.id==addresses.c.user_id).\
- where(addresses.c.email_address=='e1'),
- "UPDATE users SET name=:name FROM addresses "
- "WHERE users.id = addresses.user_id AND "
- "addresses.email_address = :email_address_1",
- checkparams={u'email_address_1': 'e1', 'name': 'newname'}
- )
+ users.update().
+ values(name='newname').
+ where(users.c.id == addresses.c.user_id).
+ where(addresses.c.email_address == 'e1'),
+ 'UPDATE users '
+ 'SET name=:name FROM addresses '
+ 'WHERE '
+ 'users.id = addresses.user_id AND '
+ 'addresses.email_address = :email_address_1',
+ checkparams={u'email_address_1': 'e1', 'name': 'newname'})
def test_render_multi_table(self):
- users, addresses, dingalings = \
- self.tables.users, \
- self.tables.addresses, \
- self.tables.dingalings
+ users = self.tables.users
+ addresses = self.tables.addresses
+ dingalings = self.tables.dingalings
+
+ checkparams = {
+ u'email_address_1': 'e1',
+ u'id_1': 2,
+ 'name': 'newname'
+ }
+
self.assert_compile(
- users.update().\
- values(name='newname').\
- where(users.c.id==addresses.c.user_id).\
- where(addresses.c.email_address=='e1').\
- where(addresses.c.id==dingalings.c.address_id).\
- where(dingalings.c.id==2),
- "UPDATE users SET name=:name FROM addresses, "
- "dingalings WHERE users.id = addresses.user_id "
- "AND addresses.email_address = :email_address_1 "
- "AND addresses.id = dingalings.address_id AND "
- "dingalings.id = :id_1",
- checkparams={u'email_address_1': 'e1', u'id_1': 2,
- 'name': 'newname'}
- )
+ users.update().
+ values(name='newname').
+ where(users.c.id == addresses.c.user_id).
+ where(addresses.c.email_address == 'e1').
+ where(addresses.c.id == dingalings.c.address_id).
+ where(dingalings.c.id == 2),
+ 'UPDATE users '
+ 'SET name=:name '
+ 'FROM addresses, dingalings '
+ 'WHERE '
+ 'users.id = addresses.user_id AND '
+ 'addresses.email_address = :email_address_1 AND '
+ 'addresses.id = dingalings.address_id AND '
+ 'dingalings.id = :id_1',
+ checkparams=checkparams)
def test_render_table_mysql(self):
users, addresses = self.tables.users, self.tables.addresses
+
self.assert_compile(
- users.update().\
- values(name='newname').\
- where(users.c.id==addresses.c.user_id).\
- where(addresses.c.email_address=='e1'),
- "UPDATE users, addresses SET users.name=%s "
- "WHERE users.id = addresses.user_id AND "
- "addresses.email_address = %s",
+ users.update().
+ values(name='newname').
+ where(users.c.id == addresses.c.user_id).
+ where(addresses.c.email_address == 'e1'),
+ 'UPDATE users, addresses '
+ 'SET users.name=%s '
+ 'WHERE '
+ 'users.id = addresses.user_id AND '
+ 'addresses.email_address = %s',
checkparams={u'email_address_1': 'e1', 'name': 'newname'},
- dialect=mysql.dialect()
- )
+ dialect=mysql.dialect())
def test_render_subquery(self):
users, addresses = self.tables.users, self.tables.addresses
- subq = select([addresses.c.id,
- addresses.c.user_id,
- addresses.c.email_address]).\
- where(addresses.c.id==7).alias()
+
+ checkparams = {
+ u'email_address_1': 'e1',
+ u'id_1': 7,
+ 'name': 'newname'
+ }
+
+ cols = [
+ addresses.c.id,
+ addresses.c.user_id,
+ addresses.c.email_address
+ ]
+
+ subq = select(cols).where(addresses.c.id == 7).alias()
self.assert_compile(
- users.update().\
- values(name='newname').\
- where(users.c.id==subq.c.user_id).\
- where(subq.c.email_address=='e1'),
- "UPDATE users SET name=:name FROM "
- "(SELECT addresses.id AS id, addresses.user_id "
- "AS user_id, addresses.email_address AS "
- "email_address FROM addresses WHERE addresses.id = "
- ":id_1) AS anon_1 WHERE users.id = anon_1.user_id "
- "AND anon_1.email_address = :email_address_1",
- checkparams={u'email_address_1': 'e1',
- u'id_1': 7, 'name': 'newname'}
- )
+ users.update().
+ values(name='newname').
+ where(users.c.id == subq.c.user_id).
+ where(subq.c.email_address == 'e1'),
+ 'UPDATE users '
+ 'SET name=:name FROM ('
+ 'SELECT '
+ 'addresses.id AS id, '
+ 'addresses.user_id AS user_id, '
+ 'addresses.email_address AS email_address '
+ 'FROM addresses '
+ 'WHERE addresses.id = :id_1'
+ ') AS anon_1 '
+ 'WHERE users.id = anon_1.user_id '
+ 'AND anon_1.email_address = :email_address_1',
+ checkparams=checkparams)
+
class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):
@testing.requires.update_from
def test_exec_two_table(self):
users, addresses = self.tables.users, self.tables.addresses
+
testing.db.execute(
- addresses.update().\
- values(email_address=users.c.name).\
- where(users.c.id==addresses.c.user_id).\
- where(users.c.name=='ed')
- )
- eq_(
- testing.db.execute(
- addresses.select().\
- order_by(addresses.c.id)).fetchall(),
- [
- (1, 7, 'x', "jack@bean.com"),
- (2, 8, 'x', "ed"),
- (3, 8, 'x', "ed"),
- (4, 8, 'x', "ed"),
- (5, 9, 'x', "fred@fred.com")
- ]
- )
+ addresses.update().
+ values(email_address=users.c.name).
+ where(users.c.id == addresses.c.user_id).
+ where(users.c.name == 'ed'))
+
+ expected = [
+ (1, 7, 'x', 'jack@bean.com'),
+ (2, 8, 'x', 'ed'),
+ (3, 8, 'x', 'ed'),
+ (4, 8, 'x', 'ed'),
+ (5, 9, 'x', 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
@testing.requires.update_from
def test_exec_two_table_plus_alias(self):
users, addresses = self.tables.users, self.tables.addresses
- a1 = addresses.alias()
+ a1 = addresses.alias()
testing.db.execute(
- addresses.update().\
- values(email_address=users.c.name).\
- where(users.c.id==a1.c.user_id).\
- where(users.c.name=='ed').\
- where(a1.c.id==addresses.c.id)
- )
- eq_(
- testing.db.execute(
- addresses.select().\
- order_by(addresses.c.id)).fetchall(),
- [
- (1, 7, 'x', "jack@bean.com"),
- (2, 8, 'x', "ed"),
- (3, 8, 'x', "ed"),
- (4, 8, 'x', "ed"),
- (5, 9, 'x', "fred@fred.com")
- ]
+ addresses.update().
+ values(email_address=users.c.name).
+ where(users.c.id == a1.c.user_id).
+ where(users.c.name == 'ed').
+ where(a1.c.id == addresses.c.id)
)
+ expected = [
+ (1, 7, 'x', 'jack@bean.com'),
+ (2, 8, 'x', 'ed'),
+ (3, 8, 'x', 'ed'),
+ (4, 8, 'x', 'ed'),
+ (5, 9, 'x', 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
+
@testing.requires.update_from
def test_exec_three_table(self):
- users, addresses, dingalings = \
- self.tables.users, \
- self.tables.addresses, \
- self.tables.dingalings
+ users = self.tables.users
+ addresses = self.tables.addresses
+ dingalings = self.tables.dingalings
+
testing.db.execute(
- addresses.update().\
- values(email_address=users.c.name).\
- where(users.c.id==addresses.c.user_id).\
- where(users.c.name=='ed').
- where(addresses.c.id==dingalings.c.address_id).\
- where(dingalings.c.id==1),
- )
- eq_(
- testing.db.execute(
- addresses.select().order_by(addresses.c.id)
- ).fetchall(),
- [
- (1, 7, 'x', "jack@bean.com"),
- (2, 8, 'x', "ed"),
- (3, 8, 'x', "ed@bettyboop.com"),
- (4, 8, 'x', "ed@lala.com"),
- (5, 9, 'x', "fred@fred.com")
- ]
- )
+ addresses.update().
+ values(email_address=users.c.name).
+ where(users.c.id == addresses.c.user_id).
+ where(users.c.name == 'ed').
+ where(addresses.c.id == dingalings.c.address_id).
+ where(dingalings.c.id == 1))
+
+ expected = [
+ (1, 7, 'x', 'jack@bean.com'),
+ (2, 8, 'x', 'ed'),
+ (3, 8, 'x', 'ed@bettyboop.com'),
+ (4, 8, 'x', 'ed@lala.com'),
+ (5, 9, 'x', 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
@testing.only_on('mysql', 'Multi table update')
def test_exec_multitable(self):
users, addresses = self.tables.users, self.tables.addresses
+
+ values = {
+ addresses.c.email_address: users.c.name,
+ users.c.name: 'ed2'
+ }
+
testing.db.execute(
- addresses.update().\
- values({
- addresses.c.email_address:users.c.name,
- users.c.name:'ed2'
- }).\
- where(users.c.id==addresses.c.user_id).\
- where(users.c.name=='ed')
- )
- eq_(
- testing.db.execute(
- addresses.select().order_by(addresses.c.id)).fetchall(),
- [
- (1, 7, 'x', "jack@bean.com"),
- (2, 8, 'x', "ed"),
- (3, 8, 'x', "ed"),
- (4, 8, 'x', "ed"),
- (5, 9, 'x', "fred@fred.com")
- ]
- )
- eq_(
- testing.db.execute(
- users.select().order_by(users.c.id)).fetchall(),
- [
- (7, 'jack'),
- (8, 'ed2'),
- (9, 'fred'),
- (10, 'chuck')
- ]
- )
+ addresses.update().
+ values(values).
+ where(users.c.id == addresses.c.user_id).
+ where(users.c.name == 'ed'))
+
+ expected = [
+ (1, 7, 'x', 'jack@bean.com'),
+ (2, 8, 'x', 'ed'),
+ (3, 8, 'x', 'ed'),
+ (4, 8, 'x', 'ed'),
+ (5, 9, 'x', 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
+
+ expected = [
+ (7, 'jack'),
+ (8, 'ed2'),
+ (9, 'fred'),
+ (10, 'chuck')]
+ self._assert_users(users, expected)
+
+ def _assert_addresses(self, addresses, expected):
+ stmt = addresses.select().order_by(addresses.c.id)
+ eq_(testing.db.execute(stmt).fetchall(), expected)
+
+ def _assert_users(self, users, expected):
+ stmt = users.select().order_by(users.c.id)
+ eq_(testing.db.execute(stmt).fetchall(), expected)
+
-class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest):
+class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase,
+ fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ test_needs_autoincrement=True),
Column('name', String(30), nullable=False),
- Column('some_update', String(30), onupdate="im the update")
- )
+ Column('some_update', String(30), onupdate='im the update'))
Table('addresses', metadata,
Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
- Column('email_address', String(50), nullable=False),
- )
+ Column('email_address', String(50), nullable=False))
@classmethod
def fixtures(cls):
return dict(
- users = (
+ users=(
('id', 'name', 'some_update'),
(8, 'ed', 'value'),
(9, 'fred', 'value'),
),
-
- addresses = (
+ addresses=(
('id', 'user_id', 'email_address'),
- (2, 8, "ed@wood.com"),
- (3, 8, "ed@bettyboop.com"),
- (4, 9, "fred@fred.com")
+ (2, 8, 'ed@wood.com'),
+ (3, 8, 'ed@bettyboop.com'),
+ (4, 9, 'fred@fred.com')
),
)
@testing.only_on('mysql', 'Multi table update')
def test_defaults_second_table(self):
users, addresses = self.tables.users, self.tables.addresses
+
+ values = {
+ addresses.c.email_address: users.c.name,
+ users.c.name: 'ed2'
+ }
+
ret = testing.db.execute(
- addresses.update().\
- values({
- addresses.c.email_address:users.c.name,
- users.c.name:'ed2'
- }).\
- where(users.c.id==addresses.c.user_id).\
- where(users.c.name=='ed')
- )
- eq_(
- set(ret.prefetch_cols()),
- set([users.c.some_update])
- )
- eq_(
- testing.db.execute(
- addresses.select().order_by(addresses.c.id)).fetchall(),
- [
- (2, 8, "ed"),
- (3, 8, "ed"),
- (4, 9, "fred@fred.com")
- ]
- )
- eq_(
- testing.db.execute(
- users.select().order_by(users.c.id)).fetchall(),
- [
- (8, 'ed2', 'im the update'),
- (9, 'fred', 'value'),
- ]
- )
+ addresses.update().
+ values(values).
+ where(users.c.id == addresses.c.user_id).
+ where(users.c.name == 'ed'))
+
+ eq_(set(ret.prefetch_cols()), set([users.c.some_update]))
+
+ expected = [
+ (2, 8, 'ed'),
+ (3, 8, 'ed'),
+ (4, 9, 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
+
+ expected = [
+ (8, 'ed2', 'im the update'),
+ (9, 'fred', 'value')]
+ self._assert_users(users, expected)
@testing.only_on('mysql', 'Multi table update')
def test_no_defaults_second_table(self):
users, addresses = self.tables.users, self.tables.addresses
+
ret = testing.db.execute(
- addresses.update().\
- values({
- 'email_address':users.c.name,
- }).\
- where(users.c.id==addresses.c.user_id).\
- where(users.c.name=='ed')
- )
- eq_(
- ret.prefetch_cols(),[]
- )
- eq_(
- testing.db.execute(
- addresses.select().order_by(addresses.c.id)).fetchall(),
- [
- (2, 8, "ed"),
- (3, 8, "ed"),
- (4, 9, "fred@fred.com")
- ]
- )
- # users table not actually updated,
- # so no onupdate
- eq_(
- testing.db.execute(
- users.select().order_by(users.c.id)).fetchall(),
- [
- (8, 'ed', 'value'),
- (9, 'fred', 'value'),
- ]
- )
+ addresses.update().
+ values({'email_address': users.c.name}).
+ where(users.c.id == addresses.c.user_id).
+ where(users.c.name == 'ed'))
+
+ eq_(ret.prefetch_cols(), [])
+
+ expected = [
+ (2, 8, 'ed'),
+ (3, 8, 'ed'),
+ (4, 9, 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
+
+ # users table not actually updated, so no onupdate
+ expected = [
+ (8, 'ed', 'value'),
+ (9, 'fred', 'value')]
+ self._assert_users(users, expected)
+
+ def _assert_addresses(self, addresses, expected):
+ stmt = addresses.select().order_by(addresses.c.id)
+ eq_(testing.db.execute(stmt).fetchall(), expected)
+
+ def _assert_users(self, users, expected):
+ stmt = users.select().order_by(users.c.id)
+ eq_(testing.db.execute(stmt).fetchall(), expected)