stmt = text("select cast('hi' as char) as hi").columns(hi=Numeric)
assert_raises(exc.InvalidRequestError, connection.execute, stmt)
- @testing.only_on("postgresql+psycopg2")
- def test_serial_integer(self):
- class BITD(TypeDecorator):
- impl = Integer
-
- cache_ok = True
-
- def load_dialect_impl(self, dialect):
- if dialect.name == "postgresql":
- return BigInteger()
- else:
- return Integer()
-
- for version, type_, expected in [
- (None, Integer, "SERIAL"),
- (None, BigInteger, "BIGSERIAL"),
- ((9, 1), SmallInteger, "SMALLINT"),
- ((9, 2), SmallInteger, "SMALLSERIAL"),
- (None, postgresql.INTEGER, "SERIAL"),
- (None, postgresql.BIGINT, "BIGSERIAL"),
- (
- None,
- Integer().with_variant(BigInteger(), "postgresql"),
- "BIGSERIAL",
- ),
- (
- None,
- Integer().with_variant(postgresql.BIGINT, "postgresql"),
- "BIGSERIAL",
- ),
- (
- (9, 2),
- Integer().with_variant(SmallInteger, "postgresql"),
- "SMALLSERIAL",
- ),
- (None, BITD(), "BIGSERIAL"),
- ]:
- m = MetaData()
+ @testing.combinations(
+ (None, Integer, "SERIAL"),
+ (None, BigInteger, "BIGSERIAL"),
+ ((9, 1), SmallInteger, "SMALLINT"),
+ ((9, 2), SmallInteger, "SMALLSERIAL"),
+ (None, SmallInteger, "SMALLSERIAL"),
+ (None, postgresql.INTEGER, "SERIAL"),
+ (None, postgresql.BIGINT, "BIGSERIAL"),
+ (
+ None,
+ Integer().with_variant(BigInteger(), "postgresql"),
+ "BIGSERIAL",
+ ),
+ (
+ None,
+ Integer().with_variant(postgresql.BIGINT, "postgresql"),
+ "BIGSERIAL",
+ ),
+ (
+ (9, 2),
+ Integer().with_variant(SmallInteger, "postgresql"),
+ "SMALLSERIAL",
+ ),
+ (None, "BITD()", "BIGSERIAL"),
+ argnames="version, type_, expected",
+ )
+ def test_serial_integer(self, version, type_, expected, testing_engine):
+ if type_ == "BITD()":
- t = Table("t", m, Column("c", type_, primary_key=True))
+ class BITD(TypeDecorator):
+ impl = Integer
- if version:
- dialect = testing.db.dialect.__class__()
- dialect._get_server_version_info = mock.Mock(
- return_value=version
- )
- dialect.initialize(testing.db.connect())
- else:
- dialect = testing.db.dialect
+ cache_ok = True
- ddl_compiler = dialect.ddl_compiler(dialect, schema.CreateTable(t))
- eq_(
- ddl_compiler.get_column_specification(t.c.c),
- "c %s NOT NULL" % expected,
- )
+ def load_dialect_impl(self, dialect):
+ if dialect.name == "postgresql":
+ return BigInteger()
+ else:
+ return Integer()
+
+ type_ = BITD()
+ t = Table("t", MetaData(), Column("c", type_, primary_key=True))
+
+ if version:
+ engine = testing_engine()
+ dialect = engine.dialect
+ dialect._get_server_version_info = mock.Mock(return_value=version)
+ engine.connect().close() # initialize the dialect
+ else:
+ dialect = testing.db.dialect
+
+ ddl_compiler = dialect.ddl_compiler(dialect, schema.CreateTable(t))
+ eq_(
+ ddl_compiler.get_column_specification(t.c.c),
+ "c %s NOT NULL" % expected,
+ )
@testing.requires.psycopg2_compatibility
def test_initial_transaction_state_psycopg2(self):