]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Support stream_results in the pg8000 dialect
authorTony Locke <tlocke@tlocke.org.uk>
Thu, 29 Apr 2021 18:25:09 +0000 (14:25 -0400)
committersqla-tester <sqla-tester@sqlalchemy.org>
Thu, 29 Apr 2021 18:25:09 +0000 (14:25 -0400)
### Description
This change adds support for stream_results for the pg8000 dialect by adding a server side cursor. The server-side cursor is a wrapper around a standard DBAPI cursor, and uses the SQL-level cursors. This is being discussed in issue https://github.com/sqlalchemy/sqlalchemy/issues/6198 and this pull request is really to give a concrete example of what I was suggesting.

### Checklist

<!-- go over following points. check them with an `x` if they do apply, (they turn into clickable checkboxes once the PR is submitted, so no need to do everything at once)

-->

This pull request is:

- [ ] A documentation / typographical error fix
- Good to go, no issue or tests are needed
- [ ] A short code fix
- please include the issue number, and create an issue if none exists, which
  must include a complete example of the issue.  one line code fixes without an
  issue and demonstration will not be accepted.
- Please include: `Fixes: #<issue number>` in the commit message
- please include tests.   one line code fixes without tests will not be accepted.
- [x] A new feature implementation
- please include the issue number, and create an issue if none exists, which must
  include a complete example of how the feature would look.
- Please include: `Fixes: #<issue number>` in the commit message
- please include tests.

**Have a nice day!**

Closes: #6356
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/6356
Pull-request-sha: 071e118a6b09a26c511b39b0d589ebd2de8d508c

Change-Id: Id1a865adf0ff64294c71814681f5b4d593939db6

doc/build/changelog/unreleased_14/pg8000_sscursor.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/postgresql/pg8000.py
lib/sqlalchemy/testing/suite/test_results.py

diff --git a/doc/build/changelog/unreleased_14/pg8000_sscursor.rst b/doc/build/changelog/unreleased_14/pg8000_sscursor.rst
new file mode 100644 (file)
index 0000000..cfa69f2
--- /dev/null
@@ -0,0 +1,6 @@
+.. change::
+    :tags: pg8000, postgresql
+    :tickets: 6198
+
+    Add support for server side cursors in the pg8000 dialect for PostgreSQL. This
+    allows use of the :class: `Connection.execution_options.stream_results` option.
index eaf6ccbb87d4e909459b85378138448ac6af57cd..d999cdf6fe14a532ef6e55bfab3aaab55f6cf00e 100644 (file)
@@ -229,12 +229,75 @@ class _PGBoolean(sqltypes.Boolean):
         return dbapi.BOOLEAN
 
 
+_server_side_id = util.counter()
+
+
 class PGExecutionContext_pg8000(PGExecutionContext):
+    def create_server_side_cursor(self):
+        ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
+        return ServerSideCursor(self._dbapi_connection.cursor(), ident)
+
     def pre_exec(self):
         if not self.compiled:
             return
 
 
+class ServerSideCursor:
+    server_side = True
+
+    def __init__(self, cursor, ident):
+        self.ident = ident
+        self.cursor = cursor
+
+    @property
+    def connection(self):
+        return self.cursor.connection
+
+    @property
+    def rowcount(self):
+        return self.cursor.rowcount
+
+    @property
+    def description(self):
+        return self.cursor.description
+
+    def execute(self, operation, args=(), stream=None):
+        op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation
+        self.cursor.execute(op, args, stream=stream)
+        return self
+
+    def executemany(self, operation, param_sets):
+        self.cursor.executemany(operation, param_sets)
+        return self
+
+    def fetchone(self):
+        self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident)
+        return self.cursor.fetchone()
+
+    def fetchmany(self, num=None):
+        if num is None:
+            return self.fetchall()
+        else:
+            self.cursor.execute(
+                "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident
+            )
+            return self.cursor.fetchall()
+
+    def fetchall(self):
+        self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident)
+        return self.cursor.fetchall()
+
+    def close(self):
+        self.cursor.execute("CLOSE " + self.ident)
+        self.cursor.close()
+
+    def setinputsizes(self, *sizes):
+        self.cursor.setinputsizes(*sizes)
+
+    def setoutputsize(self, size, column=None):
+        pass
+
+
 class PGCompiler_pg8000(PGCompiler):
     def visit_mod_binary(self, binary, operator, **kw):
         return (
@@ -263,6 +326,7 @@ class PGDialect_pg8000(PGDialect):
     execution_ctx_cls = PGExecutionContext_pg8000
     statement_compiler = PGCompiler_pg8000
     preparer = PGIdentifierPreparer_pg8000
+    supports_server_side_cursors = True
 
     use_setinputsizes = True
 
index e8ad88f24ae78839169edf4f5528eca54357e2de..982ac498d4017644feff72a6c1570779f935419d 100644 (file)
@@ -243,6 +243,8 @@ class ServerSideCursorsTest(
             return not cursor.buffered
         elif self.engine.dialect.driver in ("asyncpg", "aiosqlite"):
             return cursor.server_side
+        elif self.engine.dialect.driver == "pg8000":
+            return getattr(cursor, "server_side", False)
         else:
             return False