return False
def _get_server_version_info(self, connection):
- # eGenix suggests using conn.dbms_version instead of what we're doing here
+ # eGenix suggests using conn.dbms_version instead
+ # of what we're doing here
dbapi_con = connection.connection
version = []
r = re.compile('[.\-]')
if 'odbc_connect' in keys:
connectors = [urllib.unquote_plus(keys.pop('odbc_connect'))]
else:
- dsn_connection = 'dsn' in keys or ('host' in keys and 'database' not in keys)
+ dsn_connection = 'dsn' in keys or \
+ ('host' in keys and 'database' not in keys)
if dsn_connection:
- connectors= ['dsn=%s' % (keys.pop('host', '') or keys.pop('dsn', ''))]
+ connectors= ['dsn=%s' % (keys.pop('host', '') or \
+ keys.pop('dsn', ''))]
else:
port = ''
if 'port' in keys and not 'port' in query:
port = ',%d' % int(keys.pop('port'))
- connectors = ["DRIVER={%s}" % keys.pop('driver', self.pyodbc_driver_name),
+ connectors = ["DRIVER={%s}" %
+ keys.pop('driver', self.pyodbc_driver_name),
'Server=%s%s' % (keys.pop('host', ''), port),
'Database=%s' % keys.pop('database', '') ]
else:
connectors.append("Trusted_Connection=Yes")
- # if set to 'Yes', the ODBC layer will try to automagically convert
- # textual data from your database encoding to your client encoding
- # This should obviously be set to 'No' if you query a cp1253 encoded
- # database from a latin1 client...
+ # if set to 'Yes', the ODBC layer will try to automagically
+ # convert textual data from your database encoding to your
+ # client encoding. This should obviously be set to 'No' if
+ # you query a cp1253 encoded database from a latin1 client...
if 'odbc_autotranslate' in keys:
- connectors.append("AutoTranslate=%s" % keys.pop("odbc_autotranslate"))
+ connectors.append("AutoTranslate=%s" %
+ keys.pop("odbc_autotranslate"))
connectors.extend(['%s=%s' % (k,v) for k,v in keys.iteritems()])
return [[";".join (connectors)], connect_args]
dbapi_con = connection.connection
- self.freetds = bool(re.match(r".*libtdsodbc.*\.so", dbapi_con.getinfo(pyodbc.SQL_DRIVER_NAME)))
+ self.freetds = bool(re.match(r".*libtdsodbc.*\.so",
+ dbapi_con.getinfo(pyodbc.SQL_DRIVER_NAME)
+ ))
# the "Py2K only" part here is theoretical.
# have not tried pyodbc + python3.1 yet.
def _create_jdbc_url(self, url):
"""Create a JDBC url from a :class:`~sqlalchemy.engine.url.URL`"""
return 'jdbc:%s://%s%s/%s' % (self.jdbc_db_name, url.host,
- url.port is not None and ':%s' % url.port or '',
+ url.port is not None
+ and ':%s' % url.port or '',
url.database)
def create_connect_args(self, url):
opts = self._driver_kwargs()
opts.update(url.query)
- return [[self._create_jdbc_url(url), url.username, url.password, self.jdbc_driver_name],
+ return [
+ [self._create_jdbc_url(url),
+ url.username, url.password,
+ self.jdbc_driver_name],
opts]
def is_disconnect(self, e):
# __init__.py
-# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Michael Bayer mike_mp@zzzcomputing.com
+# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Michael Bayer
+# mike_mp@zzzcomputing.com
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
+"""Include imports from the sqlalchemy.dialects package for backwards
+compatibility with pre 0.6 versions.
+
+"""
from sqlalchemy.dialects.sqlite import base as sqlite
from sqlalchemy.dialects.postgresql import base as postgresql
postgres = postgresql
__all__ = (
# 'access',
-# 'firebird',
+ 'firebird',
# 'informix',
# 'maxdb',
-# 'mssql',
+ 'mssql',
'mysql',
'oracle',
'postgresql',
'sqlite',
-# 'sybase',
+ 'sybase',
)
class AccessExecutionContext(default.DefaultExecutionContext):
def _has_implicit_sequence(self, column):
if column.primary_key and column.autoincrement:
- if isinstance(column.type, types.Integer) and not column.foreign_keys:
- if column.default is None or (isinstance(column.default, schema.Sequence) and \
- column.default.optional):
+ if isinstance(column.type, types.Integer) and \
+ not column.foreign_keys:
+ if column.default is None or \
+ (isinstance(column.default, schema.Sequence) and \
+ column.default.optional):
return True
return False
if not hasattr(tbl, 'has_sequence'):
tbl.has_sequence = None
for column in tbl.c:
- if getattr(column, 'sequence', False) or self._has_implicit_sequence(column):
+ if getattr(column, 'sequence', False) or \
+ self._has_implicit_sequence(column):
tbl.has_sequence = column
break
if bool(tbl.has_sequence):
# TBD: for some reason _last_inserted_ids doesn't exist here
# (but it does at corresponding point in mssql???)
- #if not len(self._last_inserted_ids) or self._last_inserted_ids[0] is None:
+ #if not len(self._last_inserted_ids) or
+ # self._last_inserted_ids[0] is None:
self.cursor.execute("SELECT @@identity AS lastrowid")
row = self.cursor.fetchone()
- self._last_inserted_ids = [int(row[0])] #+ self._last_inserted_ids[1:]
+ self._last_inserted_ids = [int(row[0])]
+ #+ self._last_inserted_ids[1:]
# print "LAST ROW ID", self._last_inserted_ids
super(AccessExecutionContext, self).post_exec()
self.text_as_varchar = False
self._dtbs = None
+ @classmethod
def dbapi(cls):
import win32com.client, pythoncom
const = win32com.client.constants
for suffix in (".36", ".35", ".30"):
try:
- daoEngine = win32com.client.gencache.EnsureDispatch("DAO.DBEngine" + suffix)
+ daoEngine = win32com.client.\
+ gencache.\
+ EnsureDispatch("DAO.DBEngine" + suffix)
break
except pythoncom.com_error:
pass
else:
- raise exc.InvalidRequestError("Can't find a DB engine. Check http://support.microsoft.com/kb/239114 for details.")
+ raise exc.InvalidRequestError(
+ "Can't find a DB engine. Check "
+ "http://support.microsoft.com/kb/239114 for details.")
import pyodbc as module
return module
- dbapi = classmethod(dbapi)
def create_connect_args(self, url):
opts = url.translate_connect_args()
def do_execute(self, cursor, statement, params, context=None):
if params == {}:
params = ()
- super(AccessDialect, self).do_execute(cursor, statement, params, **kwargs)
+ super(AccessDialect, self).\
+ do_execute(cursor, statement, params, **kwargs)
def _execute(self, c, statement, parameters):
try:
const.dbLongBinary: AcBinary,
const.dbMemo: AcText,
const.dbBoolean: AcBoolean,
- const.dbText: AcUnicode, # All Access strings are unicode
+ const.dbText: AcUnicode, # All Access strings are
+ # unicode
const.dbCurrency: AcNumeric,
}
colargs = \
{
- 'nullable': not(col.Required or col.Attributes & const.dbAutoIncrField),
+ 'nullable': not(col.Required or
+ col.Attributes & const.dbAutoIncrField),
}
default = col.DefaultValue
elif default:
if col.Type == const.dbBoolean:
default = default == 'Yes' and '1' or '0'
- colargs['server_default'] = schema.DefaultClause(sql.text(default))
+ colargs['server_default'] = \
+ schema.DefaultClause(sql.text(default))
- table.append_column(schema.Column(col.Name, coltype, **colargs))
+ table.append_column(
+ schema.Column(col.Name, coltype, **colargs))
# TBD: check constraints
thecol = table.c[col.Name]
table.primary_key.add(thecol)
if isinstance(thecol.type, AcInteger) and \
- not (thecol.default and isinstance(thecol.default.arg, schema.Sequence)):
+ not (thecol.default and
+ isinstance(
+ thecol.default.arg,
+ schema.Sequence
+ )):
thecol.autoincrement = False
# Then add other indexes
continue
scols = [c.ForeignName for c in fk.Fields]
rcols = ['%s.%s' % (fk.Table, c.Name) for c in fk.Fields]
- table.append_constraint(schema.ForeignKeyConstraint(scols, rcols, link_to_name=True))
+ table.append_constraint(
+ schema.ForeignKeyConstraint(scols, rcols,\
+ link_to_name=True))
finally:
dtbs.Close()
# This is necessary, so we get the latest updates
dtbs = daoEngine.OpenDatabase(connection.engine.url.database)
- names = [t.Name for t in dtbs.TableDefs if t.Name[:4] != "MSys" and t.Name[:4] != "~TMP"]
+ names = [t.Name for t in dtbs.TableDefs
+ if t.Name[:4] != "MSys" and t.Name[:4] != "~TMP"]
dtbs.Close()
return names
if select.limit:
s += "TOP %s " % (select.limit)
if select.offset:
- raise exc.InvalidRequestError('Access does not support LIMIT with an offset')
+ raise exc.InvalidRequestError(
+ 'Access does not support LIMIT with an offset')
return s
def limit_clause(self, select):
if isinstance(column, expression.Function):
return column.label()
else:
- return super(AccessCompiler, self).label_select_column(select, column, asfrom)
+ return super(AccessCompiler, self).\
+ label_select_column(select, column, asfrom)
function_rewrites = {'current_date': 'now',
'current_timestamp': 'now',
'length': 'len',
}
def visit_function(self, func):
- """Access function names differ from the ANSI SQL names; rewrite common ones"""
+ """Access function names differ from the ANSI SQL names;
+ rewrite common ones"""
func.name = self.function_rewrites.get(func.name, func.name)
return super(AccessCompiler, self).visit_function(func)
return ""
def visit_join(self, join, asfrom=False, **kwargs):
- return (self.process(join.left, asfrom=True) + (join.isouter and " LEFT OUTER JOIN " or " INNER JOIN ") + \
- self.process(join.right, asfrom=True) + " ON " + self.process(join.onclause))
+ return (self.process(join.left, asfrom=True) + \
+ (join.isouter and " LEFT OUTER JOIN " or " INNER JOIN ") + \
+ self.process(join.right, asfrom=True) + " ON " + \
+ self.process(join.onclause))
def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
+ return 'DATEPART("%s", %s)' % \
+ (field, self.process(extract.expr, **kw))
class AccessDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
- colspec = self.preparer.format_column(column) + " " + column.type.dialect_impl(self.dialect).get_col_spec()
+ colspec = self.preparer.format_column(column) + " " + \
+ column.type.dialect_impl(self.dialect).get_col_spec()
# install a sequence if we have an implicit IDENTITY column
- if (not getattr(column.table, 'has_sequence', False)) and column.primary_key and \
- column.autoincrement and isinstance(column.type, types.Integer) and not column.foreign_keys:
- if column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional):
+ if (not getattr(column.table, 'has_sequence', False)) and \
+ column.primary_key and \
+ column.autoincrement and \
+ isinstance(column.type, types.Integer) and \
+ not column.foreign_keys:
+ if column.default is None or \
+ (isinstance(column.default, schema.Sequence) and
+ column.default.optional):
column.sequence = schema.Sequence(column.name + '_seq')
if not column.nullable:
def visit_drop_index(self, drop):
index = drop.element
- self.append("\nDROP INDEX [%s].[%s]" % (index.table.name, self._validate_identifier(index.name, False)))
+ self.append("\nDROP INDEX [%s].[%s]" % \
+ (index.table.name,
+ self._validate_identifier(index.name, False)))
class AccessIdentifierPreparer(compiler.IdentifierPreparer):
reserved_words = compiler.RESERVED_WORDS.copy()
reserved_words.update(['value', 'text'])
def __init__(self, dialect):
- super(AccessIdentifierPreparer, self).__init__(dialect, initial_quote='[', final_quote=']')
+ super(AccessIdentifierPreparer, self).\
+ __init__(dialect, initial_quote='[', final_quote=']')
dialect = AccessDialect
}
-# TODO: date conversion types (should be implemented as _FBDateTime, _FBDate, etc.
-# as bind/result functionality is required)
+# TODO: date conversion types (should be implemented as _FBDateTime,
+# _FBDate, etc. as bind/result functionality is required)
class FBTypeCompiler(compiler.GenericTypeCompiler):
def visit_boolean(self, type_):
def visit_mod(self, binary, **kw):
# Firebird lacks a builtin modulo operator, but there is
# an equivalent function in the ib_udf library.
- return "mod(%s, %s)" % (self.process(binary.left), self.process(binary.right))
+ return "mod(%s, %s)" % (
+ self.process(binary.left),
+ self.process(binary.right))
def visit_alias(self, alias, asfrom=False, **kwargs):
if self.dialect._version_two:
- return super(FBCompiler, self).visit_alias(alias, asfrom=asfrom, **kwargs)
+ return super(FBCompiler, self).\
+ visit_alias(alias, asfrom=asfrom, **kwargs)
else:
# Override to not use the AS keyword which FB 1.5 does not like
if asfrom:
- alias_name = isinstance(alias.name, expression._generated_label) and \
- self._truncated_identifier("alias", alias.name) or alias.name
-
- return self.process(alias.original, asfrom=asfrom, **kwargs) + " " + \
+ alias_name = isinstance(alias.name,
+ expression._generated_label) and \
+ self._truncated_identifier("alias",
+ alias.name) or alias.name
+
+ return self.process(
+ alias.original, asfrom=asfrom, **kwargs) + \
+ " " + \
self.preparer.format_alias(alias, alias_name)
else:
return self.process(alias.original, **kwargs)
# no syntax for these
# http://www.firebirdsql.org/manual/generatorguide-sqlsyntax.html
if create.element.start is not None:
- raise NotImplemented("Firebird SEQUENCE doesn't support START WITH")
+ raise NotImplemented(
+ "Firebird SEQUENCE doesn't support START WITH")
if create.element.increment is not None:
- raise NotImplemented("Firebird SEQUENCE doesn't support INCREMENT BY")
+ raise NotImplemented(
+ "Firebird SEQUENCE doesn't support INCREMENT BY")
if self.dialect._version_two:
- return "CREATE SEQUENCE %s" % self.preparer.format_sequence(create.element)
+ return "CREATE SEQUENCE %s" % \
+ self.preparer.format_sequence(create.element)
else:
- return "CREATE GENERATOR %s" % self.preparer.format_sequence(create.element)
+ return "CREATE GENERATOR %s" % \
+ self.preparer.format_sequence(create.element)
def visit_drop_sequence(self, drop):
"""Generate a ``DROP GENERATOR`` statement for the sequence."""
if self.dialect._version_two:
- return "DROP SEQUENCE %s" % self.preparer.format_sequence(drop.element)
+ return "DROP SEQUENCE %s" % \
+ self.preparer.format_sequence(drop.element)
else:
- return "DROP GENERATOR %s" % self.preparer.format_sequence(drop.element)
+ return "DROP GENERATOR %s" % \
+ self.preparer.format_sequence(drop.element)
class FBIdentifierPreparer(sql.compiler.IdentifierPreparer):
def fire_sequence(self, seq):
"""Get the next value from the sequence using ``gen_id()``."""
- return self._execute_scalar("SELECT gen_id(%s, 1) FROM rdb$database" % \
- self.dialect.identifier_preparer.format_sequence(seq))
+ return self._execute_scalar(
+ "SELECT gen_id(%s, 1) FROM rdb$database" %
+ self.dialect.identifier_preparer.format_sequence(seq)
+ )
class FBDialect(default.DefaultDialect):
return name
def has_table(self, connection, table_name, schema=None):
- """Return ``True`` if the given table exists, ignoring the `schema`."""
+ """Return ``True`` if the given table exists, ignoring
+ the `schema`."""
tblqry = """
SELECT 1 AS has_table FROM rdb$database
return pkfields
@reflection.cache
- def get_column_sequence(self, connection, table_name, column_name, schema=None, **kw):
+ def get_column_sequence(self, connection,
+ table_name, column_name,
+ schema=None, **kw):
tablename = self.denormalize_name(table_name)
colname = self.denormalize_name(column_name)
# Heuristic-query to determine the generator associated to a PK field
ON tabdep.rdb$dependent_name=trigdep.rdb$dependent_name
AND trigdep.rdb$depended_on_type=14
AND trigdep.rdb$dependent_type=2
- JOIN rdb$triggers trig ON trig.rdb$trigger_name=tabdep.rdb$dependent_name
+ JOIN rdb$triggers trig ON
+ trig.rdb$trigger_name=tabdep.rdb$dependent_name
WHERE tabdep.rdb$depended_on_name=?
AND tabdep.rdb$depended_on_type=0
AND trig.rdb$trigger_type=1
AND tabdep.rdb$field_name=?
AND (SELECT count(*)
- FROM rdb$dependencies trigdep2
- WHERE trigdep2.rdb$dependent_name = trigdep.rdb$dependent_name) = 2
+ FROM rdb$dependencies trigdep2
+ WHERE trigdep2.rdb$dependent_name = trigdep.rdb$dependent_name) = 2
"""
genr = connection.execute(genqry, [tablename, colname]).first()
if genr is not None:
r.rdb$null_flag AS null_flag,
t.rdb$type_name AS ftype,
f.rdb$field_sub_type AS stype,
- f.rdb$field_length/COALESCE(cs.rdb$bytes_per_character,1) AS flen,
+ f.rdb$field_length/
+ COALESCE(cs.rdb$bytes_per_character,1) AS flen,
f.rdb$field_precision AS fprec,
f.rdb$field_scale AS fscale,
- COALESCE(r.rdb$default_source, f.rdb$default_source) AS fdefault
+ COALESCE(r.rdb$default_source,
+ f.rdb$default_source) AS fdefault
FROM rdb$relation_fields r
JOIN rdb$fields f ON r.rdb$field_source=f.rdb$field_name
JOIN rdb$types t
- ON t.rdb$type=f.rdb$field_type AND t.rdb$field_name='RDB$FIELD_TYPE'
- LEFT JOIN rdb$character_sets cs ON f.rdb$character_set_id=cs.rdb$character_set_id
+ ON t.rdb$type=f.rdb$field_type AND
+ t.rdb$field_name='RDB$FIELD_TYPE'
+ LEFT JOIN rdb$character_sets cs ON
+ f.rdb$character_set_id=cs.rdb$character_set_id
WHERE f.rdb$system_flag=0 AND r.rdb$relation_name=?
ORDER BY r.rdb$field_position
"""
(colspec, name))
coltype = sqltypes.NULLTYPE
elif colspec == 'INT64':
- coltype = coltype(precision=row['fprec'], scale=row['fscale'] * -1)
+ coltype = coltype(
+ precision=row['fprec'],
+ scale=row['fscale'] * -1)
elif colspec in ('VARYING', 'CSTRING'):
coltype = coltype(row['flen'])
elif colspec == 'TEXT':
# more than one whitespace around the "DEFAULT" keyword
# (see also http://tracker.firebirdsql.org/browse/CORE-356)
defexpr = row['fdefault'].lstrip()
- assert defexpr[:8].rstrip()=='DEFAULT', "Unrecognized default value: %s" % defexpr
+ assert defexpr[:8].rstrip() == \
+ 'DEFAULT', "Unrecognized default value: %s" % \
+ defexpr
defvalue = defexpr[8:].strip()
if defvalue == 'NULL':
# Redundant
FROM rdb$relation_constraints rc
JOIN rdb$indices ix1 ON ix1.rdb$index_name=rc.rdb$index_name
JOIN rdb$indices ix2 ON ix2.rdb$index_name=ix1.rdb$foreign_key
- JOIN rdb$index_segments cse ON cse.rdb$index_name=ix1.rdb$index_name
+ JOIN rdb$index_segments cse ON
+ cse.rdb$index_name=ix1.rdb$index_name
JOIN rdb$index_segments se
ON se.rdb$index_name=ix2.rdb$index_name
AND se.rdb$field_position=cse.rdb$field_position
if not fk['name']:
fk['name'] = cname
fk['referred_table'] = self.normalize_name(row['targetrname'])
- fk['constrained_columns'].append(self.normalize_name(row['fname']))
+ fk['constrained_columns'].append(
+ self.normalize_name(row['fname']))
fk['referred_columns'].append(
- self.normalize_name(row['targetfname']))
+ self.normalize_name(row['targetfname']))
return fks.values()
@reflection.cache
JOIN rdb$index_segments ic
ON ix.rdb$index_name=ic.rdb$index_name
LEFT OUTER JOIN rdb$relation_constraints
- ON rdb$relation_constraints.rdb$index_name = ic.rdb$index_name
+ ON rdb$relation_constraints.rdb$index_name =
+ ic.rdb$index_name
WHERE ix.rdb$relation_name=? AND ix.rdb$foreign_key IS NULL
AND rdb$relation_constraints.rdb$constraint_type IS NULL
ORDER BY index_name, field_name
indexrec['column_names'] = []
indexrec['unique'] = bool(row['unique_flag'])
- indexrec['column_names'].append(self.normalize_name(row['field_name']))
+ indexrec['column_names'].append(
+ self.normalize_name(row['field_name']))
return indexes.values()
Kinterbasedb backend specific keyword arguments are:
-* type_conv - select the kind of mapping done on the types: by default SQLAlchemy
- uses 200 with Unicode, datetime and decimal support (see details__).
+* type_conv - select the kind of mapping done on the types: by default
+ SQLAlchemy uses 200 with Unicode, datetime and decimal support (see
+ details__).
-* concurrency_level - set the backend policy with regards to threading issues: by default
- SQLAlchemy uses policy 1 (see details__).
+* concurrency_level - set the backend policy with regards to threading
+ issues: by default SQLAlchemy uses policy 1 (see details__).
* enable_rowcount - True by default, setting this to False disables
the usage of "cursor.rowcount" with the
the cursor after a non-result-returning statement, rowcount must be
called, if at all, before the result object is returned. Additionally,
cursor.rowcount may not return correct results with older versions
- of Firebird, and setting this flag to False will also cause the SQLAlchemy ORM
- to ignore its usage. The behavior can also be controlled on a per-execution
- basis using the `enable_rowcount` option with :meth:`execution_options()`::
+ of Firebird, and setting this flag to False will also cause the
+ SQLAlchemy ORM to ignore its usage. The behavior can also be controlled on a
+ per-execution basis using the `enable_rowcount` option with
+ :meth:`execution_options()`::
conn = engine.connect().execution_options(enable_rowcount=True)
r = conn.execute(stmt)
)
- def __init__(self, type_conv=200, concurrency_level=1, enable_rowcount=True, **kwargs):
+ def __init__(self, type_conv=200, concurrency_level=1,
+ enable_rowcount=True, **kwargs):
super(FBDialect_kinterbasdb, self).__init__(**kwargs)
self.enable_rowcount = enable_rowcount
self.type_conv = type_conv
util.coerce_kw_type(opts, 'type_conv', int)
type_conv = opts.pop('type_conv', self.type_conv)
- concurrency_level = opts.pop('concurrency_level', self.concurrency_level)
+ concurrency_level = opts.pop('concurrency_level',
+ self.concurrency_level)
if self.dbapi is not None:
initialized = getattr(self.dbapi, 'initialized', None)
# http://kinterbasdb.cvs.sourceforge.net/viewvc/kinterbasdb/Kinterbasdb-3.0/__init__.py?r1=1.95&r2=1.96
initialized = getattr(self.dbapi, '_initialized', False)
if not initialized:
- self.dbapi.init(type_conv=type_conv, concurrency_level=concurrency_level)
+ self.dbapi.init(type_conv=type_conv,
+ concurrency_level=concurrency_level)
return ([], opts)
def _get_server_version_info(self, connection):
version = fbconn.server_version
m = match('\w+-V(\d+)\.(\d+)\.(\d+)\.(\d+) \w+ (\d+)\.(\d+)', version)
if not m:
- raise AssertionError("Could not determine version from string '%s'" % version)
+ raise AssertionError(
+ "Could not determine version from string '%s'" % version)
return tuple([int(x) for x in m.group(5, 6, 4)])
def is_disconnect(self, e):
- if isinstance(e, (self.dbapi.OperationalError, self.dbapi.ProgrammingError)):
+ if isinstance(e, (self.dbapi.OperationalError,
+ self.dbapi.ProgrammingError)):
msg = str(e)
return ('Unable to complete network request to host' in msg or
'Invalid connection state' in msg or
class InfoDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, first_pk=False):
colspec = self.preparer.format_column(column)
- if column.primary_key and len(column.foreign_keys)==0 and column.autoincrement and \
+ if column.primary_key and \
+ len(column.foreign_keys)==0 and \
+ column.autoincrement and \
isinstance(column.type, sqltypes.Integer) and first_pk:
colspec += " SERIAL"
else:
class InfoIdentifierPreparer(compiler.IdentifierPreparer):
def __init__(self, dialect):
- super(InfoIdentifierPreparer, self).__init__(dialect, initial_quote="'")
+ super(InfoIdentifierPreparer, self).\
+ __init__(dialect, initial_quote="'")
def format_constraint(self, constraint):
# informix doesnt support names for constraints
return [row[0] for row in connection.execute(s)]
def has_table(self, connection, table_name, schema=None):
- cursor = connection.execute("""select tabname from systables where tabname=?""", table_name.lower())
+ cursor = connection.execute(
+ """select tabname from systables where tabname=?""",
+ table_name.lower())
return cursor.first() is not None
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
- c = connection.execute ("""select colname , coltype , collength , t3.default , t1.colno from
- syscolumns as t1 , systables as t2 , OUTER sysdefaults as t3
- where t1.tabid = t2.tabid and t2.tabname=?
- and t3.tabid = t2.tabid and t3.colno = t1.colno
- order by t1.colno""", table.name.lower())
+ c = connection.execute(
+ """select colname, coltype, collength, t3.default, t1.colno from
+ syscolumns as t1 , systables as t2 , OUTER sysdefaults as t3
+ where t1.tabid = t2.tabid and t2.tabname=?
+ and t3.tabid = t2.tabid and t3.colno = t1.colno
+ order by t1.colno""", table.name.lower())
columns = []
for name, colattr, collength, default, colno in rows:
name = name.lower()
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
# FK
- c = connection.execute("""select t1.constrname as cons_name , t1.constrtype as cons_type ,
+ c = connection.execute(
+ """select t1.constrname as cons_name , t1.constrtype as cons_type ,
t4.colname as local_column , t7.tabname as remote_table ,
t6.colname as remote_column
from sysconstraints as t1 , systables as t2 ,
and t3.tabid = t2.tabid and t3.idxname = t1.idxname
and t4.tabid = t2.tabid and t4.colno = t3.part1
and t5.constrid = t1.constrid and t8.constrid = t5.primary
- and t6.tabid = t5.ptabid and t6.colno = t9.part1 and t9.idxname = t8.idxname
+ and t6.tabid = t5.ptabid and t6.colno = t9.part1 and t9.idxname =
+ t8.idxname
and t7.tabid = t5.ptabid""", table.name.lower())
fkeys = util.defaultdict(fkey_rec)
- for cons_name, cons_type, local_column, remote_table, remote_column in rows:
+ for cons_name, cons_type, local_column, \
+ remote_table, remote_column in rows:
rec = fkeys[cons_name]
rec['name'] = cons_name
- local_cols, remote_cols = rec['constrained_columns'], rec['referred_columns']
+ local_cols, remote_cols = \
+ rec['constrained_columns'], rec['referred_columns']
if not rec['referred_table']:
rec['referred_table'] = remote_table
@reflection.cache
def get_primary_keys(self, connection, table_name, schema=None, **kw):
- c = connection.execute("""select t4.colname as local_column
- from sysconstraints as t1 , systables as t2 ,
- sysindexes as t3 , syscolumns as t4
- where t1.tabid = t2.tabid and t2.tabname=? and t1.constrtype = 'P'
- and t3.tabid = t2.tabid and t3.idxname = t1.idxname
- and t4.tabid = t2.tabid and t4.colno = t3.part1""", table.name.lower())
+ c = connection.execute(
+ """select t4.colname as local_column
+ from sysconstraints as t1 , systables as t2 ,
+ sysindexes as t3 , syscolumns as t4
+ where t1.tabid = t2.tabid and t2.tabname=? and t1.constrtype = 'P'
+ and t3.tabid = t2.tabid and t3.idxname = t1.idxname
+ and t4.tabid = t2.tabid and t4.colno = t3.part1""",
+ table.name.lower())
return [r[0] for r in c.fetchall()]
@reflection.cache
def is_disconnect(self, e):
if isinstance(e, self.dbapi.OperationalError):
- return 'closed the connection' in str(e) or 'connection not open' in str(e)
+ return 'closed the connection' in str(e) \
+ or 'connection not open' in str(e)
else:
return False
'UTCDATE', 'UTCDIFF'])
def visit_mod(self, binary, **kw):
- return "mod(%s, %s)" % (self.process(binary.left), self.process(binary.right))
+ return "mod(%s, %s)" % \
+ (self.process(binary.left), self.process(binary.right))
def default_from(self):
return ' FROM DUAL'
if sequence.optional:
return None
else:
- return (self.dialect.identifier_preparer.format_sequence(sequence) +
- ".NEXTVAL")
+ return (
+ self.dialect.identifier_preparer.format_sequence(sequence) +
+ ".NEXTVAL")
class ColumnSnagger(visitors.ClauseVisitor):
def __init__(self):
def _get_default_schema_name(self, connection):
return self.identifier_preparer._normalize_name(
- connection.execute('SELECT CURRENT_SCHEMA FROM DUAL').scalar())
+ connection.execute(
+ 'SELECT CURRENT_SCHEMA FROM DUAL').scalar())
def has_table(self, connection, table_name, schema=None):
denormalize = self.identifier_preparer._denormalize_name
autoload=True, autoload_with=connection,
**table_kw)
- constraint = schema.ForeignKeyConstraint(columns, referants, link_to_name=True,
- **constraint_kw)
+ constraint = schema.ForeignKeyConstraint(
+ columns, referants, link_to_name=True,
+ **constraint_kw)
table.append_constraint(constraint)
def has_sequence(self, connection, name):
-from sqlalchemy.dialects.mssql import base, pyodbc, adodbapi, pymssql, zxjdbc, mxodbc
+from sqlalchemy.dialects.mssql import base, pyodbc, adodbapi, \
+ pymssql, zxjdbc, mxodbc
base.dialect = pyodbc.dialect
class MSDateTime_adodbapi(MSDateTime):
def result_processor(self, dialect, coltype):
def process(value):
- # adodbapi will return datetimes with empty time values as datetime.date() objects.
+ # adodbapi will return datetimes with empty time
+ # values as datetime.date() objects.
# Promote them back to full datetime.datetime()
if type(value) is datetime.date:
return datetime.datetime(value.year, value.month, value.day)
connectors = ["Provider=SQLOLEDB"]
if 'port' in keys:
- connectors.append ("Data Source=%s, %s" % (keys.get("host"), keys.get("port")))
+ connectors.append ("Data Source=%s, %s" %
+ (keys.get("host"), keys.get("port")))
else:
connectors.append ("Data Source=%s" % keys.get("host"))
connectors.append ("Initial Catalog=%s" % keys.get("database"))
return [[";".join (connectors)], {}]
def is_disconnect(self, e):
- return isinstance(e, self.dbapi.adodbapi.DatabaseError) and "'connection failure'" in str(e)
+ return isinstance(e, self.dbapi.adodbapi.DatabaseError) and \
+ "'connection failure'" in str(e)
dialect = MSDialect_adodbapi
if isinstance(value, datetime.datetime):
return value.date()
elif isinstance(value, basestring):
- return datetime.date(*[int(x or 0) for x in self._reg.match(value).groups()])
+ return datetime.date(*[
+ int(x or 0)
+ for x in self._reg.match(value).groups()
+ ])
else:
return value
return process
def bind_processor(self, dialect):
def process(value):
if isinstance(value, datetime.datetime):
- value = datetime.datetime.combine(self.__zero_date, value.time())
+ value = datetime.datetime.combine(
+ self.__zero_date, value.time())
elif isinstance(value, datetime.time):
value = datetime.datetime.combine(self.__zero_date, value)
return value
if isinstance(value, datetime.datetime):
return value.time()
elif isinstance(value, basestring):
- return datetime.time(*[int(x or 0) for x in self._reg.match(value).groups()])
+ return datetime.time(*[
+ int(x or 0)
+ for x in self._reg.match(value).groups()])
else:
return value
return process
-
class _DateTimeBase(object):
def bind_processor(self, dialect):
def process(value):
- # TODO: why ?
if type(value) == datetime.date:
return datetime.datetime(value.year, value.month, value.day)
else:
insert_has_sequence = seq_column is not None
if insert_has_sequence:
- self._enable_identity_insert = seq_column.key in self.compiled_parameters[0]
+ self._enable_identity_insert = \
+ seq_column.key in self.compiled_parameters[0]
else:
self._enable_identity_insert = False
if self._select_lastrowid:
if self.dialect.use_scope_identity:
- self.cursor.execute("SELECT scope_identity() AS lastrowid", ())
+ self.cursor.execute(
+ "SELECT scope_identity() AS lastrowid", ())
else:
self.cursor.execute("SELECT @@identity AS lastrowid", ())
# fetchall() ensures the cursor is consumed without closing it
row = self.cursor.fetchall()[0]
self._lastrowid = int(row[0])
- if (self.isinsert or self.isupdate or self.isdelete) and self.compiled.returning:
+ if (self.isinsert or self.isupdate or self.isdelete) and \
+ self.compiled.returning:
self._result_proxy = base.FullyBufferedResultProxy(self)
if self._enable_identity_insert:
self.cursor.execute(
"SET IDENTITY_INSERT %s OFF" %
- self.dialect.identifier_preparer.
- format_table(self.compiled.statement.table)
+ self.dialect.identifier_preparer.
+ format_table(self.compiled.statement.table)
)
def get_lastrowid(self):
def handle_dbapi_exception(self, e):
if self._enable_identity_insert:
try:
- self.cursor.execute("SET IDENTITY_INSERT %s OFF" %
- self.dialect.\
- identifier_preparer.\
- format_table(self.compiled.statement.table)
- )
+ self.cursor.execute(
+ "SET IDENTITY_INSERT %s OFF" %
+ self.dialect.identifier_preparer.\
+ format_table(self.compiled.statement.table)
+ )
except:
pass
return "LEN%s" % self.function_argspec(fn, **kw)
def visit_concat_op(self, binary, **kw):
- return "%s + %s" % (self.process(binary.left, **kw), self.process(binary.right, **kw))
+ return "%s + %s" % \
+ (self.process(binary.left, **kw),
+ self.process(binary.right, **kw))
def visit_match_op(self, binary, **kw):
return "CONTAINS (%s, %s)" % (
def visit_select(self, select, **kwargs):
"""Look for ``LIMIT`` and OFFSET in a select statement, and if
so tries to wrap it in a subquery with ``row_number()`` criterion.
+
"""
if not getattr(select, '_mssql_visit', None) and select._offset:
# to use ROW_NUMBER(), an ORDER BY is required.
_offset = select._offset
_limit = select._limit
select._mssql_visit = True
- select = select.column(sql.literal_column("ROW_NUMBER() OVER (ORDER BY %s)"
- % orderby).label("mssql_rn")
+ select = select.column(
+ sql.literal_column("ROW_NUMBER() OVER (ORDER BY %s)" \
+ % orderby).label("mssql_rn")
).order_by(None).alias()
- limitselect = sql.select([c for c in select.c if c.key!='mssql_rn'])
+ limitselect = sql.select([c for c in select.c if
+ c.key!='mssql_rn'])
limitselect.append_whereclause("mssql_rn>%d" % _offset)
if _limit is not None:
- limitselect.append_whereclause("mssql_rn<=%d" % (_limit + _offset))
+ limitselect.append_whereclause("mssql_rn<=%d" %
+ (_limit + _offset))
return self.process(limitselect, iswrapper=True, **kwargs)
else:
return compiler.SQLCompiler.visit_select(self, select, **kwargs)
def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
+ return 'DATEPART("%s", %s)' % \
+ (field, self.process(extract.expr, **kw))
def visit_rollback_to_savepoint(self, savepoint_stmt):
return ("ROLLBACK TRANSACTION %s"
# translate for schema-qualified table aliases
t = self._schema_aliased_table(column.table)
if t is not None:
- converted = expression._corresponding_column_or_error(t, column)
+ converted = expression._corresponding_column_or_error(
+ t, column)
if result_map is not None:
- result_map[column.name.lower()] = (column.name, (column, ),
- column.type)
+ result_map[column.name.lower()] = \
+ (column.name, (column, ),
+ column.type)
- return super(MSSQLCompiler, self).visit_column(converted,
- result_map=None,
- **kwargs)
+ return super(MSSQLCompiler, self).\
+ visit_column(converted,
+ result_map=None, **kwargs)
return super(MSSQLCompiler, self).visit_column(column,
result_map=result_map,
and binary.operator == operator.eq
and not isinstance(binary.right, expression._BindParamClause)
):
- return self.process(expression._BinaryExpression(binary.right,
+ return self.process(
+ expression._BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
else:
if (
-
- (binary.operator is operator.eq or binary.operator is operator.ne)
+ (binary.operator is operator.eq or
+ binary.operator is operator.ne)
and (
(isinstance(binary.left, expression._FromGrouping)
and isinstance(binary.left.element,
or isinstance(binary.left, expression._ScalarSelect)
or isinstance(binary.right, expression._ScalarSelect)
)
-
):
op = binary.operator == operator.eq and "IN" or "NOT IN"
- return self.process(expression._BinaryExpression(binary.left,
- binary.right, op),
+ return self.process(
+ expression._BinaryExpression(binary.left,
+ binary.right, op),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
if isinstance(column, expression.Function):
return column.label(None)
else:
- return super(MSSQLCompiler, self).label_select_column(select, column, asfrom)
+ return super(MSSQLCompiler, self).\
+ label_select_column(select, column, asfrom)
def for_update_clause(self, select):
- # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use
+ # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which
+ # SQLAlchemy doesn't use
return ''
def order_by_clause(self, select, **kw):
# SQL Server wants single quotes around the date string.
return "'" + str(value) + "'"
else:
- return super(MSSQLStrictCompiler, self).render_literal_value(value, type_)
+ return super(MSSQLStrictCompiler, self).\
+ render_literal_value(value, type_)
class MSDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
colspec += " NULL"
if column.table is None:
- raise exc.InvalidRequestError("mssql requires Table-bound columns "
- "in order to generate DDL")
+ raise exc.InvalidRequestError(
+ "mssql requires Table-bound columns "
+ "in order to generate DDL")
seq_col = column.table._autoincrement_column
# install a IDENTITY Sequence if we have an implicit IDENTITY column
if seq_col is column:
- sequence = isinstance(column.default, sa_schema.Sequence) and column.default
+ sequence = isinstance(column.default, sa_schema.Sequence) and \
+ column.default
if sequence:
- start, increment = sequence.start or 1, sequence.increment or 1
+ start, increment = sequence.start or 1, \
+ sequence.increment or 1
else:
start, increment = 1, 1
colspec += " IDENTITY(%s,%s)" % (start, increment)
def visit_drop_index(self, drop):
return "\nDROP INDEX %s.%s" % (
self.preparer.quote_identifier(drop.element.table.name),
- self.preparer.quote(self._validate_identifier(drop.element.name, False),
- drop.element.quote)
+ self.preparer.quote(
+ self._validate_identifier(drop.element.name, False),
+ drop.element.quote)
)
columns.c.table_schema==current_schema)
else:
whereclause = columns.c.table_name==tablename
- s = sql.select([columns], whereclause, order_by=[columns.c.ordinal_position])
+ s = sql.select([columns], whereclause,
+ order_by=[columns.c.ordinal_position])
c = connection.execute(s)
cols = []
while True:
row = c.fetchone()
if row is None:
break
- (name, type, nullable, charlen, numericprec, numericscale, default, collation) = (
+ (name, type, nullable, charlen,
+ numericprec, numericscale, default, collation) = (
row[columns.c.column_name],
row[columns.c.data_type],
row[columns.c.is_nullable] == 'YES',
kwargs = {}
if coltype in (MSString, MSChar, MSNVarchar, MSNChar, MSText,
- MSNText, MSBinary, MSVarBinary, sqltypes.LargeBinary):
+ MSNText, MSBinary, MSVarBinary,
+ sqltypes.LargeBinary):
kwargs['length'] = charlen
if collation:
kwargs['collation'] = collation
- if coltype == MSText or (coltype in (MSString, MSNVarchar) and charlen == -1):
+ if coltype == MSText or \
+ (coltype in (MSString, MSNVarchar) and charlen == -1):
kwargs.pop('length')
if coltype is None:
- util.warn("Did not recognize type '%s' of column '%s'" % (type, name))
+ util.warn(
+ "Did not recognize type '%s' of column '%s'" %
+ (type, name))
coltype = sqltypes.NULLTYPE
- if issubclass(coltype, sqltypes.Numeric) and coltype is not MSReal:
+ if issubclass(coltype, sqltypes.Numeric) and \
+ coltype is not MSReal:
kwargs['scale'] = numericscale
kwargs['precision'] = numericprec
def get_primary_keys(self, connection, tablename, schema=None, **kw):
current_schema = schema or self.default_schema_name
pkeys = []
- RR = ischema.ref_constraints # information_schema.referential_constraints
- TC = ischema.constraints # information_schema.table_constraints
- C = ischema.key_constraints.alias('C') # information_schema.constraint_column_usage:
- # the constrained column
- R = ischema.key_constraints.alias('R') # information_schema.constraint_column_usage:
- # the referenced column
+ # information_schema.referential_constraints
+ RR = ischema.ref_constraints
+ # information_schema.table_constraints
+ TC = ischema.constraints
+ # information_schema.constraint_column_usage:
+ # the constrained column
+ C = ischema.key_constraints.alias('C')
+ # information_schema.constraint_column_usage:
+ # the referenced column
+ R = ischema.key_constraints.alias('R')
# Primary key constraints
s = sql.select([C.c.column_name, TC.c.constraint_type],
def get_foreign_keys(self, connection, tablename, schema=None, **kw):
current_schema = schema or self.default_schema_name
# Add constraints
- RR = ischema.ref_constraints #information_schema.referential_constraints
- TC = ischema.constraints #information_schema.table_constraints
- C = ischema.key_constraints.alias('C') # information_schema.constraint_column_usage:
- # the constrained column
- R = ischema.key_constraints.alias('R') # information_schema.constraint_column_usage:
- # the referenced column
+ #information_schema.referential_constraints
+ RR = ischema.ref_constraints
+ # information_schema.table_constraints
+ TC = ischema.constraints
+ # information_schema.constraint_column_usage:
+ # the constrained column
+ C = ischema.key_constraints.alias('C')
+ # information_schema.constraint_column_usage:
+ # the referenced column
+ R = ischema.key_constraints.alias('R')
# Foreign key constraints
s = sql.select([C.c.column_name,
R.c.table_schema, R.c.table_name, R.c.column_name,
- RR.c.constraint_name, RR.c.match_option, RR.c.update_rule,
+ RR.c.constraint_name, RR.c.match_option,
+ RR.c.update_rule,
RR.c.delete_rule],
sql.and_(C.c.table_name == tablename,
C.c.table_schema == current_schema,
C.c.constraint_name == RR.c.constraint_name,
- R.c.constraint_name == RR.c.unique_constraint_name,
+ R.c.constraint_name ==
+ RR.c.unique_constraint_name,
C.c.ordinal_position == R.c.ordinal_position
),
- order_by = [RR.c.constraint_name, R.c.ordinal_position])
+ order_by = [
+ RR.c.constraint_name,
+ R.c.ordinal_position])
# group rows by constraint ID, to handle multi-column FKs
if schema is not None or current_schema != rschema:
rec['referred_schema'] = rschema
- local_cols, remote_cols = rec['constrained_columns'], rec['referred_columns']
+ local_cols, remote_cols = \
+ rec['constrained_columns'],\
+ rec['referred_columns']
local_cols.append(scol)
remote_cols.append(rcol)
Column("CONSTRAINT_CATALOG", CoerceUnicode, key="constraint_catalog"),
Column("CONSTRAINT_SCHEMA", CoerceUnicode, key="constraint_schema"),
Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
- Column("UNIQUE_CONSTRAINT_CATLOG", CoerceUnicode, key="unique_constraint_catalog"), # TODO: is CATLOG misspelled ?
- Column("UNIQUE_CONSTRAINT_SCHEMA", CoerceUnicode, key="unique_constraint_schema"),
- Column("UNIQUE_CONSTRAINT_NAME", CoerceUnicode, key="unique_constraint_name"),
+ # TODO: is CATLOG misspelled ?
+ Column("UNIQUE_CONSTRAINT_CATLOG", CoerceUnicode,
+ key="unique_constraint_catalog"),
+
+ Column("UNIQUE_CONSTRAINT_SCHEMA", CoerceUnicode,
+ key="unique_constraint_schema"),
+ Column("UNIQUE_CONSTRAINT_NAME", CoerceUnicode,
+ key="unique_constraint_name"),
Column("MATCH_OPTION", String, key="match_option"),
Column("UPDATE_RULE", String, key="update_rule"),
Column("DELETE_RULE", String, key="delete_rule"),
Execution Modes
~~~~~~~~~~~~~~~
-mxODBC features two styles of statement execution, using the ``cursor.execute()``
-and ``cursor.executedirect()`` methods (the second being an extension to the
-DBAPI specification). The former makes use of the native
-parameter binding services of the ODBC driver, while the latter uses string escaping.
-The primary advantage to native parameter binding is that the same statement, when
-executed many times, is only prepared once. Whereas the primary advantage to the
-latter is that the rules for bind parameter placement are relaxed. MS-SQL has very
-strict rules for native binds, including that they cannot be placed within the argument
-lists of function calls, anywhere outside the FROM, or even within subqueries within the
-FROM clause - making the usage of bind parameters within SELECT statements impossible for
-all but the most simplistic statements. For this reason, the mxODBC dialect uses the
-"native" mode by default only for INSERT, UPDATE, and DELETE statements, and uses the
-escaped string mode for all other statements. This behavior can be controlled completely
-via :meth:`~sqlalchemy.sql.expression.Executable.execution_options`
-using the ``native_odbc_execute`` flag with a value of ``True`` or ``False``, where a value of
-``True`` will unconditionally use native bind parameters and a value of ``False`` will
-uncondtionally use string-escaped parameters.
+mxODBC features two styles of statement execution, using the
+``cursor.execute()`` and ``cursor.executedirect()`` methods (the second being
+an extension to the DBAPI specification). The former makes use of a particular
+API call specific to the SQL Server Native Client ODBC driver known
+SQLDescribeParam, while the latter does not.
+
+mxODBC apparently only makes repeated use of a single prepared statement
+when SQLDescribeParam is used. The advantage to prepared statement reuse is
+one of performance. The disadvantage is that SQLDescribeParam has a limited
+set of scenarios in which bind parameters are understood, including that they
+cannot be placed within the argument lists of function calls, anywhere outside
+the FROM, or even within subqueries within the FROM clause - making the usage
+of bind parameters within SELECT statements impossible for all but the most
+simplistic statements.
+
+For this reason, the mxODBC dialect uses the "native" mode by default only for
+INSERT, UPDATE, and DELETE statements, and uses the escaped string mode for
+all other statements.
+
+This behavior can be controlled via
+:meth:`~sqlalchemy.sql.expression.Executable.execution_options` using the
+``native_odbc_execute`` flag with a value of ``True`` or ``False``, where a
+value of ``True`` will unconditionally use native bind parameters and a value
+of ``False`` will uncondtionally use string-escaped parameters.
"""
from sqlalchemy.connectors.mxodbc import MxODBCConnector
from sqlalchemy.dialects.mssql.pyodbc import MSExecutionContext_pyodbc
from sqlalchemy.dialects.mssql.base import (MSExecutionContext, MSDialect,
- MSSQLCompiler, MSSQLStrictCompiler,
+ MSSQLCompiler,
+ MSSQLStrictCompiler,
_MSDateTime, _MSDate, TIME)
def _get_server_version_info(self, connection):
vers = connection.scalar("select @@version")
- m = re.match(r"Microsoft SQL Server.*? - (\d+).(\d+).(\d+).(\d+)", vers)
+ m = re.match(
+ r"Microsoft SQL Server.*? - (\d+).(\d+).(\d+).(\d+)", vers)
if m:
return tuple(int(x) for x in m.group(1, 2, 3, 4))
else:
else:
if (len(value._int) - 1) > value.adjusted():
result = "%s%s.%s" % (
- (value < 0 and '-' or ''),
- "".join([str(s) for s in value._int][0:value.adjusted() + 1]),
- "".join([str(s) for s in value._int][value.adjusted() + 1:]))
+ (value < 0 and '-' or ''),
+ "".join(
+ [str(s) for s in value._int][0:value.adjusted() + 1]),
+ "".join(
+ [str(s) for s in value._int][value.adjusted() + 1:]))
else:
result = "%s%s" % (
- (value < 0 and '-' or ''),
- "".join([str(s) for s in value._int][0:value.adjusted() + 1]))
+ (value < 0 and '-' or ''),
+ "".join(
+ [str(s) for s in value._int][0:value.adjusted() + 1]))
return result
_embedded_scope_identity = False
def pre_exec(self):
- """where appropriate, issue "select scope_identity()" in the same statement.
+ """where appropriate, issue "select scope_identity()" in the same
+ statement.
Background on why "scope_identity()" is preferable to "@@identity":
http://msdn.microsoft.com/en-us/library/ms190315.aspx
super(MSExecutionContext_pyodbc, self).pre_exec()
- # don't embed the scope_identity select into an "INSERT .. DEFAULT VALUES"
+ # don't embed the scope_identity select into an
+ # "INSERT .. DEFAULT VALUES"
if self._select_lastrowid and \
self.dialect.use_scope_identity and \
len(self.parameters[0]):
def post_exec(self):
if self._embedded_scope_identity:
# Fetch the last inserted id from the manipulated statement
- # We may have to skip over a number of result sets with no data (due to triggers, etc.)
+ # We may have to skip over a number of result sets with
+ # no data (due to triggers, etc.)
while True:
try:
# fetchall() ensures the cursor is consumed
def __init__(self, description_encoding='latin-1', **params):
super(MSDialect_pyodbc, self).__init__(**params)
self.description_encoding = description_encoding
- self.use_scope_identity = self.dbapi and hasattr(self.dbapi.Cursor, 'nextset')
+ self.use_scope_identity = self.dbapi and \
+ hasattr(self.dbapi.Cursor, 'nextset')
dialect = MSDialect_pyodbc
self.cursor.nextset()
self._lastrowid = int(row[0])
- if (self.isinsert or self.isupdate or self.isdelete) and self.compiled.returning:
+ if (self.isinsert or self.isupdate or self.isdelete) and \
+ self.compiled.returning:
self._result_proxy = base.FullyBufferedResultProxy(self)
if self._enable_identity_insert:
- table = self.dialect.identifier_preparer.format_table(self.compiled.statement.table)
+ table = self.dialect.identifier_preparer.format_table(
+ self.compiled.statement.table)
self.cursor.execute("SET IDENTITY_INSERT %s OFF" % table)
execution_ctx_cls = MSExecutionContext_zxjdbc
def _get_server_version_info(self, connection):
- return tuple(int(x) for x in connection.connection.dbversion.split('.'))
+ return tuple(
+ int(x)
+ for x in connection.connection.dbversion.split('.')
+ )
dialect = MSDialect_zxjdbc
-from sqlalchemy.dialects.mysql import base, mysqldb, oursql, pyodbc, zxjdbc, mysqlconnector
+from sqlalchemy.dialects.mysql import base, mysqldb, oursql, \
+ pyodbc, zxjdbc, mysqlconnector
# default dialect
base.dialect = mysqldb.dialect
from sqlalchemy.dialects.mysql.base import \
- BIGINT, BINARY, BIT, BLOB, BOOLEAN, CHAR, DATE, DATETIME, DECIMAL, DOUBLE, ENUM, DECIMAL,\
- FLOAT, INTEGER, INTEGER, LONGBLOB, LONGTEXT, MEDIUMBLOB, MEDIUMINT, MEDIUMTEXT, NCHAR, \
- NVARCHAR, NUMERIC, SET, SMALLINT, REAL, TEXT, TIME, TIMESTAMP, TINYBLOB, TINYINT, TINYTEXT,\
+ BIGINT, BINARY, BIT, BLOB, BOOLEAN, CHAR, DATE, DATETIME, \
+ DECIMAL, DOUBLE, ENUM, DECIMAL,\
+ FLOAT, INTEGER, INTEGER, LONGBLOB, LONGTEXT, MEDIUMBLOB, \
+ MEDIUMINT, MEDIUMTEXT, NCHAR, \
+ NVARCHAR, NUMERIC, SET, SMALLINT, REAL, TEXT, TIME, TIMESTAMP, \
+ TINYBLOB, TINYINT, TINYTEXT,\
VARBINARY, VARCHAR, YEAR, dialect
__all__ = (