"""
-import datetime, random, warnings
+import datetime, random, warnings, re
from sqlalchemy import sql, schema, ansisql, exceptions
import sqlalchemy.types as sqltypes
# print "LAST ROW ID", self._last_inserted_ids
self.HASIDENT = False
super(MSSQLExecutionContext, self).post_exec()
+
+ _ms_is_select = re.compile(r'\s*(?:SELECT|sp_columns)',
+ re.I | re.UNICODE)
+
+ def is_select(self):
+ return self._ms_is_select.match(self.statement) is not None
class MSSQLExecutionContext_pyodbc (MSSQLExecutionContext):
def preparer(self):
return MSSQLIdentifierPreparer(self)
- def get_default_schema_name(self):
+ def get_default_schema_name(self, connection):
return self.schema_name
def set_default_schema_name(self, schema_name):
def has_table(self, connection, tablename, schema=None):
import sqlalchemy.databases.information_schema as ischema
- current_schema = schema or self.get_default_schema_name()
+ current_schema = schema or self.get_default_schema_name(connection)
columns = self.uppercase_table(ischema.columns)
s = sql.select([columns],
current_schema
if table.schema is not None:
current_schema = table.schema
else:
- current_schema = self.get_default_schema_name()
+ current_schema = self.get_default_schema_name(connection)
columns = self.uppercase_table(ischema.columns)
s = sql.select([columns],
def visit_alias(self, alias, **kwargs):
# translate for schema-qualified table aliases
self.tablealiases[alias.original] = alias
+ kwargs['mssql_aliased'] = True
return super(MSSQLCompiler, self).visit_alias(alias, **kwargs)
def visit_column(self, column):
def filter_by(self, **kwargs):
"""apply the given filtering criterion to the query and return the newly resulting ``Query``."""
- #import properties
-
- joinpoint = self._joinpoint
-
- clauses = [joinpoint.get_property(key, resolve_synonyms=True).compare(operator.eq, value)
+ clauses = [self._joinpoint.get_property(key, resolve_synonyms=True).compare(operator.eq, value)
for key, value in kwargs.iteritems()]
return self.filter(sql.and_(*clauses))