return self._extend_string(type_, basic)
-
class FBCompiler(sql.compiler.SQLCompiler):
"""Firebird specific idiosyncrasies"""
# get primary key fields
c = connection.execute(keyqry, ["PRIMARY KEY", tablename])
pkfields = [self.normalize_name(r['fname']) for r in c.fetchall()]
- return {'constrained_columns':pkfields, 'name':None}
+ return {'constrained_columns': pkfields, 'name': None}
@reflection.cache
def get_column_sequence(self, connection,
# Redundant
defvalue = None
col_d = {
- 'name' : name,
- 'type' : coltype,
- 'nullable' : not bool(row['null_flag']),
- 'default' : defvalue,
- 'autoincrement':defvalue is None
+ 'name': name,
+ 'type': coltype,
+ 'nullable': not bool(row['null_flag']),
+ 'default': defvalue,
+ 'autoincrement': defvalue is None
}
if orig_colname.lower() == orig_colname:
# if the PK is a single field, try to see if its linked to
# a sequence thru a trigger
- if len(pkey_cols)==1 and name==pkey_cols[0]:
+ if len(pkey_cols) == 1 and name == pkey_cols[0]:
seq_d = self.get_column_sequence(connection, tablename, name)
if seq_d is not None:
col_d['sequence'] = seq_d
tablename = self.denormalize_name(table_name)
c = connection.execute(fkqry, ["FOREIGN KEY", tablename])
- fks = util.defaultdict(lambda:{
- 'name' : None,
- 'constrained_columns' : [],
- 'referred_schema' : None,
- 'referred_table' : None,
- 'referred_columns' : []
+ fks = util.defaultdict(lambda: {
+ 'name': None,
+ 'constrained_columns': [],
+ 'referred_schema': None,
+ 'referred_table': None,
+ 'referred_columns': []
})
for row in c:
colspecs = util.update_copy(
FBDialect.colspecs,
{
- sqltypes.Numeric:_FBNumeric_kinterbasdb,
+ sqltypes.Numeric: _FBNumeric_kinterbasdb,
}
)
ischema_names = {
- 0 : sqltypes.CHAR, # CHAR
- 1 : sqltypes.SMALLINT, # SMALLINT
- 2 : sqltypes.INTEGER, # INT
- 3 : sqltypes.FLOAT, # Float
- 3 : sqltypes.Float, # SmallFloat
- 5 : sqltypes.DECIMAL, # DECIMAL
- 6 : sqltypes.Integer, # Serial
- 7 : sqltypes.DATE, # DATE
- 8 : sqltypes.Numeric, # MONEY
- 10 : sqltypes.DATETIME, # DATETIME
- 11 : sqltypes.LargeBinary, # BYTE
- 12 : sqltypes.TEXT, # TEXT
- 13 : sqltypes.VARCHAR, # VARCHAR
- 15 : sqltypes.NCHAR, # NCHAR
- 16 : sqltypes.NVARCHAR, # NVARCHAR
- 17 : sqltypes.Integer, # INT8
- 18 : sqltypes.Integer, # Serial8
- 43 : sqltypes.String, # LVARCHAR
- -1 : sqltypes.BLOB, # BLOB
- -1 : sqltypes.CLOB, # CLOB
+ 0: sqltypes.CHAR, # CHAR
+ 1: sqltypes.SMALLINT, # SMALLINT
+ 2: sqltypes.INTEGER, # INT
+ 3: sqltypes.FLOAT, # Float
+ 3: sqltypes.Float, # SmallFloat
+ 5: sqltypes.DECIMAL, # DECIMAL
+ 6: sqltypes.Integer, # Serial
+ 7: sqltypes.DATE, # DATE
+ 8: sqltypes.Numeric, # MONEY
+ 10: sqltypes.DATETIME, # DATETIME
+ 11: sqltypes.LargeBinary, # BYTE
+ 12: sqltypes.TEXT, # TEXT
+ 13: sqltypes.VARCHAR, # VARCHAR
+ 15: sqltypes.NCHAR, # NCHAR
+ 16: sqltypes.NVARCHAR, # NVARCHAR
+ 17: sqltypes.Integer, # INT8
+ 18: sqltypes.Integer, # Serial8
+ 43: sqltypes.String, # LVARCHAR
+ -1: sqltypes.BLOB, # BLOB
+ -1: sqltypes.CLOB, # CLOB
}
t8.idxname
and t7.tabid = t5.ptabid""", table_name, schema_sel)
-
def fkey_rec():
return {
'name' : None,
colpositions = set()
for row in data:
- colpos = set([getattr(row, 'part%d' % x) for x in range(1,16)])
+ colpos = set([getattr(row, 'part%d' % x) for x in range(1, 16)])
colpositions |= colpos
if not len(colpositions):
- return {'constrained_columns':[], 'name':None}
+ return {'constrained_columns': [], 'name': None}
# Select the column names using the columnpositions
# TODO: Maybe cache a bit of those col infos (eg select all colnames for one table)
- place_holder = ','.join('?'*len(colpositions))
+ place_holder = ','.join('?' * len(colpositions))
c = connection.execute(
"""select t1.colname
from syscolumns as t1, systables as t2
table_name, *colpositions
).fetchall()
- cols = reduce(lambda x,y: list(x)+list(y), c, [])
- return {'constrained_columns':cols, 'name':None}
+ cols = reduce(lambda x, y: list(x) + list(y), c, [])
+ return {'constrained_columns': cols, 'name': None}
@reflection.cache
def get_indexes(self, connection, table_name, schema, **kw):
indexes = []
for row in c.fetchall():
- colnames = [getattr(row, 'part%d' % x) for x in range(1,16)]
+ colnames = [getattr(row, 'part%d' % x) for x in range(1, 16)]
colnames = [x for x in colnames if x]
- place_holder = ','.join('?'*len(colnames))
+ place_holder = ','.join('?' * len(colnames))
c = connection.execute(
"""select t1.colname
from syscolumns as t1, systables as t2
t1.colno in (%s)""" % place_holder,
table_name, *colnames
).fetchall()
- c = reduce(lambda x,y: list(x)+list(y), c, [])
+ c = reduce(lambda x, y: list(x) + list(y), c, [])
indexes.append({
'name': row.idxname,
'unique': row.idxtype.lower() == 'u',
colspecs = util.update_copy(
MSDialect.colspecs,
{
- sqltypes.DateTime:MSDateTime_adodbapi
+ sqltypes.DateTime: MSDateTime_adodbapi
}
)
'type' : coltype,
'nullable' : nullable,
'default' : default,
- 'autoincrement':False,
+ 'autoincrement': False,
}
cols.append(cdict)
# autoincrement and identity
RR.c.constraint_name,
R.c.ordinal_position])
-
# group rows by constraint ID, to handle multi-column FKs
fkeys = []
fknm, scols, rcols = (None, [], [])
sqltypes.Time : _MSTime_mxodbc,
}
-
def __init__(self, description_encoding=None, **params):
super(MSDialect_mxodbc, self).__init__(**params)
self.description_encoding = description_encoding
colspecs = util.update_copy(
MSDialect.colspecs,
{
- sqltypes.Numeric:_MSNumeric_pymssql,
- sqltypes.Float:sqltypes.Float,
+ sqltypes.Numeric: _MSNumeric_pymssql,
+ sqltypes.Float: sqltypes.Float,
}
)
@classmethod
result = "%s%s%s" % (
(value < 0 and '-' or ''),
"".join([str(s) for s in _int]),
- "0" * (value.adjusted() - (len(_int)-1)))
+ "0" * (value.adjusted() - (len(_int) - 1)))
else:
if (len(_int) - 1) > value.adjusted():
result = "%s%s.%s" % (
colspecs = util.update_copy(
MSDialect.colspecs,
{
- sqltypes.Numeric:_MSNumeric_pyodbc
+ sqltypes.Numeric: _MSNumeric_pyodbc
}
)
return constraint_string
-
def get_column_specification(self, column, **kw):
"""Builds column DDL."""
table_opts.append(joiner.join((opt, arg)))
return ' '.join(table_opts)
-
def visit_create_index(self, create):
index = create.element
preparer = self.preparer
def _get_default_schema_name(self, connection):
return connection.execute('SELECT DATABASE()').scalar()
-
def has_table(self, connection, table_name, schema=None):
# SHOW TABLE STATUS LIKE and SHOW TABLES LIKE do not function properly
# on macosx (and maybe win?) with multibyte table names.
# full_name = self.identifier_preparer.format_table(table,
# use_schema=True)
-
full_name = '.'.join(self.identifier_preparer._quote_free_identifiers(
schema, table_name))
if key['type'] == 'PRIMARY':
# There can be only one.
cols = [s[0] for s in key['columns']]
- return {'constrained_columns':cols, 'name':None}
- return {'constrained_columns':[], 'name':None}
+ return {'constrained_columns': cols, 'name': None}
+ return {'constrained_columns': [], 'name': None}
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
# 123 or 123,456
self._re_csv_int = _re_compile(r'\d+')
-
# `colname` <type> [type opts]
# (NOT NULL | NULL)
# DEFAULT ('value' | CURRENT_TIMESTAMP...)
r"'(?P<val>(?:[^']|'')*?)'(?!')" %
(re.escape(directive), self._optional_equals))
self._pr_options.append(
- _pr_compile(regex, lambda v: v.replace("\\\\","\\").replace("''", "'")))
+ _pr_compile(regex, lambda v: v.replace("\\\\", "\\").replace("''", "'")))
def _add_option_word(self, directive):
regex = (r'(?P<directive>%s)%s'
def is_disconnect(self, e, connection, cursor):
errnos = (2006, 2013, 2014, 2045, 2055, 2048)
- exceptions = (self.dbapi.OperationalError,self.dbapi.InterfaceError)
+ exceptions = (self.dbapi.OperationalError, self.dbapi.InterfaceError)
if isinstance(e, exceptions):
return e.errno in errnos
else:
from ... import types as sqltypes, util
-
class _oursqlBIT(BIT):
def result_processor(self, dialect, coltype):
"""oursql already converts mysql bits, so."""
**kw
)
-
def get_columns(self, connection, table_name, schema=None, **kw):
return MySQLDialect.get_columns(self,
connection.connect().\
if c:
return int(c)
- def _get_server_version_info(self,connection):
+ def _get_server_version_info(self, connection):
dbapi_con = connection.connection
version = []
r = re.compile('[.\-]')
__visit_name__ = 'ROWID'
-
class _OracleBoolean(sqltypes.Boolean):
def get_dbapi_type(self, dbapi):
return dbapi.NUMBER
if precision is None:
return name
elif scale is None:
- return "%(name)s(%(precision)s)" % {'name':name,'precision': precision}
+ n = "%(name)s(%(precision)s)"
+ return n % {'name': name, 'precision': precision}
else:
- return "%(name)s(%(precision)s, %(scale)s)" % {'name':name,'precision': precision, 'scale' : scale}
+ n = "%(name)s(%(precision)s, %(scale)s)"
+ return n % {'name': name, 'precision': precision, 'scale': scale}
def visit_string(self, type_):
return self.visit_VARCHAR2(type_)
def _visit_varchar(self, type_, n, num):
if not n and self.dialect._supports_char_length:
- return "VARCHAR%(two)s(%(length)s CHAR)" % {
- 'length' : type_.length,
- 'two':num}
+ varchar = "VARCHAR%(two)s(%(length)s CHAR)"
+ return varchar % {'length': type_.length, 'two': num}
else:
- return "%(n)sVARCHAR%(two)s(%(length)s)" % {'length' : type_.length,
- 'two':num, 'n':n}
+ varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
+ return varchar % {'length': type_.length, 'two': num, 'n': n}
def visit_text(self, type_):
return self.visit_CLOB(type_)
elif binary.right.table is join.right:
binary.right = _OuterJoinColumn(binary.right)
clauses.append(visitors.cloned_traverse(join.onclause, {},
- {'binary':visit_binary}))
+ {'binary': visit_binary}))
else:
clauses.append(join.onclause)
cursor = connection.execute(s, owner=schema)
return [self.normalize_name(row[0]) for row in cursor]
-
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
schema = self.denormalize_name(schema or self.default_schema_name)
"SELECT column_name, data_type, %(char_length_col)s, data_precision, data_scale, "
"nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "
"WHERE table_name = :table_name AND owner = :owner "
- "ORDER BY column_id" % {'dblink': dblink, 'char_length_col':char_length_col}),
+ "ORDER BY column_id" % {'dblink': dblink, 'char_length_col': char_length_col}),
table_name=table_name, owner=schema)
for row in c:
(colname, orig_colname, coltype, length, precision, scale, nullable, default) = \
- (self.normalize_name(row[0]), row[0], row[1], row[2], row[3], row[4], row[5]=='Y', row[6])
+ (self.normalize_name(row[0]), row[0], row[1], row[2], row[3], row[4], row[5] == 'Y', row[6])
if coltype == 'NUMBER' :
coltype = NUMBER(precision, scale)
'type': coltype,
'nullable': nullable,
'default': default,
- 'autoincrement':default is None
+ 'autoincrement': default is None
}
if orig_colname.lower() == orig_colname:
cdict['quote'] = True
def get_indexes(self, connection, table_name, schema=None,
resolve_synonyms=False, dblink='', **kw):
-
info_cache = kw.get('info_cache')
(table_name, schema, dblink, synonym) = \
self._prepare_reflection_args(connection, table_name, schema,
if constraint_name is None:
constraint_name = self.normalize_name(cons_name)
pkeys.append(local_column)
- return {'constrained_columns':pkeys, 'name':constraint_name}
+ return {'constrained_columns': pkeys, 'name': constraint_name}
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
util.warn(
("Got 'None' querying 'table_name' from "
"all_cons_columns%(dblink)s - does the user have "
- "proper rights to the table?") % {'dblink':dblink})
+ "proper rights to the table?") % {'dblink': dblink})
continue
rec = fkeys[cons_name]
return None
-
class _OuterJoinColumn(sql.ClauseElement):
__visit_name__ = 'outer_join_column'
def __init__(self, column):
self.column = column
-
-
-
oracle.ROWID: _OracleRowid,
}
-
execute_sequence_format = list
def __init__(self,
# expect encoded strings or unicodes, etc.
self.dbapi_type_map = {
self.dbapi.CLOB: oracle.CLOB(),
- self.dbapi.NCLOB:oracle.NCLOB(),
+ self.dbapi.NCLOB: oracle.NCLOB(),
self.dbapi.BLOB: oracle.BLOB(),
self.dbapi.BINARY: oracle.RAW(),
}