return bool(cursor.scalar())
def _get_server_version_info(self, connection):
- v = connection.exec_driver_sql("select version()").scalar()
+ v = connection.exec_driver_sql("select pg_catalog.version()").scalar()
m = re.match(
r".*(?:PostgreSQL|EnterpriseDB) "
r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
from sqlalchemy import cast
from sqlalchemy import Column
from sqlalchemy import DateTime
+from sqlalchemy import DDL
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import extract
]:
eq_(dialect._get_server_version_info(mock_conn(string)), version)
+ @testing.only_on("postgresql")
+ def test_ensure_version_is_qualified(
+ self, future_connection, testing_engine, metadata
+ ):
+
+ default_schema_name = future_connection.dialect.default_schema_name
+ event.listen(
+ metadata,
+ "after_create",
+ DDL(
+ """
+CREATE OR REPLACE FUNCTION %s.version() RETURNS integer AS $$
+BEGIN
+ return 0;
+END;
+$$ LANGUAGE plpgsql;"""
+ % (default_schema_name,)
+ ),
+ )
+ event.listen(
+ metadata,
+ "before_drop",
+ DDL("DROP FUNCTION %s.version" % (default_schema_name,)),
+ )
+
+ metadata.create_all(future_connection)
+ future_connection.commit()
+
+ e = testing_engine()
+
+ @event.listens_for(e, "do_connect")
+ def receive_do_connect(dialect, conn_rec, cargs, cparams):
+ conn = dialect.dbapi.connect(*cargs, **cparams)
+ cursor = conn.cursor()
+ cursor.execute(
+ "set search_path = %s,pg_catalog" % (default_schema_name,)
+ )
+ cursor.close()
+ return conn
+
+ with e.connect():
+ pass
+ eq_(
+ e.dialect.server_version_info,
+ future_connection.dialect.server_version_info,
+ )
+
@testing.requires.python3
@testing.requires.psycopg2_compatibility
def test_pg_dialect_no_native_unicode_in_python3(self, testing_engine):