on postgres. Also, the true labelname is always attached as the
accessor on the parent Selectable so theres no need to be aware
of the genrerated label names [ticket:512].
+ - preliminary support for unicode table and column names added.
- orm:
- improved/fixed custom collection classes when giving it "set"/
"sets.Set" classes or subclasses (was still looking for append()
# this re will search for params like :param
# it has a negative lookbehind for an extra ':' so that it doesnt match
# postgres '::text' tokens
- match = r'(?<!:):([\w_]+)'
+ match = re.compile(r'(?<!:):([\w_]+)', re.UNICODE)
if self.paramstyle=='pyformat':
- self.strings[self.statement] = re.sub(match, lambda m:'%(' + m.group(1) +')s', self.strings[self.statement])
+ self.strings[self.statement] = match.sub(lambda m:'%(' + m.group(1) +')s', self.strings[self.statement])
elif self.positional:
- params = re.finditer(match, self.strings[self.statement])
+ params = match.finditer(self.strings[self.statement])
for p in params:
self.positiontup.append(p.group(1))
if self.paramstyle=='qmark':
- self.strings[self.statement] = re.sub(match, '?', self.strings[self.statement])
+ self.strings[self.statement] = match.sub('?', self.strings[self.statement])
elif self.paramstyle=='format':
- self.strings[self.statement] = re.sub(match, '%s', self.strings[self.statement])
+ self.strings[self.statement] = match.sub('%s', self.strings[self.statement])
elif self.paramstyle=='numeric':
i = [0]
def getnum(x):
i[0] += 1
return str(i[0])
- self.strings[self.statement] = re.sub(match, getnum, self.strings[self.statement])
+ self.strings[self.statement] = match.sub(getnum, self.strings[self.statement])
def get_from_text(self, obj):
return self.froms.get(obj, None)
def get_whereclause(self, obj):
return self.wheres.get(obj, None)
- def get_params(self, **params):
+ def construct_params(self, params):
"""Return a structure of bind parameters for this compiled object.
This includes bind parameters that might be compiled in via
else:
bindparams = {}
bindparams.update(params)
-
d = sql.ClauseParameters(self.dialect, self.positiontup)
for b in self.binds.values():
d.set_parameter(b, b.value)
def to_col(key):
if not isinstance(key, sql._ColumnClause):
- return stmt.table.columns.get(str(key), key)
+ return stmt.table.columns.get(unicode(key), key)
else:
return key
def _requires_quotes(self, value, case_sensitive):
"""Return True if the given identifier requires quoting."""
-
return \
value in self._reserved_words() \
or (value[0] in self._illegal_initial_characters()) \
- or bool(len([x for x in str(value) if x not in self._legal_characters()])) \
+ or bool(len([x for x in unicode(value) if x not in self._legal_characters()])) \
or (case_sensitive and value.lower() != value)
def __generic_obj_format(self, obj, ident):
if not compiled.can_execute:
raise exceptions.ArgumentError("Not an executeable clause: %s" % (str(compiled)))
cursor = self.__engine.dialect.create_cursor(self.connection)
- parameters = [compiled.get_params(**m) for m in self._params_to_listofdicts(*multiparams, **params)]
+ parameters = [compiled.construct_params(m) for m in self._params_to_listofdicts(*multiparams, **params)]
if len(parameters) == 1:
parameters = parameters[0]
def proxy(statement=None, parameters=None):
return cursor
context = self.__engine.dialect.create_execution_context()
context.pre_exec(self.__engine, proxy, compiled, parameters)
- proxy(str(compiled), parameters)
+ proxy(unicode(compiled), parameters)
context.post_exec(self.__engine, proxy, compiled, parameters)
rpargs = self.__engine.dialect.create_result_proxy_args(self, cursor)
return ResultProxy(self.__engine, self, cursor, context, typemap=compiled.typemap, column_labels=compiled.column_labels, **rpargs)
def _execute_raw(self, statement, parameters=None, cursor=None, context=None, **kwargs):
if cursor is None:
cursor = self.__engine.dialect.create_cursor(self.connection)
- try:
- self.__engine.logger.info(statement)
- self.__engine.logger.info(repr(parameters))
- if parameters is not None and isinstance(parameters, list) and len(parameters) > 0 and (isinstance(parameters[0], list) or isinstance(parameters[0], dict)):
- self._executemany(cursor, statement, parameters, context=context)
- else:
- self._execute(cursor, statement, parameters, context=context)
- self._autocommit(statement)
- except:
- raise
+ self.__engine.logger.info(statement)
+ self.__engine.logger.info(repr(parameters))
+ if parameters is not None and isinstance(parameters, list) and len(parameters) > 0 and (isinstance(parameters[0], list) or isinstance(parameters[0], dict)):
+ self._executemany(cursor, statement, parameters, context=context)
+ else:
+ self._execute(cursor, statement, parameters, context=context)
+ self._autocommit(statement)
return cursor
def _execute(self, c, statement, parameters, context=None):
mapper._adapt_inherited_property(key, prop)
def __str__(self):
- return "Mapper|" + self.class_.__name__ + "|" + (self.entity_name is not None and "/%s" % self.entity_name or "") + (self.local_table and self.local_table.name or str(self.local_table)) + (not self._is_primary_mapper() and "|non-primary" or "")
+ return "Mapper|" + self.class_.__name__ + "|" + (self.entity_name is not None and "/%s" % self.entity_name or "") + (self.local_table and self.local_table.encodedname or str(self.local_table)) + (not self._is_primary_mapper() and "|non-primary" or "")
def _is_primary_mapper(self):
"""Return True if this mapper is the primary mapper for its class key (class + entity_name)."""
if metadata is None:
metadata = default_metadata
- name = str(name) # in case of incoming unicode
schema = kwargs.get('schema', None)
autoload = kwargs.pop('autoload', False)
autoload_with = kwargs.pop('autoload_with', False)
, ',')
def __str__(self):
- return _get_table_key(self.name, self.schema)
+ return _get_table_key(self.encodedname, self.schema)
def append_column(self, column):
"""Append a ``Column`` to this ``Table``."""
identifier contains mixed case.
"""
- name = str(name) # in case of incoming unicode
super(Column, self).__init__(name, None, type)
self.args = args
self.key = kwargs.pop('key', name)
def __str__(self):
if self.table is not None:
if self.table.named_with_column():
- return self.table.name + "." + self.name
+ return (self.table.encodedname + "." + self.encodedname)
else:
- return self.name
+ return self.encodedname
else:
- return self.name
+ return self.encodedname
def _derived_metadata(self):
return self.table.metadata
self.table = table
if self.index:
- if isinstance(self.index, str):
+ if isinstance(self.index, basestring):
raise exceptions.ArgumentError("The 'index' keyword argument on Column is boolean only. To create indexes with a specific name, append an explicit Index object to the Table's list of elements.")
Index('ix_%s' % self._label, self, unique=self.unique)
elif self.unique:
- if isinstance(self.unique, str):
+ if isinstance(self.unique, basestring):
raise exceptions.ArgumentError("The 'unique' keyword argument on Column is boolean only. To create unique constraints or indexes with a specific name, append an explicit UniqueConstraint or Index object to the Table's list of elements.")
table.append_constraint(UniqueConstraint(self.key))
created and added to the parent table.
"""
- if isinstance(column, unicode):
- column = str(column)
self._colspec = column
self._column = None
self.constraint = constraint
return ForeignKey(self._get_colspec())
def _get_colspec(self):
- if isinstance(self._colspec, str):
+ if isinstance(self._colspec, basestring):
return self._colspec
elif self._colspec.table.schema is not None:
return "%s.%s.%s" % (self._colspec.table.schema, self._colspec.table.name, self._colspec.key)
# ForeignKey inits its remote column as late as possible, so tables can
# be defined without dependencies
if self._column is None:
- if isinstance(self._colspec, str):
+ if isinstance(self._colspec, basestring):
# locate the parent table this foreign key is attached to.
# we use the "original" column which our parent column represents
# (its a list of columns/other ColumnElements if the parent table is a UNION)
break
else:
raise exceptions.ArgumentError("Parent column '%s' does not descend from a table-attached Column" % str(self.parent))
- m = re.match(r"^([\w_-]+)(?:\.([\w_-]+))?(?:\.([\w_-]+))?$", self._colspec)
+ m = re.match(r"^([\w_-]+)(?:\.([\w_-]+))?(?:\.([\w_-]+))?$", self._colspec, re.UNICODE)
if m is None:
raise exceptions.ArgumentError("Invalid foreign key column specification: " + self._colspec)
if m.group(3) is None:
raise NotImplementedError()
def get_params(self, **params):
+ """Deprecated. use construct_params(). (supports unicode names)
+ """
+
+ return self.construct_params(params)
+
+ def construct_params(self, params):
"""Return the bind params for this compiled object.
Will start with the default parameters specified when this
``_BindParamClause`` objects compiled into this object; either
the `key` or `shortname` property of the ``_BindParamClause``.
"""
-
raise NotImplementedError()
-
+
def execute(self, *multiparams, **params):
"""Execute this compiled object."""
return compiler
def __str__(self):
- return str(self.compile())
+ return unicode(self.compile()).encode('ascii', 'backslashreplace')
def __and__(self, other):
return and_(self, other)
def __init__(self, text, selectable=None, type=None, _is_oid=False, case_sensitive=True, is_literal=False):
self.key = self.name = text
+ self.encodedname = self.name.encode('ascii', 'backslashreplace')
self.table = selectable
self.type = sqltypes.to_instance(type)
self._is_oid = _is_oid
def __init__(self, name, *columns):
super(TableClause, self).__init__(name)
self.name = self.fullname = name
+ self.encodedname = self.name.encode('ascii', 'backslashreplace')
self._columns = ColumnCollection()
self._foreign_keys = util.OrderedSet()
self._primary_key = ColumnCollection()
'sql.selectable',
'sql.case_statement',
'sql.labels',
+ 'sql.unicode',
# assorted round-trip tests
'sql.query',
from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird
import unittest, re
+
# the select test now tests almost completely with TableClause/ColumnClause objects,
# which are free-roaming table/column objects not attached to any database.
# so SQLAlchemy's SQL construction engine can be used with no database dependencies at all.
--- /dev/null
+# coding: utf-8
+import testbase
+
+from sqlalchemy import *
+
+"""verrrrry basic unicode column name testing"""
+
+class UnicodeSchemaTest(testbase.PersistTest):
+ @testbase.unsupported('postgres')
+ def setUpAll(self):
+ global metadata, t1, t2
+ metadata = MetaData(engine=testbase.db)
+ t1 = Table('unitable1', metadata,
+ Column(u'méil', Integer, primary_key=True),
+ Column(u'éXXm', Integer),
+
+ )
+ t2 = Table(u'unitéble2', metadata,
+ Column(u'méil', Integer, primary_key=True, key="a"),
+ Column(u'éXXm', Integer, ForeignKey(u'unitable1.méil'), key="b"),
+
+ )
+
+ metadata.create_all()
+ @testbase.unsupported('postgres')
+ def tearDownAll(self):
+ metadata.drop_all()
+
+ @testbase.unsupported('postgres')
+ def test_insert(self):
+ t1.insert().execute({u'méil':1, u'éXXm':5})
+ t2.insert().execute({'a':1, 'b':5})
+
+ assert t1.select().execute().fetchall() == [(1, 5)]
+ assert t2.select().execute().fetchall() == [(1, 5)]
+
+ @testbase.unsupported('postgres')
+ def test_mapping(self):
+ # TODO: this test should be moved to the ORM tests, tests should be
+ # added to this module testing SQL syntax and joins, etc.
+ class A(object):pass
+ class B(object):pass
+
+ mapper(A, t1, properties={
+ 't2s':relation(B),
+ 'a':t1.c[u'méil'],
+ 'b':t1.c[u'éXXm']
+ })
+ mapper(B, t2)
+ sess = create_session()
+ a1 = A()
+ b1 = B()
+ a1.t2s.append(b1)
+ sess.save(a1)
+ sess.flush()
+ sess.clear()
+ new_a1 = sess.query(A).selectone(t1.c[u'méil'] == a1.a)
+ assert new_a1.a == a1.a
+ assert new_a1.t2s[0].a == b1.a
+ sess.clear()
+ new_a1 = sess.query(A).options(eagerload('t2s')).selectone(t1.c[u'méil'] == a1.a)
+ assert new_a1.a == a1.a
+ assert new_a1.t2s[0].a == b1.a
+
+if __name__ == '__main__':
+ testbase.main()
\ No newline at end of file
def post_exec(engine, proxy, compiled, parameters, **kwargs):
ctx = e
self.engine.logger = self.logger
- statement = str(compiled)
+ statement = unicode(compiled)
statement = re.sub(r'\n', '', statement)
if self.assert_list is not None: