import sqlalchemy.ansisql as ansisql
from sqlalchemy.ansisql import *
+colspecs = {
+ schema.INT : "INTEGER",
+ schema.CHAR : "CHAR(%(length)s)",
+ schema.VARCHAR : "VARCHAR(%(length)s)",
+ schema.TEXT : "TEXT",
+ schema.FLOAT : "NUMERIC(%(precision)s, %(length)s)",
+ schema.DECIMAL : "NUMERIC(%(precision)s, %(length)s)",
+ schema.TIMESTAMP : "TIMESTAMP",
+ schema.DATETIME : "TIMESTAMP",
+ schema.CLOB : "TEXT",
+ schema.BLOB : "BLOB",
+ schema.BOOLEAN : "BOOLEAN",
+}
+
+
def engine(**params):
return PGSQLEngine(**params)
-
+
class PGSQLEngine(ansisql.ANSISQLEngine):
def __init__(self, **params):
ansisql.ANSISQLEngine.__init__(self, **params)
- def create_connection(self):
+ def connect_args(self):
+ return [[], {}]
+
+ def compile(self, statement, bindparams):
+ compiler = PGCompiler(self, statement, bindparams)
+ statement.accept_visitor(compiler)
+ return compiler
+
+ def pre_exec(self, connection, cursor, statement, parameters, echo = None, compiled = None, **kwargs):
+ if compiled is None: return
+ if getattr(compiled, "isinsert", False):
+ for primary_key in compiled.statement.table.primary_keys:
+ # pseudocode
+ if echo is True or self._echo:
+ self.log(primary_key.sequence.text)
+ res = cursor.execute(primary_key.sequence.text)
+ parameters[primary_key.key] = res.fetchrow()[0]
+
+ def dbapi(self):
+ return None
+# return psycopg
+
+ def columnimpl(self, column):
+ return PGColumnImpl(column)
+
+ def reflecttable(self, table):
raise NotImplementedError()
-
+
+class PGCompiler(ansisql.ANSICompiler):
+ def visit_insert(self, insert):
+ self.isinsert = True
+ super(self).visit_insert(insert)
+
+class PGColumnImpl(sql.ColumnSelectable):
+ def get_specification(self):
+ coltype = self.column.type
+ if isinstance(coltype, types.ClassType):
+ key = coltype
+ else:
+ key = coltype.__class__
+
+ return self.name + " " + colspecs[key] % {'precision': getattr(coltype, 'precision', None), 'length' : getattr(coltype, 'length', None)}
"SELECT myothertable.otherid, myothertable.othername, mytable.myid, mytable.name, mytable.description \
FROM myothertable, mytable WHERE mytable.myid = myothertable.otherid"
)
-
+
self.runtest(
select(
[table],
engine = postgres.engine())
- self.runtest(query,
- "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
-FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \
-mytable.name = :mytable_name AND mytable.myid = :mytable_myid AND \
-myothertable.othername != :myothertable_othername AND EXISTS (select yay from foo where boo = lar)",
- engine = oracle.engine(use_ansi = False))
+# self.runtest(query,
+# "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
+#FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \
+#mytable.name = :mytable_name AND mytable.myid = :mytable_myid AND \
+#myothertable.othername != :myothertable_othername AND EXISTS (select yay from foo where boo = lar)",
+# engine = oracle.engine(use_ansi = False))
def testbindparam(self):
self.runtest(select(