From: Tony Locke Date: Thu, 29 Apr 2021 18:25:09 +0000 (-0400) Subject: Support stream_results in the pg8000 dialect X-Git-Tag: rel_1_4_13~5^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=95523c78dd2e2aeb2603734d20383c6d3dac5dfb;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Support stream_results in the pg8000 dialect ### Description This change adds support for stream_results for the pg8000 dialect by adding a server side cursor. The server-side cursor is a wrapper around a standard DBAPI cursor, and uses the SQL-level cursors. This is being discussed in issue https://github.com/sqlalchemy/sqlalchemy/issues/6198 and this pull request is really to give a concrete example of what I was suggesting. ### Checklist This pull request is: - [ ] A documentation / typographical error fix - Good to go, no issue or tests are needed - [ ] A short code fix - please include the issue number, and create an issue if none exists, which must include a complete example of the issue. one line code fixes without an issue and demonstration will not be accepted. - Please include: `Fixes: #` in the commit message - please include tests. one line code fixes without tests will not be accepted. - [x] A new feature implementation - please include the issue number, and create an issue if none exists, which must include a complete example of how the feature would look. - Please include: `Fixes: #` in the commit message - please include tests. **Have a nice day!** Closes: #6356 Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/6356 Pull-request-sha: 071e118a6b09a26c511b39b0d589ebd2de8d508c Change-Id: Id1a865adf0ff64294c71814681f5b4d593939db6 --- diff --git a/doc/build/changelog/unreleased_14/pg8000_sscursor.rst b/doc/build/changelog/unreleased_14/pg8000_sscursor.rst new file mode 100644 index 0000000000..cfa69f2b0b --- /dev/null +++ b/doc/build/changelog/unreleased_14/pg8000_sscursor.rst @@ -0,0 +1,6 @@ +.. change:: + :tags: pg8000, postgresql + :tickets: 6198 + + Add support for server side cursors in the pg8000 dialect for PostgreSQL. This + allows use of the :class: `Connection.execution_options.stream_results` option. diff --git a/lib/sqlalchemy/dialects/postgresql/pg8000.py b/lib/sqlalchemy/dialects/postgresql/pg8000.py index eaf6ccbb87..d999cdf6fe 100644 --- a/lib/sqlalchemy/dialects/postgresql/pg8000.py +++ b/lib/sqlalchemy/dialects/postgresql/pg8000.py @@ -229,12 +229,75 @@ class _PGBoolean(sqltypes.Boolean): return dbapi.BOOLEAN +_server_side_id = util.counter() + + class PGExecutionContext_pg8000(PGExecutionContext): + def create_server_side_cursor(self): + ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) + return ServerSideCursor(self._dbapi_connection.cursor(), ident) + def pre_exec(self): if not self.compiled: return +class ServerSideCursor: + server_side = True + + def __init__(self, cursor, ident): + self.ident = ident + self.cursor = cursor + + @property + def connection(self): + return self.cursor.connection + + @property + def rowcount(self): + return self.cursor.rowcount + + @property + def description(self): + return self.cursor.description + + def execute(self, operation, args=(), stream=None): + op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation + self.cursor.execute(op, args, stream=stream) + return self + + def executemany(self, operation, param_sets): + self.cursor.executemany(operation, param_sets) + return self + + def fetchone(self): + self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident) + return self.cursor.fetchone() + + def fetchmany(self, num=None): + if num is None: + return self.fetchall() + else: + self.cursor.execute( + "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident + ) + return self.cursor.fetchall() + + def fetchall(self): + self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident) + return self.cursor.fetchall() + + def close(self): + self.cursor.execute("CLOSE " + self.ident) + self.cursor.close() + + def setinputsizes(self, *sizes): + self.cursor.setinputsizes(*sizes) + + def setoutputsize(self, size, column=None): + pass + + class PGCompiler_pg8000(PGCompiler): def visit_mod_binary(self, binary, operator, **kw): return ( @@ -263,6 +326,7 @@ class PGDialect_pg8000(PGDialect): execution_ctx_cls = PGExecutionContext_pg8000 statement_compiler = PGCompiler_pg8000 preparer = PGIdentifierPreparer_pg8000 + supports_server_side_cursors = True use_setinputsizes = True diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py index e8ad88f24a..982ac498d4 100644 --- a/lib/sqlalchemy/testing/suite/test_results.py +++ b/lib/sqlalchemy/testing/suite/test_results.py @@ -243,6 +243,8 @@ class ServerSideCursorsTest( return not cursor.buffered elif self.engine.dialect.driver in ("asyncpg", "aiosqlite"): return cursor.server_side + elif self.engine.dialect.driver == "pg8000": + return getattr(cursor, "server_side", False) else: return False