supports_unicode_binds = True
postfetch_lastrowid = True
_supports_offset_fetch = False
+ _supports_nvarchar_max = False
server_version_info = ()
finally:
cursor.close()
else:
+ # note that the NotImplementedError is caught by
+ # DefaultDialect, so the warning here is all that displays
util.warn(
"Could not fetch transaction isolation level, "
"tried views: %s; final error was: %s" % (views, last_error)
)
-
raise NotImplementedError(
"Can't fetch isolation level on this particular "
- "SQL Server version"
+ "SQL Server version. tried views: %s; final error was: %s"
+ % (views, last_error)
)
def initialize(self, connection):
super(MSDialect, self).initialize(connection)
self._setup_version_attributes()
+ self._setup_supports_nvarchar_max(connection)
def on_connect(self):
if self.isolation_level is not None:
self.server_version_info and self.server_version_info[0] >= 11
)
+ def _setup_supports_nvarchar_max(self, connection):
+ try:
+ connection.scalar(
+ sql.text("SELECT CAST('test max support' AS NVARCHAR(max))")
+ )
+ except exc.DBAPIError:
+ self._supports_nvarchar_max = False
+ else:
+ self._supports_nvarchar_max = True
+
def _get_default_schema_name(self, connection):
if self.server_version_info < MS_2005_VERSION:
return self.schema_name
columns.c.table_schema == owner,
)
table_fullname = "%s.%s" % (owner, tablename)
- concat = func.concat(
- columns.c.table_schema, ".", columns.c.table_name
- )
- join_on = computed_cols.c.object_id == func.object_id(concat)
+ full_name = columns.c.table_schema + "." + columns.c.table_name
+ join_on = computed_cols.c.object_id == func.object_id(full_name)
else:
whereclause = columns.c.table_name == tablename
table_fullname = tablename
join_on, columns.c.column_name == computed_cols.c.name
)
join = columns.join(computed_cols, onclause=join_on, isouter=True)
+
+ if self._supports_nvarchar_max:
+ computed_definition = computed_cols.c.definition
+ else:
+ # tds_version 4.2 does not support NVARCHAR(MAX)
+ computed_definition = sql.cast(
+ computed_cols.c.definition, NVARCHAR(4000)
+ )
+
s = sql.select(
- [
- columns,
- computed_cols.c.definition,
- computed_cols.c.is_persisted,
- ],
+ [columns, computed_definition, computed_cols.c.is_persisted],
whereclause,
from_obj=join,
order_by=[columns.c.ordinal_position],
numericscale = row[columns.c.numeric_scale]
default = row[columns.c.column_default]
collation = row[columns.c.collation_name]
- definition = row[computed_cols.c.definition]
+ definition = row[computed_definition]
is_persisted = row[computed_cols.c.is_persisted]
coltype = self.ischema_names.get(type_, None)