from elementtree.ElementTree import Element, SubElement
meta = MetaData()
-meta.engine = 'sqlite://'
+meta.bind = 'sqlite://'
################################# PART II - Table Metadata ###########################################
############################################ PART VI - Searching for Paths #######################################
# manually search for a document which contains "/somefile/header/field1:hi"
-print "\nManual search for /somefile/header/field1=='hi':", line
-n1 = elements.alias('n1')
-n2 = elements.alias('n2')
-n3 = elements.alias('n3')
-j = documents.join(n1).join(n2, n1.c.element_id==n2.c.parent_id).join(n3, n2.c.element_id==n3.c.parent_id)
-d = session.query(Document).select_from(j).filter(n1.c.tag=='somefile').filter(n2.c.tag=='header').filter(and_(n3.c.tag=='field1', n3.c.text=='hi')).one()
+d = session.query(Document).join('_root', aliased=True).filter(_Node.tag=='somefile').\
+ join('children', aliased=True, from_joinpoint=True).filter(_Node.tag=='header').\
+ join('children', aliased=True, from_joinpoint=True).filter(and_(_Node.tag=='field1', _Node.text=='hi')).\
+ one()
print d
# generalize the above approach into an extremely impoverished xpath function:
j = documents
prev_elements = None
query = session.query(Document)
+ attribute = '_root'
for i, match in enumerate(re.finditer(r'/([\w_]+)(?:\[@([\w_]+)(?:=(.*))?\])?', path)):
(token, attrname, attrvalue) = match.group(1, 2, 3)
- a = elements.alias("n%d" % i)
- query = query.filter(a.c.tag==token)
+ query = query.join(attribute, aliased=True, from_joinpoint=True).filter(_Node.tag==token)
+ attribute = 'children'
if attrname:
- attr_alias = attributes.alias('a%d' % i)
if attrvalue:
- query = query.filter(and_(a.c.element_id==attr_alias.c.element_id, attr_alias.c.name==attrname, attr_alias.c.value==attrvalue))
+ query = query.join('attributes', aliased=True, from_joinpoint=True).filter(and_(_Attribute.name==attrname, _Attribute.value==attrvalue))
else:
- query = query.filter(and_(a.c.element_id==attr_alias.c.element_id, attr_alias.c.name==attrname))
- if prev_elements is not None:
- j = j.join(a, prev_elements.c.element_id==a.c.parent_id)
- else:
- j = j.join(a)
- prev_elements = a
- return query.options(lazyload('_root')).select_from(j).filter(prev_elements.c.text==compareto).all()
+ query = query.join('attributes', aliased=True, from_joinpoint=True).filter(_Attribute.name==attrname)
+ return query.options(lazyload('_root')).filter(_Node.text==compareto).all()
for path, compareto in (
('/somefile/header/field1', 'hi'),
#logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
# uncomment to show SQL statements and result sets
-logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
+#logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
from elementtree import ElementTree
from elementtree.ElementTree import Element, SubElement
meta = MetaData()
-meta.engine = 'sqlite://'
+meta.bind = 'sqlite://'
################################# PART II - Table Metadata ###########################################
})
# the _Node objects change the way they load so that a list of _Nodes will organize
-# themselves hierarchically using the HierarchicalLoader. this depends on the ordering of
+# themselves hierarchically using the ElementTreeMarshal. this depends on the ordering of
# nodes being hierarchical as well; relation() always applies at least ROWID/primary key
# ordering to rows which will suffice.
mapper(_Node, elements, properties={
# manually search for a document which contains "/somefile/header/field1:hi"
print "\nManual search for /somefile/header/field1=='hi':", line
-n1 = elements.alias('n1')
-n2 = elements.alias('n2')
-n3 = elements.alias('n3')
-j = documents.join(n1).join(n2, n1.c.element_id==n2.c.parent_id).join(n3, n2.c.element_id==n3.c.parent_id)
-d = session.query(Document).select_from(j).filter(n1.c.tag=='somefile').filter(n2.c.tag=='header').filter(and_(n3.c.tag=='field1', n3.c.text=='hi')).one()
+d = session.query(Document).join('_nodes', aliased=True).filter(and_(_Node.parent_id==None, _Node.tag=='somefile')).\
+ join('children', aliased=True, from_joinpoint=True).filter(_Node.tag=='header').\
+ join('children', aliased=True, from_joinpoint=True).filter(and_(_Node.tag=='field1', _Node.text=='hi')).\
+ one()
print d
# generalize the above approach into an extremely impoverished xpath function:
j = documents
prev_elements = None
query = session.query(Document)
+ first = True
for i, match in enumerate(re.finditer(r'/([\w_]+)(?:\[@([\w_]+)(?:=(.*))?\])?', path)):
(token, attrname, attrvalue) = match.group(1, 2, 3)
- a = elements.alias("n%d" % i)
- query = query.filter(a.c.tag==token)
+ if first:
+ query = query.join('_nodes', aliased=True).filter(_Node.parent_id==None)
+ first = False
+ else:
+ query = query.join('children', aliased=True, from_joinpoint=True)
+ query = query.filter(_Node.tag==token)
if attrname:
- attr_alias = attributes.alias('a%d' % i)
+ query = query.join('attributes', aliased=True, from_joinpoint=True)
if attrvalue:
- query = query.filter(and_(a.c.element_id==attr_alias.c.element_id, attr_alias.c.name==attrname, attr_alias.c.value==attrvalue))
+ query = query.filter(and_(_Attribute.name==attrname, _Attribute.value==attrvalue))
else:
- query = query.filter(and_(a.c.element_id==attr_alias.c.element_id, attr_alias.c.name==attrname))
- if prev_elements is not None:
- j = j.join(a, prev_elements.c.element_id==a.c.parent_id)
- else:
- j = j.join(a)
- prev_elements = a
- return query.options(lazyload('_nodes')).select_from(j).filter(prev_elements.c.text==compareto).all()
+ query = query.filter(_Attribute.name==attrname)
+ return query.options(lazyload('_nodes')).filter(_Node.text==compareto).all()
for path, compareto in (
('/somefile/header/field1', 'hi'),
return aliased_column
def adapt_clause(self, clause):
- return self.aliased_column(clause)
-# return sql_util.ClauseAdapter(self.alias).traverse(clause, clone=True)
+ return sql_util.ClauseAdapter(self.alias).traverse(clause, clone=True)
def _create_row_adapter(self):
"""Return a callable which,
q = sess.query(User).join('addresses', aliased=True).filter(Address.email_address=='jack@bean.com')
assert [User(id=7)] == q.all()
+ q = sess.query(User).join('addresses', aliased=True).filter(or_(Address.email_address=='jack@bean.com', Address.email_address=='fred@fred.com'))
+ assert [User(id=7), User(id=9)] == q.all()
+
# test two aliasized paths, one to 'orders' and the other to 'orders','items'.
# one row is returned because user 7 has order 3 and also has order 1 which has item 1
# this tests a o2m join and a m2m join.