- fix for non-integer relationships [ticket:473]
- DB-API module now selectable at run-time [ticket:419]
- preliminary support for pyodbc (Yay!) [ticket:419]
- - now passes more unit tests [ticket:422]
+ - now passes many more unit tests [tickets:422, 481, 415]
- better unittest compatibility with ANSI functions [ticket:479]
- improved support for implicit sequence PK columns with auto-insert [ticket:415]
+ - fix for blank password in adodbapi [ticket:371]
+ - fixes to get unit tests working with pyodbc [ticket:481]
+
0.3.4
- general:
import adodbapi as dbmodule
# ADODBAPI has a non-standard Connection method
connect = dbmodule.Connection
- make_connect_string = lambda keys: \
- [["Provider=SQLOLEDB;Data Source=%s;User Id=%s;Password=%s;Initial Catalog=%s" % (
- keys.get("host"), keys.get("user"), keys.get("password"), keys.get("database"))], {}]
+ def make_connect_string(keys):
+ return [["Provider=SQLOLEDB;Data Source=%s;User Id=%s;Password=%s;Initial Catalog=%s" % (
+ keys.get("host"), keys.get("user"), keys.get("password", ""), keys.get("database"))], {}]
sane_rowcount = True
dialect = MSSQLDialect
colspecs[sqltypes.Unicode] = AdoMSUnicode
global dbmodule, connect, make_connect_string, do_commit, sane_rowcount, dialect, colspecs, ischema_names
import pyodbc as dbmodule
connect = dbmodule.connect
- make_connect_string = lambda keys: \
- [["Driver={SQL Server};Server=%s;UID=%s;PWD=%s;Database=%s" % (
- keys.get("host"), keys.get("user"), keys.get("password"), keys.get("database"))], {}]
+ def make_connect_string(keys):
+ return [["Driver={SQL Server};Server=%s;UID=%s;PWD=%s;Database=%s" % (
+ keys.get("host"), keys.get("user"), keys.get("password", ""), keys.get("database"))], {}]
do_commit = True
- sane_rowcount = True
- dialect = MSSQLDialect # XXX - find out whether this needs to be tweaked for pyodbc
+ sane_rowcount = False
+ dialect = MSSQLDialect
import warnings
warnings.warn('pyodbc support in sqlalchemy.databases.mssql is extremely experimental - use at your own risk.')
- colspecs[sqltypes.Unicode] = MSUnicode # Ado?
- ischema_names['nvarchar'] = MSUnicode # Ado?
+ colspecs[sqltypes.Unicode] = AdoMSUnicode
+ ischema_names['nvarchar'] = AdoMSUnicode
def use_default():
import_errors = []
use_adodbapi,
use_pymssql,
use_pyodbc,
- ]:
+ ]:
if try_use(f):
return dbmodule # informational return, so the user knows what he's using.
else:
class MSSQLDialect(ansisql.ANSIDialect):
- def __init__(self, module=None, auto_identity_insert=False, encoding=None, **params):
+ def __init__(self, module=None, auto_identity_insert=False, **params):
self.module = module or dbmodule or use_default()
self.auto_identity_insert = auto_identity_insert
- ansisql.ANSIDialect.__init__(self, encoding=encoding, **params)
+ ansisql.ANSIDialect.__init__(self, **params)
self.set_default_schema_name("dbo")
def create_connect_args(self, url):
def last_inserted_ids(self):
return self.context.last_inserted_ids
+ def do_execute(self, cursor, statement, params, **kwargs):
+ if params == {}:
+ params = ()
+ super(MSSQLDialect, self).do_execute(cursor, statement, params, **kwargs)
+
def _execute(self, c, statement, parameters):
try:
+ if parameters == {}:
+ parameters = ()
c.execute(statement, parameters)
self.context.rowcount = c.rowcount
c.DBPROP_COMMITPRESERVE = "Y"
func.name = self.function_rewrites.get(func.name, func.name)
super(MSSQLCompiler, self).visit_function(func)
+ def for_update_clause(self, select):
+ # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use
+ return ''
+
class MSSQLSchemaGenerator(ansisql.ANSISchemaGenerator):
def get_column_specification(self, column, **kwargs):
colspec = self.preparer.format_column(column) + " " + column.type.engine_impl(self.engine).get_col_spec()
#TODO: determin MSSQL's case folding rules
return value
+use_default()
+
table = Table(
'engine_multi', meta,
Column('multi_id', Integer, Sequence('multi_id_seq'), primary_key=True),
- Column('multi_rev', Integer, Sequence('multi_rev_seq'), primary_key=True),
+ Column('multi_rev', Integer, primary_key=True),
Column('name', String(50), nullable=False),
Column('val', String(100))
)
def define_tables(self, metadata):
global table_Employee, table_Engineer, table_Manager
table_Employee = Table( 'Employee', metadata,
- Column( 'name', type= String, ),
+ Column( 'name', type= String(100), ),
Column( 'id', primary_key= True, type= Integer, ),
- Column( 'atype', type= String, ),
+ Column( 'atype', type= String(100), ),
)
table_Engineer = Table( 'Engineer', metadata,
- Column( 'machine', type= String, ),
+ Column( 'machine', type= String(100), ),
Column( 'id', Integer, ForeignKey( 'Employee.id', ), primary_key= True, ),
)
table_Manager = Table( 'Manager', metadata,
- Column( 'duties', type= String, ),
+ Column( 'duties', type= String(100), ),
Column( 'id', Integer, ForeignKey( 'Engineer.id', ), primary_key= True, ),
)
def test_threelevels(self):
table2 = Table("mytable2", metadata,
Column('col1', Integer, primary_key=True),
Column('col2', String(30)),
- Column('col3', String(30), ForeignKey("mytable.col1"))
+ Column('col3', Integer, ForeignKey("mytable.col1"))
)
metadata.create_all()
clear_mappers()
if __name__ == '__main__':
- testbase.main()
\ No newline at end of file
+ testbase.main()
Column('col5', deftype, PassiveDefault(def2)),
# preexecute + update timestamp
- Column('col6', Date, default=currenttime, onupdate=currenttime),
+ Column('col6', DateTime, default=currenttime, onupdate=currenttime),
Column('boolcol1', Boolean, default=True),
Column('boolcol2', Boolean, default=False)
finally:
table.drop()
+ def testfetchid(self):
+ meta = BoundMetaData(testbase.db)
+ table = Table("aitest", meta,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(20)))
+ table.create()
+
+ try:
+ # simulate working on a table that doesn't already exist
+ meta2 = BoundMetaData(testbase.db)
+ table2 = Table("aitest", meta2,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(20)))
+ class AiTest(object):
+ pass
+ mapper(AiTest, table2)
+
+ s = create_session()
+ u = AiTest()
+ s.save(u)
+ s.flush()
+ assert u.id is not None
+ s.clear()
+ finally:
+ table.drop()
+
+
class SequenceTest(PersistTest):
@testbase.supported('postgres', 'oracle')
def setUpAll(self):
self.users.insert().execute(user_id=7, user_name='fido')
r = self.users.select(limit=3, order_by=[self.users.c.user_id]).execute().fetchall()
self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
+
+ @testbase.unsupported('mssql')
+ def testselectlimitoffset(self):
+ self.users.insert().execute(user_id=1, user_name='john')
+ self.users.insert().execute(user_id=2, user_name='jack')
+ self.users.insert().execute(user_id=3, user_name='ed')
+ self.users.insert().execute(user_id=4, user_name='wendy')
+ self.users.insert().execute(user_id=5, user_name='laura')
+ self.users.insert().execute(user_id=6, user_name='ralph')
+ self.users.insert().execute(user_id=7, user_name='fido')
r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall()
self.echo(repr(x['plain_data']))
self.assert_(isinstance(x['unicode_data'], unicode) and x['unicode_data'] == unicodedata)
if isinstance(x['plain_data'], unicode):
- # SQLLite returns even non-unicode data as unicode
- self.assert_(db.name == 'sqlite')
+ # SQLLite and MSSQL return non-unicode data as unicode
+ self.assert_(db.name in ('sqlite', 'mssql'))
self.assert_(x['plain_data'] == unicodedata)
- self.echo("its sqlite !")
+ self.echo("it's %s!" % db.name)
else:
self.assert_(not isinstance(x['plain_data'], unicode) and x['plain_data'] == rawdata)
def testengineparam(self):