class SQLiteDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
- if column.computed is not None:
- raise exc.CompileError("SQLite does not support computed columns")
coltype = self.dialect.type_compiler.process(
column.type, type_expression=column
colspec += " AUTOINCREMENT"
+ if column.computed is not None:
+ colspec += " " + self.process(column.computed)
+
return colspec
def visit_primary_key_constraint(self, constraint):
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
+ pragma = "table_info"
+ # computed columns are threaded as hidden, they require table_xinfo
+ if self.server_version_info >= (3, 31):
+ pragma = "table_xinfo"
info = self._get_table_pragma(
- connection, "table_info", table_name, schema=schema
+ connection, pragma, table_name, schema=schema
)
-
columns = []
+ tablesql = None
for row in info:
- (name, type_, nullable, default, primary_key) = (
- row[1],
- row[2].upper(),
- not row[3],
- row[4],
- row[5],
- )
+ name = row[1]
+ type_ = row[2].upper()
+ nullable = not row[3]
+ default = row[4]
+ primary_key = row[5]
+ hidden = row[6] if pragma == "table_xinfo" else 0
+
+ # hidden has value 0 for normal columns, 1 for hidden columns,
+ # 2 for computed virtual columns and 3 for computed stored columns
+ # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b
+ if hidden == 1:
+ continue
+
+ generated = bool(hidden)
+ persisted = hidden == 3
+
+ if tablesql is None and generated:
+ tablesql = self._get_table_sql(
+ connection, table_name, schema, **kw
+ )
columns.append(
self._get_column_info(
- name, type_, nullable, default, primary_key
+ name,
+ type_,
+ nullable,
+ default,
+ primary_key,
+ generated,
+ persisted,
+ tablesql,
)
)
return columns
- def _get_column_info(self, name, type_, nullable, default, primary_key):
+ def _get_column_info(
+ self,
+ name,
+ type_,
+ nullable,
+ default,
+ primary_key,
+ generated,
+ persisted,
+ tablesql,
+ ):
+
+ if generated:
+ # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)"
+ # somehow is "INTEGER GENERATED ALWAYS"
+ type_ = re.sub("generated", "", type_, flags=re.IGNORECASE)
+ type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip()
+
coltype = self._resolve_type_affinity(type_)
if default is not None:
default = util.text_type(default)
- return {
+ colspec = {
"name": name,
"type": coltype,
"nullable": nullable,
"autoincrement": "auto",
"primary_key": primary_key,
}
+ if generated:
+ sqltext = ""
+ if tablesql:
+ pattern = r"[^,]*\s+AS\s+\(([^,]*)\)\s*(?:virtual|stored)?"
+ match = re.search(
+ re.escape(name) + pattern, tablesql, re.IGNORECASE
+ )
+ if match:
+ sqltext = match.group(1)
+ colspec["computed"] = {"sqltext": sqltext, "persisted": persisted}
+ return colspec
def _resolve_type_affinity(self, type_):
"""Return a data type from a reflected column, using affinity tules.
"""test non-quoted integer value on older sqlite pragma"""
dialect = sqlite.dialect()
- info = dialect._get_column_info("foo", "INTEGER", False, 3, False)
+ info = dialect._get_column_info(
+ "foo", "INTEGER", False, 3, False, False, False, None
+ )
eq_(info["default"], "3")
-class DialectTest(fixtures.TestBase, AssertsExecutionResults):
+class DialectTest(
+ fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
+):
__only_on__ = "sqlite"
eq_(d.create_connect_args(url), expected)
@testing.combinations(
- ("no_persisted", "ignore"),
- ("persisted_none", None),
- ("persisted_true", True),
- ("persisted_false", False),
- id_="ia",
+ ("no_persisted", "", "ignore"),
+ ("persisted_none", "", None),
+ ("persisted_true", " STORED", True),
+ ("persisted_false", " VIRTUAL", False),
+ id_="iaa",
)
- def test_column_computed(self, persisted):
+ def test_column_computed(self, text, persisted):
m = MetaData()
kwargs = {"persisted": persisted} if persisted != "ignore" else {}
t = Table(
Column("x", Integer),
Column("y", Integer, Computed("x + 2", **kwargs)),
)
- assert_raises_message(
- exc.CompileError,
- "SQLite does not support computed columns",
- schema.CreateTable(t).compile,
- dialect=sqlite.dialect(),
+ self.assert_compile(
+ schema.CreateTable(t),
+ "CREATE TABLE t (x INTEGER,"
+ " y INTEGER GENERATED ALWAYS AS (x + 2)%s)" % text,
)