return sqltypes.adapt_type(typeobj, colspecs)
def do_begin_twophase(self, connection, xid):
+ # Two phase transactions seem to require that the transaction is explicitly started.
+ # The implicit transactions that usually work aren't enough.
+ connection.execute(sql.text("BEGIN"))
self.do_begin(connection.connection)
def do_prepare_twophase(self, connection, xid):
-from sqlalchemy.util import ScopedRegistry, to_list
+from sqlalchemy.util import ScopedRegistry, to_list, get_cls_kwargs
from sqlalchemy.orm import MapperExtension, EXT_CONTINUE, object_session
from sqlalchemy.orm.session import Session
from sqlalchemy import exceptions
"""return a mapper() function which associates this ScopedSession with the Mapper."""
from sqlalchemy.orm import mapper
- validate = kwargs.pop('validate', False)
+
+ extension_args = dict((arg,kwargs.pop(arg)) for arg in get_cls_kwargs(_ScopedExt) if arg in kwargs)
+
kwargs['extension'] = extension = to_list(kwargs.get('extension', []))
- if validate:
- extension.append(self.extension.validating())
+ if extension_args:
+ extension.append(self.extension.configure(**extension_args))
else:
extension.append(self.extension)
return mapper(*args, **kwargs)
setattr(ScopedSession, prop, clslevel(prop))
class _ScopedExt(MapperExtension):
- def __init__(self, context, validate=False):
+ def __init__(self, context, validate=False, save_on_init=True):
self.context = context
self.validate = validate
+ self.save_on_init = save_on_init
def validating(self):
return _ScopedExt(self.context, validate=True)
-
+
+ def configure(self, **kwargs):
+ return _ScopedExt(self.context, **kwargs)
+
def get_session(self):
return self.context.registry()
if not mapper.get_property(key, resolve_synonyms=False, raiseerr=False):
raise exceptions.ArgumentError("Invalid __init__ argument: '%s'" % key)
setattr(instance, key, value)
- session._save_impl(instance, entity_name=kwargs.pop('_sa_entity_name', None))
+ if self.save_on_init:
+ session._save_impl(instance, entity_name=kwargs.pop('_sa_entity_name', None))
return EXT_CONTINUE
def init_failed(self, mapper, class_, oldinit, instance, args, kwargs):
[(1,)]
)
connection2.close()
-
+
+ @testing.supported('postgres', 'mysql')
+ @testing.exclude('mysql', '<', (5, 0, 3))
+ def testmultipletwophase(self):
+ conn = testbase.db.connect()
+
+ xa = conn.begin_twophase()
+ conn.execute(users.insert(), user_id=1, user_name='user1')
+ xa.prepare()
+ xa.commit()
+
+ xa = conn.begin_twophase()
+ conn.execute(users.insert(), user_id=2, user_name='user2')
+ xa.prepare()
+ xa.rollback()
+
+ xa = conn.begin_twophase()
+ conn.execute(users.insert(), user_id=3, user_name='user3')
+ xa.rollback()
+
+ xa = conn.begin_twophase()
+ conn.execute(users.insert(), user_id=4, user_name='user4')
+ xa.prepare()
+ xa.commit()
+
+ result = conn.execute(select([users.c.user_name]).order_by(users.c.user_id))
+ self.assertEqual(result.fetchall(), [('user1',),('user4',)])
+
+ conn.close()
+
class AutoRollbackTest(PersistTest):
def setUpAll(self):
global metadata