]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- fixed argument passing to straight textual execute() on engine, connection.
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 13 Feb 2007 22:53:05 +0000 (22:53 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 13 Feb 2007 22:53:05 +0000 (22:53 +0000)
can handle *args or a list instance for positional, **kwargs or a dict instance
for named args, or a list of list or dicts to invoke executemany()

CHANGES
lib/sqlalchemy/engine/base.py
test/engine/alltests.py
test/engine/execute.py [new file with mode: 0644]

diff --git a/CHANGES b/CHANGES
index 61f43d426431cbe99e4acff2f5e6d8b15769650f..7cbf6c85ee9e86773ef773a42f49da31bec8ba27 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,9 @@
   - added a "supports_execution()" method to ClauseElement, so that individual
   kinds of clauses can express if they are appropriate for executing...such as, 
   you can execute a "select", but not a "Table" or a "Join".
+  - fixed argument passing to straight textual execute() on engine, connection.
+  can handle *args or a list instance for positional, **kwargs or a dict instance
+  for named args, or a list of list or dicts to invoke executemany()
 - orm:
   - another refactoring to relationship calculation.  Allows more accurate ORM behavior
   with relationships from/to/between mappers, particularly polymorphic mappers,
index 3ec3e028f77cd5db67dfa30f510d70345b93508e..1985bcec1fbc48f9b5786dddd7023a0febac2c43 100644 (file)
@@ -260,13 +260,19 @@ class Connection(Connectable):
         self.__connection.close()
         self.__connection = None
         del self.__connection
-    def scalar(self, object, parameters=None, **kwargs):
-        return self.execute(object, parameters, **kwargs).scalar()
+    def scalar(self, object, *multiparams, **params):
+        return self.execute(object, *multiparams, **params).scalar()
     def execute(self, object, *multiparams, **params):
         return Connection.executors[type(object).__mro__[-2]](self, object, *multiparams, **params)
     def execute_default(self, default, **kwargs):
         return default.accept_schema_visitor(self.__engine.dialect.defaultrunner(self.__engine, self.proxy, **kwargs))
-    def execute_text(self, statement, parameters=None):
+    def execute_text(self, statement, *multiparams, **params):
+        if len(multiparams) == 0:
+            parameters = params
+        elif len(multiparams) == 1 and (isinstance(multiparams[0], list) or isinstance(multiparams[0], dict)):
+            parameters = multiparams[0]
+        else:
+            parameters = list(multiparams)
         cursor = self._execute_raw(statement, parameters)
         rpargs = self.__engine.dialect.create_result_proxy_args(self, cursor)
         return ResultProxy(self.__engine, self, cursor, **rpargs)
index c63cb2861cb1d97ef152d67c213b702ccc5cb1e5..7bb9acb72fa443195543b0cfc2bd3e117a8ae5d0 100644 (file)
@@ -5,8 +5,9 @@ import unittest
 def suite():
     modules_to_test = (
         # connectivity, execution
-       'engine.parseconnect',
+           'engine.parseconnect',
         'engine.pool', 
+        'engine.execute',
         'engine.transaction',
         
         # schema/tables
diff --git a/test/engine/execute.py b/test/engine/execute.py
new file mode 100644 (file)
index 0000000..ce60a21
--- /dev/null
@@ -0,0 +1,56 @@
+
+import testbase
+import unittest, sys, datetime
+import tables
+db = testbase.db
+from sqlalchemy import *
+
+
+class ExecuteTest(testbase.PersistTest):
+    def setUpAll(self):
+        global users, metadata
+        metadata = BoundMetaData(testbase.db)
+        users = Table('users', metadata,
+            Column('user_id', INT, primary_key = True),
+            Column('user_name', VARCHAR(20)),
+            mysql_engine='InnoDB'
+        )
+        metadata.create_all()
+    
+    def tearDown(self):
+        testbase.db.connect().execute(users.delete())
+    def tearDownAll(self):
+        metadata.drop_all()
+        
+    @testbase.supported('sqlite')
+    def test_raw_qmark(self):
+        for conn in (testbase.db, testbase.db.connect()):
+            conn.execute("insert into users (user_id, user_name) values (?, ?)", [1,"jack"])
+            conn.execute("insert into users (user_id, user_name) values (?, ?)", [2,"ed"], [3,"horse"])
+            conn.execute("insert into users (user_id, user_name) values (?, ?)", 4, 'sally')
+            res = conn.execute("select * from users")
+            assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
+            conn.execute("delete from users")
+
+    @testbase.supported('mysql')
+    def test_raw_sprintf(self):
+        for conn in (testbase.db, testbase.db.connect()):
+            conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"])
+            conn.execute("insert into users (user_id, user_name) values (%s, %s)", [2,"ed"], [3,"horse"])
+            conn.execute("insert into users (user_id, user_name) values (%s, %s)", 4, 'sally')
+            res = conn.execute("select * from users")
+            assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
+            conn.execute("delete from users")
+            
+    @testbase.supported('postgres')
+    def test_raw_python(self):
+        for conn in (testbase.db, testbase.db.connect()):
+            conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':1, 'name':'jack'})
+            conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'})
+            conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", id=4, name='sally')
+            res = conn.execute("select * from users")
+            assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
+            conn.execute("delete from users")
+        
+if __name__ == "__main__":
+    testbase.main()