callables are of the following form:
- def execute(instance, row, flags):
+ def execute(instance, row, **flags):
# process incoming instance and given row.
# flags is a dictionary containing at least the following attributes:
# isnew - indicates if the instance was newly created as a result of reading this row
# optional attribute:
# ispostselect - indicates if this row resulted from a 'post' select of additional tables/columns
- def post_execute(instance, flags):
+ def post_execute(instance, **flags):
# process instance after all result rows have been processed. this
# function should be used to issue additional selections in order to
# eagerly load additional properties.
if not context.identity_map.has_key(identitykey):
context.identity_map[identitykey] = instance
isnew = True
- if extension.populate_instance(self, context, row, instance, {'instancekey':identitykey, 'isnew':isnew}) is EXT_PASS:
- self.populate_instance(context, instance, row, {'instancekey':identitykey, 'isnew':isnew})
- if extension.append_result(self, context, row, instance, result, {'instancekey':identitykey, 'isnew':isnew}) is EXT_PASS:
+ if extension.populate_instance(self, context, row, instance, **{'instancekey':identitykey, 'isnew':isnew}) is EXT_PASS:
+ self.populate_instance(context, instance, row, **{'instancekey':identitykey, 'isnew':isnew})
+ if extension.append_result(self, context, row, instance, result, **{'instancekey':identitykey, 'isnew':isnew}) is EXT_PASS:
if result is not None:
result.append(instance)
return instance
# call further mapper properties on the row, to pull further
# instances from the row and possibly populate this item.
- if extension.populate_instance(self, context, row, instance, {'instancekey':identitykey, 'isnew':isnew}) is EXT_PASS:
- self.populate_instance(context, instance, row, {'instancekey':identitykey, 'isnew':isnew})
- if extension.append_result(self, context, row, instance, result, {'instancekey':identitykey, 'isnew':isnew}) is EXT_PASS:
+ if extension.populate_instance(self, context, row, instance, **{'instancekey':identitykey, 'isnew':isnew}) is EXT_PASS:
+ self.populate_instance(context, instance, row, **{'instancekey':identitykey, 'isnew':isnew})
+ if extension.append_result(self, context, row, instance, result, **{'instancekey':identitykey, 'isnew':isnew}) is EXT_PASS:
if result is not None:
result.append(instance)
return instance
newrow[c] = row[c2]
return newrow
- def populate_instance(self, selectcontext, instance, row, flags):
+ def populate_instance(self, selectcontext, instance, row, ispostselect=None, **flags):
"""populate an instance from a result row."""
- populators = selectcontext.attributes.get(('instance_populators', self, flags.get('ispostselect')), None)
+ populators = selectcontext.attributes.get(('instance_populators', self, ispostselect), None)
if populators is None:
populators = []
post_processors = []
if poly_select_loader is not None:
post_processors.append(poly_select_loader)
- selectcontext.attributes[('instance_populators', self, flags.get('ispostselect'))] = populators
- selectcontext.attributes[('post_processors', self, flags.get('ispostselect'))] = post_processors
+ selectcontext.attributes[('instance_populators', self, ispostselect)] = populators
+ selectcontext.attributes[('post_processors', self, ispostselect)] = post_processors
for p in populators:
- p(instance, row, flags)
+ p(instance, row, ispostselect=ispostselect, **flags)
if self.non_primary:
selectcontext.attributes[('populating_mapper', instance)] = self
cond, param_names = self._deferred_inheritance_condition(needs_tables)
statement = sql.select(needs_tables, cond, use_labels=True)
- def post_execute(instance, flags):
+ def post_execute(instance, **flags):
self.__log_debug("Post query loading instance " + mapperutil.instance_str(instance))
identitykey = self.instance_key(instance)
for c in param_names:
params[c.name] = self.get_attr_by_column(instance, c)
row = selectcontext.session.connection(self).execute(statement, **params).fetchone()
- self.populate_instance(selectcontext, instance, row, {'isnew':False, 'instancekey':identitykey, 'ispostselect':True})
+ self.populate_instance(selectcontext, instance, row, **{'isnew':False, 'instancekey':identitykey, 'ispostselect':True})
return post_execute
return EXT_PASS
- def append_result(self, mapper, selectcontext, row, instance, result, flags):
+ def append_result(self, mapper, selectcontext, row, instance, result, **flags):
"""Receive an object instance before that instance is appended
to a result list.
result
List to which results are being appended.
- flags
+ \**flags
extra information about the row, same as criterion in
`create_row_processor()` method of [sqlalchemy.orm.interfaces#MapperProperty]
"""
return EXT_PASS
- def populate_instance(self, mapper, selectcontext, row, instance, flags):
+ def populate_instance(self, mapper, selectcontext, row, instance, **flags):
"""Receive a newly-created instance before that instance has
its attributes populated.
def create_row_processor(self, selectcontext, mapper, row):
if self.columns[0] in row:
- def execute(instance, row, flags):
- if flags['isnew'] or flags.get('ispostselect'):
+ def execute(instance, row, isnew, ispostselect=None, **flags):
+ if isnew or ispostselect:
if self._should_log_debug:
self.logger.debug("populating %s with %s/%s" % (mapperutil.attribute_str(instance, self.key), row.__class__.__name__, self.columns[0].key))
instance.__dict__[self.key] = row[self.columns[0]]
return (None, None)
if hosted_mapper.polymorphic_fetch == 'deferred':
- def execute(instance, row, flags):
- if flags['isnew']:
+ def execute(instance, row, isnew, **flags):
+ if isnew:
sessionlib.attribute_manager.init_instance_attribute(instance, self.key, False, callable_=self._get_deferred_loader(instance, mapper, needs_tables))
self.logger.debug("Returning deferred column fetcher for %s %s" % (mapper, self.key))
return (execute, None)
def create_row_processor(self, selectcontext, mapper, row):
if not self.is_default or len(selectcontext.options):
- def execute(instance, row, flags):
- if flags['isnew']:
+ def execute(instance, row, isnew, **flags):
+ if isnew:
if self._should_log_debug:
self.logger.debug("set deferred callable on %s" % mapperutil.attribute_str(instance, self.key))
sessionlib.attribute_manager.init_instance_attribute(instance, self.key, False, callable_=self.setup_loader(instance))
return (execute, None)
else:
- def execute(instance, row, flags):
- if flags['isnew']:
+ def execute(instance, row, isnew, **flags):
+ if isnew:
if self._should_log_debug:
self.logger.debug("set deferred callable on %s" % mapperutil.attribute_str(instance, self.key))
sessionlib.attribute_manager.reset_instance_attribute(instance, self.key)
def create_row_processor(self, selectcontext, mapper, row):
if not self.is_default or len(selectcontext.options):
- def execute(instance, row, identitykey, isnew):
+ def execute(instance, row, isnew, **flags):
if isnew:
if self._should_log_debug:
self.logger.debug("set instance-level no loader on %s" % mapperutil.attribute_str(instance, self.key))
def create_row_processor(self, selectcontext, mapper, row):
if not self.is_default or len(selectcontext.options):
- def execute(instance, row, flags):
- if flags['isnew']:
+ def execute(instance, row, isnew, **flags):
+ if isnew:
if self._should_log_debug:
self.logger.debug("set instance-level lazy loader on %s" % mapperutil.attribute_str(instance, self.key))
# we are not the primary manager for this attribute on this class - set up a per-instance lazyloader,
self._init_instance_attribute(instance, callable_=self.setup_loader(instance, selectcontext.options))
return (execute, None)
else:
- def execute(instance, row, flags):
- if flags['isnew']:
+ def execute(instance, row, isnew, **flags):
+ if isnew:
if self._should_log_debug:
self.logger.debug("set class-level lazy loader on %s" % mapperutil.attribute_str(instance, self.key))
# we are the primary manager for this attribute on this class - reset its per-instance attribute state,
def create_row_processor(self, selectcontext, mapper, row):
row_decorator = self._create_row_decorator(selectcontext, row)
if row_decorator is not None:
- def execute(instance, row, flags):
+ def execute(instance, row, isnew, **flags):
if self in selectcontext.recursion_stack:
return
decorated_row = row_decorator(row)
if not self.uselist:
if self._should_log_debug:
self.logger.debug("eagerload scalar instance on %s" % mapperutil.attribute_str(instance, self.key))
- if flags['isnew']:
+ if isnew:
# set a scalar object instance directly on the parent object,
# bypassing InstrumentedAttribute event handlers.
instance.__dict__[self.key] = self.mapper._instance(selectcontext, decorated_row, None)
# so that we further descend into properties
self.mapper._instance(selectcontext, decorated_row, None)
else:
- if flags['isnew']:
+ if isnew:
if self._should_log_debug:
self.logger.debug("initialize UniqueAppender on %s" % mapperutil.attribute_str(instance, self.key))
]
from sqlalchemy import util, exceptions
-import inspect, weakref
+import inspect
try:
import cPickle as pickle
except:
import pickle
-_impl_cache = weakref.WeakKeyDictionary()
-
class AbstractType(object):
- def _get_impl_dict(self):
- try:
- return _impl_cache[self]
- except KeyError:
- return _impl_cache.setdefault(self, {})
-
- impl_dict = property(_get_impl_dict)
-
+ def __init__(self, *args, **kwargs):
+ pass
+
def copy_value(self, value):
return value
return "%s(%s)" % (self.__class__.__name__, ",".join(["%s=%s" % (k, getattr(self, k)) for k in inspect.getargspec(self.__init__)[0][1:]]))
class TypeEngine(AbstractType):
- def __init__(self, *args, **params):
- pass
-
def dialect_impl(self, dialect):
try:
- return self.impl_dict[dialect]
+ return self._impl_dict[dialect]
+ except AttributeError:
+ self._impl_dict = {}
+ return self._impl_dict.setdefault(dialect, dialect.type_descriptor(self))
except KeyError:
- return self.impl_dict.setdefault(dialect, dialect.type_descriptor(self))
+ return self._impl_dict.setdefault(dialect, dialect.type_descriptor(self))
def get_col_spec(self):
raise NotImplementedError()
def dialect_impl(self, dialect):
try:
- return self.impl_dict[dialect]
- except:
- typedesc = dialect.type_descriptor(self.impl)
- tt = self.copy()
- if not isinstance(tt, self.__class__):
- raise exceptions.AssertionError("Type object %s does not properly implement the copy() method, it must return an object of type %s" % (self, self.__class__))
- tt.impl = typedesc
- self.impl_dict[dialect] = tt
- return tt
+ return self._impl_dict[dialect]
+ except AttributeError:
+ self._impl_dict = {}
+ return self._impl_dict.setdefault(dialect, self._create_dialect_impl(dialect))
+ except KeyError:
+ return self._impl_dict.setdefault(dialect, self._create_dialect_impl(dialect))
+
+ def _create_dialect_impl(self, dialect):
+ typedesc = dialect.type_descriptor(self.impl)
+ tt = self.copy()
+ if not isinstance(tt, self.__class__):
+ raise exceptions.AssertionError("Type object %s does not properly implement the copy() method, it must return an object of type %s" % (self, self.__class__))
+ tt.impl = typedesc
+ return tt
def __getattr__(self, key):
"""Proxy all other undefined accessors to the underlying implementation."""
def testextensionoptions(self):
sess = create_session()
class ext1(MapperExtension):
- def populate_instance(self, mapper, selectcontext, row, instance, flags):
+ def populate_instance(self, mapper, selectcontext, row, instance, **flags):
"""test options at the Mapper._instance level"""
instance.TEST = "hello world"
return EXT_PASS
def select_by(self, *args, **kwargs):
"""test options at the Query level"""
return "HI"
- def populate_instance(self, mapper, selectcontext, row, instance, flags):
+ def populate_instance(self, mapper, selectcontext, row, instance, **flags):
"""test options at the Mapper._instance level"""
instance.TEST_2 = "also hello world"
return EXT_PASS
l = []
for x in range(1,NUM/DIVISOR):
l.append({'item_id':x, 'value':'this is item #%d' % x})
- print l
+ #print l
items.insert().execute(*l)
for x in range(1, NUM/DIVISOR):
l = []
for y in range(1, NUM/(NUM/DIVISOR)):
z = ((x-1) * NUM/(NUM/DIVISOR)) + y
l.append({'sub_id':z,'value':'this is iteim #%d' % z, 'parent_id':x})
- print l
+ #print l
subitems.insert().execute(*l)
def testload(self):
class Item(object):pass