from sqlalchemy import types as sqltypes
-class InfoDateTime(sqltypes.DateTime ):
+class InfoDateTime(sqltypes.DateTime):
def bind_processor(self, dialect):
def process(value):
if value is not None:
if value.microsecond:
- value = value.replace( microsecond = 0 )
+ value = value.replace(microsecond=0)
return value
return process
-class InfoTime(sqltypes.Time ):
+class InfoTime(sqltypes.Time):
def bind_processor(self, dialect):
def process(value):
if value is not None:
if value.microsecond:
- value = value.replace( microsecond = 0 )
+ value = value.replace(microsecond=0)
return value
return process
def result_processor(self, dialect):
def process(value):
- if isinstance( value , datetime.datetime ):
+ if isinstance(value, datetime.datetime):
return value.time()
else:
return value
class InfoTypeCompiler(compiler.GenericTypeCompiler):
def visit_DATETIME(self, type_):
return "DATETIME YEAR TO SECOND"
-
+
def visit_TIME(self, type_):
return "DATETIME HOUR TO SECOND"
-
+
def visit_binary(self, type_):
return "BYTE"
-
+
def visit_boolean(self, type_):
return "SMALLINT"
-
+
class InfoSQLCompiler(compiler.SQLCompiler):
def __init__(self, *args, **kwargs):
self.limit = 0
self.offset = 0
- compiler.SQLCompiler.__init__( self , *args, **kwargs )
+ compiler.SQLCompiler.__init__(self, *args, **kwargs)
def default_from(self):
return " from systables where tabname = 'systables' "
- def get_select_precolumns( self , select ):
+ def get_select_precolumns(self, select):
s = select._distinct and "DISTINCT " or ""
# only has limit
if select._limit:
off = select._offset or 0
- s += " FIRST %s " % ( select._limit + off )
+ s += " FIRST %s " % (select._limit + off)
else:
s += ""
return s
self.limit = select._limit or 0
# the column in order by clause must in select too
- def __label( c ):
+ def __label(c):
try:
return c._label.lower()
except:
return ''
# TODO: dont modify the original select, generate a new one
- a = [ __label(c) for c in select._raw_columns ]
+ a = [__label(c) for c in select._raw_columns]
for c in select._order_by_clause.clauses:
- if ( __label(c) not in a ):
- select.append_column( c )
+ if __label(c) not in a:
+ select.append_column(c)
return compiler.SQLCompiler.visit_select(self, select)
def limit_clause(self, select):
return ""
- def visit_function( self , func ):
+ def visit_function(self, func):
if func.name.lower() == 'current_date':
return "today"
elif func.name.lower() == 'current_time':
return "CURRENT HOUR TO SECOND"
- elif func.name.lower() in ( 'current_timestamp' , 'now' ):
+ elif func.name.lower() in ('current_timestamp', 'now'):
return "CURRENT YEAR TO SECOND"
else:
- return compiler.SQLCompiler.visit_function( self , func )
+ return compiler.SQLCompiler.visit_function(self, func)
def visit_clauselist(self, list, **kwargs):
return ', '.join([s for s in [self.process(c) for c in list.clauses] if s is not None])
def get_column_specification(self, column, first_pk=False):
colspec = self.preparer.format_column(column)
if column.primary_key and len(column.foreign_keys)==0 and column.autoincrement and \
- isinstance(column.type, sqltypes.Integer) and not getattr( self , 'has_serial' , False ) and first_pk:
+ isinstance(column.type, sqltypes.Integer) and not getattr(self, 'has_serial', False) and first_pk:
colspec += " SERIAL"
self.has_serial = True
else:
return colspec
def post_create_table(self, table):
- if hasattr( self , 'has_serial' ):
+ if hasattr(self, 'has_serial'):
del self.has_serial
return ''
def format_constraint(self, constraint):
# informix doesnt support names for constraints
return ''
-
+
def _requires_quotes(self, value):
return False
colspecs = colspecs
ischema_names = ischema_names
- def do_begin(self , connect ):
+ def do_begin(self, connect):
cu = connect.cursor()
- cu.execute( 'SET LOCK MODE TO WAIT' )
- #cu.execute( 'SET ISOLATION TO REPEATABLE READ' )
+ cu.execute('SET LOCK MODE TO WAIT')
+ #cu.execute('SET ISOLATION TO REPEATABLE READ')
def table_names(self, connection, schema):
s = "select tabname from systables"
return [row[0] for row in connection.execute(s)]
def has_table(self, connection, table_name, schema=None):
- cursor = connection.execute("""select tabname from systables where tabname=?""", table_name.lower() )
+ cursor = connection.execute("""select tabname from systables where tabname=?""", table_name.lower())
return cursor.first() is not None
def reflecttable(self, connection, table, include_columns):
- c = connection.execute ("select distinct OWNER from systables where tabname=?", table.name.lower() )
+ c = connection.execute ("select distinct OWNER from systables where tabname=?", table.name.lower())
rows = c.fetchall()
if not rows :
raise exc.NoSuchTableError(table.name)
c = connection.execute ("""select colname , coltype , collength , t3.default , t1.colno from syscolumns as t1 , systables as t2 , OUTER sysdefaults as t3
where t1.tabid = t2.tabid and t2.tabname=? and t2.owner=?
and t3.tabid = t2.tabid and t3.colno = t1.colno
- order by t1.colno""", table.name.lower(), owner )
+ order by t1.colno""", table.name.lower(), owner)
rows = c.fetchall()
if not rows:
raise exc.NoSuchTableError(table.name)
- for name , colattr , collength , default , colno in rows:
+ for name, colattr, collength, default, colno in rows:
name = name.lower()
if include_columns and name not in include_columns:
continue
# in 7.31, coltype = 0x000
# ^^-- column type
- # ^-- 1 not null , 0 null
- nullable , coltype = divmod( colattr , 256 )
- if coltype not in ( 0 , 13 ) and default:
+ # ^-- 1 not null, 0 null
+ nullable, coltype = divmod(colattr, 256)
+ if coltype not in (0, 13) and default:
default = default.split()[-1]
- if coltype == 0 or coltype == 13: # char , varchar
+ if coltype == 0 or coltype == 13: # char, varchar
coltype = ischema_names.get(coltype, InfoString)(collength)
if default:
default = "'%s'" % default
elif coltype == 5: # decimal
- precision , scale = ( collength & 0xFF00 ) >> 8 , collength & 0xFF
+ precision, scale = (collength & 0xFF00) >> 8, collength & 0xFF
if scale == 255:
scale = 0
coltype = InfoNumeric(precision, scale)
and t4.tabid = t2.tabid and t4.colno = t3.part1
and t5.constrid = t1.constrid and t8.constrid = t5.primary
and t6.tabid = t5.ptabid and t6.colno = t9.part1 and t9.idxname = t8.idxname
- and t7.tabid = t5.ptabid""", table.name.lower(), owner )
+ and t7.tabid = t5.ptabid""", table.name.lower(), owner)
rows = c.fetchall()
fks = {}
for cons_name, cons_type, local_column, remote_table, remote_column in rows:
fk[1].append(refspec)
for name, value in fks.iteritems():
- table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1] , None, link_to_name=True ))
+ table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1], None, link_to_name=True))
# PK
c = connection.execute("""select t1.constrname as cons_name , t1.constrtype as cons_type ,
sysindexes as t3 , syscolumns as t4
where t1.tabid = t2.tabid and t2.tabname=? and t2.owner=? and t1.constrtype = 'P'
and t3.tabid = t2.tabid and t3.idxname = t1.idxname
- and t4.tabid = t2.tabid and t4.colno = t3.part1""", table.name.lower(), owner )
+ and t4.tabid = t2.tabid and t4.colno = t3.part1""", table.name.lower(), owner)
rows = c.fetchall()
for cons_name, cons_type, local_column in rows:
- table.primary_key.add( table.c[local_column] )
+ table.primary_key.add(table.c[local_column])
# for offset
class informix_cursor(object):
- def __init__( self , con ):
+ def __init__(self, con):
self.__cursor = con.cursor()
self.rowcount = 0
- def offset( self , n ):
+ def offset(self, n):
if n > 0:
- self.fetchmany( n )
+ self.fetchmany(n)
self.rowcount = self.__cursor.rowcount - n
if self.rowcount < 0:
self.rowcount = 0
else:
self.rowcount = self.__cursor.rowcount
- def execute( self , sql , params ):
- if params is None or len( params ) == 0:
+ def execute(self, sql, params):
+ if params is None or len(params) == 0:
params = []
- return self.__cursor.execute( sql , params )
+ return self.__cursor.execute(sql, params)
- def __getattr__( self , name ):
- if name not in ( 'offset' , '__cursor' , 'rowcount' , '__del__' , 'execute' ):
- return getattr( self.__cursor , name )
+ def __getattr__(self, name):
+ if name not in ('offset', '__cursor', 'rowcount', '__del__', 'execute'):
+ return getattr(self.__cursor, name)
class InfoExecutionContext(default.DefaultExecutionContext):
def post_exec(self):
if getattr(self.compiled, "isinsert", False) and self.inserted_primary_key is None:
self._last_inserted_ids = [self.cursor.sqlerrd[1]]
- elif hasattr( self.compiled , 'offset' ):
- self.cursor.offset( self.compiled.offset )
+ elif hasattr(self.compiled, 'offset'):
+ self.cursor.offset(self.compiled.offset)
- def create_cursor( self ):
- return informix_cursor( self.connection.connection )
+ def create_cursor(self):
+ return informix_cursor(self.connection.connection)
class Informix_informixdb(InformixDialect):
driver = 'informixdb'
default_paramstyle = 'qmark'
execution_context_cls = InfoExecutionContext
-
+
@classmethod
def dbapi(cls):
return __import__('informixdb')
def create_connect_args(self, url):
if url.host:
- dsn = '%s@%s' % ( url.database , url.host )
+ dsn = '%s@%s' % (url.database, url.host)
else:
dsn = url.database
if url.username:
- opt = { 'user':url.username , 'password': url.password }
+ opt = {'user': url.username, 'password': url.password}
else:
opt = {}
return False
-dialect = Informix_informixdb
\ No newline at end of file
+dialect = Informix_informixdb