base.dialect = pyodbc.dialect
from base import CHAR, VARCHAR, TIME, NCHAR, NVARCHAR,\
- TEXT,DATE,DATETIME, FLOAT, NUMERIC,\
- BIGINT,INT, INTEGER, SMALLINT, BINARY,\
- VARBINARY,UNITEXT,UNICHAR,UNIVARCHAR,\
- IMAGE,BIT,MONEY,SMALLMONEY,TINYINT,\
- dialect
+ TEXT, DATE, DATETIME, FLOAT, NUMERIC,\
+ BIGINT, INT, INTEGER, SMALLINT, BINARY,\
+ VARBINARY, UNITEXT, UNICHAR, UNIVARCHAR,\
+ IMAGE, BIT, MONEY, SMALLMONEY, TINYINT,\
+ dialect
__all__ = (
- 'CHAR', 'VARCHAR', 'TIME', 'NCHAR', 'NVARCHAR',
- 'TEXT','DATE','DATETIME', 'FLOAT', 'NUMERIC',
- 'BIGINT','INT', 'INTEGER', 'SMALLINT', 'BINARY',
- 'VARBINARY','UNITEXT','UNICHAR','UNIVARCHAR',
- 'IMAGE','BIT','MONEY','SMALLMONEY','TINYINT',
- 'dialect'
+ 'CHAR', 'VARCHAR', 'TIME', 'NCHAR', 'NVARCHAR',
+ 'TEXT', 'DATE', 'DATETIME', 'FLOAT', 'NUMERIC',
+ 'BIGINT', 'INT', 'INTEGER', 'SMALLINT', 'BINARY',
+ 'VARBINARY', 'UNITEXT', 'UNICHAR', 'UNIVARCHAR',
+ 'IMAGE', 'BIT', 'MONEY', 'SMALLMONEY', 'TINYINT',
+ 'dialect'
)
def result_processor(self, dialect, coltype):
def process(value):
if value is not None:
- return str(value) #.decode("ucs-2")
+ return str(value) # decode("ucs-2")
else:
return None
return process
+
class UNICHAR(_SybaseUnitypeMixin, sqltypes.Unicode):
__visit_name__ = 'UNICHAR'
+
class UNIVARCHAR(_SybaseUnitypeMixin, sqltypes.Unicode):
__visit_name__ = 'UNIVARCHAR'
+
class UNITEXT(_SybaseUnitypeMixin, sqltypes.UnicodeText):
__visit_name__ = 'UNITEXT'
+
class TINYINT(sqltypes.Integer):
__visit_name__ = 'TINYINT'
+
class BIT(sqltypes.TypeEngine):
__visit_name__ = 'BIT'
+
class MONEY(sqltypes.TypeEngine):
__visit_name__ = "MONEY"
+
class SMALLMONEY(sqltypes.TypeEngine):
__visit_name__ = "SMALLMONEY"
+
class UNIQUEIDENTIFIER(sqltypes.TypeEngine):
__visit_name__ = "UNIQUEIDENTIFIER"
+
class IMAGE(sqltypes.LargeBinary):
__visit_name__ = 'IMAGE'
ischema_names = {
'bigint': BIGINT,
- 'int' : INTEGER,
- 'integer' : INTEGER,
- 'smallint' : SMALLINT,
- 'tinyint' : TINYINT,
- 'unsigned bigint' : BIGINT, # TODO: unsigned flags
- 'unsigned int' : INTEGER, # TODO: unsigned flags
- 'unsigned smallint' : SMALLINT, # TODO: unsigned flags
- 'numeric' : NUMERIC,
- 'decimal' : DECIMAL,
- 'dec' : DECIMAL,
- 'float' : FLOAT,
- 'double' : NUMERIC, # TODO
- 'double precision' : NUMERIC, # TODO
+ 'int': INTEGER,
+ 'integer': INTEGER,
+ 'smallint': SMALLINT,
+ 'tinyint': TINYINT,
+ 'unsigned bigint': BIGINT, # TODO: unsigned flags
+ 'unsigned int': INTEGER, # TODO: unsigned flags
+ 'unsigned smallint': SMALLINT, # TODO: unsigned flags
+ 'numeric': NUMERIC,
+ 'decimal': DECIMAL,
+ 'dec': DECIMAL,
+ 'float': FLOAT,
+ 'double': NUMERIC, # TODO
+ 'double precision': NUMERIC, # TODO
'real': REAL,
'smallmoney': SMALLMONEY,
'money': MONEY,
'datetime': DATETIME,
'date': DATE,
'time': TIME,
- 'char' : CHAR,
- 'character' : CHAR,
- 'varchar' : VARCHAR,
- 'character varying' : VARCHAR,
- 'char varying' : VARCHAR,
- 'unichar' : UNICHAR,
- 'unicode character' : UNIVARCHAR,
+ 'char': CHAR,
+ 'character': CHAR,
+ 'varchar': VARCHAR,
+ 'character varying': VARCHAR,
+ 'char varying': VARCHAR,
+ 'unichar': UNICHAR,
+ 'unicode character': UNIVARCHAR,
'nchar': NCHAR,
'national char': NCHAR,
'national character': NCHAR,
'national character varying': NVARCHAR,
'text': TEXT,
'unitext': UNITEXT,
- 'binary' : BINARY,
- 'varbinary' : VARBINARY,
- 'image' : IMAGE,
+ 'binary': BINARY,
+ 'varbinary': VARBINARY,
+ 'image': IMAGE,
'bit': BIT,
# not in documentation for ASE 15.7
- 'long varchar' : TEXT, # TODO
+ 'long varchar': TEXT, # TODO
'timestamp': TIMESTAMP,
'uniqueidentifier': UNIQUEIDENTIFIER,
cursor.close()
return lastrowid
+
class SybaseSQLCompiler(compiler.SQLCompiler):
ansi_bind_rules = True
# FIXME: sybase doesn't allow an offset without a limit
# so use a huge value for TOP here
s += "TOP 1000000 "
- s += "START AT %s " % (select._offset+1,)
+ s += "START AT %s " % (select._offset + 1,)
return s
def get_from_hint_text(self, table, text):
self._index_identifier(index.name), index.quote)
)
+
class SybaseIdentifierPreparer(compiler.IdentifierPreparer):
reserved_words = RESERVED_WORDS
+
class SybaseDialect(default.DefaultDialect):
name = 'sybase'
supports_unicode_statements = False
def _get_default_schema_name(self, connection):
return connection.scalar(
text("SELECT user_name() as user_name",
- typemap={'user_name':Unicode})
+ typemap={'user_name': Unicode})
)
def initialize(self, connection):
COLUMN_SQL = text("""
SELECT col.name AS name,
- t.name AS type,
+ t.name AS type,
(col.status & 8) AS nullable,
(col.status & 128) AS autoincrement,
com.text AS 'default',
col.prec AS precision,
col.scale AS scale,
col.length AS length
- FROM systypes t, syscolumns col LEFT OUTER JOIN syscomments com ON
+ FROM systypes t, syscolumns col LEFT OUTER JOIN syscomments com ON
col.cdefault = com.id
- WHERE col.usertype = t.usertype
+ WHERE col.usertype = t.usertype
AND col.id = :table_id
ORDER BY col.colid
""")
results = connection.execute(COLUMN_SQL, table_id=table_id)
columns = []
- for (name, type_, nullable, autoincrement, default, precision, scale,
+ for (name, type_, nullable, autoincrement, default, precision, scale,
length) in results:
col_info = self._get_column_info(name, type_, bool(nullable),
bool(autoincrement), default, precision, scale,
default = re.sub("^'(.*)'$", lambda m: m.group(1), default)
else:
default = None
-
+
column_info = dict(name=name, type=coltype, nullable=nullable,
default=default, autoincrement=autoincrement)
return column_info
table_id = self.get_table_id(connection, table_name, schema,
info_cache=kw.get("info_cache"))
-
+
table_cache = {}
column_cache = {}
foreign_keys = []
-
+
table_cache[table_id] = {"name": table_name, "schema": schema}
-
+
COLUMN_SQL = text("""
SELECT c.colid AS id, c.name AS name
FROM syscolumns c
WHERE c.id = :table_id
""")
-
+
results = connection.execute(COLUMN_SQL, table_id=table_id)
columns = {}
for col in results:
columns[col["id"]] = col["name"]
column_cache[table_id] = columns
-
+
REFCONSTRAINT_SQL = text("""
SELECT o.name AS name, r.reftabid AS reftable_id,
r.keycnt AS 'count',
""")
referential_constraints = connection.execute(REFCONSTRAINT_SQL,
table_id=table_id)
-
+
REFTABLE_SQL = text("""
SELECT o.name AS name, u.name AS 'schema'
FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
""")
for r in referential_constraints:
-
reftable_id = r["reftable_id"]
-
+
if reftable_id not in table_cache:
c = connection.execute(REFTABLE_SQL, table_id=reftable_id)
reftable = c.fetchone()
for col in results:
reftable_columns[col["id"]] = col["name"]
column_cache[reftable_id] = reftable_columns
-
+
reftable = table_cache[reftable_id]
reftable_columns = column_cache[reftable_id]
-
+
constrained_columns = []
referred_columns = []
- for i in range(1, r["count"]+1):
+ for i in range(1, r["count"] + 1):
constrained_columns.append(columns[r["fokey%i" % i]])
referred_columns.append(reftable_columns[r["refkey%i" % i]])
-
+
fk_info = {
"constrained_columns": constrained_columns,
"referred_schema": reftable["schema"],
"referred_columns": referred_columns,
"name": r["name"]
}
-
+
foreign_keys.append(fk_info)
-
+
return foreign_keys
@reflection.cache
results = connection.execute(PK_SQL, table_id=table_id)
pks = results.fetchone()
results.close()
-
+
constrained_columns = []
- for i in range(1, pks["count"]+1):
+ for i in range(1, pks["count"] + 1):
constrained_columns.append(pks["pk_%i" % (i,)])
return {"constrained_columns": constrained_columns,
"name": pks["name"]}