From: Rick Morrison Date: Sun, 18 Feb 2007 19:43:05 +0000 (+0000) Subject: Completed previously missed patches from tickets 422 and 415 X-Git-Tag: rel_0_3_5~17 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e7ac502b811aeede49a82fd6fb077d31513d0f29;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Completed previously missed patches from tickets 422 and 415 get unit tests to work with pyodbc [ticket:481] fix blank password on adodbapi [ticket:371] --- diff --git a/CHANGES b/CHANGES index c2e16c3c78..04ddf51992 100644 --- a/CHANGES +++ b/CHANGES @@ -72,9 +72,12 @@ - fix for non-integer relationships [ticket:473] - DB-API module now selectable at run-time [ticket:419] - preliminary support for pyodbc (Yay!) [ticket:419] - - now passes more unit tests [ticket:422] + - now passes many more unit tests [tickets:422, 481, 415] - better unittest compatibility with ANSI functions [ticket:479] - improved support for implicit sequence PK columns with auto-insert [ticket:415] + - fix for blank password in adodbapi [ticket:371] + - fixes to get unit tests working with pyodbc [ticket:481] + 0.3.4 - general: diff --git a/lib/sqlalchemy/databases/mssql.py b/lib/sqlalchemy/databases/mssql.py index b9ae115498..c8a73f8056 100644 --- a/lib/sqlalchemy/databases/mssql.py +++ b/lib/sqlalchemy/databases/mssql.py @@ -52,9 +52,9 @@ def use_adodbapi(): import adodbapi as dbmodule # ADODBAPI has a non-standard Connection method connect = dbmodule.Connection - make_connect_string = lambda keys: \ - [["Provider=SQLOLEDB;Data Source=%s;User Id=%s;Password=%s;Initial Catalog=%s" % ( - keys.get("host"), keys.get("user"), keys.get("password"), keys.get("database"))], {}] + def make_connect_string(keys): + return [["Provider=SQLOLEDB;Data Source=%s;User Id=%s;Password=%s;Initial Catalog=%s" % ( + keys.get("host"), keys.get("user"), keys.get("password", ""), keys.get("database"))], {}] sane_rowcount = True dialect = MSSQLDialect colspecs[sqltypes.Unicode] = AdoMSUnicode @@ -82,16 +82,16 @@ def use_pyodbc(): global dbmodule, connect, make_connect_string, do_commit, sane_rowcount, dialect, colspecs, ischema_names import pyodbc as dbmodule connect = dbmodule.connect - make_connect_string = lambda keys: \ - [["Driver={SQL Server};Server=%s;UID=%s;PWD=%s;Database=%s" % ( - keys.get("host"), keys.get("user"), keys.get("password"), keys.get("database"))], {}] + def make_connect_string(keys): + return [["Driver={SQL Server};Server=%s;UID=%s;PWD=%s;Database=%s" % ( + keys.get("host"), keys.get("user"), keys.get("password", ""), keys.get("database"))], {}] do_commit = True - sane_rowcount = True - dialect = MSSQLDialect # XXX - find out whether this needs to be tweaked for pyodbc + sane_rowcount = False + dialect = MSSQLDialect import warnings warnings.warn('pyodbc support in sqlalchemy.databases.mssql is extremely experimental - use at your own risk.') - colspecs[sqltypes.Unicode] = MSUnicode # Ado? - ischema_names['nvarchar'] = MSUnicode # Ado? + colspecs[sqltypes.Unicode] = AdoMSUnicode + ischema_names['nvarchar'] = AdoMSUnicode def use_default(): import_errors = [] @@ -110,7 +110,7 @@ def use_default(): use_adodbapi, use_pymssql, use_pyodbc, - ]: + ]: if try_use(f): return dbmodule # informational return, so the user knows what he's using. else: @@ -348,10 +348,10 @@ class MSSQLExecutionContext(default.DefaultExecutionContext): class MSSQLDialect(ansisql.ANSIDialect): - def __init__(self, module=None, auto_identity_insert=False, encoding=None, **params): + def __init__(self, module=None, auto_identity_insert=False, **params): self.module = module or dbmodule or use_default() self.auto_identity_insert = auto_identity_insert - ansisql.ANSIDialect.__init__(self, encoding=encoding, **params) + ansisql.ANSIDialect.__init__(self, **params) self.set_default_schema_name("dbo") def create_connect_args(self, url): @@ -397,8 +397,15 @@ class MSSQLDialect(ansisql.ANSIDialect): def last_inserted_ids(self): return self.context.last_inserted_ids + def do_execute(self, cursor, statement, params, **kwargs): + if params == {}: + params = () + super(MSSQLDialect, self).do_execute(cursor, statement, params, **kwargs) + def _execute(self, c, statement, parameters): try: + if parameters == {}: + parameters = () c.execute(statement, parameters) self.context.rowcount = c.rowcount c.DBPROP_COMMITPRESERVE = "Y" @@ -648,6 +655,10 @@ class MSSQLCompiler(ansisql.ANSICompiler): func.name = self.function_rewrites.get(func.name, func.name) super(MSSQLCompiler, self).visit_function(func) + def for_update_clause(self, select): + # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use + return '' + class MSSQLSchemaGenerator(ansisql.ANSISchemaGenerator): def get_column_specification(self, column, **kwargs): colspec = self.preparer.format_column(column) + " " + column.type.engine_impl(self.engine).get_col_spec() @@ -690,3 +701,5 @@ class MSSQLIdentifierPreparer(ansisql.ANSIIdentifierPreparer): #TODO: determin MSSQL's case folding rules return value +use_default() + diff --git a/test/engine/reflection.py b/test/engine/reflection.py index 388ed30c80..8d9e555492 100644 --- a/test/engine/reflection.py +++ b/test/engine/reflection.py @@ -273,7 +273,7 @@ class ReflectionTest(PersistTest): table = Table( 'engine_multi', meta, Column('multi_id', Integer, Sequence('multi_id_seq'), primary_key=True), - Column('multi_rev', Integer, Sequence('multi_rev_seq'), primary_key=True), + Column('multi_rev', Integer, primary_key=True), Column('name', String(50), nullable=False), Column('val', String(100)) ) diff --git a/test/orm/inheritance5.py b/test/orm/inheritance5.py index 29bec2aa86..b81ae47a97 100644 --- a/test/orm/inheritance5.py +++ b/test/orm/inheritance5.py @@ -600,18 +600,18 @@ class MultiLevelTest(testbase.ORMTest): def define_tables(self, metadata): global table_Employee, table_Engineer, table_Manager table_Employee = Table( 'Employee', metadata, - Column( 'name', type= String, ), + Column( 'name', type= String(100), ), Column( 'id', primary_key= True, type= Integer, ), - Column( 'atype', type= String, ), + Column( 'atype', type= String(100), ), ) table_Engineer = Table( 'Engineer', metadata, - Column( 'machine', type= String, ), + Column( 'machine', type= String(100), ), Column( 'id', Integer, ForeignKey( 'Employee.id', ), primary_key= True, ), ) table_Manager = Table( 'Manager', metadata, - Column( 'duties', type= String, ), + Column( 'duties', type= String(100), ), Column( 'id', Integer, ForeignKey( 'Engineer.id', ), primary_key= True, ), ) def test_threelevels(self): diff --git a/test/orm/memusage.py b/test/orm/memusage.py index 84d3ebdb2e..b456a1abbc 100644 --- a/test/orm/memusage.py +++ b/test/orm/memusage.py @@ -43,7 +43,7 @@ class MapperCleanoutTest(testbase.AssertMixin): table2 = Table("mytable2", metadata, Column('col1', Integer, primary_key=True), Column('col2', String(30)), - Column('col3', String(30), ForeignKey("mytable.col1")) + Column('col3', Integer, ForeignKey("mytable.col1")) ) metadata.create_all() @@ -76,4 +76,4 @@ class MapperCleanoutTest(testbase.AssertMixin): clear_mappers() if __name__ == '__main__': - testbase.main() \ No newline at end of file + testbase.main() diff --git a/test/sql/defaults.py b/test/sql/defaults.py index d018c5efbf..7e346aef40 100644 --- a/test/sql/defaults.py +++ b/test/sql/defaults.py @@ -63,7 +63,7 @@ class DefaultTest(PersistTest): Column('col5', deftype, PassiveDefault(def2)), # preexecute + update timestamp - Column('col6', Date, default=currenttime, onupdate=currenttime), + Column('col6', DateTime, default=currenttime, onupdate=currenttime), Column('boolcol1', Boolean, default=True), Column('boolcol2', Boolean, default=False) @@ -158,6 +158,33 @@ class AutoIncrementTest(PersistTest): finally: table.drop() + def testfetchid(self): + meta = BoundMetaData(testbase.db) + table = Table("aitest", meta, + Column('id', Integer, primary_key=True), + Column('data', String(20))) + table.create() + + try: + # simulate working on a table that doesn't already exist + meta2 = BoundMetaData(testbase.db) + table2 = Table("aitest", meta2, + Column('id', Integer, primary_key=True), + Column('data', String(20))) + class AiTest(object): + pass + mapper(AiTest, table2) + + s = create_session() + u = AiTest() + s.save(u) + s.flush() + assert u.id is not None + s.clear() + finally: + table.drop() + + class SequenceTest(PersistTest): @testbase.supported('postgres', 'oracle') def setUpAll(self): diff --git a/test/sql/query.py b/test/sql/query.py index fe119a63df..a7cb52dc45 100644 --- a/test/sql/query.py +++ b/test/sql/query.py @@ -159,6 +159,16 @@ class QueryTest(PersistTest): self.users.insert().execute(user_id=7, user_name='fido') r = self.users.select(limit=3, order_by=[self.users.c.user_id]).execute().fetchall() self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r)) + + @testbase.unsupported('mssql') + def testselectlimitoffset(self): + self.users.insert().execute(user_id=1, user_name='john') + self.users.insert().execute(user_id=2, user_name='jack') + self.users.insert().execute(user_id=3, user_name='ed') + self.users.insert().execute(user_id=4, user_name='wendy') + self.users.insert().execute(user_id=5, user_name='laura') + self.users.insert().execute(user_id=6, user_name='ralph') + self.users.insert().execute(user_id=7, user_name='fido') r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall() self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')]) r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall() diff --git a/test/sql/testtypes.py b/test/sql/testtypes.py index e924d08ca4..411e39406e 100644 --- a/test/sql/testtypes.py +++ b/test/sql/testtypes.py @@ -158,10 +158,10 @@ class UnicodeTest(AssertMixin): self.echo(repr(x['plain_data'])) self.assert_(isinstance(x['unicode_data'], unicode) and x['unicode_data'] == unicodedata) if isinstance(x['plain_data'], unicode): - # SQLLite returns even non-unicode data as unicode - self.assert_(db.name == 'sqlite') + # SQLLite and MSSQL return non-unicode data as unicode + self.assert_(db.name in ('sqlite', 'mssql')) self.assert_(x['plain_data'] == unicodedata) - self.echo("its sqlite !") + self.echo("it's %s!" % db.name) else: self.assert_(not isinstance(x['plain_data'], unicode) and x['plain_data'] == rawdata) def testengineparam(self):