.. seealso:: See :ref:`row-factories` for details about defining the
objects returned by cursors.
- .. automethod:: execute(query, params=None, prepare=None) -> Cursor
+ .. automethod:: execute
:param query: The query to execute.
:type query: `!str`, `!bytes`, or `sql.Composable`
:param prepare: Force (`!True`) or disallow (`!False`) preparation of
the query. By default (`!None`) prepare automatically. See
:ref:`prepared-statements`.
+ :param binary: If `!True` the cursor will return binary values from the
+ database. All the types returned by the query must have a binary
+ loader. See :ref:`binary-data` for details.
- The cursor is what returned calling `cursor()` without parameters. The
- parameters are passed to its `~Cursor.execute()` and the cursor is
- returned.
+ The method simply creates a `Cursor` instance, `~Cursor.execute()` the
+ query requested, and return it.
See :ref:`query-parameters` for all the details about executing
queries.
.. autoattribute:: row_factory
- .. automethod:: execute(query, params=None, prepare=None) -> AsyncCursor
+ .. automethod:: execute
.. automethod:: commit
.. automethod:: rollback
.. rubric:: Methods to send commands
- .. automethod:: execute(query, params=None, *, prepare=None) -> Cursor
+ .. automethod:: execute
:param query: The query to execute.
:type query: `!str`, `!bytes`, or `sql.Composable`
:param prepare: Force (`!True`) or disallow (`!False`) preparation of
the query. By default (`!None`) prepare automatically. See
:ref:`prepared-statements`.
+ :param binary: Specify whether the server shoul return data in binary
+ format (`!True`) or in text format (`!False`). By default
+ (`!None`) return data as requested by the cursor's `~Cursor.format`.
Return the cursor itself, so that it will be possible to chain a fetch
operation after the call.
The format of the data returned by the queries. It can be selected
initially e.g. specifying `Connection.cursor`\ ``(binary=True)`` and
- changed during the cursor's lifetime.
+ changed during the cursor's lifetime. It is also possible to override
+ the value for single queries, e.g. specifying `execute`\
+ ``(binary=True)``.
- :type: pq.Format
+ :type: `pq.Format`
+ :default: `~pq.Format.TEXT`
- .. admonition:: TODO
-
- Add `execute`\ ``(binary=True)`` too?
+ .. seealso:: :ref:`binary-data`
.. rubric:: Methods to retrieve results
is especially useful so that the cursor is closed at the end of
the block.
- .. automethod:: execute(query, params=None, *) -> ServerCursor
+ .. automethod:: execute
:param query: The query to execute.
:type query: `!str`, `!bytes`, or `sql.Composable`
:param params: The parameters to pass to the query, if any.
:type params: Sequence or Mapping
+ :param binary: Specify whether the server shoul return data in binary
+ format (`!True`) or in text format (`!False`). By default
+ (`!None`) return data as requested by the cursor's `~Cursor.format`.
Create a server cursor with given `name` and the *query* in argument.
to close the cursor automatically when the block is exited.
- .. automethod:: execute(query, params=None, *, prepare=None) -> AsyncCursor
- .. automethod:: executemany(query: Query, params_seq: Sequence[Args])
- .. automethod:: copy(statement: Query) -> AsyncCopy
+ .. automethod:: execute
+ .. automethod:: executemany
+ .. automethod:: copy
.. note::
async with conn.cursor("name") as cursor:
...
- .. automethod:: execute(query, params=None) -> AsyncServerCursor
- .. automethod:: executemany(query: Query, params_seq: Sequence[Args])
+ .. automethod:: execute
+ .. automethod:: executemany
.. automethod:: fetchone
.. automethod:: fetchmany
.. automethod:: fetchall
Binary parameters and results
-----------------------------
-PostgreSQL has two different ways to represent data type on the wire:
-`~psycopg.pq.Format.TEXT`, always available, and
-`~psycopg.pq.Format.BINARY`, available most of the times. Usually the binary
-format is more efficient to use.
+PostgreSQL has two different ways to transmit data between client and server:
+`~psycopg.pq.Format.TEXT`, always available, and `~psycopg.pq.Format.BINARY`,
+available most of the times but not always. Usually the binary format is more
+efficient to use.
-Psycopg can support both the formats of each data type. Whenever a value
+Psycopg can support both the formats for each data type. Whenever a value
is passed to a query using the normal ``%s`` placeholder, the best format
available is chosen (often, but not always, the binary format is picked as the
best choice).
If you have a reason to select explicitly the binary format or the text format
for a value you can use respectively a ``%b`` placeholder or a ``%t``
-placeholder instead. `~Cursor.execute()` will fail if a
-`~psycopg.adapt.Dumper` for the right parameter type and format is not
-available.
+placeholder instead of the normal ``%s``. `~Cursor.execute()` will fail if a
+`~psycopg.adapt.Dumper` for the right data type and format is not available.
The same two formats, text or binary, are used by PostgreSQL to return data
from a query to the client. Unlike with parameters, where you can choose the
format value-by-value, all the columns returned by a query will have the same
-format. For each of the types returned by the query, a
-`~psycopg.adapt.Loader` must be available, otherwise the data will be
-returned as unparsed `!str` or buffer.
-
-.. warning::
-
- Currently the default is to return values from database queries in textual
- type, for the simple reason that not all the PostgreSQL data types have a
- binary `!Loader` implemented. The plan is to support the binary format for
- all PostgreSQL builtins before the first Psycopg 3 released: TODO!
-
-By default the data will be returned in text format. In order to return data
-in binary format you can create the cursor using `Connection.cursor`\
-``(binary=True)``.
-
-.. admonition:: TODO
-
- add a `Cursor.execute`\ ``(binary=True)`` parameter?
-
+format. Every type returned by the query should have a `~psycopg.adapt.Loader`
+configured, otherwise the data will be returned as unparsed `!str` (for text
+results) or buffer (for binary results).
+
+.. note::
+ The `pg_type`_ table defines which format is supported for each PostgreSQL
+ data type. Text input/output is managed by the functions declared in the
+ ``typinput`` and ``typoutput`` fields (always present), binary
+ input/output is managed by the ``typsend`` and ``typreceive`` (which are
+ optional).
+
+ .. _pg_type: https://www.postgresql.org/docs/current/catalog-pg-type.html
+
+Because not every PostgreSQL type supports binary output, By default the data
+will be returned in text format. In order to return data in binary format you
+can create the cursor using `Connection.cursor`\ ``(binary=True)`` or execute
+the query using `Cursor.execute`\ ``(binary=True)``. A case in which
+requesting binary results is a clear winner is when you have large binary data
+in the database, such as images::
+
+ cur.execute(
+ "select image_data from images where id = %s", [image_id], binary=True)
+ data = cur.fetchone()[0]
.. admonition:: TODO
- - pass parameters in binary with ``%b``
- - return parameters in binary with `!cursor(binary=True)`
- cannot pass multiple statements in binary
return self._pgresult
def set_pgresult(
- self, result: Optional["PGresult"], set_loaders: bool = True
+ self,
+ result: Optional["PGresult"],
+ *,
+ set_loaders: bool = True,
+ format: Optional[pq.Format] = None,
) -> None:
self._pgresult = result
rc = self._row_loaders = []
for i in range(nf):
oid = result.ftype(i)
- fmt = result.fformat(i)
+ fmt = result.fformat(i) if format is None else format
rc.append(self.get_loader(oid, fmt).load) # type: ignore
def set_row_types(
...
def set_pgresult(
- self, result: Optional["PGresult"], set_loaders: bool = True
+ self,
+ result: Optional["PGresult"],
+ *,
+ set_loaders: bool = True,
+ format: Optional[pq.Format] = None
) -> None:
...
conn._autocommit = bool(autocommit)
return conn
- def _exec_command(self, command: Query) -> PQGen["PGresult"]:
+ def _exec_command(
+ self, command: Query, result_format: Format = Format.TEXT
+ ) -> PQGen["PGresult"]:
"""
Generator to send a command and receive the result to the backend.
elif isinstance(command, Composable):
command = command.as_bytes(self)
- self.pgconn.send_query(command)
+ if result_format == Format.TEXT:
+ self.pgconn.send_query(command)
+ else:
+ self.pgconn.send_query_params(
+ command, None, result_format=result_format
+ )
+
result = (yield from execute(self.pgconn))[-1]
if result.status not in (ExecStatus.COMMAND_OK, ExecStatus.TUPLES_OK):
if result.status == ExecStatus.FATAL_ERROR:
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
+ binary: bool = False,
) -> Cursor[Row]:
"""Execute a query and return a cursor to read its results."""
cur = self.cursor()
+ if binary:
+ cur.format = Format.BINARY
+
try:
return cur.execute(query, params, prepare=prepare)
except e.Error as ex:
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
+ binary: bool = False,
) -> AsyncCursor[Row]:
cur = self.cursor()
+ if binary:
+ cur.format = Format.BINARY
+
try:
return await cur.execute(query, params, prepare=prepare)
except e.Error as ex:
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
+ binary: Optional[bool] = None,
) -> PQGen[None]:
"""Generator implementing `Cursor.execute()`."""
yield from self._start_query(query)
pgq = self._convert_query(query, params)
- results = yield from self._maybe_prepare_gen(pgq, prepare)
+ results = yield from self._maybe_prepare_gen(
+ pgq, prepare=prepare, binary=binary
+ )
self._execute_results(results)
self._last_query = query
else:
pgq.dump(params)
- results = yield from self._maybe_prepare_gen(pgq, True)
+ results = yield from self._maybe_prepare_gen(pgq, prepare=True)
self._execute_results(results)
self._last_query = query
def _maybe_prepare_gen(
- self, pgq: PostgresQuery, prepare: Optional[bool]
+ self,
+ pgq: PostgresQuery,
+ *,
+ prepare: Optional[bool] = None,
+ binary: Optional[bool] = None,
) -> PQGen[Sequence["PGresult"]]:
# Check if the query is prepared or needs preparing
prep, name = self._conn._prepared.get(pgq, prepare)
if prep is Prepare.YES:
# The query is already prepared
- self._send_query_prepared(name, pgq)
+ self._send_query_prepared(name, pgq, binary=binary)
elif prep is Prepare.NO:
# The query must be executed without preparing
- self._execute_send(pgq)
+ self._execute_send(pgq, binary=binary)
else:
# The query must be prepared and executed
raise e.error_from_result(
result, encoding=self._conn.client_encoding
)
- self._send_query_prepared(name, pgq)
+ self._send_query_prepared(name, pgq, binary=binary)
# run the query
results = yield from execute(self._conn.pgconn)
return results
def _stream_send_gen(
- self, query: Query, params: Optional[Params] = None
+ self,
+ query: Query,
+ params: Optional[Params] = None,
+ *,
+ binary: Optional[bool] = None,
) -> PQGen[None]:
"""Generator to send the query for `Cursor.stream()`."""
yield from self._start_query(query)
pgq = self._convert_query(query, params)
- self._execute_send(pgq, no_pqexec=True)
+ self._execute_send(pgq, binary=binary, no_pqexec=True)
self._conn.pgconn.set_single_row_mode()
self._last_query = query
self._tx.set_pgresult(result)
def _execute_send(
- self, query: PostgresQuery, no_pqexec: bool = False
+ self,
+ query: PostgresQuery,
+ *,
+ no_pqexec: bool = False,
+ binary: Optional[bool] = None,
) -> None:
"""
Implement part of execute() before waiting common to sync and async.
This is not a generator, but a normal non-blocking function.
"""
+ if binary is None:
+ fmt = self.format
+ else:
+ fmt = Format.BINARY if binary else Format.TEXT
+
self._query = query
- if query.params or no_pqexec or self.format == Format.BINARY:
+ if query.params or no_pqexec or fmt == Format.BINARY:
self._conn.pgconn.send_query_params(
query.query,
query.params,
param_formats=query.formats,
param_types=query.types,
- result_format=self.format,
+ result_format=fmt,
)
else:
# if we don't have to, let's use exec_ as it can run more than
ExecStatus.COPY_BOTH,
)
- def _execute_results(self, results: Sequence["PGresult"]) -> None:
+ def _execute_results(
+ self, results: Sequence["PGresult"], format: Optional[Format] = None
+ ) -> None:
"""
Implement part of execute() after waiting common to sync and async
self._results = list(results)
self.pgresult = results[0]
- self._tx.set_pgresult(results[0])
+
+ # Note: the only reason to override format is to correclty set
+ # binary loaders on server-side cursors, because send_describe_portal
+ # only returns a text result.
+ self._tx.set_pgresult(results[0], format=format)
+
self._make_row = self._make_row_maker()
nrows = self.pgresult.command_tuples
if nrows is not None:
name, query.query, param_types=query.types
)
- def _send_query_prepared(self, name: bytes, pgq: PostgresQuery) -> None:
+ def _send_query_prepared(
+ self, name: bytes, pgq: PostgresQuery, *, binary: Optional[bool] = None
+ ) -> None:
+ if binary is None:
+ fmt = self.format
+ else:
+ fmt = Format.BINARY if binary else Format.TEXT
+
self._conn.pgconn.send_query_prepared(
name,
pgq.params,
param_formats=pgq.formats,
- result_format=self.format,
+ result_format=fmt,
)
def _check_result(self) -> None:
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
+ binary: Optional[bool] = None,
) -> AnyCursor:
"""
Execute a query or command to the database.
try:
with self._conn.lock:
self._conn.wait(
- self._execute_gen(query, params, prepare=prepare)
+ self._execute_gen(
+ query, params, prepare=prepare, binary=binary
+ )
)
except e.Error as ex:
raise ex.with_traceback(None)
self._conn.wait(self._executemany_gen(query, params_seq))
def stream(
- self, query: Query, params: Optional[Params] = None
+ self,
+ query: Query,
+ params: Optional[Params] = None,
+ *,
+ binary: Optional[bool] = None,
) -> Iterator[Row]:
"""
Iterate row-by-row on a result from the database.
"""
with self._conn.lock:
- self._conn.wait(self._stream_send_gen(query, params))
+ self._conn.wait(
+ self._stream_send_gen(query, params, binary=binary)
+ )
first = True
while self._conn.wait(self._stream_fetchone_gen(first)):
rec = self._tx.load_row(0, self._make_row)
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
+ binary: Optional[bool] = None,
) -> AnyCursor:
try:
async with self._conn.lock:
await self._conn.wait(
- self._execute_gen(query, params, prepare=prepare)
+ self._execute_gen(
+ query, params, prepare=prepare, binary=binary
+ )
)
except e.Error as ex:
raise ex.with_traceback(None)
await self._conn.wait(self._executemany_gen(query, params_seq))
async def stream(
- self, query: Query, params: Optional[Params] = None
+ self,
+ query: Query,
+ params: Optional[Params] = None,
+ *,
+ binary: Optional[bool] = None,
) -> AsyncIterator[Row]:
async with self._conn.lock:
- await self._conn.wait(self._stream_send_gen(query, params))
+ await self._conn.wait(
+ self._stream_send_gen(query, params, binary=binary)
+ )
first = True
while await self._conn.wait(self._stream_fetchone_gen(first)):
rec = self._tx.load_row(0, self._make_row)
class ServerCursorHelper(Generic[ConnectionType, Row]):
- __slots__ = ("name", "scrollable", "withhold", "described")
+ __slots__ = ("name", "scrollable", "withhold", "described", "format")
"""Helper object for common ServerCursor code.
TODO: this should be a mixin, but couldn't find a way to work it
self.scrollable = scrollable
self.withhold = withhold
self.described = False
+ self.format = pq.Format.TEXT
def _repr(self, cur: BaseCursor[ConnectionType, Row]) -> str:
cls = f"{cur.__class__.__module__}.{cur.__class__.__qualname__}"
self.name.encode(conn.client_encoding)
)
results = yield from execute(conn.pgconn)
- cur._execute_results(results)
+ cur._execute_results(results, format=self.format)
self.described = True
def _close_gen(self, cur: BaseCursor[ConnectionType, Row]) -> PQGen[None]:
query = sql.SQL("FETCH FORWARD {} FROM {}").format(
howmuch, sql.Identifier(self.name)
)
- res = yield from cur._conn._exec_command(query)
+ res = yield from cur._conn._exec_command(
+ query, result_format=self.format
+ )
cur.pgresult = res
cur._tx.set_pgresult(res, set_loaders=False)
self: AnyCursor,
query: Query,
params: Optional[Params] = None,
+ *,
+ binary: Optional[bool] = None,
**kwargs: Any,
) -> AnyCursor:
"""
raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
helper = cast(ServerCursor[Row], self)._helper
query = helper._make_declare_statement(self, query)
+
+ if binary is None:
+ helper.format = self.format
+ else:
+ helper.format = pq.Format.BINARY if binary else pq.Format.TEXT
+
with self._conn.lock:
self._conn.wait(helper._declare_gen(self, query, params))
return self
self: AnyCursor,
query: Query,
params: Optional[Params] = None,
+ *,
+ binary: Optional[bool] = None,
**kwargs: Any,
) -> AnyCursor:
if kwargs:
raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
helper = cast(AsyncServerCursor[Row], self)._helper
query = helper._make_declare_statement(self, query)
+
+ if binary is None:
+ helper.format = self.format
+ else:
+ helper.format = pq.Format.BINARY if binary else pq.Format.TEXT
+
async with self._conn.lock:
await self._conn.wait(helper._declare_gen(self, query, params))
return self
@property
def pgresult(self) -> Optional[PGresult]: ...
def set_pgresult(
- self, result: Optional["PGresult"], set_loaders: bool = True
+ self,
+ result: Optional["PGresult"],
+ *,
+ set_loaders: bool = True,
+ format: Optional[pq.Format] = None,
) -> None: ...
def set_row_types(
self, types: Sequence[int], formats: Sequence[pq.Format]
def pgresult(self) -> Optional[PGresult]:
return self._pgresult
- cpdef void set_pgresult(self, pq.PGresult result, object set_loaders = True):
+ cpdef void set_pgresult(
+ self,
+ pq.PGresult result,
+ object set_loaders = True,
+ object format = None
+ ):
self._pgresult = result
if result is None:
Py_INCREF(tmp)
PyList_SET_ITEM(types, i, tmp)
- tmp = libpq.PQfformat(res, i)
+ tmp = libpq.PQfformat(res, i) if format is None else format
Py_INCREF(tmp)
PyList_SET_ITEM(formats, i, tmp)
_orig_exec_command = conn._exec_command
L = ListPopAll()
- def _exec_command(command):
+ def _exec_command(command, *args, **kwargs):
cmdcopy = command
if isinstance(cmdcopy, bytes):
cmdcopy = cmdcopy.decode(conn.client_encoding)
cmdcopy = cmdcopy.as_string(conn)
L.insert(0, cmdcopy)
- return _orig_exec_command(command)
+ return _orig_exec_command(command, *args, **kwargs)
monkeypatch.setattr(conn, "_exec_command", _exec_command)
return L
def test_execute(conn):
cur = conn.execute("select %s, %s", [10, 20])
assert cur.fetchone() == (10, 20)
+ assert cur.format == 0
+ assert cur.pgresult.fformat(0) == 0
cur = conn.execute("select %(a)s, %(b)s", {"a": 11, "b": 21})
assert cur.fetchone() == (11, 21)
assert cur.fetchone() == (12, 22)
+def test_execute_binary(conn):
+ cur = conn.execute("select %s, %s", [10, 20], binary=True)
+ assert cur.fetchone() == (10, 20)
+ assert cur.format == 1
+ assert cur.pgresult.fformat(0) == 1
+
+
def test_row_factory(dsn):
conn = Connection.connect(dsn)
assert conn.row_factory is tuple_row
async def test_execute(aconn):
cur = await aconn.execute("select %s, %s", [10, 20])
assert await cur.fetchone() == (10, 20)
+ assert cur.format == 0
+ assert cur.pgresult.fformat(0) == 0
cur = await aconn.execute("select %(a)s, %(b)s", {"a": 11, "b": 21})
assert await cur.fetchone() == (11, 21)
assert await cur.fetchone() == (12, 22)
+async def test_execute_binary(aconn):
+ cur = await aconn.execute("select %s, %s", [10, 20], binary=True)
+ assert await cur.fetchone() == (10, 20)
+ assert cur.format == 1
+ assert cur.pgresult.fformat(0) == 1
+
+
async def test_row_factory(dsn):
conn = await AsyncConnection.connect(dsn)
assert conn.row_factory is tuple_row
assert cur.pgresult.fformat(0) == 0
row = cur.fetchone()
- assert row[0] == 1
- assert row[1] == "foo"
- assert row[2] is None
+ assert row == (1, "foo", None)
row = cur.fetchone()
assert row is None
-def test_execute_binary_result(conn):
+def test_binary_cursor_execute(conn):
cur = conn.cursor(binary=True)
- cur.execute("select %s::text, %s::text", ["foo", None])
+ cur.execute("select %s, %s", [1, None])
+ assert cur.fetchone() == (1, None)
assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x01"
- row = cur.fetchone()
- assert row[0] == "foo"
- assert row[1] is None
- row = cur.fetchone()
- assert row is None
+
+def test_execute_binary(conn):
+ cur = conn.cursor()
+ cur.execute("select %s, %s", [1, None], binary=True)
+ assert cur.fetchone() == (1, None)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x01"
+
+
+def test_binary_cursor_text_override(conn):
+ cur = conn.cursor(binary=True)
+ cur.execute("select %s, %s", [1, None], binary=False)
+ assert cur.fetchone() == (1, None)
+ assert cur.pgresult.fformat(0) == 0
+ assert cur.pgresult.get_value(0, 0) == b"1"
@pytest.mark.parametrize("encoding", ["utf8", "latin9"])
pass
+def test_stream_binary_cursor(conn):
+ cur = conn.cursor(binary=True)
+ recs = []
+ for rec in cur.stream("select generate_series(1, 2)"):
+ recs.append(rec)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
+
+ assert recs == [(1,), (2,)]
+
+
+def test_stream_execute_binary(conn):
+ cur = conn.cursor()
+ recs = []
+ for rec in cur.stream("select generate_series(1, 2)", binary=True):
+ recs.append(rec)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
+
+ assert recs == [(1,), (2,)]
+
+
+def test_stream_binary_cursor_text_override(conn):
+ cur = conn.cursor(binary=True)
+ recs = []
+ for rec in cur.stream("select generate_series(1, 2)", binary=False):
+ recs.append(rec)
+ assert cur.pgresult.fformat(0) == 0
+ assert cur.pgresult.get_value(0, 0) == str(rec[0]).encode("utf8")
+
+ assert recs == [(1,), (2,)]
+
+
class TestColumn:
def test_description_attribs(self, conn):
curs = conn.cursor()
assert cur.pgresult.fformat(0) == 0
row = await cur.fetchone()
- assert row[0] == 1
- assert row[1] == "foo"
- assert row[2] is None
+ assert row == (1, "foo", None)
row = await cur.fetchone()
assert row is None
-async def test_execute_binary_result(aconn):
+async def test_binary_cursor_execute(aconn):
cur = aconn.cursor(binary=True)
- await cur.execute("select %s::text, %s::text", ["foo", None])
+ await cur.execute("select %s, %s", [1, None])
+ assert (await cur.fetchone()) == (1, None)
assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x01"
- row = await cur.fetchone()
- assert row[0] == "foo"
- assert row[1] is None
- row = await cur.fetchone()
- assert row is None
+
+async def test_execute_binary(aconn):
+ cur = aconn.cursor()
+ await cur.execute("select %s, %s", [1, None], binary=True)
+ assert (await cur.fetchone()) == (1, None)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x01"
+
+
+async def test_binary_cursor_text_override(aconn):
+ cur = aconn.cursor(binary=True)
+ await cur.execute("select %s, %s", [1, None], binary=False)
+ assert (await cur.fetchone()) == (1, None)
+ assert cur.pgresult.fformat(0) == 0
+ assert cur.pgresult.get_value(0, 0) == b"1"
@pytest.mark.parametrize("encoding", ["utf8", "latin9"])
pass
+async def test_stream_binary_cursor(aconn):
+ cur = aconn.cursor(binary=True)
+ recs = []
+ async for rec in cur.stream("select generate_series(1, 2)"):
+ recs.append(rec)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
+
+ assert recs == [(1,), (2,)]
+
+
+async def test_stream_execute_binary(aconn):
+ cur = aconn.cursor()
+ recs = []
+ async for rec in cur.stream("select generate_series(1, 2)", binary=True):
+ recs.append(rec)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
+
+ assert recs == [(1,), (2,)]
+
+
+async def test_stream_binary_cursor_text_override(aconn):
+ cur = aconn.cursor(binary=True)
+ recs = []
+ async for rec in cur.stream("select generate_series(1, 2)", binary=False):
+ recs.append(rec)
+ assert cur.pgresult.fformat(0) == 0
+ assert cur.pgresult.get_value(0, 0) == str(rec[0]).encode("utf8")
+
+ assert recs == [(1,), (2,)]
+
+
async def test_str(aconn):
cur = aconn.cursor()
assert "[IDLE]" in str(cur)
assert cur._query.params == [bytes([0, 3])] # 3 as binary int2
+def test_binary_cursor_execute(conn):
+ cur = conn.cursor("foo", binary=True)
+ cur.execute("select generate_series(1, 2)")
+ assert cur.fetchone() == (1,)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+ assert cur.fetchone() == (2,)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x02"
+
+
+def test_execute_binary(conn):
+ cur = conn.cursor("foo")
+ cur.execute("select generate_series(1, 2)", binary=True)
+ assert cur.fetchone() == (1,)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+ assert cur.fetchone() == (2,)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x02"
+
+ cur.execute("select generate_series(1, 1)")
+ assert cur.fetchone() == (1,)
+ assert cur.pgresult.fformat(0) == 0
+ assert cur.pgresult.get_value(0, 0) == b"1"
+
+
+def test_binary_cursor_text_override(conn):
+ cur = conn.cursor("foo", binary=True)
+ cur.execute("select generate_series(1, 2)", binary=False)
+ assert cur.fetchone() == (1,)
+ assert cur.pgresult.fformat(0) == 0
+ assert cur.pgresult.get_value(0, 0) == b"1"
+ assert cur.fetchone() == (2,)
+ assert cur.pgresult.fformat(0) == 0
+ assert cur.pgresult.get_value(0, 0) == b"2"
+
+ cur.execute("select generate_series(1, 2)")
+ assert cur.fetchone() == (1,)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+
+
def test_close(conn, recwarn, retries):
for retry in retries:
with retry:
assert cur._query.params == [bytes([0, 3])] # 3 as binary int2
+async def test_binary_cursor_execute(aconn):
+ cur = aconn.cursor("foo", binary=True)
+ await cur.execute("select generate_series(1, 2)")
+ assert (await cur.fetchone()) == (1,)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+ assert (await cur.fetchone()) == (2,)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x02"
+
+
+async def test_execute_binary(aconn):
+ cur = aconn.cursor("foo")
+ await cur.execute("select generate_series(1, 2)", binary=True)
+ assert (await cur.fetchone()) == (1,)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+ assert (await cur.fetchone()) == (2,)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x02"
+
+ await cur.execute("select generate_series(1, 1)")
+ assert (await cur.fetchone()) == (1,)
+ assert cur.pgresult.fformat(0) == 0
+ assert cur.pgresult.get_value(0, 0) == b"1"
+
+
+async def test_binary_cursor_text_override(aconn):
+ cur = aconn.cursor("foo", binary=True)
+ await cur.execute("select generate_series(1, 2)", binary=False)
+ assert (await cur.fetchone()) == (1,)
+ assert cur.pgresult.fformat(0) == 0
+ assert cur.pgresult.get_value(0, 0) == b"1"
+ assert (await cur.fetchone()) == (2,)
+ assert cur.pgresult.fformat(0) == 0
+ assert cur.pgresult.get_value(0, 0) == b"2"
+
+ await cur.execute("select generate_series(1, 2)")
+ assert (await cur.fetchone()) == (1,)
+ assert cur.pgresult.fformat(0) == 1
+ assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+
+
async def test_close(aconn, recwarn, retries):
async for retry in retries:
with retry: