date/time processors used by SQLite, including
C and Python versions. [ticket:2382]
+ - [bug] Fixed bug whereby a table-bound Column
+ object named "<a>_<b>" which matched a column
+ labeled as "<tablename>_<colname>" could match
+ inappropriately when targeting in a result
+ set row. [ticket:2377]
+
- sqlite
- [bug] the "name" of an FK constraint in SQLite
is reflected as "None", not "0" or other
return NULL;
}
- indexobject = PyTuple_GetItem(record, 1);
+ indexobject = PyTuple_GetItem(record, 2);
if (indexobject == NULL)
return NULL;
def __getitem__(self, key):
try:
- processor, index = self._keymap[key]
+ processor, obj, index = self._keymap[key]
except KeyError:
- processor, index = self._parent._key_fallback(key)
+ processor, obj, index = self._parent._key_fallback(key)
except TypeError:
if isinstance(key, slice):
l = []
processor = type_._cached_result_processor(dialect, coltype)
processors.append(processor)
- rec = (processor, i)
+ rec = (processor, obj, i)
# indexes as keys. This is only needed for the Python version of
# RowProxy (the C version uses a faster path for integer indexes).
# columns colliding by name is not a problem as long as the
# user does not try to access them (ie use an index directly,
# or the more precise ColumnElement)
- keymap[name.lower()] = (processor, None)
+ keymap[name.lower()] = (processor, obj, None)
if dialect.requires_name_normalize:
colname = dialect.normalize_name(colname)
row.
"""
- rec = (processor, i) = self._keymap[origname.lower()]
+ rec = (processor, obj, i) = self._keymap[origname.lower()]
if self._keymap.setdefault(name, rec) is not rec:
- self._keymap[name] = (processor, None)
+ self._keymap[name] = (processor, obj, None)
def _key_fallback(self, key, raiseerr=True):
map = self._keymap
if key._label and key._label.lower() in map:
result = map[key._label.lower()]
elif hasattr(key, 'name') and key.name.lower() in map:
+ # match is only on name. search
+ # extra hard to make sure this isn't a column/
+ # label name overlap
result = map[key.name.lower()]
+
+ if result[1] is not None:
+ for obj in result[1]:
+ if key._compare_name_for_result(obj):
+ break
+ else:
+ result = None
if result is None:
if raiseerr:
raise exc.NoSuchColumnError(
return {
'_pickled_keymap': dict(
(key, index)
- for key, (processor, index) in self._keymap.iteritems()
+ for key, (processor, obj, index) in self._keymap.iteritems()
if isinstance(key, (basestring, int))
),
'keys': self.keys
self._processors = [None for _ in xrange(len(state['keys']))]
self._keymap = keymap = {}
for key, index in state['_pickled_keymap'].iteritems():
- keymap[key] = (None, index)
+ # not preserving "obj" here, unfortunately our
+ # proxy comparison fails with the unpickle
+ keymap[key] = (None, None, index)
self.keys = state['keys']
self._echo = False
# replace the all type processors by None processors.
metadata._processors = [None for _ in xrange(len(metadata.keys))]
keymap = {}
- for k, (func, index) in metadata._keymap.iteritems():
- keymap[k] = (None, index)
+ for k, (func, obj, index) in metadata._keymap.iteritems():
+ keymap[k] = (None, obj, index)
self._metadata._keymap = keymap
def fetchall(self):
self.element = col
self.name = name
+ @property
+ def proxy_set(self):
+ return self.element.proxy_set
+
@property
def type(self):
return self.element.type
return bool(self.proxy_set.intersection(othercolumn.proxy_set))
+ def _compare_name_for_result(self, other):
+ """Return True if the given column element compares to this one
+ when targeting within a result row."""
+
+ return hasattr(other, 'name') and hasattr(self, 'name') and \
+ other.name == self.name
+
def _make_proxy(self, selectable, name=None):
"""Create a new :class:`.ColumnElement` representing this
:class:`.ColumnElement` as it appears in the select list of a
self.type = sqltypes.to_instance(type_)
self.is_literal = is_literal
+ def _compare_name_for_result(self, other):
+ if self.table is not None and hasattr(other, 'proxy_set'):
+ return other.proxy_set.intersection(self.proxy_set)
+ else:
+ return super(ColumnClause, self).\
+ _compare_name_for_result(other)
+
def _get_table(self):
return self.__dict__['table']
def _set_table(self, table):
return list.__getitem__(self.l, i)
proxy = RowProxy(object(), MyList(['value']), [None], {'key'
- : (None, 0), 0: (None, 0)})
+ : (None, None, 0), 0: (None, None, 0)})
eq_(list(proxy), ['value'])
eq_(proxy[0], 'value')
eq_(proxy['key'], 'value')
from sqlalchemy.engine import RowProxy
row = RowProxy(object(), ['value'], [None], {'key'
- : (None, 0), 0: (None, 0)})
+ : (None, None, 0), 0: (None, None, 0)})
assert isinstance(row, collections.Sequence)
@testing.requires.cextensions
"NULL AS c4, t3.c5, 'c' AS q1 FROM t3"
)
+
+class NameConflictTest(fixtures.MappedTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ content = Table('content', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('type', String(30))
+ )
+ foo = Table('foo', metadata,
+ Column('id', Integer, ForeignKey('content.id'),
+ primary_key=True),
+ Column('content_type', String(30))
+ )
+
+ def test_name_conflict(self):
+ class Content(object):
+ pass
+ class Foo(Content):
+ pass
+ mapper(Content, self.tables.content,
+ polymorphic_on=self.tables.content.c.type)
+ mapper(Foo, self.tables.foo, inherits=Content,
+ polymorphic_identity='foo')
+ sess = create_session()
+ f = Foo()
+ f.content_type = u'bar'
+ sess.add(f)
+ sess.flush()
+ f_id = f.id
+ sess.expunge_all()
+ assert sess.query(Content).get(f_id).content_type == u'bar'
self.assert_(not (rp != equal))
self.assert_(not (equal != equal))
+ @testing.provide_metadata
+ def test_column_label_overlap_fallback(self):
+ content = Table('content', self.metadata,
+ Column('type', String(30)),
+ )
+ bar = Table('bar', self.metadata,
+ Column('content_type', String(30))
+ )
+ self.metadata.create_all(testing.db)
+ testing.db.execute(content.insert().values(type="t1"))
+ row = testing.db.execute(content.select(use_labels=True)).first()
+ assert content.c.type in row
+ assert bar.c.content_type not in row
+ assert sql.column('content_type') in row
+
+ row = testing.db.execute(select([content.c.type.label("content_type")])).first()
+ assert content.c.type in row
+ assert bar.c.content_type not in row
+ assert sql.column('content_type') in row
+
+ row = testing.db.execute(select([(content.c.type > 5).label("content_type")])).first()
+ assert content.c.type in row
+ assert bar.c.content_type not in row
+ assert sql.column('content_type') in row
+
def test_pickled_rows(self):
users.insert().execute(
{'user_id':7, 'user_name':'jack'},
eq_(result[0][users.c.user_id], 7)
eq_(result[0][users.c.user_name], 'jack')
- if use_labels:
+ if not pickle or use_labels:
assert_raises(exc.NoSuchColumnError, lambda: result[0][addresses.c.user_id])
else:
# test with a different table. name resolution is