- the "else_" parameter to the case statement now properly works when
set to zero.
+
- oracle:
- got binary working for any size input ! cx_oracle works fine,
it was my fault as BINARY was being passed and not BLOB for
- query() method is added by assignmapper. this helps with
navigating to all the new generative methods on Query.
+
+- ms-sql:
+ - removed seconds input on DATE column types (probably
+ should remove the time altogether)
+
+ - null values in float fields no longer raise errors
+
+ - LIMIT with OFFSET now raises an error (MS-SQL has no OFFSET support)
+
+
0.3.5
- sql:
# ADODBAPI has a non-standard Connection method
connect = dbmodule.Connection
def make_connect_string(keys):
- return [["Provider=SQLOLEDB;Data Source=%s;User Id=%s;Password=%s;Initial Catalog=%s" % (
- keys.get("host"), keys.get("user"), keys.get("password", ""), keys.get("database"))], {}]
+ connectors = ["Provider=SQLOLEDB"]
+ connectors.append ("Data Source=%s" % keys.get("host"))
+ connectors.append ("Initial Catalog=%s" % keys.get("database"))
+ user = keys.get("user")
+ if user:
+ connectors.append("User Id=%s" % user)
+ connectors.append("Password=%s" % keys.get("password", ""))
+ else:
+ connectors.append("Integrated Security=SSPI")
+ return [[";".join (connectors)], {}]
sane_rowcount = True
dialect = MSSQLDialect
colspecs[sqltypes.Unicode] = AdoMSUnicode
import pyodbc as dbmodule
connect = dbmodule.connect
def make_connect_string(keys):
- return [["Driver={SQL Server};Server=%s;UID=%s;PWD=%s;Database=%s" % (
- keys.get("host"), keys.get("user"), keys.get("password", ""), keys.get("database"))], {}]
+ connectors = ["Driver={SQL Server}"]
+ connectors.append("Server=%s" % keys.get("host"))
+ connectors.append("Database=%s" % keys.get("database"))
+ user = keys.get("user")
+ if user:
+ connectors.append("UID=%s" % user)
+ connectors.append("PWD=%s" % keys.get("password", ""))
+ else:
+ connectors.append ("TrustedConnection=Yes")
+ return [[";".join (connectors)], {}]
do_commit = True
sane_rowcount = False
dialect = MSSQLDialect
def convert_bind_param(self, value, dialect):
"""By converting to string, we can use Decimal types round-trip."""
- return str(value)
+ return value and str(value) or None
class MSInteger(sqltypes.Integer):
def get_col_spec(self):
def convert_bind_param(self, value, dialect):
if value and hasattr(value, "isoformat"):
- return value.strftime('%Y-%m-%d %H:%M:%S')
+ return value.strftime('%Y-%m-%d %H:%M')
return value
def convert_result_value(self, value, dialect):
def __init__(self, dialect):
self.IINSERT = self.HASIDENT = False
super(MSSQLExecutionContext, self).__init__(dialect)
-
+
+ def _has_implicit_sequence(self, column):
+ if column.primary_key and column.autoincrement:
+ if isinstance(column.type, sqltypes.Integer) and not column.foreign_key:
+ if column.default is None or (isinstance(column.default, schema.Sequence) and \
+ column.default.optional):
+ return True
+ return False
+
def pre_exec(self, engine, proxy, compiled, parameters, **kwargs):
"""MS-SQL has a special mode for inserting non-NULL values
into IDENTITY columns.
Activate it if the feature is turned on and needed.
"""
-
if getattr(compiled, "isinsert", False):
tbl = compiled.statement.table
- if not hasattr(tbl, 'has_sequence'):
+ if not hasattr(tbl, 'has_sequence'):
+ tbl.has_sequence = False
for column in tbl.c:
- if column.primary_key and column.autoincrement and \
- isinstance(column.type, sqltypes.Integer) and not column.foreign_key:
- if column.default is None or (isinstance(column.default, schema.Sequence) and \
- column.default.optional):
- tbl.has_sequence = column
- break
- else:
- tbl.has_sequence = False
+ if getattr(column, 'sequence', False) or self._has_implicit_sequence(column):
+ tbl.has_sequence = column
+ break
self.HASIDENT = bool(tbl.has_sequence)
if engine.dialect.auto_identity_insert and self.HASIDENT:
row[columns.c.column_default]
)
+ # cope with varchar(max)
+ if charlen == -1:
+ charlen = None
+
args = []
for a in (charlen, numericprec, numericscale):
if a is not None:
def visit_select_precolumns(self, select):
""" MS-SQL puts TOP, it's version of LIMIT here """
s = select.distinct and "DISTINCT " or ""
- if (select.limit):
+ if select.limit:
s += "TOP %s " % (select.limit,)
+ if select.offset:
+ raise exceptions.InvalidRequestError('MSSQL does not support LIMIT with an offset')
return s
- def limit_clause(self, select):
- # Limit in mssql is after the select keyword; MSsql has no support for offset
+ def limit_clause(self, select):
+ # Limit in mssql is after the select keyword
return ""
def visit_table(self, table):
use_default()
+
c2 = p.connect()
assert id(c2.connection) == c_id
c2.close()
- time.sleep(3)
+ time.sleep(4)
c3= p.connect()
assert id(c3.connection) != c_id
testbase.db.execute("drop table django_admin_log")
testbase.db.execute("drop table django_content_type")
+ @testbase.unsupported('mssql')
def testmultipk(self):
"""test that creating a table checks for a sequence before creating it"""
meta = BoundMetaData(testbase.db)
res = self.query.select_by(range=5)
assert res.order_by([Foo.c.bar])[0].bar == 5
assert res.order_by([desc(Foo.c.bar)])[0].bar == 95
-
+
+ @testbase.unsupported('mssql')
def test_slice(self):
assert self.res[1] == self.orig[1]
assert list(self.res[10:20]) == self.orig[10:20]
assert list(self.res[-5:]) == self.orig[-5:]
assert self.res[10:20][5] == self.orig[10:20][5]
+ @testbase.supported('mssql')
+ def test_slice_mssql(self):
+ assert list(self.res[:10]) == self.orig[:10]
+ assert list(self.res[:10]) == self.orig[:10]
+
def test_aggregate(self):
assert self.res.count() == 100
assert self.res.filter(foo.c.bar<30).min(foo.c.bar) == 0
# this one fails in mysql as the result comes back as a string
assert self.res.filter(foo.c.bar<30).sum(foo.c.bar) == 435
- @testbase.unsupported('postgres', 'mysql', 'firebird')
+ @testbase.unsupported('postgres', 'mysql', 'firebird', 'mssql')
def test_aggregate_2(self):
- # this one fails with postgres, the floating point comparison fails
assert self.res.filter(foo.c.bar<30).avg(foo.c.bar) == 14.5
+ @testbase.supported('postgres', 'mysql', 'firebird', 'mssql')
+ def test_aggregate_2_int(self):
+ assert int(self.res.filter(foo.c.bar<30).avg(foo.c.bar)) == 14
+
def test_filter(self):
assert self.res.count() == 100
assert self.res.filter(Foo.c.bar < 30).count() == 30
assert res.order_by([Foo.c.bar])[0].bar == 5
assert res.order_by([desc(Foo.c.bar)])[0].bar == 95
+ @testbase.unsupported('mssql')
def test_slice(self):
assert self.query[1] == self.orig[1]
assert list(self.query[10:20]) == self.orig[10:20]
assert list(self.query[-5:]) == self.orig[-5:]
assert self.query[10:20][5] == self.orig[10:20][5]
+ @testbase.supported('mssql')
+ def test_slice_mssql(self):
+ assert list(self.query[:10]) == self.orig[:10]
+ assert list(self.query[:10]) == self.orig[:10]
+
def test_aggregate(self):
assert self.query.count() == 100
assert self.query.filter(foo.c.bar<30).min(foo.c.bar) == 0
# this one fails in mysql as the result comes back as a string
assert self.query.filter(foo.c.bar<30).sum(foo.c.bar) == 435
- @testbase.unsupported('postgres', 'mysql', 'firebird')
+ @testbase.unsupported('postgres', 'mysql', 'firebird', 'mssql')
def test_aggregate_2(self):
- # this one fails with postgres, the floating point comparison fails
- assert self.query.filter(foo.c.bar<30).avg(foo.c.bar) == 14.5
+ assert self.res.filter(foo.c.bar<30).avg(foo.c.bar) == 14.5
+
+ @testbase.supported('postgres', 'mysql', 'firebird', 'mssql')
+ def test_aggregate_2_int(self):
+ assert int(self.res.filter(foo.c.bar<30).avg(foo.c.bar)) == 14
def test_filter(self):
assert self.query.count() == 100
))
sess= create_session()
q = sess.query(m)
- l = q.select(limit=2, offset=1)
- self.assert_result(l, User, *user_all_result[1:3])
+
+ if db.engine.name == 'mssql':
+ l = q.select(limit=2)
+ self.assert_result(l, User, *user_all_result[:2])
+ else:
+ l = q.select(limit=2, offset=1)
+ self.assert_result(l, User, *user_all_result[1:3])
+
# use a union all to get a lot of rows to join against
u2 = users.alias('u2')
s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
sess = create_session()
q = sess.query(m)
- l = q.select(limit=2, offset=1)
- self.assert_result(l, User, *user_all_result[1:3])
+ if db.engine.name == 'mssql':
+ l = q.select(limit=2)
+ self.assert_result(l, User, *user_all_result[:2])
+ else:
+ l = q.select(limit=2, offset=1)
+ self.assert_result(l, User, *user_all_result[1:3])
+
# this is an involved 3x union of the users table to get a lot of rows.
# then see if the "distinct" works its way out. you actually get the same
# result with or without the distinct, just via less or more rows.
sess = create_session()
q = sess.query(m)
- l = q.select(q.join_to('orders'), order_by=desc(orders.c.user_id), limit=2, offset=1)
- self.assert_result(l, User, *(user_all_result[2], user_all_result[0]))
+ if db.engine.name != 'mssql':
+ l = q.select(q.join_to('orders'), order_by=desc(orders.c.user_id), limit=2, offset=1)
+ self.assert_result(l, User, *(user_all_result[2], user_all_result[0]))
l = q.select(q.join_to('addresses'), order_by=desc(addresses.c.email_address), limit=1, offset=0)
self.assert_result(l, User, *(user_all_result[0],))
r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall()
self.assert_(r==[(6, 'ralph'), (7, 'fido')])
+ @testbase.supported('mssql')
+ def testselectlimitoffset_mssql(self):
+ try:
+ r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
+ assert False # InvalidRequestError should have been raised
+ except exceptions.InvalidRequestError:
+ pass
+
@testbase.unsupported('mysql')
def test_scalar_select(self):
"""test that scalar subqueries with labels get their type propigated to the result set."""