"""
rp = connection.execute(s)
# what about system tables?
-# start Py3K
- schema_names = [row[0] for row in rp \
+
+ if util.py2k:
+ schema_names = [row[0].decode(self.encoding) for row in rp \
+ if not row[0].startswith('pg_')]
+ else:
+ schema_names = [row[0] for row in rp \
if not row[0].startswith('pg_')]
-# end Py3K
-# start Py2K
-# schema_names = [row[0].decode(self.encoding) for row in rp \
-# if not row[0].startswith('pg_')]
-# end Py2K
return schema_names
@reflection.cache
AND '%(schema)s' = (select nspname from pg_namespace n
where n.oid = c.relnamespace)
""" % dict(schema=current_schema)
-# start Py3K
- view_names = [row[0] for row in connection.execute(s)]
-# end Py3K
-# start Py2K
-# view_names = [row[0].decode(self.encoding)
-# for row in connection.execute(s)]
-# end Py2K
+
+ if util.py2k:
+ view_names = [row[0].decode(self.encoding)
+ for row in connection.execute(s)]
+ else:
+ view_names = [row[0] for row in connection.execute(s)]
return view_names
@reflection.cache
rp = connection.execute(sql.text(s),
view_name=view_name, schema=current_schema)
if rp:
-# start Py3K
- view_def = rp.scalar()
-# end Py3K
-# start Py2K
-# view_def = rp.scalar().decode(self.encoding)
-# end Py2K
+ if util.py2k:
+ view_def = rp.scalar().decode(self.encoding)
+ else:
+ view_def = rp.scalar()
return view_def
@reflection.cache
from ... import types as sqltypes
from ...sql import functions as sqlfunc
from ...sql.operators import custom_op
+from ... import util
+
__all__ = ('HSTORE', 'hstore')
def esc(s, position):
if position == 'value' and s is None:
return 'NULL'
- elif isinstance(s, str):
+ elif isinstance(s, util.string_types):
return '"%s"' % s.replace('"', r'\"')
else:
raise ValueError("%r in %s position is not a string." %
meta1.create_all()
meta2 = MetaData(testing.db)
subject = Table('subject', meta2, autoload=True)
- eq_(list(subject.primary_key.columns.keys()), ['p2', 'p1'])
+ eq_(subject.primary_key.columns.keys(), ['p2', 'p1'])
@testing.provide_metadata
def test_pg_weirdchar_reflection(self):
def test_array_subtype_resultprocessor(self):
arrtable = self.tables.arrtable
arrtable.insert().execute(intarr=[4, 5, 6],
- strarr=[[util.u('m\xe4\xe4')], [
- util.u('m\xf6\xf6')]])
+ strarr=[[util.ue('m\xe4\xe4')], [
+ util.ue('m\xf6\xf6')]])
arrtable.insert().execute(intarr=[1, 2, 3], strarr=[
- util.u('m\xe4\xe4'), util.u('m\xf6\xf6')])
+ util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')])
results = \
arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall()
eq_(len(results), 2)
- eq_(results[0]['strarr'], [util.u('m\xe4\xe4'), util.u('m\xf6\xf6')])
- eq_(results[1]['strarr'], [[util.u('m\xe4\xe4')], [util.u('m\xf6\xf6')]])
+ eq_(results[0]['strarr'], [util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')])
+ eq_(results[1]['strarr'], [[util.ue('m\xe4\xe4')], [util.ue('m\xf6\xf6')]])
def test_array_literal(self):
eq_(