raise "not implemented"
def last_inserted_ids(self):
- return self.context.last_inserted_ids
-
+ table = self.context.last_inserted_table
+ if self.context.lastrowid is not None and table is not None and len(table.primary_keys):
+ row = sql.select(table.primary_keys, table.rowid_column == self.context.lastrowid).execute().fetchone()
+ return [v for v in row]
+ else:
+ return None
+
def pre_exec(self, connection, cursor, statement, parameters, echo = None, compiled = None, **kwargs):
if True: return
# if a sequence was explicitly defined we do it here
def post_exec(self, connection, cursor, statement, parameters, echo = None, compiled = None, **kwargs):
if compiled is None: return
if getattr(compiled, "isinsert", False):
- # psycopg wants to return internal rowids, which I guess is what DBAPI2 really
- # specifies.
- # well then post exec to get the row. I guess this could be genericised to
- # be for all inserts somehow if the "rowid" col could be gotten off a table.
table = compiled.statement.table
- if len(table.primary_keys):
- # TODO: cache this statement against the table to avoid multiple re-compiles
- # TODO: instead of "oid" have the Table object have a "rowid_col" property
- # that gives this col generically
- row = sql.select(table.primary_keys, sql.ColumnClause("oid",table) == bindparam('oid', cursor.lastrowid) ).execute().fetchone()
- self.context.last_inserted_ids = [v for v in row]
-
+ self.context.last_inserted_table = table
+ self.context.lastrowid = cursor.lastrowid
+
def dbapi(self):
return self.module
def compiler(self, statement, bindparams):
raise NotImplementedError()
+ def rowid_column_name(self):
+ """returns the ROWID column name for this engine."""
+ return "oid"
+
def create(self, table, **params):
table.accept_visitor(self.schemagenerator(self.proxy(), **params))
return _mappers[hashkey]
def clear_mappers():
+ """removes all mappers that have been created thus far. when new mappers are
+ created, they will be assigned to their classes as their primary mapper."""
_mappers.clear()
-
+
+def clear_mapper(m):
+ """removes the given mapper from the storage of mappers. when a new mapper is
+ created for the previous mapper's class, it will be used as that classes'
+ new primary mapper."""
+ del _mappers[m.hash_key]
+
def eagerload(name):
"""returns a MapperOption that will convert the property of the given name
into an eager load. Used with mapper.options()"""
return self.hashkey
def _init_class(self):
+ """sets up our classes' overridden __init__ method, this mappers hash key as its '_mapper' property,
+ and our columns as its 'c' property. if the class already had a mapper, the old __init__ method
+ is kept the same."""
+ if not hasattr(self.class_, '_mapper'):
+ oldinit = self.class_.__init__
+ def init(self, *args, **kwargs):
+ nohist = kwargs.pop('_mapper_nohistory', False)
+ if oldinit is not None:
+ oldinit(self, *args, **kwargs)
+ if not nohist:
+ objectstore.uow().register_new(self)
+ self.class_.__init__ = init
self.class_._mapper = self.hashkey
self.class_.c = self.c
- oldinit = self.class_.__init__
- def init(self, *args, **kwargs):
- nohist = kwargs.pop('_mapper_nohistory', False)
- if oldinit is not None:
- oldinit(self, *args, **kwargs)
- if not nohist:
- objectstore.uow().register_new(self)
- self.class_.__init__ = init
def set_property(self, key, prop):
self.props[key] = prop
params = {}
for key in self.lazybinds.keys():
params[key] = row[key]
- result = self.mapper.select(self.lazywhere, **params)
+ if self.secondary is not None:
+ order_by = [self.secondary.rowid_column]
+ else:
+ order_by = [self.target.rowid_column]
+ result = self.mapper.select(self.lazywhere, order_by=order_by,**params)
if self.uselist:
return result
else:
"""
return (class_, table, tuple([row[column.label] for column in primary_keys]))
+def begin():
+ """begins a new UnitOfWork transaction. the next commit will affect only
+ objects that are created, modified, or deleted following the begin statement."""
+ uow().begin()
+
def commit(*obj):
+ """commits the current UnitOfWork transaction. if a transaction was begun
+ via begin(), commits only those objects that were created, modified, or deleted
+ since that begin statement. otherwise commits all objects that have been
+ changed."""
uow().commit(*obj)
def clear():
+ """removes all current UnitOfWorks and IdentityMaps for this thread and
+ establishes a new one. It is probably a good idea to discard all
+ current mapped object instances, as they are no longer in the Identity Map."""
uow.set(UnitOfWork())
def delete(*obj):
+ """registers the given objects as to be deleted upon the next commit"""
uw = uow()
for o in obj:
uw.register_deleted(o)
def has_key(key):
+ """returns True if the current thread-local IdentityMap contains the given instance key"""
return uow().identity_map.has_key(key)
+def has_instance(instance):
+ """returns True if the current thread-local IdentityMap contains the given instance"""
+ return uow().identity_map.has_key(instance_key(instance))
+
+def instance_key(instance):
+ """returns the IdentityMap key for the given instance"""
+ return object_mapper(instance).instance_key(instance)
+
class UOWListElement(attributes.ListElement):
def __init__(self, obj, key, data=None, deleteremoved=False):
attributes.ListElement.__init__(self, obj, key, data=data)
original = property(lambda s: s._orig or s)
engine = property(lambda s: s.table.engine)
-
def _set_parent(self, table):
table.columns[self.key] = self
"""represents a list of clauses joined by an operator"""
def __init__(self, operator, *clauses):
self.operator = operator
- self.fromobj = []
self.clauses = []
self.parens = False
for c in clauses:
elif isinstance(clause, CompoundClause):
clause.parens = True
self.clauses.append(clause)
- self.fromobj += clause._get_from_objects()
def accept_visitor(self, visitor):
for c in self.clauses:
visitor.visit_compound(self)
def _get_from_objects(self):
- return self.fromobj
+ f = []
+ for c in self.clauses:
+ f += c._get_from_objects()
+ return f
def hash_key(self):
return string.join([c.hash_key() for c in self.clauses], self.operator)
def __init__(self, table):
self.table = table
self.id = self.table.name
+ self.rowid_column = schema.Column(self.table.engine.rowid_column_name(), types.Integer)
+ self.rowid_column._set_parent(table)
+ del self.table.c[self.rowid_column.key]
def get_from_text(self):
return self.table.name
+
def group_parenthesized(self):
return False
"""
def __init__(self):
self.__dict__['_list'] = []
-
def keys(self):
return self._list
-
def __iter__(self):
return iter([self[x] for x in self._list])
-
def __setitem__(self, key, object):
setattr(self, key, object)
-
def __getitem__(self, key):
return getattr(self, key)
-
+ def __delitem__(self, key):
+ delattr(self, key)
+ del self._list[self._list.index(key)]
def __setattr__(self, key, object):
if not hasattr(self, key):
self._list.append(key)
l = Item.mapper.select()
self.assert_result(l, Item,
{'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
- {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
- {'item_id' : 3, 'keywords' : (Keyword, [{'keyword_id' : 3}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
+ {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 7}, {'keyword_id' : 5}])},
+ {'item_id' : 3, 'keywords' : (Keyword, [{'keyword_id' : 6}, {'keyword_id' : 3}, {'keyword_id' : 4}])},
{'item_id' : 4, 'keywords' : (Keyword, [])},
{'item_id' : 5, 'keywords' : (Keyword, [])}
)
l = Item.mapper.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, Item.c.item_id==itemkeywords.c.item_id))
self.assert_result(l, Item,
{'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
- {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
+ {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 7}, {'keyword_id' : 5}])},
)
class EagerTest(MapperSuperTest):
import sqlalchemy.objectstore as objectstore
import testbase
-echo = testbase.echo
-#testbase.echo = False
from tables import *
+import tables
-db.connection().commit()
-
-db.echo = echo
-testbase.echo = echo
-
class HistoryTest(AssertMixin):
def testattr(self):
m = mapper(User, users, properties = dict(addresses = relation(Address, addresses)))
class SaveTest(AssertMixin):
+ def setUpAll(self):
+ db.echo = False
+ tables.create()
+ db.echo = testbase.echo
+ def tearDownAll(self):
+ db.echo = False
+ tables.drop()
+ db.echo = testbase.echo
+
def setUp(self):
- e = db.echo
db.echo = False
+ # remove all history/identity maps etc.
objectstore.clear()
+ # remove all mapperes
clear_mappers()
-
- itemkeywords.delete().execute()
- orderitems.delete().execute()
- orders.delete().execute()
- addresses.delete().execute()
- users.delete().execute()
- keywords.delete().execute()
keywords.insert().execute(
dict(name='blue'),
dict(name='red'),
dict(name='round'),
dict(name='square')
)
- db.connection().commit()
- db.echo = e
+ db.commit()
+ db.echo = testbase.echo
+
+ def tearDown(self):
+ db.echo = False
+ tables.delete()
+ db.echo = testbase.echo
def testbasic(self):
# save two users
ECHO = testbase.echo
-DBTYPE = 'sqlite_memory'
-#DBTYPE = 'postgres'
+#DBTYPE = 'sqlite_memory'
+DBTYPE = 'postgres'
#DBTYPE = 'sqlite_file'
if DBTYPE == 'sqlite_memory':
db = sqlalchemy.engine.create_engine('sqlite', ':memory:', {}, echo = testbase.echo)
elif DBTYPE == 'sqlite_file':
import sqlalchemy.databases.sqlite as sqllite
-# if os.access('querytest.db', os.F_OK):
- # os.remove('querytest.db')
db = sqlalchemy.engine.create_engine('sqlite', 'querytest.db', {}, echo = testbase.echo)
elif DBTYPE == 'postgres':
db = sqlalchemy.engine.create_engine('postgres', {'database':'test', 'host':'127.0.0.1', 'user':'scott', 'password':'tiger'}, echo=testbase.echo)
orderitems.create()
keywords.create()
itemkeywords.create()
+ db.commit()
def drop():
itemkeywords.drop()
orders.drop()
addresses.drop()
users.drop()
-
-def data():
+ db.commit()
+
+def delete():
itemkeywords.delete().execute()
keywords.delete().execute()
orderitems.delete().execute()
orders.delete().execute()
addresses.delete().execute()
users.delete().execute()
+
+def data():
+ delete()
+
+ # with SQLITE, the OID column of a table defaults to the primary key, if it has one.
+ # so to database-neutrally get rows back in "insert order" based on OID, we
+ # have to also put the primary keys in order for the purpose of these tests
users.insert().execute(
dict(user_id = 7, user_name = 'jack'),
dict(user_id = 8, user_name = 'ed'),
)
orderitems.insert().execute(
dict(item_id=1, order_id=2, item_name='item 1'),
- dict(item_id=3, order_id=3, item_name='item 3'),
dict(item_id=2, order_id=2, item_name='item 2'),
+ dict(item_id=3, order_id=3, item_name='item 3'),
+ dict(item_id=4, order_id=3, item_name='item 4'),
dict(item_id=5, order_id=3, item_name='item 5'),
- dict(item_id=4, order_id=3, item_name='item 4')
)
keywords.insert().execute(
dict(keyword_id=1, name='blue'),