key attribute on an item contained within a collection
owned by an object being deleted would not be set to
None if the relation() was self-referential. [ticket:1376]
-
+
+- schema
+ - Added a quote_schema() method to the IdentifierPreparer class
+ so that dialects can override how schemas get handled. This
+ enables the MSSQL dialect to treat schemas as multipart
+ identifiers, such as 'database.owner'. [ticket: 594, 1341]
+
- sql
- ``sqlalchemy.extract()`` is now dialect sensitive and can
extract components of timestamps idiomatically across the
#TODO: determine MSSQL's escaping rules
return value
+ def quote_schema(self, schema, force=True):
+ """Prepare a quoted table and schema name."""
+ result = '.'.join([self.quote(x, force) for x in schema.split('.')])
+ return result
+
dialect = MSSQLDialect
dialect.statement_compiler = MSSQLCompiler
dialect.schemagenerator = MSSQLSchemaGenerator
raise exc.ArgumentError(
"Parent column '%s' does not descend from a "
"table-attached Column" % str(self.parent))
- m = re.match(r"^(.+?)(?:\.(.+?))?(?:\.(.+?))?$", self._colspec,
- re.UNICODE)
+
+ m = self._colspec.split('.')
+
if m is None:
raise exc.ArgumentError(
"Invalid foreign key column specification: %s" %
self._colspec)
- if m.group(3) is None:
- (tname, colname) = m.group(1, 2)
- schema = None
+
+ # A FK between column 'bar' and table 'foo' can be
+ # specified as 'foo', 'foo.bar', 'dbo.foo.bar',
+ # 'otherdb.dbo.foo.bar'. Once we have the column name and
+ # the table name, treat everything else as the schema
+ # name. Some databases (e.g. Sybase) support
+ # inter-database foreign keys. See tickets#1341 and --
+ # indirectly related -- Ticket #594. This assumes that '.'
+ # will never appear *within* any component of the FK.
+
+ (schema, tname, colname) = (None, None, None)
+ if (len(m) == 1):
+ tname = m.pop()
else:
- (schema, tname, colname) = m.group(1, 2, 3)
+ colname = m.pop()
+ tname = m.pop()
+
+ if (len(m) > 0):
+ schema = '.'.join(m)
+
if _get_table_key(tname, schema) not in parenttable.metadata:
raise exc.NoReferencedTableError(
"Could not find table '%s' with which to generate a "
return name
else:
if column.table.schema:
- schema_prefix = self.preparer.quote(column.table.schema, column.table.quote_schema) + '.'
+ schema_prefix = self.preparer.quote_schema(column.table.schema, column.table.quote_schema) + '.'
else:
schema_prefix = ''
tablename = column.table.name
def visit_table(self, table, asfrom=False, **kwargs):
if asfrom:
if getattr(table, "schema", None):
- return self.preparer.quote(table.schema, table.quote_schema) + "." + self.preparer.quote(table.name, table.quote)
+ return self.preparer.quote_schema(table.schema, table.quote_schema) + "." + self.preparer.quote(table.name, table.quote)
else:
return self.preparer.quote(table.name, table.quote)
else:
or self.illegal_initial_characters.match(value[0])
or not self.legal_characters.match(unicode(value))
or (lc_value != value))
-
+
+ def quote_schema(self, schema, force):
+ """Quote a schema.
+
+ Subclasses should override this to provide database-dependent
+ quoting behavior.
+ """
+ return self.quote(schema, force)
+
def quote(self, ident, force):
if force is None:
if ident in self._strings:
def format_sequence(self, sequence, use_schema=True):
name = self.quote(sequence.name, sequence.quote)
if not self.omit_schema and use_schema and sequence.schema is not None:
- name = self.quote(sequence.schema, sequence.quote) + "." + name
+ name = self.quote_schema(sequence.schema, sequence.quote) + "." + name
return name
def format_label(self, label, name=None):
name = table.name
result = self.quote(name, table.quote)
if not self.omit_schema and use_schema and getattr(table, "schema", None):
- result = self.quote(table.schema, table.quote_schema) + "." + result
+ result = self.quote_schema(table.schema, table.quote_schema) + "." + result
return result
def format_column(self, column, use_table=False, name=None, table_name=None):
# a longer sequence.
if not self.omit_schema and use_schema and getattr(table, 'schema', None):
- return (self.quote(table.schema, table.quote_schema),
+ return (self.quote_schema(table.schema, table.quote_schema),
self.format_table(table, use_schema=False))
else:
return (self.format_table(table, use_schema=False), )
s = select([tbl.c.id]).where(tbl.c.id==1)
self.assert_compile(tbl.delete().where(tbl.c.id==(s)), "DELETE FROM paj.test WHERE paj.test.id IN (SELECT test_1.id FROM paj.test AS test_1 WHERE test_1.id = :id_1)")
+ def test_delete_schema_multipart(self):
+ metadata = MetaData()
+ tbl = Table('test', metadata, Column('id', Integer, primary_key=True), schema='banana.paj')
+ self.assert_compile(tbl.delete(tbl.c.id == 1), "DELETE FROM banana.paj.test WHERE banana.paj.test.id = :id_1")
+
+ s = select([tbl.c.id]).where(tbl.c.id==1)
+ self.assert_compile(tbl.delete().where(tbl.c.id==(s)), "DELETE FROM banana.paj.test WHERE banana.paj.test.id IN (SELECT test_1.id FROM banana.paj.test AS test_1 WHERE test_1.id = :id_1)")
+
+ def test_delete_schema_multipart_needs_quoting(self):
+ metadata = MetaData()
+ tbl = Table('test', metadata, Column('id', Integer, primary_key=True), schema='banana split.paj')
+ self.assert_compile(tbl.delete(tbl.c.id == 1), "DELETE FROM [banana split].paj.test WHERE [banana split].paj.test.id = :id_1")
+
+ s = select([tbl.c.id]).where(tbl.c.id==1)
+ self.assert_compile(tbl.delete().where(tbl.c.id==(s)), "DELETE FROM [banana split].paj.test WHERE [banana split].paj.test.id IN (SELECT test_1.id FROM [banana split].paj.test AS test_1 WHERE test_1.id = :id_1)")
+
+ def test_delete_schema_multipart_both_need_quoting(self):
+ metadata = MetaData()
+ tbl = Table('test', metadata, Column('id', Integer, primary_key=True), schema='banana split.paj with a space')
+ self.assert_compile(tbl.delete(tbl.c.id == 1), "DELETE FROM [banana split].[paj with a space].test WHERE [banana split].[paj with a space].test.id = :id_1")
+
+ s = select([tbl.c.id]).where(tbl.c.id==1)
+ self.assert_compile(tbl.delete().where(tbl.c.id==(s)), "DELETE FROM [banana split].[paj with a space].test WHERE [banana split].[paj with a space].test.id IN (SELECT test_1.id FROM [banana split].[paj with a space].test AS test_1 WHERE test_1.id = :id_1)")
+
def test_union(self):
t1 = table('t1',
column('col1'),