def create_connect_args(self, url):
opts = url.translate_connect_args(['host', 'database', 'user', 'password', 'port'])
opts.update(url.query)
- if opts.has_key('auto_identity_insert'):
+ if 'auto_identity_insert' in opts:
self.auto_identity_insert = bool(int(opts.pop('auto_identity_insert')))
- if opts.has_key('query_timeout'):
+ if 'query_timeout' in opts:
self.query_timeout = int(opts.pop('query_timeout'))
- if opts.has_key('text_as_varchar'):
+ if 'text_as_varchar' in opts:
self.text_as_varchar = bool(int(opts.pop('text_as_varchar')))
- if opts.has_key('use_scope_identity'):
+ if 'use_scope_identity' in opts:
self.use_scope_identity = bool(int(opts.pop('use_scope_identity')))
return self.make_connect_string(opts)
def _schema_aliased_table(self, table):
if getattr(table, 'schema', None) is not None:
- if not self.tablealiases.has_key(table):
+ if table not in self.tablealiases:
self.tablealiases[table] = table.alias()
return self.tablealiases[table]
else:
"""
for c in insert.table.primary_key:
- if not self.parameters.has_key(c.key):
+ if c.key not in self.parameters:
self.parameters[c.key] = None
return ansisql.ANSICompiler.visit_insert(self, insert)
def create_connect_args(self, url):
opts = url.translate_connect_args(['host', 'database', 'user', 'password', 'port'])
- if opts.has_key('port'):
+ if 'port' in opts:
opts['port'] = int(opts['port'])
opts.update(url.query)
return ([], opts)
selectable = self.get_selectable(mapper)
for c in mapper.mapped_table.c:
c2 = selectable.corresponding_column(c, keys_ok=True, raiseerr=False)
- if c2 and row.has_key(c2):
+ if c2 and c2 in row:
newrow[c] = row[c2]
return newrow
def rollback(self, manager, obj):
for attr in manager.managed_attributes(obj.__class__):
- if self.data.has_key(attr.key):
+ if attr.key in self.data:
if not isinstance(attr, InstrumentedCollectionAttribute):
obj.__dict__[attr.key] = self.data[attr.key]
else:
def update(fn):
def update(self, other):
for key in other.keys():
- if not self.has_key(key) or self[key] is not other[key]:
+ if key not in self or self[key] is not other[key]:
self[key] = other[key]
_tidy(update)
return update
if self.inherits is not None:
for key, prop in self.inherits.__props.iteritems():
- if not self.__props.has_key(key):
+ if key not in self.__props:
self._adapt_inherited_property(key, prop)
# load properties from the main table object,
# not overriding those set up in the 'properties' argument
for column in self.mapped_table.columns:
- if self.columntoproperty.has_key(column):
+ if column in self.columntoproperty:
continue
- if not self.columns.has_key(column.key):
+ if column.key not in self.columns:
self.columns[column.key] = self.select_table.corresponding_column(column, keys_ok=True, raiseerr=True)
column_key = (self.column_prefix or '') + column.key
if self.non_primary:
return
- if not self.non_primary and (mapper_registry.has_key(self.class_key)):
+ if not self.non_primary and (self.class_key in mapper_registry):
raise exceptions.ArgumentError("Class '%s' already has a primary mapper defined with entity name '%s'. Use non_primary=True to create a non primary Mapper, or to create a new primary mapper, remove this mapper first via sqlalchemy.orm.clear_mapper(mapper), or preferably sqlalchemy.orm.clear_mappers() to clear all mappers." % (self.class_, self.entity_name))
attribute_manager.reset_class_managed(self.class_)
if not pk:
return False
for k in pk:
- if not self.columntoproperty.has_key(k):
+ if k not in self.columntoproperty:
return False
else:
return True
raise exceptions.ConcurrentModificationError("Instance '%s' version of %s does not match %s" % (instance, self.get_attr_by_column(instance, self.version_id_col), row[self.version_id_col]))
if populate_existing or context.session.is_expired(instance, unexpire=True):
- if not context.identity_map.has_key(identitykey):
+ if identitykey not in context.identity_map:
context.identity_map[identitykey] = instance
isnew = True
if extension.populate_instance(self, context, row, instance, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE:
newrow = util.DictDecorator(row)
for c in tomapper.mapped_table.c:
c2 = self.mapped_table.corresponding_column(c, keys_ok=True, raiseerr=False)
- if c2 and row.has_key(c2):
+ if c2 and c2 in row:
newrow[c] = row[c2]
return newrow
if self.__parent is not None:
return self.__parent.add(bind)
- if self.__connections.has_key(bind.engine):
+ if bind.engine in self.__connections:
raise exceptions.InvalidRequestError("Session already has a Connection associated for the given %sEngine" % (isinstance(bind, engine.Connection) and "Connection's " or ""))
return self.get_or_add(bind)
if not self.nested:
return self.__parent.get_or_add(bind)
- if self.__connections.has_key(bind):
+ if bind in self.__connections:
return self.__connections[bind][0]
if bind in self.__parent._connection_dict():
(conn, trans, autoclose) = self.__parent.__connections[bind]
self.__connections[conn] = self.__connections[bind.engine] = (conn, conn.begin_nested(), autoclose)
return conn
- elif self.__connections.has_key(bind):
+ elif bind in self.__connections:
return self.__connections[bind][0]
if not isinstance(bind, engine.Connection):
if mapper is not None:
if isinstance(mapper, type):
mapper = _class_mapper(mapper)
- if self.__binds.has_key(mapper.base_mapper):
+ if mapper.base_mapper in self.__binds:
return self.__binds[mapper.base_mapper]
- elif self.__binds.has_key(mapper.compile().mapped_table):
+ elif mapper.compile().mapped_table in self.__binds:
return self.__binds[mapper.mapped_table]
if clause is not None:
for t in clause._table_iterator():
def _save_impl(self, object, **kwargs):
if hasattr(object, '_instance_key'):
- if not self.identity_map.has_key(object._instance_key):
+ if object._instance_key not in self.identity_map:
raise exceptions.InvalidRequestError("Instance '%s' is a detached instance "
"or is already persistent in a "
"different Session" % repr(object))
old_id = getattr(obj, '_sa_session_id', None)
if old_id != self.hash_key:
- if old_id is not None and _sessions.has_key(old_id):
+ if old_id is not None and old_id in _sessions:
raise exceptions.InvalidRequestError("Object '%s' is already attached "
"to session '%s' (this is '%s')" %
(repr(obj), old_id, id(self)))
result of True.
"""
- return self._is_attached(obj) and (obj in self.uow.new or self.identity_map.has_key(obj._instance_key))
+ return self._is_attached(obj) and (obj in self.uow.new or obj._instance_key in self.identity_map)
def __iter__(self):
"""return an iterator of all objects which are pending or persistent within this Session."""
def has_key(self, key):
"""return True if the given identity key is present within this Session's identity map."""
- return self.identity_map.has_key(key)
+ return key in self.identity_map
dirty = property(lambda s:s.uow.locate_dirty(),
doc="A ``Set`` of all objects marked as 'dirty' within this ``Session``")
#print "creating row decorator for path ", "->".join([str(s) for s in path])
# check for a user-defined decorator in the SelectContext (which was set up by the contains_eager() option)
- if selectcontext.attributes.has_key(("eager_row_processor", self.parent_property)):
+ if ("eager_row_processor", self.parent_property) in selectcontext.attributes:
# custom row decoration function, placed in the selectcontext by the
# contains_eager() mapper option
decorator = selectcontext.attributes[("eager_row_processor", self.parent_property)]
pass
def _validate_obj(self, obj):
- if (hasattr(obj, '_instance_key') and not self.identity_map.has_key(obj._instance_key)) or \
+ if (hasattr(obj, '_instance_key') and obj._instance_key not in self.identity_map) or \
(not hasattr(obj, '_instance_key') and obj not in self.new):
raise exceptions.InvalidRequestError("Instance '%s' is not attached or pending within this session" % repr(obj))
def _is_valid(self, obj):
- if (hasattr(obj, '_instance_key') and not self.identity_map.has_key(obj._instance_key)) or \
+ if (hasattr(obj, '_instance_key') and obj._instance_key not in self.identity_map) or \
(not hasattr(obj, '_instance_key') and obj not in self.new):
return False
else:
parenttask.append(None, listonly=False, isdelete=originating_task._objects[node.item].isdelete, childtask=t)
t.append(node.item, originating_task._objects[node.item].listonly, isdelete=originating_task._objects[node.item].isdelete)
- if dependencies.has_key(node.item):
+ if node.item in dependencies:
for depprocessor, deptask in dependencies[node.item].iteritems():
t.cyclical_dependencies.add(depprocessor.branch(deptask))
nd = {}
return super(TranslatingDict, self).__getitem__(self.__translate_col(col))
def has_key(self, col):
- return super(TranslatingDict, self).has_key(self.__translate_col(col))
+ return col in self
def __setitem__(self, col, value):
return super(TranslatingDict, self).__setitem__(self.__translate_col(col), value)
def __contains__(self, col):
- return self.has_key(col)
+ return super(TranslatingDict, self).__contains__(self.__translate_col(col))
def setdefault(self, col, value):
return super(TranslatingDict, self).setdefault(self.__translate_col(col), value)
else:
# TODO: this is squirrely. we shouldnt have to hold onto engines
# in a case like this
- if not self.__engines.has_key(bind):
+ if bind not in self.__engines:
self.__engines[bind] = bind
self.context._engine = bind
# Allow an aliased column to replace an unaliased column of the
# same name.
- if self.has_key(column.name):
+ if column.name in self:
other = self[column.name]
if other.name == other.key:
del self[other.name]
def __contains__(self, other):
if not isinstance(other, basestring):
raise exceptions.ArgumentError("__contains__ requires a string argument")
- return self.has_key(other)
+ return util.OrderedProperties.__contains__(self, other)
def contains_column(self, col):
# have to use a Set here, because it will compare the identity
"""Add an edge to this collection."""
(parentnode, childnode) = edge
- if not self.parent_to_children.has_key(parentnode):
+ if parentnode not in self.parent_to_children:
self.parent_to_children[parentnode] = util.Set()
self.parent_to_children[parentnode].add(childnode)
- if not self.child_to_parents.has_key(childnode):
+ if childnode not in self.child_to_parents:
self.child_to_parents[childnode] = util.Set()
self.child_to_parents[childnode].add(parentnode)
parentnode.dependencies.add(childnode)
return None
def has_parents(self, node):
- return self.child_to_parents.has_key(node) and len(self.child_to_parents[node]) > 0
+ return node in self.child_to_parents and len(self.child_to_parents[node]) > 0
def edges_by_parent(self, node):
- if self.parent_to_children.has_key(node):
+ if node in self.parent_to_children:
return [(node, child) for child in self.parent_to_children[node]]
else:
return []
nodes = {}
edges = _EdgeCollection()
for item in allitems + [t[0] for t in tuples] + [t[1] for t in tuples]:
- if not nodes.has_key(item):
+ if item not in nodes:
node = _Node(item)
nodes[item] = node
cycset = util.Set(cycle)
for x in cycle:
involved_in_cycles.add(x)
- if cycles.has_key(x):
+ if x in cycles:
existing_set = cycles[x]
[existing_set.add(y) for y in cycset]
for y in existing_set:
return key in self._data
def get(self, key, default=None):
- if self.has_key(key):
+ if key in self:
return self[key]
else:
return default
self.update(kwargs)
def setdefault(self, key, value):
- if not self.has_key(key):
+ if key not in self:
self.__setitem__(key, value)
return value
else:
return iter(self.items())
def __setitem__(self, key, object):
- if not self.has_key(key):
+ if key not in self:
self._list.append(key)
dict.__setitem__(self, key, object)
for k in kwargs:
setattr(self, k, kwargs[k])
- def __repr__(self):
- return "%s(%s)" % (
- (self.__class__.__name__),
- ','.join(["%s=%s" % (key, repr(getattr(self, key))) for key in self.__dict__ if not key.startswith('_')])
- )
+ # TODO: add recursion checks to this
+ #def __repr__(self):
+ # return "%s(%s)" % (
+ # (self.__class__.__name__),
+ # ','.join(["%s=%s" % (key, repr(getattr(self, key))) for key in self.__dict__ if not key.startswith('_')])
+ # )
+
def __ne__(self, other):
return not self.__eq__(other)
Session.commit()
Session.delete(u)
Session.commit()
- self.assert_(a.address_id is not None and a.user_id is None and not Session.identity_map.has_key(u._instance_key) and Session.identity_map.has_key(a._instance_key))
+ self.assert_(a.address_id is not None and a.user_id is None and u._instance_key not in Session.identity_map and a._instance_key in Session.identity_map)
def test_onetoone(self):
m = mapper(User, users, properties = dict(
def testreflect(self):
meta2 = MetaData(testbase.db)
t2 = Table('WorstCase2', meta2, autoload=True, quote=True)
- assert t2.c.has_key('MixedCase')
+ assert 'MixedCase' in t2.c
def testlabels(self):
table1.insert().execute({'lowercase':1,'UPPERCASE':2,'MixedCase':3,'a123':4},
# asserting a dictionary of statements->parameters
# this is to specify query assertions where the queries can be in
# multiple orderings
- if not item.has_key('_converted'):
+ if '_converted' not in item:
for key in item.keys():
ckey = self.convert_statement(key)
item[ckey] = item[key]