from sqlalchemy import bindparam
- def search_for_user(session, username, email=None):
+ def search_for_user(session, username, email=None):
baked_query = bakery(lambda session: session.query(User))
- baked_query += lambda q: q.filter(User.name == bindparam('username'))
+ baked_query += lambda q: q.filter(User.name == bindparam("username"))
baked_query += lambda q: q.order_by(User.id)
if email:
- baked_query += lambda q: q.filter(User.email == bindparam('email'))
+ baked_query += lambda q: q.filter(User.email == bindparam("email"))
result = baked_query(session).params(username=username, email=email).all()
s = Session(bind=engine)
for id_ in random.sample(ids, n):
q = bakery(lambda s: s.query(Customer))
- q += lambda q: q.filter(Customer.id == bindparam('id'))
+ q += lambda q: q.filter(Customer.id == bindparam("id"))
q(s).params(id=id_).one()
The difference in Python function call count for an iteration of 10000
my_simple_cache = {}
+
def lookup(session, id_argument):
if "my_key" not in my_simple_cache:
- query = session.query(Model).filter(Model.id == bindparam('id'))
+ query = session.query(Model).filter(Model.id == bindparam("id"))
my_simple_cache["my_key"] = query.with_session(None)
else:
query = my_simple_cache["my_key"].with_session(session)
my_simple_cache = {}
def lookup(session, id_argument):
-
if "my_key" not in my_simple_cache:
- query = session.query(Model).filter(Model.id == bindparam('id'))
+ query = session.query(Model).filter(Model.id == bindparam("id"))
my_simple_cache["my_key"] = query.with_session(None).bake()
else:
query = my_simple_cache["my_key"].with_session(session)
bakery = baked.bakery()
+
def lookup(session, id_argument):
def create_model_query(session):
- return session.query(Model).filter(Model.id == bindparam('id'))
+ return session.query(Model).filter(Model.id == bindparam("id"))
parameterized_query = bakery.bake(create_model_query)
return parameterized_query(session).params(id=id_argument).all()
my_simple_cache = {}
+
def lookup(session, id_argument, include_frobnizzle=False):
if include_frobnizzle:
cache_key = "my_key_with_frobnizzle"
cache_key = "my_key_without_frobnizzle"
if cache_key not in my_simple_cache:
- query = session.query(Model).filter(Model.id == bindparam('id'))
+ query = session.query(Model).filter(Model.id == bindparam("id"))
if include_frobnizzle:
query = query.filter(Model.frobnizzle == True)
bakery = baked.bakery()
+
def lookup(session, id_argument, include_frobnizzle=False):
def create_model_query(session):
- return session.query(Model).filter(Model.id == bindparam('id'))
+ return session.query(Model).filter(Model.id == bindparam("id"))
parameterized_query = bakery.bake(create_model_query)
return query.filter(Model.frobnizzle == True)
parameterized_query = parameterized_query.with_criteria(
- include_frobnizzle_in_query)
+ include_frobnizzle_in_query
+ )
return parameterized_query(session).params(id=id_argument).all()
bakery = baked.bakery()
+
def lookup(session, id_argument, include_frobnizzle=False):
parameterized_query = bakery.bake(
- lambda s: s.query(Model).filter(Model.id == bindparam('id'))
- )
+ lambda s: s.query(Model).filter(Model.id == bindparam("id"))
+ )
if include_frobnizzle:
parameterized_query += lambda q: q.filter(Model.frobnizzle == True)
baked_query = bakery(lambda session: session.query(User))
baked_query += lambda q: q.filter(
- User.name.in_(bindparam('username', expanding=True)))
+ User.name.in_(bindparam("username", expanding=True))
+ )
- result = baked_query.with_session(session).params(
- username=['ed', 'fred']).all()
+ result = baked_query.with_session(session).params(username=["ed", "fred"]).all()
.. seealso::
# select a correlated subquery in the top columns list,
# we have the "session" argument, pass that
- my_q = bakery(
- lambda s: s.query(Address.id, my_subq.to_query(s).as_scalar()))
+ my_q = bakery(lambda s: s.query(Address.id, my_subq.to_query(s).as_scalar()))
# use a correlated subquery in some of the criteria, we have
# the "query" argument, pass that.
still to allow the result to be cached, the event can be registered
passing the ``bake_ok=True`` flag::
- @event.listens_for(
- Query, "before_compile", retval=True, bake_ok=True)
+ @event.listens_for(Query, "before_compile", retval=True, bake_ok=True)
def my_event(query):
for desc in query.column_descriptions:
- if desc['type'] is User:
- entity = desc['entity']
+ if desc["type"] is User:
+ entity = desc["entity"]
query = query.filter(entity.deleted == False)
return query