from sqlalchemy.testing import is_
from sqlalchemy.testing import expect_deprecated
from ...engine import test_execute
+from sqlalchemy import dialects
-class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+class DialectTest(fixtures.TestBase):
+ """python-side dialect tests. """
- __only_on__ = 'postgresql'
- __backend__ = True
-
- @testing.provide_metadata
- def test_date_reflection(self):
- metadata = self.metadata
- Table(
- 'pgdate', metadata, Column('date1', DateTime(timezone=True)),
- Column('date2', DateTime(timezone=False)))
- metadata.create_all()
- m2 = MetaData(testing.db)
- t2 = Table('pgdate', m2, autoload=True)
- assert t2.c.date1.type.timezone is True
- assert t2.c.date2.type.timezone is False
-
- @testing.fails_on('+zxjdbc',
- 'The JDBC driver handles the version parsing')
def test_version_parsing(self):
def mock_conn(res):
return Mock(
execute=Mock(return_value=Mock(scalar=Mock(return_value=res))))
+ dialect = postgresql.dialect()
for string, version in [
(
'PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by '
'compiled by gcc (GCC) 4.8.5 20150623 '
'(Red Hat 4.8.5-11), 64-bit', (10,))
]:
- eq_(testing.db.dialect._get_server_version_info(mock_conn(string)),
+ eq_(dialect._get_server_version_info(mock_conn(string)),
version)
+ def test_deprecated_dialect_name_still_loads(self):
+ dialects.registry.clear()
+ with expect_deprecated(
+ "The 'postgres' dialect name "
+ "has been renamed to 'postgresql'"):
+ dialect = url.URL("postgres").get_dialect()
+ is_(dialect, postgresql.dialect)
+
+ @testing.requires.psycopg2_compatibility
+ def test_pg_dialect_use_native_unicode_from_config(self):
+ config = {
+ 'sqlalchemy.url': testing.db.url,
+ 'sqlalchemy.use_native_unicode': "false"}
+
+ e = engine_from_config(config, _initialize=False)
+ eq_(e.dialect.use_native_unicode, False)
+
+ config = {
+ 'sqlalchemy.url': testing.db.url,
+ 'sqlalchemy.use_native_unicode': "true"}
+
+ e = engine_from_config(config, _initialize=False)
+ eq_(e.dialect.use_native_unicode, True)
+
+
+class MiscBackendTest(
+ fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+
+ __only_on__ = 'postgresql'
+ __backend__ = True
+
+ @testing.provide_metadata
+ def test_date_reflection(self):
+ metadata = self.metadata
+ Table(
+ 'pgdate', metadata, Column('date1', DateTime(timezone=True)),
+ Column('date2', DateTime(timezone=False)))
+ metadata.create_all()
+ m2 = MetaData(testing.db)
+ t2 = Table('pgdate', m2, autoload=True)
+ assert t2.c.date1.type.timezone is True
+ assert t2.c.date2.type.timezone is False
+
@testing.requires.psycopg2_compatibility
def test_psycopg2_version(self):
v = testing.db.dialect.psycopg2_version
psycopg2.Error)
assert isinstance(exception, exc.OperationalError)
- def test_deprecated_dialect_name_still_loads(self):
- with expect_deprecated(
- "The 'postgres' dialect name "
- "has been renamed to 'postgresql'"):
- dialect = url.URL("postgres").get_dialect()
- is_(dialect, postgresql.dialect)
-
# currently not passing with pg 9.3 that does not seem to generate
# any notices here, would rather find a way to mock this
@testing.requires.no_coverage
new_encoding = c.execute("show client_encoding").fetchone()[0]
eq_(new_encoding, test_encoding)
- @testing.requires.psycopg2_compatibility
- def test_pg_dialect_use_native_unicode_from_config(self):
- config = {
- 'sqlalchemy.url': testing.db.url,
- 'sqlalchemy.use_native_unicode': "false"}
-
- e = engine_from_config(config, _initialize=False)
- eq_(e.dialect.use_native_unicode, False)
-
- config = {
- 'sqlalchemy.url': testing.db.url,
- 'sqlalchemy.use_native_unicode': "true"}
-
- e = engine_from_config(config, _initialize=False)
- eq_(e.dialect.use_native_unicode, True)
-
@testing.requires.psycopg2_or_pg8000_compatibility
@engines.close_open_connections
def test_autocommit_isolation_level(self):
).scalar()
eq_(r, exp)
+ @testing.provide_metadata
def test_checksfor_sequence(self):
- meta1 = MetaData(testing.db)
+ meta1 = self.metadata
seq = Sequence('fooseq')
t = Table(
'mytable', meta1,
Column('col1', Integer, seq)
)
seq.drop()
- try:
- testing.db.execute('CREATE SEQUENCE fooseq')
- t.create(checkfirst=True)
- finally:
- t.drop(checkfirst=True)
+ testing.db.execute('CREATE SEQUENCE fooseq')
+ t.create(checkfirst=True)
+ @testing.provide_metadata
def test_schema_roundtrips(self):
- meta = MetaData(testing.db)
+ meta = self.metadata
users = Table(
'users', meta, Column(
'id', Integer, primary_key=True), Column(
'name', String(50)), schema='test_schema')
users.create()
- try:
- users.insert().execute(id=1, name='name1')
- users.insert().execute(id=2, name='name2')
- users.insert().execute(id=3, name='name3')
- users.insert().execute(id=4, name='name4')
- eq_(users.select().where(users.c.name == 'name2')
- .execute().fetchall(), [(2, 'name2')])
- eq_(users.select(use_labels=True).where(
- users.c.name == 'name2').execute().fetchall(), [(2, 'name2')])
- users.delete().where(users.c.id == 3).execute()
- eq_(users.select().where(users.c.name == 'name3')
- .execute().fetchall(), [])
- users.update().where(users.c.name == 'name4'
- ).execute(name='newname')
- eq_(users.select(use_labels=True).where(
- users.c.id == 4).execute().fetchall(), [(4, 'newname')])
- finally:
- users.drop()
+ users.insert().execute(id=1, name='name1')
+ users.insert().execute(id=2, name='name2')
+ users.insert().execute(id=3, name='name3')
+ users.insert().execute(id=4, name='name4')
+ eq_(users.select().where(users.c.name == 'name2')
+ .execute().fetchall(), [(2, 'name2')])
+ eq_(users.select(use_labels=True).where(
+ users.c.name == 'name2').execute().fetchall(), [(2, 'name2')])
+ users.delete().where(users.c.id == 3).execute()
+ eq_(users.select().where(users.c.name == 'name3')
+ .execute().fetchall(), [])
+ users.update().where(users.c.name == 'name4'
+ ).execute(name='newname')
+ eq_(users.select(use_labels=True).where(
+ users.c.id == 4).execute().fetchall(), [(4, 'newname')])
def test_preexecute_passivedefault(self):
"""test that when we get a primary key column back from