-from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
+from sqlalchemy.testing import eq_, assert_raises_message, assert_raises
import datetime
from sqlalchemy.schema import CreateSequence, DropSequence
-from sqlalchemy.sql import select, text, literal_column
+from sqlalchemy.sql import select, text
import sqlalchemy as sa
from sqlalchemy import testing
from sqlalchemy.testing import engines
Sequence, func, literal, Unicode
from sqlalchemy.types import TypeDecorator, TypeEngine
from sqlalchemy.testing.schema import Table, Column
-from sqlalchemy.testing import eq_
from sqlalchemy.dialects import sqlite
from sqlalchemy.testing import fixtures
+t = f = f2 = ts = currenttime = metadata = default_generator = None
+
class DefaultTest(fixtures.TestBase):
@classmethod
db = testing.db
metadata = MetaData(db)
- default_generator = {'x':50}
+ default_generator = {'x': 50}
def mydefault():
default_generator['x'] += 1
def fn4(x=1, y=2, z=3):
eq_(x, 1)
fn5 = list
- class fn6(object):
+ class fn6a(object):
def __init__(self, x):
eq_(x, "context")
- class fn6(object):
+ class fn6b(object):
def __init__(self, x, y=3):
eq_(x, "context")
class FN7(object):
eq_(x, "context")
fn8 = FN8()
- for fn in fn1, fn2, fn3, fn4, fn5, fn6, fn7, fn8:
+ for fn in fn1, fn2, fn3, fn4, fn5, fn6a, fn6b, fn7, fn8:
c = sa.ColumnDefault(fn)
c.arg("context")
eq_(set(r.context.postfetch_cols),
set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
- eq_(t.select(t.c.col1==54).execute().fetchall(),
+ eq_(t.select(t.c.col1 == 54).execute().fetchall(),
[(54, 'imthedefault', f, ts, ts, ctexec, True, False,
12, today, None)])
def test_insertmany(self):
# MySQL-Python 1.2.2 breaks functions in execute_many :(
if (testing.against('mysql+mysqldb') and
- testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
+ testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
return
- r = t.insert().execute({}, {}, {})
+ t.insert().execute({}, {}, {})
ctexec = currenttime.scalar()
l = t.select().execute()
assert_raises_message(exc.StatementError,
"A value is required for bind parameter 'col7', in parameter group 1",
t.insert().execute,
- {'col4':7, 'col7':12, 'col8':19},
- {'col4':7, 'col8':19},
- {'col4':7, 'col7':12, 'col8':19},
+ {'col4': 7, 'col7': 12, 'col8': 19},
+ {'col4': 7, 'col8': 19},
+ {'col4': 7, 'col7': 12, 'col8': 19},
)
def test_insert_values(self):
- t.insert(values={'col3':50}).execute()
+ t.insert(values={'col3': 50}).execute()
l = t.select().execute()
eq_(50, l.first()['col3'])
def test_updatemany(self):
# MySQL-Python 1.2.2 breaks functions in execute_many :(
if (testing.against('mysql+mysqldb') and
- testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
+ testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
return
t.insert().execute({}, {}, {})
- t.update(t.c.col1==sa.bindparam('pkval')).execute(
- {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False})
+ t.update(t.c.col1 == sa.bindparam('pkval')).execute(
+ {'pkval': 51, 'col7': None, 'col8': None, 'boolcol1': False})
- t.update(t.c.col1==sa.bindparam('pkval')).execute(
- {'pkval':51,},
- {'pkval':52,},
- {'pkval':53,})
+ t.update(t.c.col1 == sa.bindparam('pkval')).execute(
+ {'pkval': 51},
+ {'pkval': 52},
+ {'pkval': 53})
l = t.select().execute()
ctexec = currenttime.scalar()
def test_update(self):
r = t.insert().execute()
pk = r.inserted_primary_key[0]
- t.update(t.c.col1==pk).execute(col4=None, col5=None)
+ t.update(t.c.col1 == pk).execute(col4=None, col5=None)
ctexec = currenttime.scalar()
- l = t.select(t.c.col1==pk).execute()
+ l = t.select(t.c.col1 == pk).execute()
l = l.first()
eq_(l,
(pk, 'im the update', f2, None, None, ctexec, True, False,
def test_update_values(self):
r = t.insert().execute()
pk = r.inserted_primary_key[0]
- t.update(t.c.col1==pk, values={'col3': 55}).execute()
- l = t.select(t.c.col1==pk).execute()
+ t.update(t.c.col1 == pk, values={'col3': 55}).execute()
+ l = t.select(t.c.col1 == pk).execute()
l = l.first()
eq_(55, l['col3'])
if not returning and not testing.db.dialect.implicit_returning:
engine = testing.db
else:
- engine = engines.testing_engine(options={'implicit_returning':returning})
+ engine = engines.testing_engine(
+ options={'implicit_returning': returning})
engine.execute(t2.insert(), nextid=1)
r = engine.execute(t1.insert(), data='hi')
eq_([1], r.inserted_primary_key)
self.assert_(last not in ids)
ids.add(last)
- rs = bind.execute(aitable.insert(values={'int1':func.length('four')}))
+ rs = bind.execute(aitable.insert(values={'int1': func.length('four')}))
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- eq_(ids, set([1,2,3,4]))
+ eq_(ids, set([1, 2, 3, 4]))
eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))),
[(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)])
class EmptyInsertTest(fixtures.TestBase):
@testing.exclude('sqlite', '<', (3, 3, 8), 'no empty insert support')
@testing.fails_on('oracle', 'FIXME: unknown')
+ @testing.provide_metadata
def test_empty_insert(self):
- metadata = MetaData(testing.db)
- t1 = Table('t1', metadata,
+ t1 = Table('t1', self.metadata,
Column('is_true', Boolean, server_default=('1')))
- metadata.create_all()
-
- try:
- result = t1.insert().execute()
- eq_(1, select([func.count(text('*'))], from_obj=t1).scalar())
- eq_(True, t1.select().scalar())
- finally:
- metadata.drop_all()
+ self.metadata.create_all()
+ t1.insert().execute()
+ eq_(1, select([func.count(text('*'))], from_obj=t1).scalar())
+ eq_(True, t1.select().scalar())
class AutoIncrementTest(fixtures.TablesTest):
__requires__ = ('identity',)
def test_autoincrement_ignore_fk(self):
m = MetaData()
- y = Table('y', m,
+ Table('y', m,
Column('id', Integer(), primary_key=True)
)
x = Table('x', m,
def test_autoincrement_fk_disqualifies(self):
m = MetaData()
- y = Table('y', m,
+ Table('y', m,
Column('id', Integer(), primary_key=True)
)
x = Table('x', m,
Column('data', String(20)))
nonai.create()
-
- try:
+ def go():
# postgresql + mysql strict will fail on first row,
# mysql in legacy mode fails on second row
nonai.insert().execute(data='row 1')
nonai.insert().execute(data='row 2')
- assert False
- except sa.exc.DBAPIError, e:
- assert True
+ assert_raises(
+ sa.exc.DBAPIError,
+ go
+ )
nonai.insert().execute(id=1, data='row 1')
@classmethod
def setup_class(cls):
- cls.seq= Sequence("my_sequence")
+ cls.seq = Sequence("my_sequence")
cls.seq.create(testing.db)
@classmethod
Column('x', Integer)
)
t1.create(testing.db)
- testing.db.execute(t1.insert(), [{'x':1}, {'x':300}, {'x':301}])
+ testing.db.execute(t1.insert(), [{'x': 1}, {'x': 300}, {'x': 301}])
s = Sequence("my_sequence")
eq_(
testing.db.execute(
pk_col=next_value(), implicit returning is not used."""
metadata = self.metadata
- e = engines.testing_engine(options={'implicit_returning':False})
+ e = engines.testing_engine(options={'implicit_returning': False})
s = Sequence("my_sequence")
metadata.bind = e
t1 = Table('t', metadata,
pk_col=next_value(), when implicit returning is used."""
metadata = self.metadata
- e = engines.testing_engine(options={'implicit_returning':True})
+ e = engines.testing_engine(options={'implicit_returning': True})
s = Sequence("my_sequence")
metadata.bind = e
t1 = Table('t', metadata,
Sequence("my_seq", optional=True)):
assert str(s.next_value().
compile(dialect=testing.db.dialect)) in (
- "nextval('my_seq')",
- "gen_id(my_seq, 1)",
- "my_seq.nextval",
- )
+ "nextval('my_seq')",
+ "gen_id(my_seq, 1)",
+ "my_seq.nextval",
+ )
def test_nextval_unsupported(self):
"""test next_value() used on non-sequence platform
def test_checkfirst_metadata(self):
m = MetaData()
- s = Sequence("my_sequence", metadata=m)
+ Sequence("my_sequence", metadata=m)
m.create_all(testing.db, checkfirst=False)
assert self._has_sequence('my_sequence')
m.create_all(testing.db, checkfirst=True)
@testing.provide_metadata
def test_table_overrides_metadata_create(self):
metadata = self.metadata
- s1 = Sequence("s1", metadata=metadata)
+ Sequence("s1", metadata=metadata)
s2 = Sequence("s2", metadata=metadata)
s3 = Sequence("s3")
t = Table('t', metadata,
assert not self._has_sequence('s1')
assert not self._has_sequence('s2')
-
+cartitems = sometable = metadata = None
class TableBoundSequenceTest(fixtures.TestBase):
__requires__ = ('sequences',)
Column("description", String(40)),
Column("createdate", sa.DateTime())
)
- sometable = Table( 'Manager', metadata,
- Column('obj_id', Integer, Sequence('obj_id_seq'), ),
- Column('name', String(128)),
- Column('id', Integer, Sequence('Manager_id_seq', optional=True),
+ sometable = Table('Manager', metadata,
+ Column('obj_id', Integer, Sequence('obj_id_seq')),
+ Column('name', String(128)),
+ Column('id', Integer, Sequence('Manager_id_seq', optional=True),
primary_key=True),
)
def test_seq_nonpk(self):
"""test sequences fire off as defaults on non-pk columns"""
- engine = engines.testing_engine(options={'implicit_returning':False})
+ engine = engines.testing_engine(
+ options={'implicit_returning': False})
result = engine.execute(sometable.insert(), name="somename")
assert set(result.postfetch_cols()) == set([sometable.c.obj_id])
assert set(result.postfetch_cols()) == set([sometable.c.obj_id])
sometable.insert().execute(
- {'name':'name3'},
- {'name':'name4'})
+ {'name': 'name3'},
+ {'name': 'name4'})
eq_(sometable.select().order_by(sometable.c.id).execute().fetchall(),
[(1, "somename", 1),
(2, "someother", 2),
r = t.insert().values(data=5).execute()
# we don't pre-fetch 'server_default'.
- if 'server_default' in kw and (not testing.db.dialect.implicit_returning or not implicit_returning):
+ if 'server_default' in kw and (not
+ testing.db.dialect.implicit_returning or
+ not implicit_returning):
eq_(r.inserted_primary_key, [None])
else:
eq_(r.inserted_primary_key, ['INT_1'])
@testing.provide_metadata
def test_int_default_none_on_insert_reflected(self):
metadata = self.metadata
- t = Table('x', metadata,
+ Table('x', metadata,
Column('y', Integer,
server_default='5', primary_key=True),
Column('data', String(10)),
class UnicodeDefaultsTest(fixtures.TestBase):
def test_no_default(self):
- c = Column(Unicode(32))
+ Column(Unicode(32))
def test_unicode_default(self):
# Py3K
# Py2K
default = u'foo'
# end Py2K
- c = Column(Unicode(32), default=default)
+ Column(Unicode(32), default=default)
def test_nonunicode_default(self):