.. seealso::
- `The InnoDB Storage Engine <http://dev.mysql.com/doc/refman/5.0/en/innodb-storage-engine.html>`_ - on the MySQL website.
+ `The InnoDB Storage Engine
+ <http://dev.mysql.com/doc/refman/5.0/en/innodb-storage-engine.html>`_ -
+ on the MySQL website.
Case Sensitivity and Table Reflection
-------------------------------------
CAST may still not be desirable on an early MySQL version post-4.0.2, as it didn't
add all datatype support until 4.1.1. If your application falls into this
-narrow area, the behavior of CAST can be controlled using the :ref:`sqlalchemy.ext.compiler_toplevel`
-system, as per the recipe below::
+narrow area, the behavior of CAST can be controlled using the
+:ref:`sqlalchemy.ext.compiler_toplevel` system, as per the recipe below::
from sqlalchemy.sql.expression import Cast
from sqlalchemy.ext.compiler import compiles
numeric.
"""
- super(NUMERIC, self).__init__(precision=precision, scale=scale, asdecimal=asdecimal, **kw)
+ super(NUMERIC, self).__init__(precision=precision,
+ scale=scale, asdecimal=asdecimal, **kw)
class DECIMAL(_NumericType, sqltypes.DECIMAL):
# ..some versions convert '' to an empty set
if not value:
value.add('')
- # ..some return sets.Set, even for pythons that have __builtin__.set
+ # ..some return sets.Set, even for pythons
+ # that have __builtin__.set
if not isinstance(value, set):
value = set(value)
return value
return 'SIGNED INTEGER'
elif isinstance(type_, sqltypes.TIMESTAMP):
return 'DATETIME'
- elif isinstance(type_, (sqltypes.DECIMAL, sqltypes.DateTime, sqltypes.Date, sqltypes.Time)):
+ elif isinstance(type_, (sqltypes.DECIMAL, sqltypes.DateTime,
+ sqltypes.Date, sqltypes.Time)):
return self.dialect.type_compiler.process(type_)
elif isinstance(type_, sqltypes.Text):
return 'CHAR'
elif isinstance(type_, sqltypes._Binary):
return 'BINARY'
elif isinstance(type_, sqltypes.NUMERIC):
- return self.dialect.type_compiler.process(type_).replace('NUMERIC', 'DECIMAL')
+ return self.dialect.type_compiler.process(
+ type_).replace('NUMERIC', 'DECIMAL')
else:
return None
class MySQLDDLCompiler(compiler.DDLCompiler):
def create_table_constraints(self, table):
"""Get table constraints."""
- constraint_string = super(MySQLDDLCompiler, self).create_table_constraints(table)
+ constraint_string = super(
+ MySQLDDLCompiler, self).create_table_constraints(table)
engine_key = '%s_engine' % self.dialect.name
is_innodb = table.kwargs.has_key(engine_key) and \
elif column.nullable and is_timestamp and default is None:
colspec.append('NULL')
- if column is column.table._autoincrement_column and column.server_default is None:
+ if column is column.table._autoincrement_column and \
+ column.server_default is None:
colspec.append('AUTO_INCREMENT')
return ' '.join(colspec)
else:
return self._extend_numeric(type_,
"NUMERIC(%(precision)s, %(scale)s)" %
- {'precision': type_.precision, 'scale': type_.scale})
+ {'precision': type_.precision,
+ 'scale': type_.scale})
def visit_DECIMAL(self, type_):
if type_.precision is None:
else:
return self._extend_numeric(type_,
"DECIMAL(%(precision)s, %(scale)s)" %
- {'precision': type_.precision, 'scale': type_.scale})
+ {'precision': type_.precision,
+ 'scale': type_.scale})
def visit_DOUBLE(self, type_):
if type_.precision is not None and type_.scale is not None:
- return self._extend_numeric(type_, "DOUBLE(%(precision)s, %(scale)s)" %
+ return self._extend_numeric(type_,
+ "DOUBLE(%(precision)s, %(scale)s)" %
{'precision': type_.precision,
'scale': type_.scale})
else:
def visit_REAL(self, type_):
if type_.precision is not None and type_.scale is not None:
- return self._extend_numeric(type_, "REAL(%(precision)s, %(scale)s)" %
+ return self._extend_numeric(type_,
+ "REAL(%(precision)s, %(scale)s)" %
{'precision': type_.precision,
'scale': type_.scale})
else:
return self._extend_numeric(type_,
"FLOAT(%s, %s)" % (type_.precision, type_.scale))
elif type_.precision is not None:
- return self._extend_numeric(type_, "FLOAT(%s)" % (type_.precision,))
+ return self._extend_numeric(type_,
+ "FLOAT(%s)" % (type_.precision,))
else:
return self._extend_numeric(type_, "FLOAT")
def visit_TINYINT(self, type_):
if self._mysql_type(type_) and type_.display_width is not None:
- return self._extend_numeric(type_, "TINYINT(%s)" % type_.display_width)
+ return self._extend_numeric(type_,
+ "TINYINT(%s)" % type_.display_width)
else:
return self._extend_numeric(type_, "TINYINT")
self.dialect.name)
def visit_NCHAR(self, type_):
- # We'll actually generate the equiv. "NATIONAL CHAR" instead of "NCHAR".
+ # We'll actually generate the equiv.
+ # "NATIONAL CHAR" instead of "NCHAR".
if type_.length:
return self._extend_string(type_, {'national': True},
"CHAR(%(length)s)" % {'length': type_.length})
quoted_enums = []
for e in type_.enums:
quoted_enums.append("'%s'" % e.replace("'", "''"))
- return self._extend_string(type_, {}, "ENUM(%s)" % ",".join(quoted_enums))
+ return self._extend_string(type_, {}, "ENUM(%s)" %
+ ",".join(quoted_enums))
def visit_SET(self, type_):
- return self._extend_string(type_, {}, "SET(%s)" % ",".join(type_._ddl_values))
+ return self._extend_string(type_, {}, "SET(%s)" %
+ ",".join(type_._ddl_values))
def visit_BOOLEAN(self, type):
return "BOOL"
return False
def _compat_fetchall(self, rp, charset=None):
- """Proxy result rows to smooth over MySQL-Python driver inconsistencies."""
+ """Proxy result rows to smooth over MySQL-Python driver
+ inconsistencies."""
return [_DecodingRowProxy(row, charset) for row in rp.fetchall()]
def _compat_fetchone(self, rp, charset=None):
- """Proxy a result row to smooth over MySQL-Python driver inconsistencies."""
+ """Proxy a result row to smooth over MySQL-Python driver
+ inconsistencies."""
return _DecodingRowProxy(rp.fetchone(), charset)
def _compat_first(self, rp, charset=None):
- """Proxy a result row to smooth over MySQL-Python driver inconsistencies."""
+ """Proxy a result row to smooth over MySQL-Python driver
+ inconsistencies."""
return _DecodingRowProxy(rp.first(), charset)
# if ansiquotes == True, build a new IdentifierPreparer
# with the new setting
self.identifier_preparer = self.preparer(self,
- server_ansiquotes=self._server_ansiquotes)
+ server_ansiquotes=self._server_ansiquotes)
@property
def _supports_cast(self):
if self.server_version_info < (5, 0, 2):
rp = connection.execute("SHOW TABLES FROM %s" %
self.identifier_preparer.quote_identifier(current_schema))
- return [row[0] for row in self._compat_fetchall(rp, charset=charset)]
+ return [row[0] for
+ row in self._compat_fetchall(rp, charset=charset)]
else:
rp = connection.execute("SHOW FULL TABLES FROM %s" %
self.identifier_preparer.quote_identifier(current_schema))
- return [row[0] for row in self._compat_fetchall(rp, charset=charset)\
- if row[1] == 'BASE TABLE']
+ return [row[0]
+ for row in self._compat_fetchall(rp, charset=charset)
+ if row[1] == 'BASE TABLE']
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
charset = self._connection_charset
rp = connection.execute("SHOW FULL TABLES FROM %s" %
self.identifier_preparer.quote_identifier(schema))
- return [row[0] for row in self._compat_fetchall(rp, charset=charset)\
- if row[1] in ('VIEW', 'SYSTEM VIEW')]
+ return [row[0]
+ for row in self._compat_fetchall(rp, charset=charset)
+ if row[1] in ('VIEW', 'SYSTEM VIEW')]
@reflection.cache
def get_table_options(self, connection, table_name, schema=None, **kw):
- parsed_state = self._parsed_state_or_create(connection, table_name, schema, **kw)
+ parsed_state = self._parsed_state_or_create(
+ connection, table_name, schema, **kw)
return parsed_state.table_options
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
- parsed_state = self._parsed_state_or_create(connection, table_name, schema, **kw)
+ parsed_state = self._parsed_state_or_create(
+ connection, table_name, schema, **kw)
return parsed_state.columns
@reflection.cache
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- parsed_state = self._parsed_state_or_create(connection, table_name, schema, **kw)
+ parsed_state = self._parsed_state_or_create(
+ connection, table_name, schema, **kw)
for key in parsed_state.keys:
if key['type'] == 'PRIMARY':
# There can be only one.
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
- parsed_state = self._parsed_state_or_create(connection, table_name, schema, **kw)
+ parsed_state = self._parsed_state_or_create(
+ connection, table_name, schema, **kw)
default_schema = None
fkeys = []
@reflection.cache
def get_indexes(self, connection, table_name, schema=None, **kw):
- parsed_state = self._parsed_state_or_create(connection, table_name, schema, **kw)
+ parsed_state = self._parsed_state_or_create(
+ connection, table_name, schema, **kw)
indexes = []
for spec in parsed_state.keys:
full_name=full_name)
return sql
- def _parsed_state_or_create(self, connection, table_name, schema=None, **kw):
+ def _parsed_state_or_create(self, connection, table_name,
+ schema=None, **kw):
return self._setup_parser(
connection,
table_name,
pass
elif line.startswith('CREATE '):
self._parse_table_name(line, state)
- # Not present in real reflection, but may be if loading from a file.
+ # Not present in real reflection, but may be if
+ # loading from a file.
elif not line:
pass
else:
self._re_partition = _re_compile(r'(?:.*)(?:SUB)?PARTITION(?:.*)')
# Table-level options (COLLATE, ENGINE, etc.)
- # Do the string options first, since they have quoted strings we need to get rid of.
+ # Do the string options first, since they have quoted
+ # strings we need to get rid of.
for option in _options_of_type_string:
self._add_option_string(option)
regex = (r'(?P<directive>%s)%s'
r"'(?P<val>(?:[^']|'')*?)'(?!')" %
(re.escape(directive), self._optional_equals))
- self._pr_options.append(
- _pr_compile(regex, lambda v: v.replace("\\\\", "\\").replace("''", "'")))
+ self._pr_options.append(_pr_compile(regex, lambda v:
+ v.replace("\\\\", "\\").replace("''", "'")))
def _add_option_word(self, directive):
regex = (r'(?P<directive>%s)%s'