def visit_check_constraint(self, constraint):
self.append(", \n\t")
if constraint.name is not None:
- self.append("CONSTRAINT %s " % constraint.name)
+ self.append("CONSTRAINT %s " %
+ self.preparer.format_constraint(constraint))
self.append(" CHECK (%s)" % constraint.sqltext)
def visit_column_check_constraint(self, constraint):
- self.append(" ")
self.append(" CHECK (%s)" % constraint.sqltext)
def visit_primary_key_constraint(self, constraint):
return
self.append(", \n\t")
if constraint.name is not None:
- self.append("CONSTRAINT %s " % constraint.name)
+ self.append("CONSTRAINT %s " % self.preparer.format_constraint(constraint))
self.append("PRIMARY KEY ")
self.append("(%s)" % (string.join([self.preparer.format_column(c) for c in constraint],', ')))
self.execute()
def define_foreign_key(self, constraint):
+ preparer = self.preparer
if constraint.name is not None:
- self.append("CONSTRAINT %s " % constraint.name)
+ self.append("CONSTRAINT %s " %
+ preparer.format_constraint(constraint))
self.append("FOREIGN KEY(%s) REFERENCES %s (%s)" % (
- string.join([self.preparer.format_column(f.parent) for f in constraint.elements], ', '),
- self.preparer.format_table(list(constraint.elements)[0].column.table),
- string.join([self.preparer.format_column(f.column) for f in constraint.elements], ', ')
+ string.join([preparer.format_column(f.parent) for f in constraint.elements], ', '),
+ preparer.format_table(list(constraint.elements)[0].column.table),
+ string.join([preparer.format_column(f.column) for f in constraint.elements], ', ')
))
if constraint.ondelete is not None:
self.append(" ON DELETE %s" % constraint.ondelete)
def visit_unique_constraint(self, constraint):
self.append(", \n\t")
if constraint.name is not None:
- self.append("CONSTRAINT %s " % constraint.name)
- self.append(" UNIQUE ")
- self.append("(%s)" % (string.join([self.preparer.format_column(c) for c in constraint],', ')))
+ self.append("CONSTRAINT %s " %
+ self.preparer.format_constraint(constraint))
+ self.append(" UNIQUE (%s)" % (string.join([self.preparer.format_column(c) for c in constraint],', ')))
def visit_column(self, column):
pass
def visit_index(self, index):
+ preparer = self.preparer
self.append('CREATE ')
if index.unique:
self.append('UNIQUE ')
self.append('INDEX %s ON %s (%s)' \
- % (index.name, self.preparer.format_table(index.table),
- string.join([self.preparer.format_column(c) for c in index.columns], ', ')))
+ % (preparer.format_index(index),
+ preparer.format_table(index.table),
+ string.join([preparer.format_column(c) for c in index.columns], ', ')))
self.execute()
class ANSISchemaDropper(ANSISchemaBase):
table.accept_visitor(self)
def visit_index(self, index):
- self.append("\nDROP INDEX " + index.name)
+ self.append("\nDROP INDEX " + self.preparer.format_index(index))
self.execute()
def drop_foreignkey(self, constraint):
- self.append("ALTER TABLE %s DROP CONSTRAINT %s" % (self.preparer.format_table(constraint.table), constraint.name))
+ self.append("ALTER TABLE %s DROP CONSTRAINT %s" % (
+ self.preparer.format_table(constraint.table),
+ self.preparer.format_constraint(constraint)))
self.execute()
def visit_table(self, table):
return value.replace('"', '""')
- def _quote_identifier(self, value):
+ def quote_identifier(self, value):
"""Quote an identifier.
Subclasses should override this to provide database-dependent
def __generic_obj_format(self, obj, ident):
if getattr(obj, 'quote', False):
- return self._quote_identifier(ident)
+ return self.quote_identifier(ident)
if self.dialect.cache_identifiers:
case_sens = getattr(obj, 'case_sensitive', None)
try:
return self.__strings[(ident, case_sens)]
except KeyError:
if self._requires_quotes(ident, getattr(obj, 'case_sensitive', ident == ident.lower())):
- self.__strings[(ident, case_sens)] = self._quote_identifier(ident)
+ self.__strings[(ident, case_sens)] = self.quote_identifier(ident)
else:
self.__strings[(ident, case_sens)] = ident
return self.__strings[(ident, case_sens)]
else:
if self._requires_quotes(ident, getattr(obj, 'case_sensitive', ident == ident.lower())):
- return self._quote_identifier(ident)
+ return self.quote_identifier(ident)
else:
return ident
def format_alias(self, alias):
return self.__generic_obj_format(alias, alias.name)
+ def format_constraint(self, constraint):
+ return self.__generic_obj_format(constraint, constraint.name)
+
+ def format_index(self, index):
+ return self.__generic_obj_format(index, index.name)
+
def format_table(self, table, use_schema=True, name=None):
"""Prepare a quoted table and schema name."""
def is_disconnect(self, e):
return isinstance(e, self.dbapi.OperationalError) and e.args[0] in (2006, 2013, 2014, 2045, 2055)
- def get_default_schema_name(self):
- if not hasattr(self, '_default_schema_name'):
- self._default_schema_name = sql.text("select database()", self).scalar()
- return self._default_schema_name
+ def get_default_schema_name(self, connection):
+ try:
+ return self._default_schema_name
+ except AttributeError:
+ name = self._default_schema_name = \
+ connection.execute('SELECT DATABASE()').scalar()
+ return name
def has_table(self, connection, table_name, schema=None):
# SHOW TABLE STATUS LIKE and SHOW TABLES LIKE do not function properly
else:
st = "DESCRIBE `%s`" % table_name
try:
- return connection.execute(st).rowcount > 0
+ rs = connection.execute(st)
+ have = rs.rowcount > 0
+ rs.close()
+ return have
except exceptions.SQLError, e:
if e.orig.args[0] == 1146:
return False
class MySQLSchemaDropper(ansisql.ANSISchemaDropper):
def visit_index(self, index):
- self.append("\nDROP INDEX " + index.name + " ON " + index.table.name)
+ self.append("\nDROP INDEX %s ON %s" %
+ (self.preparer.format_index(index),
+ self.preparer.format_table(index.table)))
self.execute()
def drop_foreignkey(self, constraint):
- self.append("ALTER TABLE %s DROP FOREIGN KEY %s" % (self.preparer.format_table(constraint.table), constraint.name))
+ self.append("ALTER TABLE %s DROP FOREIGN KEY %s" %
+ (self.preparer.format_table(constraint.table),
+ self.preparer.format_constraint(constraint)))
self.execute()
class MySQLIdentifierPreparer(ansisql.ANSIIdentifierPreparer):
return RESERVED_WORDS
def _escape_identifier(self, value):
- #TODO: determine MySQL's escaping rules
- return value
+ return value.replace('`', '``')
def _fold_identifier_case(self, value):
#TODO: determine MySQL's case folding rules
def testreserved(self):
# check a table that uses an SQL reserved name doesn't cause an error
+ meta = MetaData(testbase.db)
+ table_a = Table('select', meta,
+ Column('not', Integer, primary_key=True),
+ Column('from', String(12), nullable=False),
+ UniqueConstraint('from', name='when'))
+ Index('where', table_a.c['from'])
+
+ quoter = meta.bind.dialect.identifier_preparer.quote_identifier
+
+ table_b = Table('false', meta,
+ Column('create', Integer, primary_key=True),
+ Column('true', Integer, ForeignKey('select.not')),
+ CheckConstraint('%s <> 1' % quoter('true'), name='limit'))
+
+ table_c = Table('is', meta,
+ Column('or', Integer, nullable=False, primary_key=True),
+ Column('join', Integer, nullable=False, primary_key=True),
+ PrimaryKeyConstraint('or', 'join', name='to'))
+
+ index_c = Index('else', table_c.c.join)
+
+ #meta.bind.echo = True
+ meta.create_all()
+
+ index_c.drop()
+
+ meta2 = MetaData(testbase.db)
+ try:
+ table_a2 = Table('select', meta2, autoload=True)
+ table_b2 = Table('false', meta2, autoload=True)
+ table_c2 = Table('is', meta2, autoload=True)
+ finally:
+ meta.drop_all()
+
+
meta = MetaData(testbase.db)
table = Table(
'select', meta,
@testbase.unsupported('sqlite')
def testcreate(self):
- schema = testbase.db.url.database
+ engine = testbase.db
+ schema = engine.dialect.get_default_schema_name(engine)
+
metadata = MetaData(testbase.db)
table1 = Table('table1', metadata,
Column('col1', Integer, primary_key=True),