on all new connections, which allows all String/Text/etc.
types to skip the need to post-process bytestrings into
unicode (an expensive step due to its volume). Other
dialects which return unicode natively (pg8000, zxjdbc)
also skip unicode post-processing.
- String/Text/Unicode types now skip the unicode() check
on each result column value if the dialect has
detected the DBAPI as returning Python unicode objects
natively. This check is issued on first connect
using "SELECT CAST 'some text' AS VARCHAR(10)" or
equivalent, then checking if the returned object
is a Python unicode. This allows vast performance
increases for native-unicode DBAPIs, including
pysqlite/sqlite3, psycopg2, and pg8000.
"postgres_where" names still work with a
deprecation warning.
+ - The psycopg2 dialect now uses psycopg2's "unicode extension"
+ on all new connections, which allows all String/Text/etc.
+ types to skip the need to post-process bytestrings into
+ unicode (an expensive step due to its volume). Other
+ dialects which return unicode natively (pg8000, zxjdbc)
+ also skip unicode post-processing.
+
- Added new ENUM type, which exists as a schema-level
construct and extends the generic Enum type. Automatically
associates itself with tables and their parent metadata
parameters. In particular, Numeric, Float, NUMERIC,
FLOAT, DECIMAL don't generate any length or scale unless
specified.
+
+ - String/Text/Unicode types now skip the unicode() check
+ on each result column value if the dialect has
+ detected the DBAPI as returning Python unicode objects
+ natively. This check is issued on first connect
+ using "SELECT CAST 'some text' AS VARCHAR(10)" or
+ equivalent, then checking if the returned object
+ is a Python unicode. This allows vast performance
+ increases for native-unicode DBAPIs, including
+ pysqlite/sqlite3, psycopg2, and pg8000.
- Reflection of types now returns the exact UPPERCASE
type within types.py, or the UPPERCASE type within
class _OracleChar(sqltypes.CHAR):
def get_dbapi_type(self, dbapi):
return dbapi.FIXED_CHAR
-
+
+class _OracleNVarChar(sqltypes.NVARCHAR):
+ def result_processor(self, dialect):
+ if dialect._cx_oracle_native_nvarchar:
+ return None
+ else:
+ return sqltypes.NVARCHAR.result_processor(self, dialect)
+
class _OracleText(_LOBMixin, sqltypes.Text):
def get_dbapi_type(self, dbapi):
return dbapi.CLOB
# it would be nice if we could not use it otherwise.
oracle.NUMBER : oracle.NUMBER, # don't let this get converted
oracle.RAW: _OracleRaw,
-
+ sqltypes.NVARCHAR : _OracleNVarChar,
}
class Oracle_cx_oracleCompiler(OracleCompiler):
if hasattr(self.dbapi, 'version'):
cx_oracle_ver = vers(self.dbapi.version)
self.supports_unicode_binds = cx_oracle_ver >= (5, 0)
-
+ self._cx_oracle_native_nvarchar = cx_oracle_ver >= (5, 0)
+
if self.dbapi is None or not self.auto_convert_lobs or not 'CLOB' in self.dbapi.__dict__:
self.dbapi_type_map = {}
self.ORACLE_BINARY_TYPES = []
return self.mutable
def dialect_impl(self, dialect, **kwargs):
- impl = self.__class__.__new__(self.__class__)
- impl.__dict__.update(self.__dict__)
+ impl = super(ARRAY, self).dialect_impl(dialect, **kwargs)
+ if impl is self:
+ impl = self.__class__.__new__(self.__class__)
+ impl.__dict__.update(self.__dict__)
impl.item_type = self.item_type.dialect_impl(dialect)
return impl
-
+
+ def adapt(self, impltype):
+ return impltype(
+ self.item_type,
+ mutable=self.mutable
+ )
+
def bind_processor(self, dialect):
item_proc = self.item_type.bind_processor(dialect)
def process(value):
on the server and only retrieved as needed. SQLAlchemy's :class:`~sqlalchemy.engine.base.ResultProxy`
uses special row-buffering behavior when this feature is enabled, such that groups of 100 rows
at a time are fetched over the wire to reduce conversational overhead.
-
+* *use_native_unicode* - Enable the usage of Psycopg2 "native unicode" mode per connection. True
+ by default.
* *isolation_level* - Sets the transaction isolation level for each transaction
within the engine. Valid isolation levels are `READ_COMMITTED`,
`READ_UNCOMMITTED`, `REPEATABLE_READ`, and `SERIALIZABLE`.
from sqlalchemy.sql import expression
from sqlalchemy.sql import operators as sql_operators
from sqlalchemy import types as sqltypes
-from sqlalchemy.dialects.postgresql.base import PGDialect, PGCompiler, PGIdentifierPreparer, PGExecutionContext
+from sqlalchemy.dialects.postgresql.base import PGDialect, PGCompiler, \
+ PGIdentifierPreparer, PGExecutionContext, \
+ ENUM, ARRAY
class _PGNumeric(sqltypes.Numeric):
def bind_processor(self, dialect):
return process
+class _PGEnum(ENUM):
+ def __init__(self, *arg, **kw):
+ super(_PGEnum, self).__init__(*arg, **kw)
+ if self.convert_unicode:
+ self.convert_unicode = "force"
+
+class _PGArray(ARRAY):
+ def __init__(self, *arg, **kw):
+ super(_PGArray, self).__init__(*arg, **kw)
+ # FIXME: this check won't work for setups that
+ # have convert_unicode only on their create_engine().
+ if isinstance(self.item_type, sqltypes.String) and \
+ self.item_type.convert_unicode:
+ self.item_type.convert_unicode = "force"
+
# TODO: filter out 'FOR UPDATE' statements
SERVER_SIDE_CURSOR_RE = re.compile(
r'\s*SELECT',
value = value.replace(self.escape_quote, self.escape_to_quote)
return value.replace('%', '%%')
-
class PostgreSQL_psycopg2(PGDialect):
driver = 'psycopg2'
supports_unicode_statements = False
{
sqltypes.Numeric : _PGNumeric,
sqltypes.Float: sqltypes.Float, # prevents _PGNumeric from being used
+ ENUM : _PGEnum, # needs force_unicode
+ sqltypes.Enum : _PGEnum, # needs force_unicode
+ ARRAY : _PGArray, # needs force_unicode
}
)
- def __init__(self, server_side_cursors=False, **kwargs):
+ def __init__(self, server_side_cursors=False, use_native_unicode=True, **kwargs):
PGDialect.__init__(self, **kwargs)
self.server_side_cursors = server_side_cursors
+ self.use_native_unicode = use_native_unicode
@classmethod
def dbapi(cls):
psycopg = __import__('psycopg2')
return psycopg
-
+
+ _unwrap_connection = None
+
+ def visit_pool(self, pool):
+ if self.dbapi and self.use_native_unicode:
+ extensions = __import__('psycopg2.extensions').extensions
+ def connect(conn, rec):
+ if self._unwrap_connection:
+ conn = self._unwrap_connection(conn)
+ if conn is None:
+ return
+ extensions.register_type(extensions.UNICODE, conn)
+ pool.add_listener({'first_connect': connect, 'connect':connect})
+ super(PostgreSQL_psycopg2, self).visit_pool(pool)
+
def create_connect_args(self, url):
opts = url.translate_connect_args(username='user')
if 'port' in opts:
# Py2K
supports_unicode_statements = False
supports_unicode_binds = False
+ returns_unicode_strings = False
# end Py2K
name = 'default'
## work around dialects that might change these values
#self.supports_unicode_statements = True
#self.supports_unicode_binds = True
+ #self.returns_unicode_strings = True
def initialize(self, connection):
if hasattr(self, '_get_server_version_info'):
self.server_version_info = self._get_server_version_info(connection)
if hasattr(self, '_get_default_schema_name'):
self.default_schema_name = self._get_default_schema_name(connection)
+ # Py2K
+ self.returns_unicode_strings = self._check_unicode_returns(connection)
+ # end Py2K
+
+ def _check_unicode_returns(self, connection):
+ cursor = connection.connection.cursor()
+ cursor.execute(
+ str(
+ expression.select(
+ [expression.cast(
+ expression.literal_column("'test unicode returns'"),sqltypes.VARCHAR(60))
+ ]).compile(dialect=self)
+ )
+ )
+
+ row = cursor.fetchone()
+ result = isinstance(row[0], unicode)
+ cursor.close()
+ return result
@classmethod
def type_descriptor(cls, typeobj):
engine = engineclass(pool, dialect, u, **engine_args)
if _initialize:
- # some unit tests pass through _initialize=False
- # to help mock engines work
- class OnInit(object):
- def first_connect(self, conn, rec):
- c = base.Connection(engine, connection=conn)
- dialect.initialize(c)
- pool._on_first_connect.insert(0, OnInit())
-
- dialect.visit_pool(pool)
+ dialect.visit_pool(pool)
+
+ def first_connect(conn, rec):
+ c = base.Connection(engine, connection=conn)
+ dialect.initialize(c)
+ pool.add_listener({'first_connect':first_connect})
return engine
raise AttributeError(key)
else:
return result
+
+def unwrap_connection(conn):
+ if conn.__class__.__name__ == 'Recorder':
+ return conn._subject
+ elif conn.__class__.__name__ == 'Player':
+ return None
+ else:
+ return conn
:param convert_unicode: defaults to False. If True, convert
``unicode`` data sent to the database to a ``str``
bytestring, and convert bytestrings coming back from the
- database into ``unicode``.
+ database into ``unicode``.
Bytestrings are encoded using the dialect's
:attr:`~sqlalchemy.engine.base.Dialect.encoding`, which
If False, may be overridden by
:attr:`sqlalchemy.engine.base.Dialect.convert_unicode`.
+
+ If the dialect has been detected as returning unicode
+ strings from a VARCHAR result column, no unicode translation
+ will be done on results. To force unicode translation
+ for individual types regardless of dialect setting,
+ set convert_unicode='force'.
:param assert_unicode:
self.length = length
self.convert_unicode = convert_unicode
self.assert_unicode = assert_unicode
-
+
def adapt(self, impltype):
return impltype(
length=self.length,
return None
def result_processor(self, dialect):
- if self.convert_unicode or dialect.convert_unicode:
+ if (not dialect.returns_unicode_strings or self.convert_unicode == 'force') \
+ and (self.convert_unicode or dialect.convert_unicode):
def process(value):
if value is not None and not isinstance(value, unicode):
return value.decode(dialect.encoding)
def test_string(self):
[tuple(row) for row in t.select().execute().fetchall()]
- @profiling.function_call_count(44406, versions={'2.4':33224})
+ # sqlite3 returns native unicode. so shouldn't be an
+ # increase here.
+ @profiling.function_call_count(14396, versions={'2.4':13214})
def test_unicode(self):
[tuple(row) for row in t2.select().execute().fetchall()]
dbapi_session = engines.ReplayableSession()
metadata = None
+
class ZooMarkTest(TestBase):
"""Runs the ZooMark and squawks if method counts vary from the norm.
global metadata
creator = testing.db.pool._creator
+
recorder = lambda: dbapi_session.recorder(creator())
engine = engines.testing_engine(options={'creator':recorder})
+ engine.dialect._unwrap_connection = engines.unwrap_connection
metadata = MetaData(engine)
def test_baseline_1_create_tables(self):
player = lambda: dbapi_session.player()
engine = create_engine('postgresql:///', creator=player)
+ engine.dialect._unwrap_connection = engines.unwrap_connection
metadata = MetaData(engine)
- @profiling.function_call_count(2991, {'2.4': 1796})
+ @profiling.function_call_count(3178, {'2.4': 1796})
def test_profile_1_create_tables(self):
self.test_baseline_1_create_tables()
def test_profile_3_properties(self):
self.test_baseline_3_properties()
- @profiling.function_call_count(14752, {'2.4': 8434})
+ @profiling.function_call_count(13341, {'2.4': 8434})
def test_profile_4_expressions(self):
self.test_baseline_4_expressions()
- @profiling.function_call_count(1347, {'2.4': 901})
+ @profiling.function_call_count(1241, {'2.4': 901})
def test_profile_5_aggregates(self):
self.test_baseline_5_aggregates()
def test_profile_6_editing(self):
self.test_baseline_6_editing()
- @profiling.function_call_count(2994, {'2.4': 1844})
+ @profiling.function_call_count(2641, {'2.4': 1844})
def test_profile_7_multiview(self):
self.test_baseline_7_multiview()
creator = testing.db.pool._creator
recorder = lambda: dbapi_session.recorder(creator())
engine = engines.testing_engine(options={'creator':recorder})
+ engine.dialect._unwrap_connection = engines.unwrap_connection
metadata = MetaData(engine)
session = sessionmaker()()
player = lambda: dbapi_session.player()
engine = create_engine('postgresql:///', creator=player)
+ engine.dialect._unwrap_connection = engines.unwrap_connection
metadata = MetaData(engine)
session = sessionmaker()()
def test_profile_4_expressions(self):
self.test_baseline_4_expressions()
- @profiling.function_call_count(1331)
+ @profiling.function_call_count(1240)
def test_profile_5_aggregates(self):
self.test_baseline_5_aggregates()
m2 = MetaData(testing.db)
t2 = Table('t', m2, autoload=True)
assert isinstance(t2.c.data.type, sqltypes.NVARCHAR)
+
+ # nvarchar returns unicode natively. cx_oracle
+ # _OracleNVarChar type should be at play
+ # here.
+ assert isinstance(
+ t2.c.data.type.dialect_impl(testing.db.dialect),
+ cx_oracle._OracleNVarChar)
+
data = u'm’a réveillé.'
t2.insert().execute(data=data)
- eq_(t2.select().execute().first()['data'], data)
+ res = t2.select().execute().first()['data']
+ eq_(res, data)
+ assert isinstance(res, unicode)
finally:
metadata.drop_all()
arrtable = Table('arrtable', metadata,
Column('id', Integer, primary_key=True),
Column('intarr', postgresql.PGArray(Integer)),
- Column('strarr', postgresql.PGArray(String(convert_unicode=True)), nullable=False)
+ Column('strarr', postgresql.PGArray(Unicode(assert_unicode=False)), nullable=False)
)
metadata.create_all()
-
+
def teardown(self):
arrtable.delete().execute()
import sqlalchemy as tsa
from sqlalchemy.test import TestBase, testing
from sqlalchemy.test.util import gc_collect, lazy_gc
-
+from sqlalchemy.test.testing import eq_
mcid = 1
class MockDBAPI(object):
self.checked_out = []
self.checked_in = []
def assert_total(innerself, conn, fconn, cout, cin):
- self.assert_(len(innerself.connected) == conn)
- self.assert_(len(innerself.first_connected) == fconn)
- self.assert_(len(innerself.checked_out) == cout)
- self.assert_(len(innerself.checked_in) == cin)
+ eq_(len(innerself.connected), conn)
+ eq_(len(innerself.first_connected), fconn)
+ eq_(len(innerself.checked_out), cout)
+ eq_(len(innerself.checked_in), cin)
def assert_in(innerself, item, in_conn, in_fconn, in_cout, in_cin):
self.assert_((item in innerself.connected) == in_conn)
self.assert_((item in innerself.first_connected) == in_fconn)
del c
snoop.assert_total(2, 0, 2, 1)
+ # recreated
+ p = p.recreate()
+ snoop.clear()
+
+ c = p.connect()
+ snoop.assert_total(1, 1, 1, 0)
+ c.close()
+ snoop.assert_total(1, 1, 1, 1)
+ c = p.connect()
+ snoop.assert_total(1, 1, 2, 1)
+ c.close()
+ snoop.assert_total(1, 1, 2, 2)
+
def test_listeners_callables(self):
dbapi = MockDBAPI()
eq_(results, [(User(name='jack'), 'jack')])
self.assert_sql_count(testing.db, go, 1)
+ @testing.fails_on('postgresql+pg8000', "'type oid 705 not mapped to py type' (due to literal)")
def test_self_referential(self):
sess = create_session()
# ensure column expressions are taken from inside the subquery, not restated at the top
- q = sess.query(Order.id, Order.description, literal_column("'q'").label('foo')).filter(Order.description == u'order 3').from_self()
+ q = sess.query(Order.id, Order.description, literal_column("'q'").label('foo')).\
+ filter(Order.description == u'order 3').from_self()
self.assert_compile(q,
"SELECT anon_1.orders_id AS anon_1_orders_id, anon_1.orders_description AS anon_1_orders_description, "
"anon_1.foo AS anon_1_foo FROM (SELECT orders.id AS orders_id, orders.description AS orders_description, "
@engines.close_first
def teardown(self):
unicode_table.delete().execute()
-
+
+ def test_native_unicode(self):
+ """assert expected values for 'native unicode' mode"""
+
+ assert testing.db.dialect.returns_unicode_strings == \
+ ((testing.db.name, testing.db.driver) in \
+ (
+ ('postgresql','psycopg2'),
+ ('postgresql','pg8000'),
+ ('postgresql','zxjdbc'),
+ ('mysql','zxjdbc'),
+ ('sqlite','pysqlite'),
+ ))
+
def test_round_trip(self):
unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
]
)
- @testing.fails_on('postgresql+zxjdbc',
- 'zxjdbc fails on ENUM: column "XXX" is of type XXX '
- 'but expression is of type character varying')
- @testing.fails_on('mysql', "MySQL seems to issue a 'data truncated' warning.")
+ @testing.fails_on('mysql+mysqldb', "MySQL seems to issue a 'data truncated' warning.")
def test_constraint(self):
assert_raises(exc.DBAPIError,
enum_table.insert().execute,