}
)
- functions = util.update_copy(
- compiler.SQLCompiler.functions,
- {
- 'TIMESTAMP':lambda x:'TIMESTAMP %s' % x,
- }
- )
-
def post_process_text(self, text):
if '%%' in text:
util.warn("The SQLAlchemy postgres dialect now automatically escapes '%' in text() expressions to '%%'.")
self.do_begin(connection.connection)
def do_prepare_twophase(self, connection, xid):
- connection.execute(sql.text("PREPARE TRANSACTION :tid", bindparams=[sql.bindparam('tid', xid)]))
+ connection.execute("PREPARE TRANSACTION '%s'" % xid)
def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False):
if is_prepared:
if recover:
#FIXME: ugly hack to get out of transaction context when commiting recoverable transactions
# Must find out a way how to make the dbapi not open a transaction.
- connection.execute(sql.text("ROLLBACK"))
- connection.execute(sql.text("ROLLBACK PREPARED :tid", bindparams=[sql.bindparam('tid', xid)]))
- connection.execute(sql.text("BEGIN"))
+ connection.execute("ROLLBACK")
+ connection.execute("ROLLBACK PREPARED '%s'" % xid)
+ connection.execute("BEGIN")
self.do_rollback(connection.connection)
else:
self.do_rollback(connection.connection)
def do_commit_twophase(self, connection, xid, is_prepared=True, recover=False):
if is_prepared:
if recover:
- connection.execute(sql.text("ROLLBACK"))
- connection.execute(sql.text("COMMIT PREPARED :tid", bindparams=[sql.bindparam('tid', xid)]))
- connection.execute(sql.text("BEGIN"))
+ connection.execute("ROLLBACK")
+ connection.execute("COMMIT PREPARED '%s'" % xid)
+ connection.execute("BEGIN")
self.do_rollback(connection.connection)
else:
self.do_commit(connection.connection)
self.assertRaisesMessage(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'})
self.assertRaisesMessage(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'})
- try:
- table.insert().execute({'data':'d2'})
- assert False
- except exc.IntegrityError, e:
- assert "violates not-null constraint" in str(e)
- try:
- table.insert().execute({'data':'d2'}, {'data':'d3'})
- assert False
- except exc.IntegrityError, e:
- assert "violates not-null constraint" in str(e)
+ self.assertRaisesMessage(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'})
+
+ self.assertRaisesMessage(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'})
table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
table.insert(inline=True).execute({'id':33, 'data':'d4'})
Column('strarr', postgres.PGArray(String(convert_unicode=True)), nullable=False)
)
metadata.create_all()
+
+ def tearDown(self):
+ arrtable.delete().execute()
+
def tearDownAll(self):
metadata.drop_all()
self.assertEquals(len(results), 1)
self.assertEquals(results[0]['intarr'], [1,2,3])
self.assertEquals(results[0]['strarr'], ['abc','def'])
- arrtable.delete().execute()
+ @testing.fails_on('postgres+pg8000', 'pg8000 has poor support for PG arrays')
def test_array_where(self):
arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
arrtable.insert().execute(intarr=[4,5,6], strarr='ABC')
results = arrtable.select().where(arrtable.c.intarr == [1,2,3]).execute().fetchall()
self.assertEquals(len(results), 1)
self.assertEquals(results[0]['intarr'], [1,2,3])
- arrtable.delete().execute()
+ @testing.fails_on('postgres+pg8000', 'pg8000 has poor support for PG arrays')
def test_array_concat(self):
arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
results = select([arrtable.c.intarr + [4,5,6]]).execute().fetchall()
self.assertEquals(len(results), 1)
self.assertEquals(results[0][0], [1,2,3,4,5,6])
- arrtable.delete().execute()
+ @testing.fails_on('postgres+pg8000', 'pg8000 has poor support for PG arrays')
def test_array_subtype_resultprocessor(self):
arrtable.insert().execute(intarr=[4,5,6], strarr=[[u'm\xe4\xe4'], [u'm\xf6\xf6']])
arrtable.insert().execute(intarr=[1,2,3], strarr=[u'm\xe4\xe4', u'm\xf6\xf6'])
self.assertEquals(len(results), 2)
self.assertEquals(results[0]['strarr'], [u'm\xe4\xe4', u'm\xf6\xf6'])
self.assertEquals(results[1]['strarr'], [[u'm\xe4\xe4'], [u'm\xf6\xf6']])
- arrtable.delete().execute()
+ @testing.fails_on('postgres+pg8000', 'pg8000 has poor support for PG arrays')
def test_array_mutability(self):
class Foo(object): pass
footable = Table('foo', metadata,
sess.add(foo)
sess.flush()
-class TimeStampTest(TestBase, AssertsExecutionResults):
+class TimestampTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgres'
+
def test_timestamp(self):
engine = testing.db
connection = engine.connect()
- s = select([func.TIMESTAMP("12/25/07").label("ts")])
+
+ s = select(["timestamp '12/25/07'"])
result = connection.execute(s).fetchone()
self.assertEqual(result[0], datetime.datetime(2007, 12, 25, 0, 0))
def tearDownAll(self):
metadata.drop_all()
- def test_expression(self):
+ @testing.fails_on('postgres+pg8000', 'uses positional')
+ def test_expression_pyformat(self):
self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%(title_1)s)")
+ @testing.fails_on('postgres+psycopg2', 'uses pyformat')
+ def test_expression_positional(self):
+ self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%s)")
+
def test_simple_match(self):
results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall()
self.assertEquals([2, 5], [r.id for r in results])
# pyformat is supported for mysql, but skipping because a few driver
# versions have a bug that bombs out on this test. (1.2.2b3, 1.2.2c1, 1.2.2)
@testing.skip_if(lambda: testing.against('mysql+mysqldb'), 'db-api flaky')
- @testing.fails_on_everything_except('postgres')
+ @testing.fails_on_everything_except('postgres+psycopg2')
def test_raw_python(self):
for conn in (testing.db, testing.db.connect()):
conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':1, 'name':'jack'})
return execute(clauseelement, *multiparams, **params)
def cursor_execute(self, execute, cursor, statement, parameters, context, executemany):
+ print "CE", statement, parameters
cursor_stmts.append(
(statement, parameters, None)
)
if engine.dialect.preexecute_pk_sequences:
cursor = [
- ("CREATE TABLE t1", {}, None),
+ ("CREATE TABLE t1", {}, ()),
("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, [5, 'some data']),
("SELECT lower", {'lower_2':'Foo'}, ['Foo']),
("INSERT INTO t1 (c1, c2)", {'c2': 'foo', 'c1': 6}, [6, 'foo']),
- ("select * from t1", {}, None),
- ("DROP TABLE t1", {}, None)
+ ("select * from t1", {}, ()),
+ ("DROP TABLE t1", {}, ())
]
else:
cursor = [
"""
- __only_on__ = 'postgres'
+ __only_on__ = 'postgres+psycopg2'
__skip_if__ = ((lambda: sys.version_info < (2, 4)), )
def test_baseline_0_setup(self):
"""
- __only_on__ = 'postgres'
+ __only_on__ = 'postgres+psycopg2'
__skip_if__ = ((lambda: sys.version_info < (2, 5)), ) # TODO: get 2.4 support
def test_baseline_0_setup(self):
import __builtin__
-__all__ = '_function_named', 'callable', 'py3k', 'jython'
+__all__ = '_function_named', 'callable', 'py3k', 'jython', 'to_list'
py3k = getattr(sys, 'py3kwarning', False) or sys.version_info >= (3, 0)
fn.func_defaults, fn.func_closure)
return fn
+def to_list(x, default=None):
+ if x is None:
+ return default
+ if not isinstance(x, (list, tuple)):
+ return [x]
+ else:
+ return x
+
if py3k:
def callable(fn):
return hasattr(fn, '__call__')
self.proxy_refs[con_proxy] = True
def _apply_all(self, methods):
- for rec in self.proxy_refs:
+ for rec in list(self.proxy_refs):
if rec is not None and rec.is_valid:
try:
for name in methods:
fn,
no_support('access', 'not supported by database'),
no_support('firebird', 'no SA implementation'),
- no_support('+pg8000', 'FIXME: not sure how to accomplish'),
no_support('maxdb', 'not supported by database'),
no_support('mssql', 'FIXME: guessing, needs confirmation'),
no_support('oracle', 'no SA implementation'),
if check(test_suite)() != 'ok':
# The requirement will perform messaging.
return True
- if (hasattr(cls, '__unsupported_on__') and
- config.db.name in cls.__unsupported_on__):
- print "'%s' unsupported on DB implementation '%s'" % (
- cls.__class__.__name__, config.db.name)
- return True
- if (getattr(cls, '__only_on__', None) not in (None,config.db.name)):
- print "'%s' unsupported on DB implementation '%s'" % (
- cls.__class__.__name__, config.db.name)
- return True
+ if cls.__unsupported_on__:
+ spec = db_spec(*cls.__unsupported_on__)
+ if spec(config.db):
+ print "'%s' unsupported on DB implementation '%s'" % (
+ cls.__class__.__name__, config.db.name)
+ return True
+ if getattr(cls, '__only_on__', None):
+ spec = db_spec(*to_list(cls.__only_on__))
+ if not spec(config.db):
+ print "'%s' unsupported on DB implementation '%s'" % (
+ cls.__class__.__name__, config.db.name)
+ return True
if (getattr(cls, '__skip_if__', False)):
for c in getattr(cls, '__skip_if__'):
if c():