def __set__(self, document, element):
def traverse(node):
n = _Node()
- n.tag = node.tag
- n.text = node.text
- n.tail = node.tail
+ n.tag = unicode(node.tag)
+ n.text = unicode(node.text)
+ n.tail = unicode(node.tail)
n.children = [traverse(n2) for n2 in node]
- n.attributes = [_Attribute(k, v) for k, v in node.attrib.iteritems()]
+ n.attributes = [_Attribute(unicode(k), unicode(v)) for k, v in node.attrib.iteritems()]
return n
document._root = traverse(element.getroot())
############################################ PART VI - Searching for Paths #######################################
# manually search for a document which contains "/somefile/header/field1:hi"
-d = session.query(Document).join('_root', aliased=True).filter(_Node.tag=='somefile').\
- join('children', aliased=True, from_joinpoint=True).filter(_Node.tag=='header').\
- join('children', aliased=True, from_joinpoint=True).filter(and_(_Node.tag=='field1', _Node.text=='hi')).\
+d = session.query(Document).join('_root', aliased=True).filter(_Node.tag==u'somefile').\
+ join('children', aliased=True, from_joinpoint=True).filter(_Node.tag==u'header').\
+ join('children', aliased=True, from_joinpoint=True).filter(and_(_Node.tag==u'field1', _Node.text==u'hi')).\
one()
print d
return query.options(lazyload('_root')).filter(_Node.text==compareto).all()
for path, compareto in (
- ('/somefile/header/field1', 'hi'),
- ('/somefile/field1', 'hi'),
- ('/somefile/header/field2', 'there'),
- ('/somefile/header/field2[@attr=foo]', 'there')
+ (u'/somefile/header/field1', u'hi'),
+ (u'/somefile/field1', u'hi'),
+ (u'/somefile/header/field2', u'there'),
+ (u'/somefile/header/field2[@attr=foo]', u'there')
):
print "\nDocuments containing '%s=%s':" % (path, compareto), line
print [d.filename for d in find_document(path, compareto)]
def __set__(self, document, element):
def traverse(node):
n = _Node()
- n.tag = node.tag
- n.text = node.text
- n.tail = node.tail
+ n.tag = unicode(node.tag)
+ n.text = unicode(node.text)
+ n.tail = unicode(node.tail)
document._nodes.append(n)
n.children = [traverse(n2) for n2 in node]
- n.attributes = [_Attribute(k, v) for k, v in node.attrib.iteritems()]
+ n.attributes = [_Attribute(unicode(k), unicode(v)) for k, v in node.attrib.iteritems()]
return n
traverse(element.getroot())
# manually search for a document which contains "/somefile/header/field1:hi"
print "\nManual search for /somefile/header/field1=='hi':", line
-d = session.query(Document).join('_nodes', aliased=True).filter(and_(_Node.parent_id==None, _Node.tag=='somefile')).\
- join('children', aliased=True, from_joinpoint=True).filter(_Node.tag=='header').\
- join('children', aliased=True, from_joinpoint=True).filter(and_(_Node.tag=='field1', _Node.text=='hi')).\
+d = session.query(Document).join('_nodes', aliased=True).filter(and_(_Node.parent_id==None, _Node.tag==u'somefile')).\
+ join('children', aliased=True, from_joinpoint=True).filter(_Node.tag==u'header').\
+ join('children', aliased=True, from_joinpoint=True).filter(and_(_Node.tag==u'field1', _Node.text==u'hi')).\
one()
print d
return query.options(lazyload('_nodes')).filter(_Node.text==compareto).all()
for path, compareto in (
- ('/somefile/header/field1', 'hi'),
- ('/somefile/field1', 'hi'),
- ('/somefile/header/field2', 'there'),
- ('/somefile/header/field2[@attr=foo]', 'there')
+ (u'/somefile/header/field1', u'hi'),
+ (u'/somefile/field1', u'hi'),
+ (u'/somefile/header/field2', u'there'),
+ (u'/somefile/header/field2[@attr=foo]', u'there')
):
print "\nDocuments containing '%s=%s':" % (path, compareto), line
print [d.filename for d in find_document(path, compareto)]
from elementtree import ElementTree
-meta = MetaData()
-meta.engine = 'sqlite://'
+engine = create_engine('sqlite://')
+meta = MetaData(engine)
# stores a top level record of an XML document.
# the "element" column will store the ElementTree document as a BLOB.
return value.encode(dialect.encoding)
elif assert_unicode and not isinstance(value, (unicode, NoneType)):
if assert_unicode == 'warn':
- warnings.warn(RuntimeWarning("Unicode type received non-unicode bind param value %r" % value))
+ warnings.warn(RuntimeWarning("Unicode type received non-unicode bind param value %r" % value))
+ return value
else:
raise exceptions.InvalidRequestError("Unicode type received non-unicode bind param value %r" % value)
else:
def testassert(self):
import warnings
+
+ warnings.filterwarnings("always", r".*non-unicode bind")
+
+ # test that data still goes in if warning is emitted....
+ unicode_table.insert().execute(unicode_varchar='im not unicode')
+ assert select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('im not unicode', )]
+
warnings.filterwarnings("error", r".*non-unicode bind")
try:
assert False
except RuntimeWarning, e:
assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'", str(e)
-
+
unicode_engine = engines.utf8_engine(options={'convert_unicode':True, 'assert_unicode':True})
try:
try: