From: Mike Bayer Date: Thu, 17 Nov 2022 01:11:18 +0000 (-0500) Subject: accommodate NULL format_type() X-Git-Tag: rel_2_0_0b4~51 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=200e70b9745f1f344be4a35bb8f2b5f01b40d467;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git accommodate NULL format_type() Made an adjustment to how the PostgreSQL dialect considers column types when it reflects columns from a table, to accommodate for alternative backends which may return NULL from the PG ``format_type()`` function. Fixes: #8748 Change-Id: I6178287aac567210a76afaa5805b825daa7fa4db --- diff --git a/doc/build/changelog/unreleased_14/8748.rst b/doc/build/changelog/unreleased_14/8748.rst new file mode 100644 index 0000000000..27e0679227 --- /dev/null +++ b/doc/build/changelog/unreleased_14/8748.rst @@ -0,0 +1,7 @@ +.. change:: + :tags: bug postgresql + :tickets: 8748 + + Made an adjustment to how the PostgreSQL dialect considers column types + when it reflects columns from a table, to accommodate for alternative + backends which may return NULL from the PG ``format_type()`` function. diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index a908ed6b78..41064de107 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -3489,12 +3489,19 @@ class PGDialect(default.DefaultDialect): generated = row_dict["generated"] identity = row_dict["identity_options"] - # strip (*) from character varying(5), timestamp(5) - # with time zone, geometry(POLYGON), etc. - attype = attype_pattern.sub("", format_type) + if format_type is None: + no_format_type = True + attype = format_type = "no format_type()" + is_array = False + else: + no_format_type = False + + # strip (*) from character varying(5), timestamp(5) + # with time zone, geometry(POLYGON), etc. + attype = attype_pattern.sub("", format_type) - # strip '[]' from integer[], etc. and check if an array - attype, is_array = _handle_array_type(attype) + # strip '[]' from integer[], etc. and check if an array + attype, is_array = _handle_array_type(attype) # strip quotes from case sensitive enum or domain names enum_or_domain_key = tuple(util.quoted_token_parser(attype)) @@ -3589,6 +3596,12 @@ class PGDialect(default.DefaultDialect): coltype = coltype(*args, **kwargs) if is_array: coltype = self.ischema_names["_array"](coltype) + elif no_format_type: + util.warn( + "PostgreSQL format_type() returned NULL for column '%s'" + % (name,) + ) + coltype = sqltypes.NULLTYPE else: util.warn( "Did not recognize type '%s' of column '%s'" diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py index f0893d822b..481924c381 100644 --- a/test/dialect/postgresql/test_reflection.py +++ b/test/dialect/postgresql/test_reflection.py @@ -43,9 +43,11 @@ from sqlalchemy.testing.assertions import AssertsExecutionResults from sqlalchemy.testing.assertions import ComparesIndexes from sqlalchemy.testing.assertions import eq_ from sqlalchemy.testing.assertions import expect_raises +from sqlalchemy.testing.assertions import expect_warnings from sqlalchemy.testing.assertions import is_ from sqlalchemy.testing.assertions import is_false from sqlalchemy.testing.assertions import is_true +from sqlalchemy.types import NullType class ReflectionFixtures: @@ -2305,6 +2307,35 @@ class CustomTypeReflectionTest(fixtures.TestBase): dialect.ischema_names["my_custom_type"] = self.CustomType self._assert_reflected(dialect) + def test_no_format_type(self): + """test #8748""" + + dialect = postgresql.PGDialect() + dialect.ischema_names = dialect.ischema_names.copy() + dialect.ischema_names["my_custom_type"] = self.CustomType + + with expect_warnings( + r"PostgreSQL format_type\(\) returned NULL for column 'colname'" + ): + row_dict = { + "name": "colname", + "table_name": "tblname", + "format_type": None, + "default": None, + "not_null": False, + "comment": None, + "generated": "", + "identity_options": None, + } + column_info = dialect._get_columns_info( + [row_dict], {}, {}, "public" + ) + assert ("public", "tblname") in column_info + column_info = column_info[("public", "tblname")] + assert len(column_info) == 1 + column_info = column_info[0] + assert isinstance(column_info["type"], NullType) + class IntervalReflectionTest(fixtures.TestBase): __only_on__ = "postgresql"