From: Paul Johnston Date: Fri, 8 Feb 2008 16:48:37 +0000 (+0000) Subject: Fix: deletes with schemas on MSSQL 2000 [ticket:967] X-Git-Tag: rel_0_4_3~28 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=21c29768706ff47631ffae7a59858cf802fa3286;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Fix: deletes with schemas on MSSQL 2000 [ticket:967] --- diff --git a/lib/sqlalchemy/databases/mssql.py b/lib/sqlalchemy/databases/mssql.py index 2ec645eeaf..4d24f35fab 100644 --- a/lib/sqlalchemy/databases/mssql.py +++ b/lib/sqlalchemy/databases/mssql.py @@ -919,6 +919,8 @@ class MSSQLCompiler(compiler.DefaultCompiler): t = self._schema_aliased_table(column.table) if t is not None: return self.process(expression._corresponding_column_or_error(t, column)) + else: + kwargs['use_schema'] = True return super(MSSQLCompiler, self).visit_column(column, **kwargs) def visit_binary(self, binary, **kwargs): diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index 4e73221c12..43950a9a6e 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -240,12 +240,18 @@ class DefaultCompiler(engine.Compiled): return " ".join([self.process(label.obj), self.operator_string(operators.as_), self.preparer.format_label(label, labelname)]) - def visit_column(self, column, result_map=None, **kwargs): + def visit_column(self, column, result_map=None, use_schema=False, **kwargs): # there is actually somewhat of a ruleset when you would *not* necessarily # want to truncate a column identifier, if its mapped to the name of a # physical column. but thats very hard to identify at this point, and # the identifier length should be greater than the id lengths of any physical # columns so should not matter. + + if use_schema and getattr(column, 'table', None) and getattr(column.table, 'schema', None): + schema_prefix = self.preparer.quote(column.table, column.table.schema) + '.' + else: + schema_prefix = '' + if not column.is_literal: name = self._truncated_identifier("colident", column.name) else: @@ -260,11 +266,11 @@ class DefaultCompiler(engine.Compiled): if column.table is None or not column.table.named_with_column: return n else: - return self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + n + return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + n elif len(column.table.primary_key) != 0: pk = list(column.table.primary_key)[0] pkname = (pk.is_literal and name or self._truncated_identifier("colident", pk.name)) - return self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.preparer.quote(pk, pkname) + return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.preparer.quote(pk, pkname) else: return None elif column.table is None or not column.table.named_with_column: @@ -274,9 +280,9 @@ class DefaultCompiler(engine.Compiled): return self.preparer.quote(column, name) else: if getattr(column, "is_literal", False): - return self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.escape_literal_column(name) + return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.escape_literal_column(name) else: - return self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.preparer.quote(column, name) + return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.preparer.quote(column, name) def escape_literal_column(self, text): """provide escaping for the literal_column() construct.""" diff --git a/test/dialect/mssql.py b/test/dialect/mssql.py index 046fede102..5a7707052c 100755 --- a/test/dialect/mssql.py +++ b/test/dialect/mssql.py @@ -56,6 +56,11 @@ class CompileTest(SQLCompileTest): self.assert_compile(table1.join(table4, table1.c.myid==table4.c.rem_id).select(), "SELECT mytable.myid, mytable.name, mytable.description, remotetable_1.rem_id, remotetable_1.datatype_id, remotetable_1.value FROM mytable JOIN remote_owner.remotetable AS remotetable_1 ON remotetable_1.rem_id = mytable.myid") + def test_delete_schema(self): + metadata = MetaData() + tbl = Table('test', metadata, Column('id', Integer, primary_key=True), schema='paj') + self.assert_compile(tbl.delete(tbl.c.id == 1), "DELETE FROM paj.test WHERE paj.test.id = :test_id_1") + def test_union(self): t1 = table('t1', column('col1'), @@ -149,6 +154,19 @@ class QueryTest(PersistTest): tbl.drop() con.execute('drop schema paj') + def test_delete_schema(self): + meta = MetaData(testing.db) + con = testing.db.connect() + con.execute('create schema paj') + tbl = Table('test', meta, Column('id', Integer, primary_key=True), schema='paj') + tbl.create() + try: + tbl.insert().execute({'id':1}) + tbl.delete(tbl.c.id == 1).execute() + finally: + tbl.drop() + con.execute('drop schema paj') + def test_insertid_reserved(self): meta = MetaData(testing.db) table = Table(