return ANSISchemaDropper(self, **params)
def compiler(self, statement, parameters, **kwargs):
- return ANSICompiler(self, statement, parameters, **kwargs)
+ return ANSICompiler(statement, parameters, engine=self, **kwargs)
def connect_args(self):
return ([],{})
class ANSICompiler(sql.Compiled):
"""default implementation of Compiled, which compiles ClauseElements into ANSI-compliant SQL strings."""
- def __init__(self, engine, statement, parameters=None, typemap=None, **kwargs):
+ def __init__(self, statement, parameters=None, typemap=None, engine=None, positional=None, paramstyle=None, **kwargs):
"""constructs a new ANSICompiler object.
engine - SQLEngine to compile against
key/value pairs when the Compiled is executed, and also may affect the
actual compilation, as in the case of an INSERT where the actual columns
inserted will correspond to the keys present in the parameters."""
- sql.Compiled.__init__(self, engine, statement, parameters)
+ sql.Compiled.__init__(self, statement, parameters, engine=engine)
self.binds = {}
self.froms = {}
self.wheres = {}
self.select_stack = []
self.typemap = typemap or {}
self.isinsert = False
+ self.bindtemplate = ":%s"
+ if engine is not None:
+ self.paramstyle = engine.paramstyle
+ self.positional = engine.positional
+ else:
+ self.positional = False
+ self.paramstyle = 'named'
def after_compile(self):
- if self.engine.positional:
+ # this re will search for params like :param
+ # it has a negative lookbehind for an extra ':' so that it doesnt match
+ # postgres '::text' tokens
+ match = r'(?<!:):([\w_]+)'
+ if self.paramstyle=='pyformat':
+ self.strings[self.statement] = re.sub(match, lambda m:'%(' + m.group(1) +')s', self.strings[self.statement])
+ elif self.positional:
self.positiontup = []
- match = r'%\(([\w_]+)\)s'
params = re.finditer(match, self.strings[self.statement])
for p in params:
self.positiontup.append(p.group(1))
- if self.engine.paramstyle=='qmark':
+ if self.paramstyle=='qmark':
self.strings[self.statement] = re.sub(match, '?', self.strings[self.statement])
- elif self.engine.paramstyle=='format':
+ elif self.paramstyle=='format':
self.strings[self.statement] = re.sub(match, '%s', self.strings[self.statement])
- elif self.engine.paramstyle=='numeric':
+ elif self.paramstyle=='numeric':
i = [0]
def getnum(x):
i[0] += 1
bindparams = {}
bindparams.update(params)
- if self.engine.positional:
+ if self.positional:
d = OrderedDict()
for k in self.positiontup:
b = self.binds[k]
- d[k] = b.typeprocess(b.value, self.engine)
+ if self.engine is not None:
+ d[k] = b.typeprocess(b.value, self.engine)
+ else:
+ d[k] = b.value
else:
d = {}
for b in self.binds.values():
- d[b.key] = b.typeprocess(b.value, self.engine)
+ if self.engine is not None:
+ d[b.key] = b.typeprocess(b.value, self.engine)
+ else:
+ d[b.key] = b.value
for key, value in bindparams.iteritems():
try:
b = self.binds[key]
except KeyError:
continue
- d[b.key] = b.typeprocess(value, self.engine)
+ if self.engine is not None:
+ d[b.key] = b.typeprocess(value, self.engine)
+ else:
+ d[b.key] = value
return d
- if self.engine.positional:
- return d.values()
- else:
- return d
def get_named_params(self, parameters):
"""given the results of the get_params method, returns the parameters
same dictionary. For a positional paramstyle, the given parameters are
assumed to be in list format and are converted back to a dictionary.
"""
-# return parameters
- if self.engine.positional:
+ if self.positional:
p = {}
for i in range(0, len(self.positiontup)):
p[self.positiontup[i]] = parameters[i]
self.strings[bindparam] = self.bindparam_string(key)
def bindparam_string(self, name):
- return self.engine.bindtemplate % name
+ return self.bindtemplate % name
def visit_alias(self, alias):
self.froms[alias] = self.get_from_text(alias.original) + " AS " + alias.name
return self.context.last_inserted_ids
def compiler(self, statement, bindparams, **kwargs):
- return FBCompiler(self, statement, bindparams, use_ansi=self._use_ansi, **kwargs)
+ return FBCompiler(statement, bindparams, engine=self, use_ansi=self._use_ansi, **kwargs)
def schemagenerator(self, **params):
return FBSchemaGenerator(self, **params)
coltype = coltype(*args)
colargs= []
if default is not None:
- colargs.append(PassiveDefault(sql.text(default, escape=False)))
+ colargs.append(PassiveDefault(sql.text(default)))
table.append_item(schema.Column(name, coltype, nullable=nullable, *colargs))
s = select([constraints.c.constraint_name, constraints.c.constraint_type, constraints.c.table_name, key_constraints], use_labels=True, from_obj=[constraints.join(column_constraints, column_constraints.c.constraint_name==constraints.c.constraint_name).join(key_constraints, key_constraints.c.constraint_name==column_constraints.c.constraint_name)])
return False
def compiler(self, statement, bindparams, **kwargs):
- return MySQLCompiler(self, statement, bindparams, **kwargs)
+ return MySQLCompiler(statement, bindparams, engine=self, **kwargs)
def schemagenerator(self, **params):
return MySQLSchemaGenerator(self, **params)
colargs = []
if default is not None:
- colargs.append(PassiveDefault(sql.text(default, escape=False)))
+ colargs.append(PassiveDefault(sql.text(default)))
name = name.lower()
def __init__(self, engine, statement, parameters, use_ansi = True, **kwargs):
self._outertable = None
self._use_ansi = use_ansi
- ansisql.ANSICompiler.__init__(self, engine, statement, parameters, **kwargs)
+ ansisql.ANSICompiler.__init__(self, statement, parameters, engine=engine, **kwargs)
def visit_join(self, join):
if self._use_ansi:
return sqltypes.adapt_type(typeobj, pg1_colspecs)
def compiler(self, statement, bindparams, **kwargs):
- return PGCompiler(self, statement, bindparams, **kwargs)
+ return PGCompiler(statement, bindparams, engine=self, **kwargs)
def schemagenerator(self, **params):
return PGSchemaGenerator(self, **params)
return ([self.filename], self.opts)
def compiler(self, statement, bindparams, **kwargs):
- return SQLiteCompiler(self, statement, bindparams, **kwargs)
+ return SQLiteCompiler(statement, bindparams, engine=self, **kwargs)
def dbapi(self):
return sqlite
self._paramstyle = 'named'
if self._paramstyle == 'named':
- self.bindtemplate = ':%s'
self.positional=False
elif self._paramstyle == 'pyformat':
- self.bindtemplate = "%%(%s)s"
self.positional=False
elif self._paramstyle == 'qmark' or self._paramstyle == 'format' or self._paramstyle == 'numeric':
# for positional, use pyformat internally, ANSICompiler will convert
# to appropriate character upon compilation
- self.bindtemplate = "%%(%s)s"
self.positional = True
else:
raise DBAPIError("Unsupported paramstyle '%s'" % self._paramstyle)
instance of this engine's SQLCompiler, compiles the ClauseElement, and returns the
newly compiled object."""
compiler = self.compiler(statement, parameters, **kwargs)
- statement.accept_visitor(compiler)
- compiler.after_compile()
+ compiler.compile()
return compiler
def reflecttable(self, table):
object be dependent on the actual values of those bind parameters, even though it may
reference those values as defaults."""
- def __init__(self, engine, statement, parameters):
+ def __init__(self, statement, parameters, engine=None):
"""constructs a new Compiled object.
- engine - SQLEngine to compile against
-
statement - ClauseElement to be compiled
parameters - optional dictionary indicating a set of bind parameters
will also result in the creation of new BindParamClause objects for each key
and will also affect the generated column list in an INSERT statement and the SET
clauses of an UPDATE statement. The keys of the parameter dictionary can
- either be the string names of columns or actual sqlalchemy.schema.Column objects."""
- self.engine = engine
+ either be the string names of columns or ColumnClause objects.
+
+ engine - optional SQLEngine to compile this statement against"""
self.parameters = parameters
self.statement = statement
+ self.engine = engine
def __str__(self):
"""returns the string text of the generated SQL statement."""
"""
raise NotImplementedError()
+ def compile(self):
+ self.statement.accept_visitor(self)
+ self.after_compile()
+
def execute(self, *multiparams, **params):
"""executes this compiled object using the underlying SQLEngine"""
if len(multiparams):
return None
engine = property(lambda s: s._find_engine(), doc="attempts to locate a SQLEngine within this ClauseElement structure, or returns None if none found.")
-
- def compile(self, engine = None, parameters = None, typemap=None):
+
+
+ def compile(self, engine = None, parameters = None, typemap=None, compiler=None):
"""compiles this SQL expression using its underlying SQLEngine to produce
a Compiled object. If no engine can be found, an ansisql engine is used.
bindparams is a dictionary representing the default bind parameters to be used with
the statement. """
- if engine is None:
- engine = self.engine
-
- if engine is None:
+
+ if compiler is None:
+ if engine is not None:
+ compiler = engine.compiler(self, parameters)
+ elif self.engine is not None:
+ compiler = self.engine.compiler(self, parameters)
+
+ if compiler is None:
import sqlalchemy.ansisql as ansisql
- engine = ansisql.engine()
-
- return engine.compile(self, parameters=parameters, typemap=typemap)
+ compiler = ansisql.ANSICompiler(self, parameters=parameters, typemap=typemap)
+ compiler.compile()
+ return compiler
def __str__(self):
return str(self.compile())
being specified as a bind parameter via the bindparam() method,
since it provides more information about what it is, including an optional
type, as well as providing comparison operations."""
- def __init__(self, text = "", engine=None, bindparams=None, typemap=None, escape=True):
+ def __init__(self, text = "", engine=None, bindparams=None, typemap=None):
self.parens = False
self._engine = engine
self.id = id(self)
typemap[key] = engine.type_descriptor(typemap[key])
def repl(m):
self.bindparams[m.group(1)] = bindparam(m.group(1))
- return self.engine.bindtemplate % m.group(1)
-
- if escape:
- self.text = re.compile(r':([\w_]+)', re.S).sub(repl, text)
- else:
- self.text = text
+ return ":%s" % m.group(1)
+ # scan the string and search for bind parameter names, add them
+ # to the list of bindparams
+ self.text = re.compile(r'(?<!:):([\w_]+)', re.S).sub(repl, text)
if bindparams is not None:
for b in bindparams:
self.bindparams[b.key] = b
class SQLTest(PersistTest):
def runtest(self, clause, result, engine = None, params = None, checkparams = None):
- c = clause.compile(parameters = params, engine=engine)
+ print "TEST with e", engine
+ c = clause.compile(parameters=params, engine=engine)
self.echo("\nSQL String:\n" + str(c) + repr(c.get_params()))
cc = re.sub(r'\n', '', str(c))
self.assert_(cc == result, str(c) + "\n does not match \n" + result)
if checkparams is not None:
if isinstance(checkparams, list):
- self.assert_(c.get_params().values() == checkparams, "params dont match")
+ self.assert_(c.get_params().values() == checkparams, "params dont match ")
else:
- self.assert_(c.get_params() == checkparams, "params dont match")
+ self.assert_(c.get_params() == checkparams, "params dont match" + repr(c.get_params()))
class SelectTest(SQLTest):
def testtableselect(self):