lost their underlying database - the error catching/invalidate
step is totally moved to the connection pool. #516
- sql:
+ - preliminary support for unicode table names, column names and
+ SQL statements added, for databases which can support them.
+ Works with sqlite and postgres so far. Mysql *mostly* works
+ except the has_table() function does not work. Reflection
+ works too.
- the Unicode type is now a direct subclass of String, which now
contains all the "convert_unicode" logic. This helps the variety
of unicode situations that occur in db's such as MS-SQL to be
that those named columns are selected from (part of [ticket:513])
- MS-SQL better detects when a query is a subquery and knows not to
generate ORDER BY phrases for those [ticket:513]
- - preliminary support for unicode table names, column names and
- SQL statements added, for databases which can support them.
- fix for fetchmany() "size" argument being positional in most
dbapis [ticket:505]
- sending None as an argument to func.<something> will produce
except:
pass
opts['client_flag'] = client_flag
-
return [[], opts]
def create_execution_context(self, *args, **kwargs):
rowcount = cursor.executemany(statement, parameters)
if context is not None:
context._rowcount = rowcount
-
+
+ def supports_unicode_statements(self):
+ return True
+
def do_execute(self, cursor, statement, parameters, **kwargs):
cursor.execute(statement, parameters)
return self._default_schema_name
def has_table(self, connection, table_name, schema=None):
+ # TODO: this does not work for table names that contain multibyte characters.
+ # i have tried dozens of approaches here with no luck. statements like
+ # DESCRIBE and SHOW CREATE TABLE work better, but they raise an error when
+ # the table does not exist.
cursor = connection.execute("show table status like %s", [table_name])
- print "CURSOR", cursor, "ROWCOUNT", cursor.rowcount, "REAL RC", cursor.cursor.rowcount
return bool( not not cursor.rowcount )
def reflecttable(self, connection, table):
cs = cs.tostring()
case_sensitive = int(cs) == 0
+ decode_from = connection.execute("show variables like 'character_Set_results'").fetchone()[1]
+
if not case_sensitive:
table.name = table.name.lower()
table.metadata.tables[table.name]= table
found_table = True
# these can come back as unicode if use_unicode=1 in the mysql connection
- (name, type, nullable, primary_key, default) = (str(row[0]), str(row[1]), row[2] == 'YES', row[3] == 'PRI', row[4])
+ (name, type, nullable, primary_key, default) = (row[0], str(row[1]), row[2] == 'YES', row[3] == 'PRI', row[4])
+ if not isinstance(name, unicode):
+ name = name.decode(decode_from)
match = re.match(r'(\w+)(\(.*?\))?\s*(\w+)?\s*(\w+)?', type)
col_type = match.group(1)
c = connection.execute("SHOW CREATE TABLE " + table.fullname, {})
desc_fetched = c.fetchone()[1]
- # this can come back as unicode if use_unicode=1 in the mysql connection
- if type(desc_fetched) is unicode:
- desc_fetched = str(desc_fetched)
- elif type(desc_fetched) is not str:
+ if not isinstance(desc_fetched, basestring):
# may get array.array object here, depending on version (such as mysql 4.1.14 vs. 4.1.11)
desc_fetched = desc_fetched.tostring()
desc = desc_fetched.strip()
def has_table(self, connection, table_name, schema=None):
# seems like case gets folded in pg_class...
if schema is None:
- cursor = connection.execute("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=current_schema() and lower(relname)=%(name)s""", {'name':table_name.lower()});
+ cursor = connection.execute("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=current_schema() and lower(relname)=%(name)s""", {'name':table_name.lower().encode(self.encoding)});
else:
- cursor = connection.execute("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=%(schema)s and lower(relname)=%(name)s""", {'name':table_name.lower(), 'schema':schema});
+ cursor = connection.execute("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=%(schema)s and lower(relname)=%(name)s""", {'name':table_name.lower().encode(self.encoding), 'schema':schema});
return bool( not not cursor.rowcount )
def has_sequence(self, connection, sequence_name):
ORDER BY a.attnum
""" % schema_where_clause
- s = sql.text(SQL_COLS)
+ s = sql.text(SQL_COLS, bindparams=[sql.bindparam('table_name', type=sqltypes.Unicode), sql.bindparam('schema', type=sqltypes.Unicode)], typemap={'attname':sqltypes.Unicode})
c = connection.execute(s, table_name=table.name,
schema=table.schema)
rows = c.fetchall()
AND i.indisprimary = 't')
ORDER BY attnum
"""
- t = sql.text(PK_SQL)
+ t = sql.text(PK_SQL, typemap={'attname':sqltypes.Unicode})
c = connection.execute(t, table=table_oid)
for row in c.fetchall():
pk = row[0]
ORDER BY 1
"""
- t = sql.text(FK_SQL)
+ t = sql.text(FK_SQL, typemap={'conname':sqltypes.Unicode, 'condef':sqltypes.Unicode})
c = connection.execute(t, table=table_oid)
for conname, condef in c.fetchall():
m = re.search('FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)', condef).groups()
def create_execution_context(self, **kwargs):
return SQLiteExecutionContext(self, **kwargs)
+ def supports_unicode_statements(self):
+ return True
+
def last_inserted_ids(self):
return self.context.last_inserted_ids
def supports_unicode_statements(self):
"""indicate whether the DBAPI can receive SQL statements as Python unicode strings"""
- return True
+ return False
def max_identifier_length(self):
# TODO: probably raise this and fill out
self.statement = unicode(compiled)
else:
self.typemap = self.column_labels = None
- self.parameters = parameters
+ self.parameters = self._encode_param_keys(parameters)
self.statement = statement
if not dialect.supports_unicode_statements():
- self.statement = self.statement.encode('ascii')
-
+ self.statement = self.statement.encode(self.dialect.encoding)
+
self.cursor = self.create_cursor()
engine = property(lambda s:s.connection.engine)
+ def _encode_param_keys(self, params):
+ """apply string encoding to the keys of dictionary-based bind parameters"""
+ if self.dialect.positional or self.dialect.supports_unicode_statements():
+ return params
+ else:
+ def proc(d):
+ if d is None:
+ return None
+ return dict((k.encode(self.dialect.encoding), d[k]) for k in d)
+ if isinstance(params, list):
+ return [proc(d) for d in params]
+ else:
+ return proc(params)
+
def is_select(self):
return re.match(r'SELECT', self.statement.lstrip(), re.I)
def pre_exec(self):
self._process_defaults()
- self.parameters = self.dialect.convert_compiled_params(self.compiled_parameters)
+ self.parameters = self._encode_param_keys(self.dialect.convert_compiled_params(self.compiled_parameters))
def post_exec(self):
pass
def append(self, clause):
if _is_literal(clause):
- clause = _TextClause(str(clause))
+ clause = _TextClause(unicode(clause))
self.clauses.append(clause)
def get_children(self, **kwargs):
def __init__(self, text, selectable=None, type=None, _is_oid=False, case_sensitive=True, is_literal=False):
self.key = self.name = text
- self.encodedname = self.name.encode('ascii', 'backslashreplace')
+ self.encodedname = isinstance(self.name, unicode) and self.name.encode('ascii', 'backslashreplace') or self.name
self.table = selectable
self.type = sqltypes.to_instance(type)
self._is_oid = _is_oid
for c in columns:
self.append_column(c)
+ if order_by:
+ order_by = util.to_list(order_by)
+ if group_by:
+ group_by = util.to_list(group_by)
self.order_by(*(order_by or [None]))
self.group_by(*(group_by or [None]))
for c in self.order_by_clause:
"""verrrrry basic unicode column name testing"""
class UnicodeSchemaTest(testbase.PersistTest):
- @testbase.unsupported('postgres')
def setUpAll(self):
global metadata, t1, t2
metadata = MetaData(engine=testbase.db)
Column(u'éXXm', Integer, ForeignKey(u'unitable1.méil'), key="b"),
)
-
metadata.create_all()
- @testbase.unsupported('postgres')
+
+ def tearDown(self):
+ t2.delete().execute()
+ t1.delete().execute()
+
def tearDownAll(self):
metadata.drop_all()
-
- @testbase.unsupported('postgres')
+
+ # has_table() doesnt handle the unicode names on mysql
+ if testbase.db.name == 'mysql':
+ t2.drop()
+
def test_insert(self):
t1.insert().execute({u'méil':1, u'éXXm':5})
- t2.insert().execute({'a':1, 'b':5})
+ t2.insert().execute({'a':1, 'b':1})
assert t1.select().execute().fetchall() == [(1, 5)]
- assert t2.select().execute().fetchall() == [(1, 5)]
+ assert t2.select().execute().fetchall() == [(1, 1)]
+
+ def test_reflect(self):
+ t1.insert().execute({u'méil':2, u'éXXm':7})
+ t2.insert().execute({'a':2, 'b':2})
+
+ meta = BoundMetaData(testbase.db)
+ tt1 = Table(t1.name, meta, autoload=True)
+ tt2 = Table(t2.name, meta, autoload=True)
+ tt1.insert().execute({u'méil':1, u'éXXm':5})
+ tt2.insert().execute({u'méil':1, u'éXXm':1})
+
+ assert tt1.select(order_by=desc(u'méil')).execute().fetchall() == [(2, 7), (1, 5)]
+ assert tt2.select(order_by=desc(u'méil')).execute().fetchall() == [(2, 2), (1, 1)]
- @testbase.unsupported('postgres')
def test_mapping(self):
# TODO: this test should be moved to the ORM tests, tests should be
# added to this module testing SQL syntax and joins, etc.