From: Jason Kirtland Date: Tue, 31 Mar 2009 19:49:13 +0000 (+0000) Subject: - Added support for create_engine(isolation_level=...); postgres & X-Git-Tag: rel_0_6_6~245 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5f6b770b9c10458ab2c293fa1a646ced05f6d673;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - Added support for create_engine(isolation_level=...); postgres & sqlite initially [ticket:443] - Dialects gained visit_pool - Pools gained a first_connect event Patch from Adam Lowry. Thank you Adam! --- diff --git a/06CHANGES b/06CHANGES index d8a4884636..70834b35c1 100644 --- a/06CHANGES +++ b/06CHANGES @@ -7,11 +7,18 @@ There is no implicit fallback onto "fetch". Failure of evaluation is based on the structure of criteria, so success/failure is deterministic based on code structure. - + +- engines + - transaction isolation level may be specified with + create_engine(... isolation_level="..."); available on + postgresql and sqlite. [ticket:443] + - dialect refactor - server_version_info becomes a static attribute. - - create_engine() now establishes an initial connection immediately upon - creation, which is passed to the dialect to determine connection properties. + - dialects receive an initialize() event on initial connection to + determine connection properties. + - dialects receive a visit_pool event have an opportunity to + establish pool listeners. - cached TypeEngine classes are cached per-dialect class instead of per-dialect. - mysql diff --git a/lib/sqlalchemy/dialects/postgres/base.py b/lib/sqlalchemy/dialects/postgres/base.py index 7ab1ac7a48..c541cff95e 100644 --- a/lib/sqlalchemy/dialects/postgres/base.py +++ b/lib/sqlalchemy/dialects/postgres/base.py @@ -445,7 +445,25 @@ class PGDialect(default.DefaultDialect): preparer = PGIdentifierPreparer defaultrunner = PGDefaultRunner inspector = PGInspector - + isolation_level = None + + def __init__(self, isolation_level=None, **kwargs): + default.DefaultDialect.__init__(self, **kwargs) + self.isolation_level = isolation_level + + def visit_pool(self, pool): + if self.isolation_level is not None: + class SetIsolationLevel(object): + def __init__(self, isolation_level): + self.isolation_level = isolation_level + + def connect(self, conn, rec): + cursor = conn.cursor() + cursor.execute("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL %s" + % self.isolation_level) + cursor.execute("COMMIT") + cursor.close() + pool.add_listener(SetIsolationLevel(self.isolation_level)) def do_begin_twophase(self, connection, xid): self.do_begin(connection.connection) diff --git a/lib/sqlalchemy/dialects/postgres/psycopg2.py b/lib/sqlalchemy/dialects/postgres/psycopg2.py index 2a3eef47f0..c865c9decf 100644 --- a/lib/sqlalchemy/dialects/postgres/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgres/psycopg2.py @@ -24,6 +24,10 @@ psycopg2-specific keyword arguments which are accepted by :func:`~sqlalchemy.cre uses special row-buffering behavior when this feature is enabled, such that groups of 100 rows at a time are fetched over the wire to reduce conversational overhead. +* *isolation_level* - Sets the transaction isolation level for each transaction + within the engine. Valid isolation levels are `READ_COMMITTED`, + `READ_UNCOMMITTED`, `REPEATABLE_READ`, and `SERIALIZABLE`. + Transactions ------------ @@ -116,7 +120,6 @@ class Postgres_psycopg2(PGDialect): } ) - def __init__(self, server_side_cursors=False, **kwargs): PGDialect.__init__(self, **kwargs) self.server_side_cursors = server_side_cursors @@ -145,4 +148,4 @@ class Postgres_psycopg2(PGDialect): return False dialect = Postgres_psycopg2 - \ No newline at end of file + diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index fb5e6c2218..a884221bd3 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -390,6 +390,9 @@ class Dialect(object): raise NotImplementedError() + def visit_pool(self, pool): + """Executed after a pool is created.""" + class ExecutionContext(object): """A messenger object for a Dialect that corresponds to a single execution. diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 6b2e2a6834..d938c644f5 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -102,7 +102,10 @@ class DefaultDialect(base.Dialect): def validate_identifier(self, ident): if len(ident) > self.max_identifier_length: raise exc.IdentifierError("Identifier '%s' exceeds maximum length of %d characters" % (ident, self.max_identifier_length)) - + + def connect(self, *cargs, **cparams): + return self.dbapi.connect(*cargs, **cparams) + def do_begin(self, connection): """Implementations might want to put logic here for turning autocommit on/off, etc. diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py index d31f785545..c57acf242f 100644 --- a/lib/sqlalchemy/engine/strategies.py +++ b/lib/sqlalchemy/engine/strategies.py @@ -13,7 +13,6 @@ from operator import attrgetter from sqlalchemy.engine import base, threadlocal, url from sqlalchemy import util, exc from sqlalchemy import pool as poollib -from sqlalchemy import interfaces strategies = {} @@ -75,7 +74,7 @@ class DefaultEngineStrategy(EngineStrategy): if pool is None: def connect(): try: - return dbapi.connect(*cargs, **cparams) + return dialect.connect(*cargs, **cparams) except Exception, e: import sys raise exc.DBAPIError.instance(None, None, e), None, sys.exc_info()[2] @@ -126,13 +125,14 @@ class DefaultEngineStrategy(EngineStrategy): engine = engineclass(pool, dialect, u, **engine_args) if _initialize: - class OnInit(interfaces.PoolListener): - def connect(self, conn, rec): + class OnInit(object): + def first_connect(self, conn, rec): c = base.Connection(engine, connection=conn) dialect.initialize(c) - pool._on_connect.remove(self) - pool._on_connect.insert(0, OnInit()) - + pool._on_first_connect.insert(0, OnInit()) + + dialect.visit_pool(pool) + return engine def pool_threadlocal(self): diff --git a/lib/sqlalchemy/interfaces.py b/lib/sqlalchemy/interfaces.py index dfceffe445..e4a9adee1f 100644 --- a/lib/sqlalchemy/interfaces.py +++ b/lib/sqlalchemy/interfaces.py @@ -71,6 +71,18 @@ class PoolListener(object): """ + def first_connect(self, dbapi_con, con_record): + """Called exactly once for the first DB-API connection. + + dbapi_con + A newly connected raw DB-API connection (not a SQLAlchemy + ``Connection`` wrapper). + + con_record + The ``_ConnectionRecord`` that persistently manages the connection + + """ + def checkout(self, dbapi_con, con_record, con_proxy): """Called when a connection is retrieved from the Pool. diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index aca4663774..c39b4b225b 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -108,6 +108,7 @@ class Pool(object): self.echo = echo self.listeners = [] self._on_connect = [] + self._on_first_connect = [] self._on_checkout = [] self._on_checkin = [] @@ -178,12 +179,14 @@ class Pool(object): """ - listener = as_interface( - listener, methods=('connect', 'checkout', 'checkin')) + listener = as_interface(listener, + methods=('connect', 'first_connect', 'checkout', 'checkin')) self.listeners.append(listener) if hasattr(listener, 'connect'): self._on_connect.append(listener) + if hasattr(listener, 'first_connect'): + self._on_first_connect.append(listener) if hasattr(listener, 'checkout'): self._on_checkout.append(listener) if hasattr(listener, 'checkin'): @@ -197,6 +200,10 @@ class _ConnectionRecord(object): self.__pool = pool self.connection = self.__connect() self.info = {} + ls = pool.__dict__.pop('_on_first_connect', None) + if ls is not None: + for l in ls: + l.first_connect(self.connection, self) if pool._on_connect: for l in pool._on_connect: l.connect(self.connection, self) diff --git a/test/dialect/postgres.py b/test/dialect/postgres.py index bb4a10e605..b25270f3a9 100644 --- a/test/dialect/postgres.py +++ b/test/dialect/postgres.py @@ -708,6 +708,20 @@ class MiscTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): warnings.warn = capture_warnings._orig_showwarning m1.drop_all() + def test_set_isolation_level(self): + """Test setting the isolation level with create_engine""" + eng = create_engine(testing.db.url) + self.assertEquals( + eng.execute("show transaction isolation level").scalar(), + 'read committed') + eng = create_engine(testing.db.url, isolation_level="SERIALIZABLE") + self.assertEquals( + eng.execute("show transaction isolation level").scalar(), + 'serializable') + eng = create_engine(testing.db.url, isolation_level="FOO") + self.assertRaises(eng.dialect.dbapi.ProgrammingError, eng.execute, + "show transaction isolation level") + class TimezoneTest(TestBase, AssertsExecutionResults): """Test timezone-aware datetimes. diff --git a/test/engine/pool.py b/test/engine/pool.py index b712e24128..fdac6c0d85 100644 --- a/test/engine/pool.py +++ b/test/engine/pool.py @@ -1,8 +1,8 @@ import testenv; testenv.configure_for_tests() import threading, time, gc -from sqlalchemy import pool, interfaces +from sqlalchemy import pool, interfaces, create_engine import testlib.sa as tsa -from testlib import TestBase +from testlib import TestBase, testing mcid = 1 @@ -164,6 +164,8 @@ class PoolTest(PoolTestBase): def __init__(self): if hasattr(self, 'connect'): self.connect = self.inst_connect + if hasattr(self, 'first_connect'): + self.first_connect = self.inst_first_connect if hasattr(self, 'checkout'): self.checkout = self.inst_checkout if hasattr(self, 'checkin'): @@ -171,14 +173,17 @@ class PoolTest(PoolTestBase): self.clear() def clear(self): self.connected = [] + self.first_connected = [] self.checked_out = [] self.checked_in = [] - def assert_total(innerself, conn, cout, cin): + def assert_total(innerself, conn, fconn, cout, cin): self.assert_(len(innerself.connected) == conn) + self.assert_(len(innerself.first_connected) == fconn) self.assert_(len(innerself.checked_out) == cout) self.assert_(len(innerself.checked_in) == cin) - def assert_in(innerself, item, in_conn, in_cout, in_cin): + def assert_in(innerself, item, in_conn, in_fconn, in_cout, in_cin): self.assert_((item in innerself.connected) == in_conn) + self.assert_((item in innerself.first_connected) == in_fconn) self.assert_((item in innerself.checked_out) == in_cout) self.assert_((item in innerself.checked_in) == in_cin) def inst_connect(self, con, record): @@ -186,6 +191,11 @@ class PoolTest(PoolTestBase): assert con is not None assert record is not None self.connected.append(con) + def inst_first_connect(self, con, record): + print "first_connect(%s, %s)" % (con, record) + assert con is not None + assert record is not None + self.first_connected.append(con) def inst_checkout(self, con, record, proxy): print "checkout(%s, %s, %s)" % (con, record, proxy) assert con is not None @@ -203,6 +213,9 @@ class PoolTest(PoolTestBase): class ListenConnect(InstrumentingListener): def connect(self, con, record): pass + class ListenFirstConnect(InstrumentingListener): + def first_connect(self, con, record): + pass class ListenCheckOut(InstrumentingListener): def checkout(self, con, record, proxy, num): pass @@ -214,40 +227,44 @@ class PoolTest(PoolTestBase): return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), use_threadlocal=False, **kw) - def assert_listeners(p, total, conn, cout, cin): + def assert_listeners(p, total, conn, fconn, cout, cin): for instance in (p, p.recreate()): self.assert_(len(instance.listeners) == total) self.assert_(len(instance._on_connect) == conn) + self.assert_(len(instance._on_first_connect) == fconn) self.assert_(len(instance._on_checkout) == cout) self.assert_(len(instance._on_checkin) == cin) p = _pool() - assert_listeners(p, 0, 0, 0, 0) + assert_listeners(p, 0, 0, 0, 0, 0) p.add_listener(ListenAll()) - assert_listeners(p, 1, 1, 1, 1) + assert_listeners(p, 1, 1, 1, 1, 1) p.add_listener(ListenConnect()) - assert_listeners(p, 2, 2, 1, 1) + assert_listeners(p, 2, 2, 1, 1, 1) + + p.add_listener(ListenFirstConnect()) + assert_listeners(p, 3, 2, 2, 1, 1) p.add_listener(ListenCheckOut()) - assert_listeners(p, 3, 2, 2, 1) + assert_listeners(p, 4, 2, 2, 2, 1) p.add_listener(ListenCheckIn()) - assert_listeners(p, 4, 2, 2, 2) + assert_listeners(p, 5, 2, 2, 2, 2) del p print "----" snoop = ListenAll() p = _pool(listeners=[snoop]) - assert_listeners(p, 1, 1, 1, 1) + assert_listeners(p, 1, 1, 1, 1, 1) c = p.connect() - snoop.assert_total(1, 1, 0) + snoop.assert_total(1, 1, 1, 0) cc = c.connection - snoop.assert_in(cc, True, True, False) + snoop.assert_in(cc, True, True, True, False) c.close() - snoop.assert_in(cc, True, True, True) + snoop.assert_in(cc, True, True, True, True) del c, cc snoop.clear() @@ -255,10 +272,10 @@ class PoolTest(PoolTestBase): # this one depends on immediate gc c = p.connect() cc = c.connection - snoop.assert_in(cc, False, True, False) - snoop.assert_total(0, 1, 0) + snoop.assert_in(cc, False, False, True, False) + snoop.assert_total(0, 0, 1, 0) del c, cc - snoop.assert_total(0, 1, 1) + snoop.assert_total(0, 0, 1, 1) p.dispose() snoop.clear() @@ -266,44 +283,44 @@ class PoolTest(PoolTestBase): c = p.connect() c.close() c = p.connect() - snoop.assert_total(1, 2, 1) + snoop.assert_total(1, 0, 2, 1) c.close() - snoop.assert_total(1, 2, 2) + snoop.assert_total(1, 0, 2, 2) # invalidation p.dispose() snoop.clear() c = p.connect() - snoop.assert_total(1, 1, 0) + snoop.assert_total(1, 0, 1, 0) c.invalidate() - snoop.assert_total(1, 1, 1) + snoop.assert_total(1, 0, 1, 1) c.close() - snoop.assert_total(1, 1, 1) + snoop.assert_total(1, 0, 1, 1) del c - snoop.assert_total(1, 1, 1) + snoop.assert_total(1, 0, 1, 1) c = p.connect() - snoop.assert_total(2, 2, 1) + snoop.assert_total(2, 0, 2, 1) c.close() del c - snoop.assert_total(2, 2, 2) + snoop.assert_total(2, 0, 2, 2) # detached p.dispose() snoop.clear() c = p.connect() - snoop.assert_total(1, 1, 0) + snoop.assert_total(1, 0, 1, 0) c.detach() - snoop.assert_total(1, 1, 0) + snoop.assert_total(1, 0, 1, 0) c.close() del c - snoop.assert_total(1, 1, 0) + snoop.assert_total(1, 0, 1, 0) c = p.connect() - snoop.assert_total(2, 2, 0) + snoop.assert_total(2, 0, 2, 0) c.close() del c - snoop.assert_total(2, 2, 1) + snoop.assert_total(2, 0, 2, 1) def test_listeners_callables(self): dbapi = MockDBAPI() @@ -362,6 +379,18 @@ class PoolTest(PoolTestBase): c.close() assert counts == [1, 2, 3] + def test_listener_after_oninit(self): + """Test that listeners are called after OnInit is removed""" + called = [] + def listener(*args): + called.append(True) + listener.connect = listener + engine = create_engine(testing.db.url) + engine.pool.add_listener(listener) + engine.execute('select 1') + assert called, "Listener not called on connect" + + class QueuePoolTest(PoolTestBase): def testqueuepool_del(self):