to polymorphic mappers that are using a straight "outerjoin"
clause
- sql
+ - fixed "where"/"from" criterion of select() to accept a unicode string
+ in addition to regular string - both convert to text()
+ - added standalone distinct() function in addition to column.distinct()
+ [ticket:558]
- result.last_inserted_ids() should return a list that is identically
sized to the primary key constraint of the table. values that were
"passively" created and not available via cursor.lastrowid will be None.
- oracle
- datetime fixes: got subsecond TIMESTAMP to work [ticket:604],
added OracleDate which supports types.Date with only year/month/day
+ - added dialect flag "auto_convert_lobs", defaults to True; will cause any
+ LOB objects detected in a result set to be forced into OracleBinary
+ so that the LOB is read() automatically, if no typemap was present
+ (i.e., if a textual execute() was issued).
- sqlite
- sqlite better handles datetime/date/time objects mixed and matched
with various Date/Time/DateTime columns
- string PK column inserts dont get overwritten with OID [ticket:603]
- extensions
- added selectone_by() to assignmapper
-
+
0.3.8
- engines
- added detach() to Connection, allows underlying DBAPI connection
def get_result_proxy(self):
if self.cursor.description is not None:
- for column in self.cursor.description:
- type_code = column[1]
- if type_code in self.dialect.ORACLE_BINARY_TYPES:
+ if self.dialect.auto_convert_lobs and self.typemap is None:
+ typemap = {}
+ binary = False
+ for column in self.cursor.description:
+ type_code = column[1]
+ if type_code in self.dialect.ORACLE_BINARY_TYPES:
+ binary = True
+ typemap[column[0].lower()] = OracleBinary()
+ self.typemap = typemap
+ if binary:
return base.BufferedColumnResultProxy(self)
+ else:
+ for column in self.cursor.description:
+ type_code = column[1]
+ if type_code in self.dialect.ORACLE_BINARY_TYPES:
+ return base.BufferedColumnResultProxy(self)
return base.ResultProxy(self)
class OracleDialect(ansisql.ANSIDialect):
- def __init__(self, use_ansi=True, auto_setinputsizes=True, threaded=True, **kwargs):
+ def __init__(self, use_ansi=True, auto_setinputsizes=True, auto_convert_lobs=True, threaded=True, **kwargs):
ansisql.ANSIDialect.__init__(self, default_paramstyle='named', **kwargs)
self.use_ansi = use_ansi
self.threaded = threaded
self.supports_timestamp = self.dbapi is None or hasattr(self.dbapi, 'TIMESTAMP' )
self.auto_setinputsizes = auto_setinputsizes
+ self.auto_convert_lobs = auto_convert_lobs
if self.dbapi is not None:
self.ORACLE_BINARY_TYPES = [getattr(self.dbapi, k) for k in ["BFILE", "CLOB", "NCLOB", "BLOB", "LONG_BINARY", "LONG_STRING"] if hasattr(self.dbapi, k)]
else:
m = mapper(class_, extension=extension, *args, **kwargs)
class_.mapper = m
class_.query = classmethod(lambda cls: Query(class_, session=ctx.current))
- for name in ['get', 'filter', 'filter_by', 'select', 'select_by', 'selectfirst', 'selectfirst_by', 'selectone', 'get_by', 'join', 'count', 'count_by', 'options', 'instances']:
+ for name in ['get', 'filter', 'filter_by', 'select', 'select_by', 'selectfirst', 'selectfirst_by', 'selectone', 'selectone_by', 'get_by', 'join', 'count', 'count_by', 'options', 'instances']:
monkeypatch_query_method(ctx, class_, name)
for name in ['delete', 'expire', 'refresh', 'expunge', 'save', 'update', 'save_or_update']:
monkeypatch_objectstore_method(ctx, class_, name)
'CompoundSelect', 'Delete', 'FromClause', 'Insert', 'Join',
'Select', 'Selectable', 'TableClause', 'Update', 'alias', 'and_', 'asc',
'between_', 'bindparam', 'case', 'cast', 'column', 'delete',
- 'desc', 'except_', 'except_all', 'exists', 'extract', 'func', 'modifier',
+ 'desc', 'distinct', 'except_', 'except_all', 'exists', 'extract', 'func', 'modifier',
'insert', 'intersect', 'intersect_all', 'join', 'literal',
'literal_column', 'not_', 'null', 'or_', 'outerjoin', 'select',
'subquery', 'table', 'text', 'union', 'union_all', 'update',]
return clause._negate()
+def distinct(expr):
+ """return a ``DISTINCT`` clause."""
+
+ return _UnaryExpression(expr, operator="DISTINCT")
+
def between(ctest, cleft, cright):
"""Return a ``BETWEEN`` predicate clause.
return _Function(self.__names[-1], packagenames=self.__names[0:-1], *c, **o)
func = _FunctionGenerator()
+
+# TODO: use UnaryExpression for this instead ?
modifier = _FunctionGenerator(group=False)
+
def _compound_select(keyword, *selects, **kwargs):
return CompoundSelect(keyword, *selects, **kwargs)
s,
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)"
)
+
def testorderbysubquery(self):
self.runtest(
checkparams = {'myothertable_othername': 'asdf', 'myothertable_othername_1':'foo', 'myothertable_otherid': 9, 'mytable_myid': 12}
)
+ def testdistinct(self):
+ self.runtest(
+ select([table1.c.myid.distinct()]), "SELECT DISTINCT mytable.myid FROM mytable"
+ )
+
+ self.runtest(
+ select([distinct(table1.c.myid)]), "SELECT DISTINCT mytable.myid FROM mytable"
+ )
+
def testoperators(self):
self.runtest(
table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
"select * from foo where lala = bar"
)
+ # test bytestring
self.runtest(select(
["foobar(a)", "pk_foo_bar(syslaal)"],
"a = 12",
),
"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
+ # test unicode
+ self.runtest(select(
+ [u"foobar(a)", u"pk_foo_bar(syslaal)"],
+ u"a = 12",
+ from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"]
+ ),
+ u"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
+
# test building a select query programmatically with text
s = select()
s.append_column("column1")
Column('pickled', PickleType)
)
binary_table.create()
+
+ def tearDown(self):
+ binary_table.delete().execute()
+
def tearDownAll(self):
binary_table.drop()
binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100], pickled=testobj1)
binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2)
binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None)
- l = binary_table.select().execute().fetchall()
+ l = binary_table.select(order_by=binary_table.c.primary_id).execute().fetchall()
+ print type(stream1), type(l[0]['data']), type(l[0]['data_slice'])
print len(stream1), len(l[0]['data']), len(l[0]['data_slice'])
self.assert_(list(stream1) == list(l[0]['data']))
self.assert_(list(stream1[0:100]) == list(l[0]['data_slice']))
f = os.path.join(os.path.dirname(testbase.__file__), name)
# put a number less than the typical MySQL default BLOB size
return file(f).read(len)
-
+
+ @testbase.supported('oracle')
+ def test_oracle_autobinary(self):
+ stream1 =self.load_stream('binary_data_one.dat')
+ stream2 =self.load_stream('binary_data_two.dat')
+ binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100])
+ binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99])
+ binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None)
+ result = testbase.db.connect().execute("select primary_id, misc, data, data_slice from binary_table")
+ l = result.fetchall()
+ l[0]['data']
+ self.assert_(list(stream1) == list(l[0]['data']))
+ self.assert_(list(stream1[0:100]) == list(l[0]['data_slice']))
+ self.assert_(list(stream2) == list(l[1]['data']))
+
+
class DateTest(AssertMixin):
def setUpAll(self):
global users_with_date, insert_data