]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add binary parameter to cursor.execute()
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 25 Aug 2021 17:57:46 +0000 (19:57 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 25 Aug 2021 17:57:46 +0000 (19:57 +0200)
19 files changed:
docs/api/connections.rst
docs/api/cursors.rst
docs/basic/params.rst
psycopg/psycopg/_transform.py
psycopg/psycopg/abc.py
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/cursor.py
psycopg/psycopg/cursor_async.py
psycopg/psycopg/server_cursor.py
psycopg_c/psycopg_c/_psycopg.pyi
psycopg_c/psycopg_c/_psycopg/transform.pyx
tests/fix_db.py
tests/test_connection.py
tests/test_connection_async.py
tests/test_cursor.py
tests/test_cursor_async.py
tests/test_server_cursor.py
tests/test_server_cursor_async.py

index 552f70a78e3217dbe284c7c4823b11ef531235fd..9bb1901007065928fc720fd372dbb03889049fa0 100644 (file)
@@ -128,7 +128,7 @@ The `!Connection` class
         .. seealso:: See :ref:`row-factories` for details about defining the
             objects returned by cursors.
 
-    .. automethod:: execute(query, params=None, prepare=None) -> Cursor
+    .. automethod:: execute
 
         :param query: The query to execute.
         :type query: `!str`, `!bytes`, or `sql.Composable`
@@ -137,10 +137,12 @@ The `!Connection` class
         :param prepare: Force (`!True`) or disallow (`!False`) preparation of
             the query. By default (`!None`) prepare automatically. See
             :ref:`prepared-statements`.
+        :param binary: If `!True` the cursor will return binary values from the
+            database. All the types returned by the query must have a binary
+            loader. See :ref:`binary-data` for details.
 
-        The cursor is what returned calling `cursor()` without parameters. The
-        parameters are passed to its `~Cursor.execute()` and the cursor is
-        returned.
+        The method simply creates a `Cursor` instance, `~Cursor.execute()` the
+        query requested, and return it.
 
         See :ref:`query-parameters` for all the details about executing
         queries.
@@ -332,7 +334,7 @@ The `!AsyncConnection` class
 
     .. autoattribute:: row_factory
 
-    .. automethod:: execute(query, params=None, prepare=None) -> AsyncCursor
+    .. automethod:: execute
     .. automethod:: commit
     .. automethod:: rollback
 
index 8f84650841ab4c14391e0e580ca58525b5c3df66..218d1e84b31dcb7ca67c68340c4358985277a9f1 100644 (file)
@@ -56,7 +56,7 @@ The `!Cursor` class
 
     .. rubric:: Methods to send commands
 
-    .. automethod:: execute(query, params=None, *, prepare=None) -> Cursor
+    .. automethod:: execute
 
         :param query: The query to execute.
         :type query: `!str`, `!bytes`, or `sql.Composable`
@@ -65,6 +65,9 @@ The `!Cursor` class
         :param prepare: Force (`!True`) or disallow (`!False`) preparation of
             the query. By default (`!None`) prepare automatically. See
             :ref:`prepared-statements`.
+        :param binary: Specify whether the server shoul return data in binary
+            format (`!True`) or in text format (`!False`). By default
+            (`!None`) return data as requested by the cursor's `~Cursor.format`.
 
         Return the cursor itself, so that it will be possible to chain a fetch
         operation after the call.
@@ -119,13 +122,14 @@ The `!Cursor` class
 
         The format of the data returned by the queries. It can be selected
         initially e.g. specifying `Connection.cursor`\ ``(binary=True)`` and
-        changed during the cursor's lifetime.
+        changed during the cursor's lifetime. It is also possible  to override
+        the value for single queries, e.g. specifying `execute`\
+        ``(binary=True)``.
 
-        :type: pq.Format
+        :type: `pq.Format`
+        :default: `~pq.Format.TEXT`
 
-        .. admonition:: TODO
-
-            Add `execute`\ ``(binary=True)`` too?
+        .. seealso:: :ref:`binary-data`
 
 
     .. rubric:: Methods to retrieve results
@@ -265,12 +269,15 @@ The `!ServerCursor` class
             is especially useful so that the cursor is closed at the end of
             the block.
 
-    .. automethod:: execute(query, params=None, *) -> ServerCursor
+    .. automethod:: execute
 
         :param query: The query to execute.
         :type query: `!str`, `!bytes`, or `sql.Composable`
         :param params: The parameters to pass to the query, if any.
         :type params: Sequence or Mapping
+        :param binary: Specify whether the server shoul return data in binary
+            format (`!True`) or in text format (`!False`). By default
+            (`!None`) return data as requested by the cursor's `~Cursor.format`.
 
         Create a server cursor with given `name` and the *query* in argument.
 
@@ -350,9 +357,9 @@ The `!AsyncCursor` class
 
             to close the cursor automatically when the block is exited.
 
-    .. automethod:: execute(query, params=None, *, prepare=None) -> AsyncCursor
-    .. automethod:: executemany(query: Query, params_seq: Sequence[Args])
-    .. automethod:: copy(statement: Query) -> AsyncCopy
+    .. automethod:: execute
+    .. automethod:: executemany
+    .. automethod:: copy
 
         .. note::
 
@@ -405,8 +412,8 @@ The `!AsyncServerCursor` class
                 async with conn.cursor("name") as cursor:
                     ...
 
-    .. automethod:: execute(query, params=None) -> AsyncServerCursor
-    .. automethod:: executemany(query: Query, params_seq: Sequence[Args])
+    .. automethod:: execute
+    .. automethod:: executemany
     .. automethod:: fetchone
     .. automethod:: fetchmany
     .. automethod:: fetchall
index 9f58caa73b079bcd28b3887f20b8697e21ba206a..cfd48eb59546d02642c46c26f8c89d139c4a0468 100644 (file)
@@ -194,47 +194,48 @@ argument of the `Cursor.execute()` method::
 Binary parameters and results
 -----------------------------
 
-PostgreSQL has two different ways to represent data type on the wire:
-`~psycopg.pq.Format.TEXT`, always available, and
-`~psycopg.pq.Format.BINARY`, available most of the times. Usually the binary
-format is more efficient to use.
+PostgreSQL has two different ways to transmit data between client and server:
+`~psycopg.pq.Format.TEXT`, always available, and `~psycopg.pq.Format.BINARY`,
+available most of the times but not always. Usually the binary format is more
+efficient to use.
 
-Psycopg can support both the formats of each data type. Whenever a value
+Psycopg can support both the formats for each data type. Whenever a value
 is passed to a query using the normal ``%s`` placeholder, the best format
 available is chosen (often, but not always, the binary format is picked as the
 best choice).
 
 If you have a reason to select explicitly the binary format or the text format
 for a value you can use respectively a ``%b`` placeholder or a ``%t``
-placeholder instead. `~Cursor.execute()` will fail if a
-`~psycopg.adapt.Dumper` for the right parameter type and format is not
-available.
+placeholder instead of the normal ``%s``. `~Cursor.execute()` will fail if a
+`~psycopg.adapt.Dumper` for the right data type and format is not available.
 
 The same two formats, text or binary, are used by PostgreSQL to return data
 from a query to the client. Unlike with parameters, where you can choose the
 format value-by-value, all the columns returned by a query will have the same
-format. For each of the types returned by the query, a
-`~psycopg.adapt.Loader` must be available, otherwise the data will be
-returned as unparsed `!str` or buffer.
-
-.. warning::
-
-    Currently the default is to return values from database queries in textual
-    type, for the simple reason that not all the PostgreSQL data types have a
-    binary `!Loader` implemented. The plan is to support the binary format for
-    all PostgreSQL builtins before the first Psycopg 3 released: TODO!
-
-By default the data will be returned in text format. In order to return data
-in binary format you can create the cursor using `Connection.cursor`\
-``(binary=True)``.
-
-.. admonition:: TODO
-
-    add a `Cursor.execute`\ ``(binary=True)`` parameter?
-
+format. Every type returned by the query should have a `~psycopg.adapt.Loader`
+configured, otherwise the data will be returned as unparsed `!str` (for text
+results) or buffer (for binary results).
+
+.. note::
+    The `pg_type`_ table defines which format is supported for each PostgreSQL
+    data type. Text input/output is managed by the functions declared in the
+    ``typinput`` and ``typoutput`` fields (always present), binary
+    input/output is managed by the ``typsend`` and ``typreceive`` (which are
+    optional).
+
+    .. _pg_type: https://www.postgresql.org/docs/current/catalog-pg-type.html
+
+Because not every PostgreSQL type supports binary output, By default the data
+will be returned in text format. In order to return data in binary format you
+can create the cursor using `Connection.cursor`\ ``(binary=True)`` or execute
+the query using `Cursor.execute`\ ``(binary=True)``. A case in which
+requesting binary results is a clear winner is when you have large binary data
+in the database, such as images::
+
+    cur.execute(
+        "select image_data from images where id = %s", [image_id], binary=True)
+    data = cur.fetchone()[0]
 
 .. admonition:: TODO
 
-    - pass parameters in binary with ``%b``
-    - return parameters in binary with `!cursor(binary=True)`
     - cannot pass multiple statements in binary
index 4f7dae357a3ff124fa8d4e7b4af0e53935587c64..e51ca083bc1fa2b77bb88a66910e879afbe14b39 100644 (file)
@@ -77,7 +77,11 @@ class Transformer(AdaptContext):
         return self._pgresult
 
     def set_pgresult(
-        self, result: Optional["PGresult"], set_loaders: bool = True
+        self,
+        result: Optional["PGresult"],
+        *,
+        set_loaders: bool = True,
+        format: Optional[pq.Format] = None,
     ) -> None:
         self._pgresult = result
 
@@ -96,7 +100,7 @@ class Transformer(AdaptContext):
             rc = self._row_loaders = []
             for i in range(nf):
                 oid = result.ftype(i)
-                fmt = result.fformat(i)
+                fmt = result.fformat(i) if format is None else format
                 rc.append(self.get_loader(oid, fmt).load)  # type: ignore
 
     def set_row_types(
index a0158e2d3101165baa68bdbb55b51bbe3314ea5f..fffc5ba4d8beb4a0d9dd1ccaa7eeb3ab79a3fde4 100644 (file)
@@ -183,7 +183,11 @@ class Transformer(Protocol):
         ...
 
     def set_pgresult(
-        self, result: Optional["PGresult"], set_loaders: bool = True
+        self,
+        result: Optional["PGresult"],
+        *,
+        set_loaders: bool = True,
+        format: Optional[pq.Format] = None
     ) -> None:
         ...
 
index a362b50622e9883331a366890ebd429e77bee836..ee6b3981a731f3c238bf405f3eaee60a9a0a23c2 100644 (file)
@@ -416,7 +416,9 @@ class BaseConnection(Generic[Row]):
         conn._autocommit = bool(autocommit)
         return conn
 
-    def _exec_command(self, command: Query) -> PQGen["PGresult"]:
+    def _exec_command(
+        self, command: Query, result_format: Format = Format.TEXT
+    ) -> PQGen["PGresult"]:
         """
         Generator to send a command and receive the result to the backend.
 
@@ -436,7 +438,13 @@ class BaseConnection(Generic[Row]):
         elif isinstance(command, Composable):
             command = command.as_bytes(self)
 
-        self.pgconn.send_query(command)
+        if result_format == Format.TEXT:
+            self.pgconn.send_query(command)
+        else:
+            self.pgconn.send_query_params(
+                command, None, result_format=result_format
+            )
+
         result = (yield from execute(self.pgconn))[-1]
         if result.status not in (ExecStatus.COMMAND_OK, ExecStatus.TUPLES_OK):
             if result.status == ExecStatus.FATAL_ERROR:
@@ -700,9 +708,13 @@ class Connection(BaseConnection[Row]):
         params: Optional[Params] = None,
         *,
         prepare: Optional[bool] = None,
+        binary: bool = False,
     ) -> Cursor[Row]:
         """Execute a query and return a cursor to read its results."""
         cur = self.cursor()
+        if binary:
+            cur.format = Format.BINARY
+
         try:
             return cur.execute(query, params, prepare=prepare)
         except e.Error as ex:
index 89fd1c243e75450e249121444f8e60288b6c4aa6..9376a56f98fa4d9c73e8341cbda9a1df758eddf8 100644 (file)
@@ -224,8 +224,12 @@ class AsyncConnection(BaseConnection[Row]):
         params: Optional[Params] = None,
         *,
         prepare: Optional[bool] = None,
+        binary: bool = False,
     ) -> AsyncCursor[Row]:
         cur = self.cursor()
+        if binary:
+            cur.format = Format.BINARY
+
         try:
             return await cur.execute(query, params, prepare=prepare)
         except e.Error as ex:
index 2df57ca0c541a784250eeb80e0fd068ea7e45f4b..1dceedd05f1375f6231cd8385dd2ef0eee545db4 100644 (file)
@@ -189,11 +189,14 @@ class BaseCursor(Generic[ConnectionType, Row]):
         params: Optional[Params] = None,
         *,
         prepare: Optional[bool] = None,
+        binary: Optional[bool] = None,
     ) -> PQGen[None]:
         """Generator implementing `Cursor.execute()`."""
         yield from self._start_query(query)
         pgq = self._convert_query(query, params)
-        results = yield from self._maybe_prepare_gen(pgq, prepare)
+        results = yield from self._maybe_prepare_gen(
+            pgq, prepare=prepare, binary=binary
+        )
         self._execute_results(results)
         self._last_query = query
 
@@ -211,23 +214,27 @@ class BaseCursor(Generic[ConnectionType, Row]):
             else:
                 pgq.dump(params)
 
-            results = yield from self._maybe_prepare_gen(pgq, True)
+            results = yield from self._maybe_prepare_gen(pgq, prepare=True)
             self._execute_results(results)
 
         self._last_query = query
 
     def _maybe_prepare_gen(
-        self, pgq: PostgresQuery, prepare: Optional[bool]
+        self,
+        pgq: PostgresQuery,
+        *,
+        prepare: Optional[bool] = None,
+        binary: Optional[bool] = None,
     ) -> PQGen[Sequence["PGresult"]]:
         # Check if the query is prepared or needs preparing
         prep, name = self._conn._prepared.get(pgq, prepare)
         if prep is Prepare.YES:
             # The query is already prepared
-            self._send_query_prepared(name, pgq)
+            self._send_query_prepared(name, pgq, binary=binary)
 
         elif prep is Prepare.NO:
             # The query must be executed without preparing
-            self._execute_send(pgq)
+            self._execute_send(pgq, binary=binary)
 
         else:
             # The query must be prepared and executed
@@ -237,7 +244,7 @@ class BaseCursor(Generic[ConnectionType, Row]):
                 raise e.error_from_result(
                     result, encoding=self._conn.client_encoding
                 )
-            self._send_query_prepared(name, pgq)
+            self._send_query_prepared(name, pgq, binary=binary)
 
         # run the query
         results = yield from execute(self._conn.pgconn)
@@ -251,12 +258,16 @@ class BaseCursor(Generic[ConnectionType, Row]):
         return results
 
     def _stream_send_gen(
-        self, query: Query, params: Optional[Params] = None
+        self,
+        query: Query,
+        params: Optional[Params] = None,
+        *,
+        binary: Optional[bool] = None,
     ) -> PQGen[None]:
         """Generator to send the query for `Cursor.stream()`."""
         yield from self._start_query(query)
         pgq = self._convert_query(query, params)
-        self._execute_send(pgq, no_pqexec=True)
+        self._execute_send(pgq, binary=binary, no_pqexec=True)
         self._conn.pgconn.set_single_row_mode()
         self._last_query = query
 
@@ -317,21 +328,30 @@ class BaseCursor(Generic[ConnectionType, Row]):
         self._tx.set_pgresult(result)
 
     def _execute_send(
-        self, query: PostgresQuery, no_pqexec: bool = False
+        self,
+        query: PostgresQuery,
+        *,
+        no_pqexec: bool = False,
+        binary: Optional[bool] = None,
     ) -> None:
         """
         Implement part of execute() before waiting common to sync and async.
 
         This is not a generator, but a normal non-blocking function.
         """
+        if binary is None:
+            fmt = self.format
+        else:
+            fmt = Format.BINARY if binary else Format.TEXT
+
         self._query = query
-        if query.params or no_pqexec or self.format == Format.BINARY:
+        if query.params or no_pqexec or fmt == Format.BINARY:
             self._conn.pgconn.send_query_params(
                 query.query,
                 query.params,
                 param_formats=query.formats,
                 param_types=query.types,
-                result_format=self.format,
+                result_format=fmt,
             )
         else:
             # if we don't have to, let's use exec_ as it can run more than
@@ -356,7 +376,9 @@ class BaseCursor(Generic[ConnectionType, Row]):
         ExecStatus.COPY_BOTH,
     )
 
-    def _execute_results(self, results: Sequence["PGresult"]) -> None:
+    def _execute_results(
+        self, results: Sequence["PGresult"], format: Optional[Format] = None
+    ) -> None:
         """
         Implement part of execute() after waiting common to sync and async
 
@@ -371,7 +393,12 @@ class BaseCursor(Generic[ConnectionType, Row]):
 
         self._results = list(results)
         self.pgresult = results[0]
-        self._tx.set_pgresult(results[0])
+
+        # Note: the only reason to override format is to correclty set
+        # binary loaders on server-side cursors, because send_describe_portal
+        # only returns a text result.
+        self._tx.set_pgresult(results[0], format=format)
+
         self._make_row = self._make_row_maker()
         nrows = self.pgresult.command_tuples
         if nrows is not None:
@@ -402,12 +429,19 @@ class BaseCursor(Generic[ConnectionType, Row]):
             name, query.query, param_types=query.types
         )
 
-    def _send_query_prepared(self, name: bytes, pgq: PostgresQuery) -> None:
+    def _send_query_prepared(
+        self, name: bytes, pgq: PostgresQuery, *, binary: Optional[bool] = None
+    ) -> None:
+        if binary is None:
+            fmt = self.format
+        else:
+            fmt = Format.BINARY if binary else Format.TEXT
+
         self._conn.pgconn.send_query_prepared(
             name,
             pgq.params,
             param_formats=pgq.formats,
-            result_format=self.format,
+            result_format=fmt,
         )
 
     def _check_result(self) -> None:
@@ -505,6 +539,7 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
         params: Optional[Params] = None,
         *,
         prepare: Optional[bool] = None,
+        binary: Optional[bool] = None,
     ) -> AnyCursor:
         """
         Execute a query or command to the database.
@@ -512,7 +547,9 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
         try:
             with self._conn.lock:
                 self._conn.wait(
-                    self._execute_gen(query, params, prepare=prepare)
+                    self._execute_gen(
+                        query, params, prepare=prepare, binary=binary
+                    )
                 )
         except e.Error as ex:
             raise ex.with_traceback(None)
@@ -526,13 +563,19 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
             self._conn.wait(self._executemany_gen(query, params_seq))
 
     def stream(
-        self, query: Query, params: Optional[Params] = None
+        self,
+        query: Query,
+        params: Optional[Params] = None,
+        *,
+        binary: Optional[bool] = None,
     ) -> Iterator[Row]:
         """
         Iterate row-by-row on a result from the database.
         """
         with self._conn.lock:
-            self._conn.wait(self._stream_send_gen(query, params))
+            self._conn.wait(
+                self._stream_send_gen(query, params, binary=binary)
+            )
             first = True
             while self._conn.wait(self._stream_fetchone_gen(first)):
                 rec = self._tx.load_row(0, self._make_row)
index 36a6f152ada6a7f8041f57a05bd026ef5361fe4b..516b6f32fbb23db3b0a58ce995da208e1536bc6a 100644 (file)
@@ -66,11 +66,14 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
         params: Optional[Params] = None,
         *,
         prepare: Optional[bool] = None,
+        binary: Optional[bool] = None,
     ) -> AnyCursor:
         try:
             async with self._conn.lock:
                 await self._conn.wait(
-                    self._execute_gen(query, params, prepare=prepare)
+                    self._execute_gen(
+                        query, params, prepare=prepare, binary=binary
+                    )
                 )
         except e.Error as ex:
             raise ex.with_traceback(None)
@@ -83,10 +86,16 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
             await self._conn.wait(self._executemany_gen(query, params_seq))
 
     async def stream(
-        self, query: Query, params: Optional[Params] = None
+        self,
+        query: Query,
+        params: Optional[Params] = None,
+        *,
+        binary: Optional[bool] = None,
     ) -> AsyncIterator[Row]:
         async with self._conn.lock:
-            await self._conn.wait(self._stream_send_gen(query, params))
+            await self._conn.wait(
+                self._stream_send_gen(query, params, binary=binary)
+            )
             first = True
             while await self._conn.wait(self._stream_fetchone_gen(first)):
                 rec = self._tx.load_row(0, self._make_row)
index c8d956b6a410a81a7999a620696b5fa2a562e3f7..d74ee321a0832a3d87454385caacbab9e1a63008 100644 (file)
@@ -24,7 +24,7 @@ DEFAULT_ITERSIZE = 100
 
 
 class ServerCursorHelper(Generic[ConnectionType, Row]):
-    __slots__ = ("name", "scrollable", "withhold", "described")
+    __slots__ = ("name", "scrollable", "withhold", "described", "format")
     """Helper object for common ServerCursor code.
 
     TODO: this should be a mixin, but couldn't find a way to work it
@@ -41,6 +41,7 @@ class ServerCursorHelper(Generic[ConnectionType, Row]):
         self.scrollable = scrollable
         self.withhold = withhold
         self.described = False
+        self.format = pq.Format.TEXT
 
     def _repr(self, cur: BaseCursor[ConnectionType, Row]) -> str:
         cls = f"{cur.__class__.__module__}.{cur.__class__.__qualname__}"
@@ -85,7 +86,7 @@ class ServerCursorHelper(Generic[ConnectionType, Row]):
             self.name.encode(conn.client_encoding)
         )
         results = yield from execute(conn.pgconn)
-        cur._execute_results(results)
+        cur._execute_results(results, format=self.format)
         self.described = True
 
     def _close_gen(self, cur: BaseCursor[ConnectionType, Row]) -> PQGen[None]:
@@ -128,7 +129,9 @@ class ServerCursorHelper(Generic[ConnectionType, Row]):
         query = sql.SQL("FETCH FORWARD {} FROM {}").format(
             howmuch, sql.Identifier(self.name)
         )
-        res = yield from cur._conn._exec_command(query)
+        res = yield from cur._conn._exec_command(
+            query, result_format=self.format
+        )
 
         cur.pgresult = res
         cur._tx.set_pgresult(res, set_loaders=False)
@@ -239,6 +242,8 @@ class ServerCursor(Cursor[Row]):
         self: AnyCursor,
         query: Query,
         params: Optional[Params] = None,
+        *,
+        binary: Optional[bool] = None,
         **kwargs: Any,
     ) -> AnyCursor:
         """
@@ -248,6 +253,12 @@ class ServerCursor(Cursor[Row]):
             raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
         helper = cast(ServerCursor[Row], self)._helper
         query = helper._make_declare_statement(self, query)
+
+        if binary is None:
+            helper.format = self.format
+        else:
+            helper.format = pq.Format.BINARY if binary else pq.Format.TEXT
+
         with self._conn.lock:
             self._conn.wait(helper._declare_gen(self, query, params))
         return self
@@ -355,12 +366,20 @@ class AsyncServerCursor(AsyncCursor[Row]):
         self: AnyCursor,
         query: Query,
         params: Optional[Params] = None,
+        *,
+        binary: Optional[bool] = None,
         **kwargs: Any,
     ) -> AnyCursor:
         if kwargs:
             raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
         helper = cast(AsyncServerCursor[Row], self)._helper
         query = helper._make_declare_statement(self, query)
+
+        if binary is None:
+            helper.format = self.format
+        else:
+            helper.format = pq.Format.BINARY if binary else pq.Format.TEXT
+
         async with self._conn.lock:
             await self._conn.wait(helper._declare_gen(self, query, params))
         return self
index e4f6b3e14763fb3f365b447b1bc27d7f02d4750b..04fe7b990c0f4d5284ef471642b7e431fb16117e 100644 (file)
@@ -25,7 +25,11 @@ class Transformer(abc.AdaptContext):
     @property
     def pgresult(self) -> Optional[PGresult]: ...
     def set_pgresult(
-        self, result: Optional["PGresult"], set_loaders: bool = True
+        self,
+        result: Optional["PGresult"],
+        *,
+        set_loaders: bool = True,
+        format: Optional[pq.Format] = None,
     ) -> None: ...
     def set_row_types(
         self, types: Sequence[int], formats: Sequence[pq.Format]
index 66627c2f24e084ba58ab5c1baaeebd8db02eaf77..3029bbc6ff8ad89d605fc2266eb673e03afabdd9 100644 (file)
@@ -98,7 +98,12 @@ cdef class Transformer:
     def pgresult(self) -> Optional[PGresult]:
         return self._pgresult
 
-    cpdef void set_pgresult(self, pq.PGresult result, object set_loaders = True):
+    cpdef void set_pgresult(
+        self,
+        pq.PGresult result,
+        object set_loaders = True,
+        object format = None
+    ):
         self._pgresult = result
 
         if result is None:
@@ -123,7 +128,7 @@ cdef class Transformer:
                 Py_INCREF(tmp)
                 PyList_SET_ITEM(types, i, tmp)
 
-                tmp = libpq.PQfformat(res, i)
+                tmp = libpq.PQfformat(res, i) if format is None else format
                 Py_INCREF(tmp)
                 PyList_SET_ITEM(formats, i, tmp)
 
index 6d301dc5fc374def235314dca20ba58e69dfd077..3954a3ce5f88127e3004646d74c3cf697e83baab 100644 (file)
@@ -109,7 +109,7 @@ def patch_exec(conn, monkeypatch):
     _orig_exec_command = conn._exec_command
     L = ListPopAll()
 
-    def _exec_command(command):
+    def _exec_command(command, *args, **kwargs):
         cmdcopy = command
         if isinstance(cmdcopy, bytes):
             cmdcopy = cmdcopy.decode(conn.client_encoding)
@@ -117,7 +117,7 @@ def patch_exec(conn, monkeypatch):
             cmdcopy = cmdcopy.as_string(conn)
 
         L.insert(0, cmdcopy)
-        return _orig_exec_command(command)
+        return _orig_exec_command(command, *args, **kwargs)
 
     monkeypatch.setattr(conn, "_exec_command", _exec_command)
     return L
index 8171881fe1f997729eace9a0eff3753664c2dd7e..23e8407fdf04825a309466e9f46ffd9ea75b9721 100644 (file)
@@ -504,6 +504,8 @@ def test_notify_handlers(conn):
 def test_execute(conn):
     cur = conn.execute("select %s, %s", [10, 20])
     assert cur.fetchone() == (10, 20)
+    assert cur.format == 0
+    assert cur.pgresult.fformat(0) == 0
 
     cur = conn.execute("select %(a)s, %(b)s", {"a": 11, "b": 21})
     assert cur.fetchone() == (11, 21)
@@ -512,6 +514,13 @@ def test_execute(conn):
     assert cur.fetchone() == (12, 22)
 
 
+def test_execute_binary(conn):
+    cur = conn.execute("select %s, %s", [10, 20], binary=True)
+    assert cur.fetchone() == (10, 20)
+    assert cur.format == 1
+    assert cur.pgresult.fformat(0) == 1
+
+
 def test_row_factory(dsn):
     conn = Connection.connect(dsn)
     assert conn.row_factory is tuple_row
index 46ccdc43554e2b7353cccbe31bae067a1dc18a84..57ecfa6b81ced120dc166fd13e5c3efd6c6ee74c 100644 (file)
@@ -523,6 +523,8 @@ async def test_notify_handlers(aconn):
 async def test_execute(aconn):
     cur = await aconn.execute("select %s, %s", [10, 20])
     assert await cur.fetchone() == (10, 20)
+    assert cur.format == 0
+    assert cur.pgresult.fformat(0) == 0
 
     cur = await aconn.execute("select %(a)s, %(b)s", {"a": 11, "b": 21})
     assert await cur.fetchone() == (11, 21)
@@ -531,6 +533,13 @@ async def test_execute(aconn):
     assert await cur.fetchone() == (12, 22)
 
 
+async def test_execute_binary(aconn):
+    cur = await aconn.execute("select %s, %s", [10, 20], binary=True)
+    assert await cur.fetchone() == (10, 20)
+    assert cur.format == 1
+    assert cur.pgresult.fformat(0) == 1
+
+
 async def test_row_factory(dsn):
     conn = await AsyncConnection.connect(dsn)
     assert conn.row_factory is tuple_row
index 214971f0d318fc500b5b3598a67088fca3cd5126..686b20902aabf034da2b2f5889a45735512d3d57 100644 (file)
@@ -109,23 +109,33 @@ def test_fetchone(conn):
     assert cur.pgresult.fformat(0) == 0
 
     row = cur.fetchone()
-    assert row[0] == 1
-    assert row[1] == "foo"
-    assert row[2] is None
+    assert row == (1, "foo", None)
     row = cur.fetchone()
     assert row is None
 
 
-def test_execute_binary_result(conn):
+def test_binary_cursor_execute(conn):
     cur = conn.cursor(binary=True)
-    cur.execute("select %s::text, %s::text", ["foo", None])
+    cur.execute("select %s, %s", [1, None])
+    assert cur.fetchone() == (1, None)
     assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x01"
 
-    row = cur.fetchone()
-    assert row[0] == "foo"
-    assert row[1] is None
-    row = cur.fetchone()
-    assert row is None
+
+def test_execute_binary(conn):
+    cur = conn.cursor()
+    cur.execute("select %s, %s", [1, None], binary=True)
+    assert cur.fetchone() == (1, None)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x01"
+
+
+def test_binary_cursor_text_override(conn):
+    cur = conn.cursor(binary=True)
+    cur.execute("select %s, %s", [1, None], binary=False)
+    assert cur.fetchone() == (1, None)
+    assert cur.pgresult.fformat(0) == 0
+    assert cur.pgresult.get_value(0, 0) == b"1"
 
 
 @pytest.mark.parametrize("encoding", ["utf8", "latin9"])
@@ -464,6 +474,39 @@ def test_stream_badquery(conn, query):
             pass
 
 
+def test_stream_binary_cursor(conn):
+    cur = conn.cursor(binary=True)
+    recs = []
+    for rec in cur.stream("select generate_series(1, 2)"):
+        recs.append(rec)
+        assert cur.pgresult.fformat(0) == 1
+        assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
+
+    assert recs == [(1,), (2,)]
+
+
+def test_stream_execute_binary(conn):
+    cur = conn.cursor()
+    recs = []
+    for rec in cur.stream("select generate_series(1, 2)", binary=True):
+        recs.append(rec)
+        assert cur.pgresult.fformat(0) == 1
+        assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
+
+    assert recs == [(1,), (2,)]
+
+
+def test_stream_binary_cursor_text_override(conn):
+    cur = conn.cursor(binary=True)
+    recs = []
+    for rec in cur.stream("select generate_series(1, 2)", binary=False):
+        recs.append(rec)
+        assert cur.pgresult.fformat(0) == 0
+        assert cur.pgresult.get_value(0, 0) == str(rec[0]).encode("utf8")
+
+    assert recs == [(1,), (2,)]
+
+
 class TestColumn:
     def test_description_attribs(self, conn):
         curs = conn.cursor()
index 7f3c1c22e54040ae5ba9f372dc4b08bea73e9899..4f51334563b74876b8194f269ef8630885521563 100644 (file)
@@ -112,23 +112,33 @@ async def test_fetchone(aconn):
     assert cur.pgresult.fformat(0) == 0
 
     row = await cur.fetchone()
-    assert row[0] == 1
-    assert row[1] == "foo"
-    assert row[2] is None
+    assert row == (1, "foo", None)
     row = await cur.fetchone()
     assert row is None
 
 
-async def test_execute_binary_result(aconn):
+async def test_binary_cursor_execute(aconn):
     cur = aconn.cursor(binary=True)
-    await cur.execute("select %s::text, %s::text", ["foo", None])
+    await cur.execute("select %s, %s", [1, None])
+    assert (await cur.fetchone()) == (1, None)
     assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x01"
 
-    row = await cur.fetchone()
-    assert row[0] == "foo"
-    assert row[1] is None
-    row = await cur.fetchone()
-    assert row is None
+
+async def test_execute_binary(aconn):
+    cur = aconn.cursor()
+    await cur.execute("select %s, %s", [1, None], binary=True)
+    assert (await cur.fetchone()) == (1, None)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x01"
+
+
+async def test_binary_cursor_text_override(aconn):
+    cur = aconn.cursor(binary=True)
+    await cur.execute("select %s, %s", [1, None], binary=False)
+    assert (await cur.fetchone()) == (1, None)
+    assert cur.pgresult.fformat(0) == 0
+    assert cur.pgresult.get_value(0, 0) == b"1"
 
 
 @pytest.mark.parametrize("encoding", ["utf8", "latin9"])
@@ -460,6 +470,39 @@ async def test_stream_badquery(aconn, query):
             pass
 
 
+async def test_stream_binary_cursor(aconn):
+    cur = aconn.cursor(binary=True)
+    recs = []
+    async for rec in cur.stream("select generate_series(1, 2)"):
+        recs.append(rec)
+        assert cur.pgresult.fformat(0) == 1
+        assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
+
+    assert recs == [(1,), (2,)]
+
+
+async def test_stream_execute_binary(aconn):
+    cur = aconn.cursor()
+    recs = []
+    async for rec in cur.stream("select generate_series(1, 2)", binary=True):
+        recs.append(rec)
+        assert cur.pgresult.fformat(0) == 1
+        assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
+
+    assert recs == [(1,), (2,)]
+
+
+async def test_stream_binary_cursor_text_override(aconn):
+    cur = aconn.cursor(binary=True)
+    recs = []
+    async for rec in cur.stream("select generate_series(1, 2)", binary=False):
+        recs.append(rec)
+        assert cur.pgresult.fformat(0) == 0
+        assert cur.pgresult.get_value(0, 0) == str(rec[0]).encode("utf8")
+
+    assert recs == [(1,), (2,)]
+
+
 async def test_str(aconn):
     cur = aconn.cursor()
     assert "[IDLE]" in str(cur)
index b595658b6725aafbe797d9f4892b2e6539e000f0..67773c27036d83b61556a19e91966ff681508b64 100644 (file)
@@ -50,6 +50,49 @@ def test_query_params(conn):
         assert cur._query.params == [bytes([0, 3])]  # 3 as binary int2
 
 
+def test_binary_cursor_execute(conn):
+    cur = conn.cursor("foo", binary=True)
+    cur.execute("select generate_series(1, 2)")
+    assert cur.fetchone() == (1,)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+    assert cur.fetchone() == (2,)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x02"
+
+
+def test_execute_binary(conn):
+    cur = conn.cursor("foo")
+    cur.execute("select generate_series(1, 2)", binary=True)
+    assert cur.fetchone() == (1,)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+    assert cur.fetchone() == (2,)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x02"
+
+    cur.execute("select generate_series(1, 1)")
+    assert cur.fetchone() == (1,)
+    assert cur.pgresult.fformat(0) == 0
+    assert cur.pgresult.get_value(0, 0) == b"1"
+
+
+def test_binary_cursor_text_override(conn):
+    cur = conn.cursor("foo", binary=True)
+    cur.execute("select generate_series(1, 2)", binary=False)
+    assert cur.fetchone() == (1,)
+    assert cur.pgresult.fformat(0) == 0
+    assert cur.pgresult.get_value(0, 0) == b"1"
+    assert cur.fetchone() == (2,)
+    assert cur.pgresult.fformat(0) == 0
+    assert cur.pgresult.get_value(0, 0) == b"2"
+
+    cur.execute("select generate_series(1, 2)")
+    assert cur.fetchone() == (1,)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+
+
 def test_close(conn, recwarn, retries):
     for retry in retries:
         with retry:
index bfaa959c41f4e12e00d9867242b53bf9272c4f59..5814ed7f4b68035ca621d834dd2f2e20e6116b8a 100644 (file)
@@ -52,6 +52,49 @@ async def test_query_params(aconn):
         assert cur._query.params == [bytes([0, 3])]  # 3 as binary int2
 
 
+async def test_binary_cursor_execute(aconn):
+    cur = aconn.cursor("foo", binary=True)
+    await cur.execute("select generate_series(1, 2)")
+    assert (await cur.fetchone()) == (1,)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+    assert (await cur.fetchone()) == (2,)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x02"
+
+
+async def test_execute_binary(aconn):
+    cur = aconn.cursor("foo")
+    await cur.execute("select generate_series(1, 2)", binary=True)
+    assert (await cur.fetchone()) == (1,)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+    assert (await cur.fetchone()) == (2,)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x02"
+
+    await cur.execute("select generate_series(1, 1)")
+    assert (await cur.fetchone()) == (1,)
+    assert cur.pgresult.fformat(0) == 0
+    assert cur.pgresult.get_value(0, 0) == b"1"
+
+
+async def test_binary_cursor_text_override(aconn):
+    cur = aconn.cursor("foo", binary=True)
+    await cur.execute("select generate_series(1, 2)", binary=False)
+    assert (await cur.fetchone()) == (1,)
+    assert cur.pgresult.fformat(0) == 0
+    assert cur.pgresult.get_value(0, 0) == b"1"
+    assert (await cur.fetchone()) == (2,)
+    assert cur.pgresult.fformat(0) == 0
+    assert cur.pgresult.get_value(0, 0) == b"2"
+
+    await cur.execute("select generate_series(1, 2)")
+    assert (await cur.fetchone()) == (1,)
+    assert cur.pgresult.fformat(0) == 1
+    assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+
+
 async def test_close(aconn, recwarn, retries):
     async for retry in retries:
         with retry: