. FBDialect.table_names() doesn't bring system tables (ticket:796).
. FB now reflects Column's nullable property correctly.
+- Fixed SQL compiler's awareness of top-level column labels as used
+ in result-set processing; nested selects which contain the same column
+ names don't affect the result or conflict with result-column metadata.
+
+- query.get() and related functions (like many-to-one lazyloading)
+ use compile-time-aliased bind parameter names, to prevent
+ name conflicts with bind parameters that already exist in the
+ mapped selectable.
+
- Fixed three- and multi-level select and deferred inheritance loading
(i.e. abc inheritance with no select_table), [ticket:795]
self.__log("Identified primary key columns: " + str(primary_key))
_get_clause = sql.and_()
+ _get_params = {}
for primary_key in self.primary_key:
- _get_clause.clauses.append(primary_key == sql.bindparam(primary_key._label, type_=primary_key.type, unique=True))
- self._get_clause = _get_clause
+ bind = sql.bindparam(None, type_=primary_key.type)
+ _get_params[primary_key] = bind
+ _get_clause.clauses.append(primary_key == bind)
+ self._get_clause = (_get_clause, _get_params)
def _get_equivalent_columns(self):
"""Create a map of all *equivalent* columns, based on
ident = util.to_list(ident)
params = {}
+ (_get_clause, _get_params) = self.select_mapper._get_clause
for i, primary_key in enumerate(self.primary_key_columns):
try:
- params[primary_key._label] = ident[i]
+ params[_get_params[primary_key].key] = ident[i]
except IndexError:
raise exceptions.InvalidRequestError("Could not find enough values to formulate primary key for query.get(); primary key columns are %s" % ', '.join(["'%s'" % str(c) for c in self.primary_key_columns]))
try:
q = self
if lockmode is not None:
q = q.with_lockmode(lockmode)
- q = q.filter(self.select_mapper._get_clause)
+ q = q.filter(_get_clause)
q = q.params(params)._select_context_options(populate_existing=reload, version_check=(lockmode is not None))
# call using all() to avoid LIMIT compilation complexity
return q.all()[0]
raise exceptions.InvalidRequestError("Parent instance %s is not bound to a Session; deferred load operation of attribute '%s' cannot proceed" % (instance.__class__, self.key))
if create_statement is None:
- clause = localparent._get_clause
+ (clause, param_map) = localparent._get_clause
ident = instance._instance_key[1]
params = {}
for i, primary_key in enumerate(localparent.primary_key):
- params[primary_key._label] = ident[i]
+ params[param_map[primary_key].key] = ident[i]
statement = sql.select([p.columns[0] for p in group], clause, from_obj=[localparent.mapped_table], use_labels=True)
else:
statement, params = create_statement()
# determine if our "lazywhere" clause is the same as the mapper's
# get() clause. then we can just use mapper.get()
#from sqlalchemy.orm import query
- self.use_get = not self.uselist and self.mapper._get_clause.compare(self.lazywhere)
+ self.use_get = not self.uselist and self.mapper._get_clause[0].compare(self.lazywhere)
if self.use_get:
self.logger.info(str(self.parent_property) + " will use query.get() to optimize instance loads")
def visit_label(self, label):
labelname = self._truncated_identifier("colident", label.name)
- if self.stack and self.stack[-1].get('select'):
+ if len(self.stack) == 1 and self.stack[-1].get('select'):
self.typemap.setdefault(labelname.lower(), label.obj.type)
if isinstance(label.obj, sql._ColumnClause):
self.column_labels[label.obj._label] = labelname
else:
name = column.name
- if self.stack and self.stack[-1].get('select'):
+ if len(self.stack) == 1 and self.stack[-1].get('select'):
# if we are within a visit to a Select, set up the "typemap"
# for this column which is used to translate result set values
self.typemap.setdefault(name.lower(), column.type)
orderby = str(orders.default_order_by()[0].compile(bind=testbase.db))
self.assert_sql(testbase.db, go, [
("SELECT orders.order_id AS orders_order_id, orders.user_id AS orders_user_id, orders.isopen AS orders_isopen FROM orders ORDER BY %s" % orderby, {}),
- ("SELECT orders.description AS orders_description FROM orders WHERE orders.order_id = :orders_order_id", {'orders_order_id':3})
+ ("SELECT orders.description AS orders_description FROM orders WHERE orders.order_id = :param_1", {'param_1':3})
])
def testunsaved(self):
orderby = str(orders.default_order_by()[0].compile(testbase.db))
self.assert_sql(testbase.db, go, [
("SELECT orders.order_id AS orders_order_id FROM orders ORDER BY %s" % orderby, {}),
- ("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders WHERE orders.order_id = :orders_order_id", {'orders_order_id':3})
+ ("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders WHERE orders.order_id = :param_1", {'param_1':3})
])
o2 = q.select()[2]
orderby = str(orders.default_order_by()[0].compile(testbase.db))
self.assert_sql(testbase.db, go, [
("SELECT orders.order_id AS orders_order_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders ORDER BY %s" % orderby, {}),
- ("SELECT orders.user_id AS orders_user_id FROM orders WHERE orders.order_id = :orders_order_id", {'orders_order_id':3})
+ ("SELECT orders.user_id AS orders_user_id FROM orders WHERE orders.order_id = :param_1", {'param_1':3})
])
sess.clear()
q3 = q2.options(undefer('user_id'))
u2 = s.query(User).get(7)
assert u is not u2
+ def test_unique_param_names(self):
+ class SomeUser(object):
+ pass
+ s = users.select(users.c.id!=12).alias('users')
+ m = mapper(SomeUser, s)
+ print s.primary_key
+ print m.primary_key
+ assert s.primary_key == m.primary_key
+
+ row = s.select(use_labels=True).execute().fetchone()
+ print row[s.primary_key[0]]
+
+ sess = create_session()
+ assert sess.query(SomeUser).get(7).name == 'jack'
+
def test_load(self):
s = create_session()
except exceptions.InvalidRequestError, e:
assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \
str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement."
-
+
+ def test_column_label_targeting(self):
+ users.insert().execute(user_id=7, user_name='ed')
+
+ for s in (
+ users.select().alias('foo'),
+ users.select().alias(users.name),
+ ):
+ row = s.select(use_labels=True).execute().fetchone()
+ assert row[s.c.user_id] == 7
+ assert row[s.c.user_name] == 'ed'
+
def test_keys(self):
users.insert().execute(user_id=1, user_name='foo')
r = users.select().execute().fetchone()