class UnitOfWorkTest(AssertMixin):
def setUpAll(self):
global ctx, assign_mapper
- ctx = SessionContext(create_session)
+ ctx = SessionContext(Session)
def assign_mapper(*args, **kwargs):
return assignmapper.assign_mapper(ctx, *args, **kwargs)
global_extensions.append(ctx.mapper_extension)
def tearDownAll(self):
global_extensions.remove(ctx.mapper_extension)
def tearDown(self):
- ctx.current.clear()
+ Session.close_all()
+ ctx.current.close()
clear_mappers()
class HistoryTest(UnitOfWorkTest):
UnitOfWorkTest.tearDownAll(self)
def testbackref(self):
- s = create_session()
+ s = Session()
class User(object):pass
class Address(object):pass
am = mapper(Address, addresses)
#print repr(a.__class__._attribute_manager.get_history(a, 'user').added_items())
#print repr(u.addresses.added_items())
self.assert_(u.addresses == [a])
- s.flush()
+ s.commit()
- s.clear()
+ s.close()
u = s.query(m).select()[0]
print u.addresses[0].user
-
class VersioningTest(UnitOfWorkTest):
def setUpAll(self):
UnitOfWorkTest.setUpAll(self)
- ctx.current.clear()
+ ctx.current.close()
global version_table
version_table = Table('version_test', MetaData(testbase.db),
Column('id', Integer, Sequence('version_test_seq'), primary_key=True ),
)
version_table.create()
def tearDownAll(self):
- version_table.drop()
UnitOfWorkTest.tearDownAll(self)
+ version_table.drop()
def tearDown(self):
- version_table.delete().execute()
UnitOfWorkTest.tearDown(self)
+ version_table.delete().execute()
def testbasic(self):
- s = create_session()
+ s = Session()
class Foo(object):pass
assign_mapper(Foo, version_table, version_id_col=version_table.c.version_id)
f1 =Foo(value='f1', _sa_session=s)
f2 = Foo(value='f2', _sa_session=s)
- s.flush()
+ s.commit()
f1.value='f1rev2'
- s.flush()
- s2 = create_session()
+ s.commit()
+ s2 = Session()
f1_s = s2.query(Foo).get(f1.id)
f1_s.value='f1rev3'
- s2.flush()
+ s2.commit()
f1.value='f1rev3mine'
success = False
try:
# a concurrent session has modified this, should throw
# an exception
- s.flush()
+ s.commit()
+ assert False
except exceptions.ConcurrentModificationError, e:
#print e
success = True
if testbase.db.dialect.supports_sane_rowcount():
assert success
- s.clear()
+ s.close()
f1 = s.query(Foo).get(f1.id)
f2 = s.query(Foo).get(f2.id)
f1_s.value='f1rev4'
- s2.flush()
-
+ s2.commit()
+
s.delete(f1)
s.delete(f2)
success = False
try:
- s.flush()
+ s.commit()
except exceptions.ConcurrentModificationError, e:
#print e
success = True
def testversioncheck(self):
"""test that query.with_lockmode performs a 'version check' on an already loaded instance"""
- s1 = create_session()
+ s1 = Session()
class Foo(object):pass
assign_mapper(Foo, version_table, version_id_col=version_table.c.version_id)
f1s1 =Foo(value='f1', _sa_session=s1)
- s1.flush()
- s2 = create_session()
+ s1.commit()
+ s2 = Session()
f1s2 = s2.query(Foo).get(f1s1.id)
f1s2.value='f1 new value'
- s2.flush()
+ s2.commit()
try:
# load, version is wrong
s1.query(Foo).with_lockmode('read').get(f1s1.id)
s1.query(Foo).with_lockmode('read').get(f1s1.id)
# assert brand new load is OK too
- s1.clear()
+ s1.close()
s1.query(Foo).with_lockmode('read').get(f1s1.id)
def testnoversioncheck(self):
"""test that query.with_lockmode works OK when the mapper has no version id col"""
- s1 = create_session()
+ s1 = Session()
class Foo(object):pass
assign_mapper(Foo, version_table)
f1s1 =Foo(value='f1', _sa_session=s1)
f1s1.version_id=0
- s1.flush()
- s2 = create_session()
+ s1.commit()
+ s2 = Session()
f1s2 = s2.query(Foo).with_lockmode('read').get(f1s1.id)
assert f1s2.id == f1s1.id
assert f1s2.value == f1s1.value
class UnicodeTest(UnitOfWorkTest):
def setUpAll(self):
- UnitOfWorkTest.setUpAll(self)
global metadata, uni_table, uni_table2
metadata = MetaData(testbase.db)
uni_table = Table('uni_test', metadata,
Column('id', Integer, Sequence("uni2_test_id_seq", optional=True), primary_key=True),
Column('txt', Unicode(50), ForeignKey(uni_table.c.txt)))
metadata.create_all()
+ UnitOfWorkTest.setUpAll(self)
+
def tearDownAll(self):
- metadata.drop_all()
UnitOfWorkTest.tearDownAll(self)
+ metadata.drop_all()
+
def tearDown(self):
+ UnitOfWorkTest.tearDown(self)
clear_mappers()
for t in metadata.table_iterator(reverse=True):
t.delete().execute()
+
def testbasic(self):
class Test(object):
def __init__(self, id, txt):
txt = u"\u0160\u0110\u0106\u010c\u017d"
t1 = Test(id=1, txt = txt)
self.assert_(t1.txt == txt)
- ctx.current.flush()
+ ctx.current.commit()
self.assert_(t1.txt == txt)
def testrelation(self):
class Test(object):
t1 = Test(txt=txt)
t1.t2s.append(Test2())
t1.t2s.append(Test2())
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
t1 = ctx.current.query(Test).get_by(id=t1.id)
assert len(t1.t2s) == 2
mapper(Foo, table)
f1 = Foo()
f1.data = pickleable.Bar(4,5)
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
f2 = ctx.current.query(Foo).get_by(id=f1.id)
assert f2.data == f1.data
f2.data.y = 19
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
f3 = ctx.current.query(Foo).get_by(id=f1.id)
print f2.data, f3.data
assert f3.data != f1.data
f1 = Foo()
f1.data = pickleable.Bar(4,5)
f1.value = unicode('hi')
- ctx.current.flush()
+ ctx.current.commit()
def go():
- ctx.current.flush()
+ ctx.current.commit()
self.assert_sql_count(testbase.db, go, 0)
f1.value = unicode('someothervalue')
- self.assert_sql(testbase.db, lambda: ctx.current.flush(), [
+ self.assert_sql(testbase.db, lambda: ctx.current.commit(), [
(
"UPDATE mutabletest SET value=:value WHERE mutabletest.id = :mutabletest_id",
{'mutabletest_id': f1.id, 'value': u'someothervalue'}
])
f1.value = unicode('hi')
f1.data.x = 9
- self.assert_sql(testbase.db, lambda: ctx.current.flush(), [
+ self.assert_sql(testbase.db, lambda: ctx.current.commit(), [
(
"UPDATE mutabletest SET data=:data, value=:value WHERE mutabletest.id = :mutabletest_id",
{'mutabletest_id': f1.id, 'value': u'hi', 'data':f1.data}
mapper(Foo, table)
f1 = Foo()
f1.data = pickleable.BarWithoutCompare(4,5)
- ctx.current.flush()
+ ctx.current.commit()
def go():
- ctx.current.flush()
+ ctx.current.commit()
self.assert_sql_count(testbase.db, go, 0)
- ctx.current.clear()
+ ctx.current.close()
f2 = ctx.current.query(Foo).get_by(id=f1.id)
def go():
- ctx.current.flush()
+ ctx.current.commit()
self.assert_sql_count(testbase.db, go, 0)
f2.data.y = 19
def go():
- ctx.current.flush()
+ ctx.current.commit()
self.assert_sql_count(testbase.db, go, 1)
- ctx.current.clear()
+ ctx.current.close()
f3 = ctx.current.query(Foo).get_by(id=f1.id)
print f2.data, f3.data
assert (f3.data.x, f3.data.y) == (4,19)
def go():
- ctx.current.flush()
+ ctx.current.commit()
self.assert_sql_count(testbase.db, go, 0)
def testunicode(self):
mapper(Foo, table)
f1 = Foo()
f1.value = u'hi'
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
f1 = ctx.current.get(Foo, f1.id)
f1.value = u'hi'
def go():
- ctx.current.flush()
+ ctx.current.commit()
self.assert_sql_count(testbase.db, go, 0)
e.name = 'entry1'
e.value = 'this is entry 1'
e.multi_rev = 2
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
e2 = Query(Entry).get((e.multi_id, 2))
self.assert_(e is not e2 and e._instance_key == e2._instance_key)
e.pk_col_1 = 'pk1'
e.pk_col_2 = 'pk1_related'
e.data = 'im the data'
- ctx.current.flush()
+ ctx.current.commit()
def testkeypks(self):
import datetime
e.secondary = 'pk2'
e.assigned = datetime.date.today()
e.data = 'some more data'
- ctx.current.flush()
+ ctx.current.commit()
def testpksimmutable(self):
class Entry(object):
e.multi_id=5
e.multi_rev=5
e.name='somename'
- ctx.current.flush()
+ ctx.current.commit()
e.multi_rev=6
e.name = 'someothername'
try:
- ctx.current.flush()
+ ctx.current.commit()
assert False
except exceptions.FlushError, fe:
assert str(fe) == "Can't change the identity of instance Entry@%s in session (existing identity: (%s, (5, 5), None); new identity: (%s, (5, 6), None))" % (hex(id(e)), repr(e.__class__), repr(e.__class__))
ps = PersonSite()
ps.site = 'asdf'
p.sites.append(ps)
- ctx.current.flush()
+ ctx.current.commit()
assert people.count(people.c.person=='im the key').scalar() == peoplesites.count(peoplesites.c.person=='im the key').scalar() == 1
class PassiveDeletesTest(UnitOfWorkTest):
mc.children.append(MyOtherClass())
mc.children.append(MyOtherClass())
sess.save(mc)
- sess.flush()
- sess.clear()
+ sess.commit()
+ sess.close()
assert myothertable.count().scalar() == 4
mc = sess.query(MyClass).get(mc.id)
sess.delete(mc)
- sess.flush()
+ sess.commit()
assert mytable.count().scalar() == 0
assert myothertable.count().scalar() == 0
h3 = Hoho(hoho=self.althohoval, counter=12)
h4 = Hoho()
h5 = Hoho(foober='im the new foober')
- ctx.current.flush()
+ ctx.current.commit()
self.assert_(h1.hoho==self.althohoval)
self.assert_(h3.hoho==self.althohoval)
self.assert_(h5.foober=='im the new foober')
self.assert_sql_count(testbase.db, go, 0)
- ctx.current.clear()
+ ctx.current.close()
l = Query(Hoho).select()
h1 = Hoho(hoho="15", counter="15")
- ctx.current.flush()
+ ctx.current.commit()
def go():
self.assert_(h1.hoho=="15")
self.assert_(h1.counter=="15")
class Hoho(object):pass
assign_mapper(Hoho, default_table)
h1 = Hoho()
- ctx.current.flush()
+ ctx.current.commit()
self.assert_(h1.foober == 'im foober')
h1.counter = 19
- ctx.current.flush()
+ ctx.current.commit()
self.assert_(h1.foober == 'im the update')
class OneToManyTest(UnitOfWorkTest):
a2.email_address = 'lala@test.org'
u.addresses.append(a2)
print repr(u.addresses)
- ctx.current.flush()
+ ctx.current.commit()
usertable = users.select(users.c.user_id.in_(u.user_id)).execute().fetchall()
self.assertEqual(usertable[0].values(), [u.user_id, 'one2manytester'])
a2.email_address = 'somethingnew@foo.com'
- ctx.current.flush()
+ ctx.current.commit()
addresstable = addresses.select(addresses.c.address_id == addressid).execute().fetchall()
self.assertEqual(addresstable[0].values(), [addressid, userid, 'somethingnew@foo.com'])
a3 = Address()
a3.email_address = 'emailaddress3'
- ctx.current.flush()
+ ctx.current.commit()
# modify user2 directly, append an address to user1.
# upon commit, user2 should be updated, user1 should not
u2.user_name = 'user2modified'
u1.addresses.append(a3)
del u1.addresses[0]
- self.assert_sql(testbase.db, lambda: ctx.current.flush(),
+ self.assert_sql(testbase.db, lambda: ctx.current.commit(),
[
(
"UPDATE users SET user_name=:user_name WHERE users.user_id = :users_user_id",
a = Address()
a.email_address = 'address1'
u1.addresses.append(a)
- ctx.current.flush()
+ ctx.current.commit()
del u1.addresses[0]
u2.addresses.append(a)
ctx.current.delete(u1)
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
u2 = ctx.current.get(User, u2.user_id)
assert len(u2.addresses) == 1
a = Address()
a.email_address = 'address1'
u1.addresses.append(a)
- ctx.current.flush()
+ ctx.current.commit()
del u1.addresses[0]
u2.addresses.append(a)
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
u2 = ctx.current.get(User, u2.user_id)
assert len(u2.addresses) == 1
u.user_name = 'one2onetester'
u.address = a
u.address.email_address = 'myonlyaddress@foo.com'
- ctx.current.flush()
+ ctx.current.commit()
ctx.current.delete(u)
- ctx.current.flush()
+ ctx.current.commit()
self.assert_(a.address_id is not None and a.user_id is None and not ctx.current.identity_map.has_key(u._instance_key) and ctx.current.identity_map.has_key(a._instance_key))
def testonetoone(self):
u.user_name = 'one2onetester'
u.address = Address()
u.address.email_address = 'myonlyaddress@foo.com'
- ctx.current.flush()
+ ctx.current.commit()
u.user_name = 'imnew'
- ctx.current.flush()
+ ctx.current.commit()
u.address.email_address = 'imnew@foo.com'
- ctx.current.flush()
+ ctx.current.commit()
def testbidirectional(self):
m1 = mapper(User, users)
a = Address()
a.email_address = 'testaddress'
a.user = u
- ctx.current.flush()
+ ctx.current.commit()
print repr(u.addresses)
x = False
try:
self.assert_(False, "User addresses element should be scalar based")
ctx.current.delete(u)
- ctx.current.flush()
+ ctx.current.commit()
def testdoublerelation(self):
m2 = mapper(Address, addresses)
u.boston_addresses.append(a)
u.newyork_addresses.append(b)
- ctx.current.flush()
+ ctx.current.commit()
class SaveTest(UnitOfWorkTest):
ctx.current.save(u)
ctx.current.flush([u])
- ctx.current.flush()
+ ctx.current.commit()
# assert the first one retreives the same from the identity map
nu = ctx.current.get(m, u.user_id)
self.assert_(u is nu)
# clear out the identity map, so next get forces a SELECT
- ctx.current.clear()
+ ctx.current.close()
# check it again, identity should be different but ids the same
nu = ctx.current.get(m, u.user_id)
ctx.current.update(u)
u.user_name = 'modifiedname'
assert u in ctx.current.dirty
- ctx.current.flush()
+ ctx.current.commit()
# select both
- #ctx.current.clear()
+ #ctx.current.close()
userlist = Query(m).select(users.c.user_id.in_(u.user_id, u2.user_id), order_by=[users.c.user_name])
print repr(u.user_id), repr(userlist[0].user_id), repr(userlist[0].user_name)
self.assert_(u.user_id == userlist[0].user_id and userlist[0].user_name == 'modifiedname')
u.addresses.append(Address())
u.addresses.append(Address())
u.addresses.append(Address())
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
ulist = ctx.current.query(m1).select()
u1 = ulist[0]
u1.user_name = 'newname'
- ctx.current.flush()
+ ctx.current.commit()
self.assert_(len(u1.addresses) == 4)
def testinherits(self):
)
au = AddressUser()
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
l = ctx.current.query(AddressUser).selectone()
self.assert_(l.user_id == au.user_id and l.address_id == au.address_id)
def testdeferred(self):
- """test that a deferred load within a flush() doesnt screw up the connection"""
+ """test that a deferred load within a commit() doesnt screw up the connection"""
mapper(User, users, properties={
'user_name':deferred(users.c.user_name)
})
u = User()
u.user_id=42
- ctx.current.flush()
+ ctx.current.commit()
def test_dont_update_blanks(self):
mapper(User, users)
u = User()
u.user_name = ""
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
u = ctx.current.query(User).get(u.user_id)
u.user_name = ""
def go():
- ctx.current.flush()
+ ctx.current.commit()
self.assert_sql_count(testbase.db, go, 0)
def testmultitable(self):
u.user_name = 'multitester'
u.email = 'multi@test.org'
- ctx.current.flush()
+ ctx.current.commit()
id = m.primary_key_from_instance(u)
- ctx.current.clear()
+ ctx.current.close()
u = ctx.current.get(User, id)
assert u.user_name == 'multitester'
u.email = 'lala@hey.com'
u.user_name = 'imnew'
- ctx.current.flush()
+ ctx.current.commit()
usertable = users.select(users.c.user_id.in_(u.foo_id)).execute().fetchall()
self.assertEqual(usertable[0].values(), [u.foo_id, 'imnew'])
addresstable = addresses.select(addresses.c.address_id.in_(u.address_id)).execute().fetchall()
self.assertEqual(addresstable[0].values(), [u.address_id, u.foo_id, 'lala@hey.com'])
- ctx.current.clear()
+ ctx.current.close()
u = ctx.current.get(User, id)
assert u.user_name == 'imnew'
u = User()
u.addresses.append(Address())
u.addresses.append(Address())
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
u = ctx.current.query(User).get(u.user_id)
ctx.current.delete(u)
- ctx.current.flush()
+ ctx.current.commit()
assert users.count().scalar() == 0
assert addresses.count().scalar() == 0
u1.username = 'user1'
u2 = User()
u2.username = 'user2'
- ctx.current.flush()
+ ctx.current.commit()
clear_mappers()
u2 = User()
u2.username = 'user2'
try:
- ctx.current.flush()
+ ctx.current.commit()
assert False
except AssertionError:
assert True
a.user.user_name = elem['user_name']
objects.append(a)
- ctx.current.flush()
+ ctx.current.commit()
objects[2].email_address = 'imnew@foo.bar'
objects[3].user = User()
objects[3].user.user_name = 'imnewlyadded'
- self.assert_sql(testbase.db, lambda: ctx.current.flush(), [
+ self.assert_sql(testbase.db, lambda: ctx.current.commit(), [
(
"INSERT INTO users (user_name) VALUES (:user_name)",
{'user_name': 'imnewlyadded'}
u1.user_name='user1'
a1.user = u1
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
a1 = ctx.current.query(Address).get(a1.address_id)
u1 = ctx.current.query(User).get(u1.user_id)
assert a1.user is u1
a1.user = None
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
a1 = ctx.current.query(Address).get(a1.address_id)
u1 = ctx.current.query(User).get(u1.user_id)
assert a1.user is None
u1.user_name='user1'
a1.user = u1
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
a1 = ctx.current.query(Address).get(a1.address_id)
a2 = ctx.current.query(Address).get(a2.address_id)
u1 = ctx.current.query(User).get(u1.user_id)
assert a1.user is u1
a1.user = None
a2.user = u1
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
a1 = ctx.current.query(Address).get(a1.address_id)
a2 = ctx.current.query(Address).get(a2.address_id)
u1 = ctx.current.query(User).get(u1.user_id)
u2.user_name='user2'
a1.user = u1
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
a1 = ctx.current.query(Address).get(a1.address_id)
u1 = ctx.current.query(User).get(u1.user_id)
u2 = ctx.current.query(User).get(u2.user_id)
assert a1.user is u1
a1.user = u2
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
a1 = ctx.current.query(Address).get(a1.address_id)
u1 = ctx.current.query(User).get(u1.user_id)
u2 = ctx.current.query(User).get(u2.user_id)
k.name = kname
item.keywords.append(k)
- ctx.current.flush()
+ ctx.current.commit()
l = ctx.current.query(m).select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name])
self.assert_result(l, *data)
k = Keyword()
k.name = 'yellow'
objects[5].keywords.append(k)
- self.assert_sql(testbase.db, lambda:ctx.current.flush(), [
+ self.assert_sql(testbase.db, lambda:ctx.current.commit(), [
{
"UPDATE items SET item_name=:item_name WHERE items.item_id = :items_item_id":
{'item_name': 'item4updated', 'items_item_id': objects[4].item_id}
objects[2].keywords.append(k)
dkid = objects[5].keywords[1].keyword_id
del objects[5].keywords[1]
- self.assert_sql(testbase.db, lambda:ctx.current.flush(), [
+ self.assert_sql(testbase.db, lambda:ctx.current.commit(), [
(
"DELETE FROM itemkeywords WHERE itemkeywords.item_id = :item_id AND itemkeywords.keyword_id = :keyword_id",
[{'item_id': objects[5].item_id, 'keyword_id': dkid}]
])
ctx.current.delete(objects[3])
- ctx.current.flush()
+ ctx.current.commit()
def testmanytomanyremove(self):
"""tests that setting a list-based attribute to '[]' properly affects the history and allows
k2 = Keyword()
i.keywords.append(k1)
i.keywords.append(k2)
- ctx.current.flush()
+ ctx.current.commit()
assert itemkeywords.count().scalar() == 2
i.keywords = []
- ctx.current.flush()
+ ctx.current.commit()
assert itemkeywords.count().scalar() == 0
def testscalar(self):
))
i = Item()
- ctx.current.flush()
+ ctx.current.commit()
ctx.current.delete(i)
- ctx.current.flush()
+ ctx.current.commit()
item.keywords.append(k1)
item.keywords.append(k2)
item.keywords.append(k3)
- ctx.current.flush()
+ ctx.current.commit()
item.keywords = []
item.keywords.append(k1)
item.keywords.append(k2)
- ctx.current.flush()
+ ctx.current.commit()
- ctx.current.clear()
+ ctx.current.close()
item = ctx.current.query(Item).get(item.item_id)
print [k1, k2]
print item.keywords
ik.keyword = k
item.keywords.append(ik)
- ctx.current.flush()
- ctx.current.clear()
+ ctx.current.commit()
+ ctx.current.close()
l = Query(m).select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name])
self.assert_result(l, *data)
k = KeywordUser()
k.user_name = 'keyworduser'
k.keyword_name = 'a keyword'
- ctx.current.flush()
+ ctx.current.commit()
id = (k.user_id, k.keyword_id)
- ctx.current.clear()
+ ctx.current.close()
k = ctx.current.query(KeywordUser).get(id)
assert k.user_name == 'keyworduser'
assert k.keyword_name == 'a keyword'
class SaveTest2(UnitOfWorkTest):
def setUp(self):
- ctx.current.clear()
+ ctx.current.close()
clear_mappers()
global meta, users, addresses
meta = MetaData(testbase.db)
a.user = User()
a.user.user_name = elem['user_name']
objects.append(a)
- self.assert_sql(testbase.db, lambda: ctx.current.flush(), [
+ self.assert_sql(testbase.db, lambda: ctx.current.commit(), [
(
"INSERT INTO users (user_name) VALUES (:user_name)",
{'user_name': 'thesub'}
k2 = Keyword()
i.keywords.append(k1)
i.keywords.append(k2)
- ctx.current.flush()
+ ctx.current.commit()
assert t2.count().scalar() == 2
i.keywords = []
print i.keywords
- ctx.current.flush()
+ ctx.current.commit()
assert t2.count().scalar() == 0