return " ".join([self.process(label.obj), self.operator_string(operators.as_), self.preparer.format_label(label, labelname)])
- def visit_column(self, column, result_map=None, **kwargs):
+ def visit_column(self, column, result_map=None, use_schema=False, **kwargs):
# there is actually somewhat of a ruleset when you would *not* necessarily
# want to truncate a column identifier, if its mapped to the name of a
# physical column. but thats very hard to identify at this point, and
# the identifier length should be greater than the id lengths of any physical
# columns so should not matter.
+
+ if use_schema and getattr(column, 'table', None) and getattr(column.table, 'schema', None):
+ schema_prefix = self.preparer.quote(column.table, column.table.schema) + '.'
+ else:
+ schema_prefix = ''
+
if not column.is_literal:
name = self._truncated_identifier("colident", column.name)
else:
if column.table is None or not column.table.named_with_column:
return n
else:
- return self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + n
+ return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + n
elif len(column.table.primary_key) != 0:
pk = list(column.table.primary_key)[0]
pkname = (pk.is_literal and name or self._truncated_identifier("colident", pk.name))
- return self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.preparer.quote(pk, pkname)
+ return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.preparer.quote(pk, pkname)
else:
return None
elif column.table is None or not column.table.named_with_column:
return self.preparer.quote(column, name)
else:
if getattr(column, "is_literal", False):
- return self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.escape_literal_column(name)
+ return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.escape_literal_column(name)
else:
- return self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.preparer.quote(column, name)
+ return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.preparer.quote(column, name)
def escape_literal_column(self, text):
"""provide escaping for the literal_column() construct."""
self.assert_compile(table1.join(table4, table1.c.myid==table4.c.rem_id).select(), "SELECT mytable.myid, mytable.name, mytable.description, remotetable_1.rem_id, remotetable_1.datatype_id, remotetable_1.value FROM mytable JOIN remote_owner.remotetable AS remotetable_1 ON remotetable_1.rem_id = mytable.myid")
+ def test_delete_schema(self):
+ metadata = MetaData()
+ tbl = Table('test', metadata, Column('id', Integer, primary_key=True), schema='paj')
+ self.assert_compile(tbl.delete(tbl.c.id == 1), "DELETE FROM paj.test WHERE paj.test.id = :test_id_1")
+
def test_union(self):
t1 = table('t1',
column('col1'),
tbl.drop()
con.execute('drop schema paj')
+ def test_delete_schema(self):
+ meta = MetaData(testing.db)
+ con = testing.db.connect()
+ con.execute('create schema paj')
+ tbl = Table('test', meta, Column('id', Integer, primary_key=True), schema='paj')
+ tbl.create()
+ try:
+ tbl.insert().execute({'id':1})
+ tbl.delete(tbl.c.id == 1).execute()
+ finally:
+ tbl.drop()
+ con.execute('drop schema paj')
+
def test_insertid_reserved(self):
meta = MetaData(testing.db)
table = Table(